Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4869] Add Webhook support for HTTP Source Connector #4913

Merged
merged 14 commits into from
Jun 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
import org.apache.eventmesh.connector.http.util.QueueUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -253,23 +254,8 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
* @param exportRecord the received data to add to the queue
*/
public void addDataToQueue(HttpExportRecord exportRecord) {
// If the current queue size is greater than or equal to the maximum queue size, remove the oldest element
if (currentQueueSize.get() >= maxQueueSize) {
Object removedData = receivedDataQueue.poll();
if (log.isDebugEnabled()) {
log.debug("The queue is full, remove the oldest element: {}", removedData);
} else {
log.info("The queue is full, remove the oldest element");
}
currentQueueSize.decrementAndGet();
}
// Try to put the received data into the queue
if (receivedDataQueue.offer(exportRecord)) {
currentQueueSize.incrementAndGet();
log.debug("Successfully put the received data into the queue: {}", exportRecord);
} else {
log.error("Failed to put the received data into the queue: {}", exportRecord);
}
// add exportRecord to the queue, thread-safe
QueueUtils.addWithCover(receivedDataQueue, exportRecord, maxQueueSize, currentQueueSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.eventmesh.connector.http.source.config;

import org.apache.eventmesh.connector.http.source.protocol.impl.CloudEventProtocol;

import java.util.HashMap;
import java.util.Map;

import lombok.Data;

@Data
Expand All @@ -28,5 +33,24 @@ public class SourceConnectorConfig {

private int port;

private int idleTimeout;
// timeunit: ms, default 5000ms
private int idleTimeout = 5000;

// The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data
// Default is 1MB (1024 * 1024 bytes).
// If you receive a "size exceed allowed maximum capacity" error, you can increase this value.
// Note: This applies only when handling form data submissions.
private int maxFormAttributeSize = 1024 * 1024;
cnzakii marked this conversation as resolved.
Show resolved Hide resolved

// max size of the queue, default 1000
private int maxStorageSize = 1000;

// batch size, default 10
private int batchSize = 10;

// protocol, default CloudEvent
private String protocol = CloudEventProtocol.PROTOCOL_NAME;

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,25 @@

import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;

Expand All @@ -47,12 +46,23 @@
@Slf4j
public class HttpSourceConnector implements Source {

private static final int DEFAULT_BATCH_SIZE = 10;

private HttpSourceConfig sourceConfig;
private BlockingQueue<CloudEvent> queue;

private ConcurrentLinkedQueue<Object> queue;

private int maxQueueSize;

private AtomicInteger currQueueSize;

private int batchSize;

private ProtocolFactory protocolFactory;

private Protocol protocol;

private HttpServer server;


@Override
public Class<? extends Config> configClass() {
return HttpSourceConfig.class;
Expand All @@ -72,42 +82,34 @@ public void init(ConnectorContext connectorContext) {
}

private void doInit() {
this.queue = new LinkedBlockingQueue<>(1000);
// init queue size
this.maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize();
this.currQueueSize = new AtomicInteger(0);
this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();

// init queue
this.queue = new ConcurrentLinkedQueue<>();

// init protocol
String protocolName = this.sourceConfig.getConnectorConfig().getProtocol();
this.protocolFactory = new ProtocolFactory(this.sourceConfig.connectorConfig);
this.protocol = protocolFactory.getInstance(protocolName);

final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
router.route()
final Route route = router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.method(HttpMethod.POST)
.handler(LoggerHandler.create())
.handler(ctx -> {
VertxMessageFactory.createReader(ctx.request())
.map(reader -> {
CloudEvent event = reader.toEvent();
if (event.getSubject() == null) {
throw new IllegalStateException("attribute 'subject' cannot be null");
}
if (event.getDataContentType() == null) {
throw new IllegalStateException("attribute 'datacontenttype' cannot be null");
}
if (event.getData() == null) {
throw new IllegalStateException("attribute 'data' cannot be null");
}
return event;
})
.onSuccess(event -> {
queue.add(event);
log.info("[HttpSourceConnector] Succeed to convert payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
.onFailure(t -> {
log.error("[HttpSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
});
});
.handler(LoggerHandler.create());

// set protocol handler
this.protocol.setHandler(route, this.queue, this.maxQueueSize, this.currQueueSize);

// create server
this.server = vertx.createHttpServer(new HttpServerOptions()
.setPort(this.sourceConfig.connectorConfig.getPort())
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
.setMaxFormAttributeSize(this.sourceConfig.connectorConfig.getMaxFormAttributeSize())
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
}

@Override
Expand Down Expand Up @@ -138,17 +140,21 @@ public void stop() {

@Override
public List<ConnectRecord> poll() {
List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
try {
CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
if (event == null) {
break;
}
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
} catch (InterruptedException e) {
// if queue is empty, return empty list
if (currQueueSize.get() == 0) {
return Collections.emptyList();
}
// poll from queue
List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
Object obj = queue.poll();
if (obj == null) {
break;
}
currQueueSize.decrementAndGet();
// convert to ConnectRecord
ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
connectRecords.add(connectRecord);
}
return connectRecords;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.data;

import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.Map;

import lombok.Builder;
import lombok.Data;

/**
* Webhook Protocol Request.
*/
@Data
@Builder
public class WebhookRequest {

private String protocolName;

private String url;

private Map<String, String> headers;

private String payload;


/**
* Convert to ConnectRecord.
*
* @return ConnectRecord
*/
public ConnectRecord convertToConnectRecord() {
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), payload);
connectRecord.addExtension("protocolName", protocolName);
connectRecord.addExtension("url", url);
connectRecord.addExtension("headers", headers);
return connectRecord;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.data;

import java.io.Serializable;
import java.time.LocalDateTime;

import lombok.Builder;
import lombok.Data;

/**
* Webhook response.
*/
@Data
@Builder
public class WebhookResponse implements Serializable {

private static final long serialVersionUID = 8616938575207104455L;

private String msg;

private LocalDateTime handleTime;


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.protocol;

import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import io.vertx.ext.web.Route;


/**
* <p>
* Protocol Interface.
* </p>
* All protocols should implement this interface.
*/
public interface Protocol {


/**
* Initialize the protocol.
*
* @param sourceConnectorConfig source connector config
*/
void initialize(SourceConnectorConfig sourceConnectorConfig);


/**
* Handle the protocol message.
*
* @param route route
* @param queue queue
* @param maxSize max size of the queue
* @param currSize current size of the queue
*/
void setHandler(Route route, ConcurrentLinkedQueue<Object> queue, int maxSize, AtomicInteger currSize);


/**
* Convert the message to ConnectRecord.
*
* @param message message
* @return ConnectRecord
*/
ConnectRecord convertToConnectRecord(Object message);
}
Loading
Loading