Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4869] Add Webhook support for HTTP Source Connector #4913

Merged
merged 14 commits into from
Jun 27, 2024
Merged
cnzakii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.common;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>BoundedConcurrentQueue is a wrapper class for ConcurrentLinkedQueue</p>
* <ol>
* <li>Limit the maximum size of the queue</li>
* <li>Add an object to the queue, if the queue is full, remove the head element and add the new element.(thread safe)</li>
Pil0tXia marked this conversation as resolved.
Show resolved Hide resolved
* <li>Poll an object from the queue.(thread safe)</li>
* <li>Fetch a range of elements from the queue.(weak consistency)</li>
* </ol>
*/
public class BoundedConcurrentQueue<T> {

private final int maxSize;

private final AtomicInteger currSize;

private final ConcurrentLinkedQueue<T> queue;

// Lock for add operation
private final Object addLock = new Object();

private final AtomicBoolean doReplace = new AtomicBoolean(false);


public BoundedConcurrentQueue(int maxSize) {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxQueueSize must be greater than 0");
}
this.maxSize = maxSize;
this.currSize = new AtomicInteger(0);
this.queue = new ConcurrentLinkedQueue<>();
}

public int getMaxSize() {
return maxSize;
}

public int getCurrSize() {
return currSize.get();
}

/**
* <p>Add an object to the queue</p>
* <ul>
* <li>If the queue is full, remove the head element and add the new element.</li>
* <li>When the queue capacity is continuously saturated, each add operation generates a lock contention,
* and means are needed to avoid this approach. e.g. Suitable capacity and equal-rate {@link #poll()}</li>
* </ul>
*
* @param obj object to be added
*/
public void offerWithReplace(T obj) {
if (obj == null) {
return;
}

// try to do offer(no-blocking)
// 1. currSize < maxSize
// 2. no old values are being deleted(queue is not full)
// 3. try to add the new element
if (currSize.get() < maxSize && !doReplace.get() && queue.offer(obj)) {
currSize.incrementAndGet();
return;
}

// try to do offer again(blocking)
synchronized (addLock) {
// double check inside the lock
if (currSize.get() < maxSize && queue.offer(obj)) {
currSize.incrementAndGet();
return;
}
// remove the head element and add the new element
doReplace.set(true);
try {
T removedObj = queue.poll();
if (!queue.offer(obj)) {
// abnormal behavior
throw new IllegalStateException("Unable to add element to queue");
} else if (removedObj == null) {
// it is equivalent to just adding new element
currSize.incrementAndGet();
}
} finally {
// finish replace
doReplace.set(false);
}
}

}


/**
* <p>Poll an object from the queue.</p>
*
* @return object
*/
public T poll() {
T obj = queue.poll();
if (obj != null) {
currSize.decrementAndGet();
}
return obj;
}


/**
* <p>Fetch a range of elements from the queue(weakly consistent).</p>
* <ul>
* <li> In the case of concurrent modification, the elements may not be fetched as expected.</li>
* <li>Avoiding simultaneous use with the {@link #poll()} method can greatly reduce the risk of corresponding</li>
* </ul>
*
* @param start start index
* @param end end index
* @param removed whether to remove the elements from the queue
* @return list of elements
*/
public List<T> fetchRange(int start, int end, boolean removed) {
if (start < 0 || end > maxSize || start > end) {
cnzakii marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("Invalid range");
}

Iterator<T> iterator = queue.iterator();
List<T> items = new ArrayList<>(end - start);

int count = 0;
while (iterator.hasNext() && count < end) {
T item = iterator.next();
if (item != null && count >= start) {
// Add the element to the list
items.add(item);
if (removed) {
// Remove the element from the queue
iterator.remove();
currSize.decrementAndGet();
}
}
count++;
}

return items;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
// convert the result to an HttpExportRecord
HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
// add the data to the queue
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offerWithReplace(exportRecord);
}
})
.onRetry(event -> {
Expand All @@ -144,7 +144,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
if (connectorConfig.getWebhookConfig().isActivate()) {
HttpExportRecord exportRecord =
covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id);
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offerWithReplace(exportRecord);
}
// update the HttpConnectRecord
httpConnectRecord.setTime(LocalDateTime.now().toString());
Expand All @@ -159,7 +159,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
}
if (connectorConfig.getWebhookConfig().isActivate()) {
HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offerWithReplace(exportRecord);
}
}).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.connector.http.sink.handle;

import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.BoundedConcurrentQueue;
import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
Expand All @@ -30,13 +31,9 @@

import java.net.URI;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
Expand Down Expand Up @@ -73,25 +70,22 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
private HttpServer exportServer;

// store the received data, when webhook is enabled
private final ConcurrentLinkedQueue<HttpExportRecord> receivedDataQueue;

