From f45438dc0cbfc9a6f5128da4ef9edf7fdf5b17e1 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 2 Dec 2023 23:27:05 +0800 Subject: [PATCH 1/2] [ISSUE #4596]Fix SourceWorker#convertRecordToEvent method converts ConnectRecord to CloudEvent throw NPE --- .../apache/eventmesh/openconnect/SourceWorker.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java index 71c3fea4d4..2445382e72 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java @@ -244,12 +244,13 @@ private CloudEvent convertRecordToEvent(ConnectRecord connectRecord) { .withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8)) .withExtension("ttl", 10000); - for (String key : connectRecord.getExtensions().keySet()) { - if (CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) { - cloudEventBuilder.withExtension(key, connectRecord.getExtension(key)); + if (connectRecord.getExtensions() != null) { + for (String key : connectRecord.getExtensions().keySet()) { + if (CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) { + cloudEventBuilder.withExtension(key, connectRecord.getExtension(key)); + } } } - return cloudEventBuilder.build(); } @@ -329,7 +330,7 @@ public boolean commitOffsets() { log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages()); if (committableOffsets.hasPending()) { log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. " - + "The source partition with the most pending messages is {}, with {} pending messages", + + "The source partition with the most pending messages is {}, with {} pending messages", this, committableOffsets.numUncommittableMessages(), committableOffsets.numDeques(), @@ -337,7 +338,7 @@ public boolean commitOffsets() { committableOffsets.largestDequeSize()); } else { log.debug("{} There are currently no pending messages for this offset commit; " - + "all messages dispatched to the task's producer since the last commit have been acknowledged", + + "all messages dispatched to the task's producer since the last commit have been acknowledged", this); } } From 3a9f1925a6ee5fe76d70153e8395bc75f70df7b6 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 3 Dec 2023 00:41:41 +0800 Subject: [PATCH 2/2] fix CloudEventUtil#convertRecordToEvent method converts ConnectRecord to CloudEvent throw NPE --- .../openconnect/util/CloudEventUtil.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java index 5691d43fde..64e5a91673 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.time.OffsetDateTime; import java.util.Objects; +import java.util.Optional; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -33,32 +34,31 @@ public class CloudEventUtil { public static CloudEvent convertRecordToEvent(ConnectRecord connectRecord) { - CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1() - .withData((byte[]) connectRecord.getData()); - connectRecord.getExtensions().keySet().forEach(s -> { - switch (s) { + final CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1().withData((byte[]) connectRecord.getData()); + Optional.ofNullable(connectRecord.getExtensions()).ifPresent((extensions) -> extensions.keySet().forEach(key -> { + switch (key) { case "id": - cloudEventBuilder.withId(connectRecord.getExtension(s)); + cloudEventBuilder.withId(connectRecord.getExtension(key)); break; case "topic": - cloudEventBuilder.withSubject(connectRecord.getExtension(s)); + cloudEventBuilder.withSubject(connectRecord.getExtension(key)); break; case "source": try { - cloudEventBuilder.withSource(new URI(connectRecord.getExtension(s))); + cloudEventBuilder.withSource(new URI(connectRecord.getExtension(key))); } catch (URISyntaxException e) { throw new RuntimeException(e); } break; case "type": - cloudEventBuilder.withType(connectRecord.getExtension(s)); + cloudEventBuilder.withType(connectRecord.getExtension(key)); break; default: - if (validateExtensionType(connectRecord.getExtensionObj(s))) { - cloudEventBuilder.withExtension(s, connectRecord.getExtension(s)); + if (validateExtensionType(connectRecord.getExtensionObj(key))) { + cloudEventBuilder.withExtension(key, connectRecord.getExtension(key)); } } - }); + })); return cloudEventBuilder.build(); }