Skip to content

Commit

Permalink
[ISSUE #4659] Fix HTTP source connector stop after receiving an inval…
Browse files Browse the repository at this point in the history
…id request (#4666)
  • Loading branch information
Fungx committed Dec 16, 2023
1 parent 8d5b39b commit d625790
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -83,15 +82,27 @@ private void doInit() {
.handler(LoggerHandler.create())
.handler(ctx -> {
VertxMessageFactory.createReader(ctx.request())
.map(MessageReader::toEvent)
.map(reader -> {
CloudEvent event = reader.toEvent();
if (event.getSubject() == null) {
throw new IllegalStateException("attribute 'subject' cannot be null");
}
if (event.getDataContentType() == null) {
throw new IllegalStateException("attribute 'datacontenttype' cannot be null");
}
if (event.getData() == null) {
throw new IllegalStateException("attribute 'data' cannot be null");
}
return event;
})
.onSuccess(event -> {
queue.add(event);
log.info("[HttpSourceConnector] Succeed to convert payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
.onFailure(t -> {
log.error("[HttpSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(t.getMessage()).end();
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
});
});
this.server = vertx.createHttpServer(new HttpServerOptions()
Expand Down

0 comments on commit d625790

Please sign in to comment.