// the maximum queue size
private final int maxQueueSize;

// the current queue size
private final AtomicInteger currentQueueSize;
private final BoundedConcurrentQueue<HttpExportRecord> receivedDataQueue;

public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
this.sinkConnectorConfig = sinkConnectorConfig;
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
this.maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.currentQueueSize = new AtomicInteger(0);
this.receivedDataQueue = new ConcurrentLinkedQueue<>();
int maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.receivedDataQueue = new BoundedConcurrentQueue<>(maxQueueSize);
// init the export server
doInitExportServer();
}

public BoundedConcurrentQueue<HttpExportRecord> getReceivedDataQueue() {
return receivedDataQueue;
}

/**
* Initialize the server for exporting the received data
*/
Expand Down Expand Up @@ -135,7 +129,7 @@ private void doInitExportServer() {
int pageNum = StringUtils.isBlank(pageNumStr) ? 1 : Integer.parseInt(pageNumStr);
int pageSize = Integer.parseInt(pageSizeStr);

if (currentQueueSize.get() == 0) {
if (receivedDataQueue.getCurrSize() == 0) {
ctx.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
.setStatusCode(HttpResponseStatus.NO_CONTENT.code())
Expand All @@ -148,12 +142,12 @@ private void doInitExportServer() {
List<HttpExportRecord> exportRecords;
if (Objects.equals(type, TypeEnum.POLL.getValue())) {
// If the type is poll, only the first page of data is exported and removed
exportRecords = getDataFromQueue(0, pageSize, true);
exportRecords = receivedDataQueue.fetchRange(0, pageSize, true);
} else {
// If the type is peek, the specified page of data is exported without removing
int startIndex = (pageNum - 1) * pageSize;
int endIndex = startIndex + pageSize;
exportRecords = getDataFromQueue(startIndex, endIndex, false);
exportRecords = receivedDataQueue.fetchRange(startIndex, endIndex, false);
}

// Create HttpExportRecordPage
Expand Down Expand Up @@ -242,63 +236,11 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
// create ExportRecord
HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
// add the data to the queue
addDataToQueue(exportRecord);
receivedDataQueue.offerWithReplace(exportRecord);
});
}


/**
* Adds the received data to the queue.
*
* @param exportRecord the received data to add to the queue
*/
public void addDataToQueue(HttpExportRecord exportRecord) {
// If the current queue size is greater than or equal to the maximum queue size, remove the oldest element
if (currentQueueSize.get() >= maxQueueSize) {
Object removedData = receivedDataQueue.poll();
if (log.isDebugEnabled()) {
log.debug("The queue is full, remove the oldest element: {}", removedData);
} else {
log.info("The queue is full, remove the oldest element");
}
currentQueueSize.decrementAndGet();
}
// Try to put the received data into the queue
if (receivedDataQueue.offer(exportRecord)) {
currentQueueSize.incrementAndGet();
log.debug("Successfully put the received data into the queue: {}", exportRecord);
} else {
log.error("Failed to put the received data into the queue: {}", exportRecord);
}
}

/**
* Gets the received data from the queue.
*
* @param startIndex the start index of the data to get
* @param endIndex the end index of the data to get
* @param removed whether to remove the data from the queue
* @return the received data
*/
private List<HttpExportRecord> getDataFromQueue(int startIndex, int endIndex, boolean removed) {
Iterator<HttpExportRecord> iterator = receivedDataQueue.iterator();

List<HttpExportRecord> pageItems = new ArrayList<>(endIndex - startIndex);
int count = 0;
while (iterator.hasNext() && count < endIndex) {
HttpExportRecord item = iterator.next();
if (count >= startIndex) {
pageItems.add(item);
if (removed) {
iterator.remove();
currentQueueSize.decrementAndGet();
}
}
count++;
}
return pageItems;
}

/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.eventmesh.connector.http.source.config;

import org.apache.eventmesh.connector.http.source.protocol.impl.CloudEventProtocol;

import java.util.HashMap;
import java.util.Map;

import lombok.Data;

@Data
Expand All @@ -28,5 +33,24 @@ public class SourceConnectorConfig {

private int port;

private int idleTimeout;
// timeunit: ms, default 5000ms
private int idleTimeout = 5000;

// The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data
// Default is 1MB (1024 * 1024 bytes).
// If you receive a "size exceed allowed maximum capacity" error, you can increase this value.
// Note: This applies only when handling form data submissions.
private int maxFormAttributeSize = 1024 * 1024;
cnzakii marked this conversation as resolved.
Show resolved Hide resolved

// max size of the queue, default 1000
private int maxStorageSize = 1000;

// batch size, default 10
private int batchSize = 10;

// protocol, default CloudEvent
private String protocol = CloudEventProtocol.PROTOCOL_NAME;

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();
}
Loading
Loading