Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4869] Add Webhook support for HTTP Source Connector #4913

Merged
merged 14 commits into from
Jun 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.eventmesh.common.config.connector.http;

import java.util.HashMap;
import java.util.Map;

import lombok.Data;

@Data
Expand All @@ -28,5 +31,28 @@ public class SourceConnectorConfig {

private int port;

private int idleTimeout;
// timeunit: ms, default 5000ms
private int idleTimeout = 5000;

/**
* <ul>
* <li>The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data </li>
* <li>Default is 1MB (1024 * 1024 bytes). </li>
* <li>If you receive a "size exceed allowed maximum capacity" error, you can increase this value. </li>
* <li>Note: This applies only when handling form data submissions.</li>
* </ul>
*/
private int maxFormAttributeSize = 1024 * 1024;

// max size of the queue, default 1000
private int maxStorageSize = 1000;

// batch size, default 10
private int batchSize = 10;

// protocol, default CloudEvent
private String protocol = "CloudEvent";

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();
}
1 change: 0 additions & 1 deletion eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies {
implementation 'io.vertx:vertx-web-client:4.5.8'
implementation 'dev.failsafe:failsafe:3.3.2'

testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.common;

import org.apache.commons.collections4.queue.CircularFifoQueue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;


/**
* SynchronizedCircularFifoQueue is a synchronized version of CircularFifoQueue.
*/
public class SynchronizedCircularFifoQueue<E> extends CircularFifoQueue<E> {

/**
* <p>Default constructor. capacity = 32</p>
*/
public SynchronizedCircularFifoQueue() {
super();
}

public SynchronizedCircularFifoQueue(Collection<? extends E> coll) {
super(coll);
}

public SynchronizedCircularFifoQueue(int size) {
super(size);
}

@Override
public synchronized boolean add(E element) {
return super.add(element);
}

@Override
public synchronized void clear() {
super.clear();
}

@Override
public synchronized E element() {
return super.element();
}

@Override
public synchronized E get(int index) {
return super.get(index);
}

@Override
public synchronized boolean isAtFullCapacity() {
return super.isAtFullCapacity();
}

@Override
public synchronized boolean isEmpty() {
return super.isEmpty();
}

@Override
public synchronized boolean isFull() {
return super.isFull();
}

@Override
public synchronized int maxSize() {
return super.maxSize();
}

@Override
public synchronized boolean offer(E element) {
return super.offer(element);
}

@Override
public synchronized E peek() {
return super.peek();
}

@Override
public synchronized E poll() {
return super.poll();
}

@Override
public synchronized E remove() {
return super.remove();
}

@Override
public synchronized int size() {
return super.size();
}

/**
* <p>Fetch a range of elements from the queue.</p>
*
* @param start start index
* @param end end index
* @param removed whether to remove the elements from the queue
* @return list of elements
*/
public synchronized List<E> fetchRange(int start, int end, boolean removed) {

if (start < 0 || end > this.size() || start > end) {
throw new IllegalArgumentException("Invalid range");
}

Iterator<E> iterator = this.iterator();
List<E> items = new ArrayList<>(end - start);

int count = 0;
while (iterator.hasNext() && count < end) {
E item = iterator.next();
if (item != null && count >= start) {
// Add the element to the list
items.add(item);
if (removed) {
// Remove the element from the queue
iterator.remove();
}
}
count++;
}
return items;

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
// convert the result to an HttpExportRecord
HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
// add the data to the queue
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
}
})
.onRetry(event -> {
Expand All @@ -144,7 +144,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
if (connectorConfig.getWebhookConfig().isActivate()) {
HttpExportRecord exportRecord =
covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id);
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
}
// update the HttpConnectRecord
httpConnectRecord.setTime(LocalDateTime.now().toString());
Expand All @@ -159,7 +159,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
}
if (connectorConfig.getWebhookConfig().isActivate()) {
HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
}
}).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.connector.http.sink.handle;

import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
Expand All @@ -30,13 +31,9 @@

import java.net.URI;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
Expand Down Expand Up @@ -73,25 +70,22 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
private HttpServer exportServer;

// store the received data, when webhook is enabled
private final ConcurrentLinkedQueue<HttpExportRecord> receivedDataQueue;

// the maximum queue size
private final int maxQueueSize;

// the current queue size
private final AtomicInteger currentQueueSize;
private final SynchronizedCircularFifoQueue<HttpExportRecord> receivedDataQueue;

public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
this.sinkConnectorConfig = sinkConnectorConfig;
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
this.maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.currentQueueSize = new AtomicInteger(0);
this.receivedDataQueue = new ConcurrentLinkedQueue<>();
int maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.receivedDataQueue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
// init the export server
doInitExportServer();
}

public SynchronizedCircularFifoQueue<HttpExportRecord> getReceivedDataQueue() {
return receivedDataQueue;
}

/**
* Initialize the server for exporting the received data
*/
Expand Down Expand Up @@ -135,7 +129,7 @@ private void doInitExportServer() {
int pageNum = StringUtils.isBlank(pageNumStr) ? 1 : Integer.parseInt(pageNumStr);
int pageSize = Integer.parseInt(pageSizeStr);

if (currentQueueSize.get() == 0) {
if (receivedDataQueue.isEmpty()) {
ctx.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
.setStatusCode(HttpResponseStatus.NO_CONTENT.code())
Expand All @@ -148,12 +142,12 @@ private void doInitExportServer() {
List<HttpExportRecord> exportRecords;
if (Objects.equals(type, TypeEnum.POLL.getValue())) {
// If the type is poll, only the first page of data is exported and removed
exportRecords = getDataFromQueue(0, pageSize, true);
exportRecords = receivedDataQueue.fetchRange(0, pageSize, true);
} else {
// If the type is peek, the specified page of data is exported without removing
int startIndex = (pageNum - 1) * pageSize;
int endIndex = startIndex + pageSize;
exportRecords = getDataFromQueue(startIndex, endIndex, false);
exportRecords = receivedDataQueue.fetchRange(startIndex, endIndex, false);
}

// Create HttpExportRecordPage
Expand Down Expand Up @@ -242,63 +236,11 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
// create ExportRecord
HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
// add the data to the queue
addDataToQueue(exportRecord);
receivedDataQueue.offer(exportRecord);
});
}


/**
* Adds the received data to the queue.
*
* @param exportRecord the received data to add to the queue
*/
public void addDataToQueue(HttpExportRecord exportRecord) {
// If the current queue size is greater than or equal to the maximum queue size, remove the oldest element
if (currentQueueSize.get() >= maxQueueSize) {
Object removedData = receivedDataQueue.poll();
if (log.isDebugEnabled()) {
log.debug("The queue is full, remove the oldest element: {}", removedData);
} else {
log.info("The queue is full, remove the oldest element");
}
currentQueueSize.decrementAndGet();
}
// Try to put the received data into the queue
if (receivedDataQueue.offer(exportRecord)) {
currentQueueSize.incrementAndGet();
log.debug("Successfully put the received data into the queue: {}", exportRecord);
} else {
log.error("Failed to put the received data into the queue: {}", exportRecord);
}
}

/**
* Gets the received data from the queue.
*
* @param startIndex the start index of the data to get
* @param endIndex the end index of the data to get
* @param removed whether to remove the data from the queue
* @return the received data
*/
private List<HttpExportRecord> getDataFromQueue(int startIndex, int endIndex, boolean removed) {
Iterator<HttpExportRecord> iterator = receivedDataQueue.iterator();

List<HttpExportRecord> pageItems = new ArrayList<>(endIndex - startIndex);
int count = 0;
while (iterator.hasNext() && count < endIndex) {
HttpExportRecord item = iterator.next();
if (count >= startIndex) {
pageItems.add(item);
if (removed) {
iterator.remove();
currentQueueSize.decrementAndGet();
}
}
count++;
}
return pageItems;
}

/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
*/
Expand Down
Loading
Loading