diff --git a/amqp/pom.xml b/amqp/pom.xml
index 23853c594..865acfdc5 100644
--- a/amqp/pom.xml
+++ b/amqp/pom.xml
@@ -6,7 +6,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-amqp-proton
diff --git a/api/pom.xml b/api/pom.xml
index cb657fa82..adb849480 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -24,7 +24,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-api
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 78e6b87cd..2f150fbf7 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -21,7 +21,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-benchmarks
diff --git a/bom/pom.xml b/bom/pom.xml
index 8441369ce..8343f7644 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -22,7 +22,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-bom
diff --git a/core/pom.xml b/core/pom.xml
index 5104d4ff8..2e777d559 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -22,7 +22,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-core
diff --git a/core/src/main/java/io/cloudevents/core/format/ContentType.java b/core/src/main/java/io/cloudevents/core/format/ContentType.java
new file mode 100644
index 000000000..1d8656393
--- /dev/null
+++ b/core/src/main/java/io/cloudevents/core/format/ContentType.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.core.format;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventData;
+import io.cloudevents.rw.CloudEventDataMapper;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ *
A construct that aggregates a two-part identifier of file formats and format contents transmitted on the Internet.
+ *
+ *
The two parts of a {@code ContentType} are its type and a subtype; separated by a forward slash ({@code /}).
+ *
+ *
The constants enumerated by {@code ContentType} correspond only to the specialized formats supported by the Java™ SDK for CloudEvents.
+ *
+ * @see io.cloudevents.core.format.EventFormat
+ */
+@ParametersAreNonnullByDefault
+public enum ContentType {
+
+ /**
+ * Content type associated with the JSON event format
+ */
+ JSON("application/cloudevents+json"),
+ /**
+ * The content type for transports sending cloudevents in the protocol buffer format.
+ */
+ PROTO("application/cloudevents+protobuf"),
+ /**
+ * The content type for transports sending cloudevents in XML format.
+ */
+ XML("application/cloudevents+xml");
+
+ private String value;
+
+ private ContentType(String value) { this.value = value; }
+
+ /**
+ * Return a string consisting of the slash-delimited ({@code /}) two-part identifier for this {@code enum} constant.
+ */
+ public String value() { return value; }
+
+ /**
+ * Return a string consisting of the slash-delimited ({@code /}) two-part identifier for this {@code enum} constant.
+ */
+ @Override
+ public String toString() { return value(); }
+
+}
diff --git a/core/src/main/java/io/cloudevents/core/provider/EventFormatProvider.java b/core/src/main/java/io/cloudevents/core/provider/EventFormatProvider.java
index 96f3ad678..d961d2f85 100644
--- a/core/src/main/java/io/cloudevents/core/provider/EventFormatProvider.java
+++ b/core/src/main/java/io/cloudevents/core/provider/EventFormatProvider.java
@@ -25,6 +25,7 @@
import javax.annotation.ParametersAreNonnullByDefault;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.lang.Nullable;
@@ -98,4 +99,14 @@ public EventFormat resolveFormat(String contentType) {
return this.formats.get(contentType);
}
+ /**
+ * Resolve an event format starting from the content type.
+ *
+ * @param contentType the content type to resolve the event format
+ * @return null if no format was found for the provided content type
+ */
+ @Nullable
+ public EventFormat resolveFormat(ContentType contentType) {
+ return this.formats.get(contentType.value());
+ }
}
diff --git a/docs/json-jackson.md b/docs/json-jackson.md
index 6bb2a7c6a..823436f02 100644
--- a/docs/json-jackson.md
+++ b/docs/json-jackson.md
@@ -28,9 +28,9 @@ adding the dependency to your project:
```java
import io.cloudevents.CloudEvent;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventFormatProvider;
import io.cloudevents.core.builder.CloudEventBuilder;
-import io.cloudevents.jackson.JsonFormat;
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
@@ -40,7 +40,7 @@ CloudEvent event = CloudEventBuilder.v1()
byte[]serialized = EventFormatProvider
.getInstance()
- .resolveFormat(JsonFormat.CONTENT_TYPE)
+ .resolveFormat(ContentType.JSON)
.serialize(event);
```
diff --git a/docs/protobuf.md b/docs/protobuf.md
index d67a0d708..6a49ec86b 100644
--- a/docs/protobuf.md
+++ b/docs/protobuf.md
@@ -30,9 +30,9 @@ No further configuration is required is use the module.
```java
import io.cloudevents.CloudEvent;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventFormatProvider;
import io.cloudevents.core.builder.CloudEventBuilder;
-import io.cloudevents.protobuf.ProtobufFormat;
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
@@ -42,7 +42,7 @@ CloudEvent event = CloudEventBuilder.v1()
byte[]serialized = EventFormatProvider
.getInstance()
- .resolveFormat(ProtobufFormat.CONTENT_TYPE)
+ .resolveFormat(ContentType.PROTO)
.serialize(event);
```
diff --git a/docs/xml.md b/docs/xml.md
index 6d0640157..8e365ac1f 100644
--- a/docs/xml.md
+++ b/docs/xml.md
@@ -29,9 +29,9 @@ adding the dependency to your project:
```java
import io.cloudevents.CloudEvent;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventFormatProvider;
import io.cloudevents.core.builder.CloudEventBuilder;
-import io.cloudevents.xml.XMLFormat;
CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
@@ -41,7 +41,7 @@ CloudEvent event = CloudEventBuilder.v1()
byte[] serialized = EventFormatProvider
.getInstance()
- .resolveFormat(XMLFormat.CONTENT_TYPE)
+ .resolveFormat(ContentType.XML)
.serialize(event);
```
diff --git a/examples/amqp-proton/pom.xml b/examples/amqp-proton/pom.xml
index 6919bf4f2..4ef09c5ce 100644
--- a/examples/amqp-proton/pom.xml
+++ b/examples/amqp-proton/pom.xml
@@ -3,7 +3,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/basic-http/pom.xml b/examples/basic-http/pom.xml
index 7b57dbe25..153c0de78 100644
--- a/examples/basic-http/pom.xml
+++ b/examples/basic-http/pom.xml
@@ -21,7 +21,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index e39e9d8eb..018384e61 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/pom.xml b/examples/pom.xml
index a706a51e2..de74f57fd 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -5,7 +5,7 @@
cloudevents-parent
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
@@ -29,6 +29,7 @@
spring-reactive
spring-rsocket
spring-function
+ rocketmq
diff --git a/examples/restful-ws-microprofile-liberty/pom.xml b/examples/restful-ws-microprofile-liberty/pom.xml
index 0cfbd64fc..86680c0b9 100644
--- a/examples/restful-ws-microprofile-liberty/pom.xml
+++ b/examples/restful-ws-microprofile-liberty/pom.xml
@@ -3,7 +3,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/examples/restful-ws-quarkus/pom.xml b/examples/restful-ws-quarkus/pom.xml
index c6a145837..68b11715d 100644
--- a/examples/restful-ws-quarkus/pom.xml
+++ b/examples/restful-ws-quarkus/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
cloudevents-restful-ws-quarkus-example
diff --git a/examples/restful-ws-spring-boot/pom.xml b/examples/restful-ws-spring-boot/pom.xml
index 5a66b8f89..2475adc0f 100644
--- a/examples/restful-ws-spring-boot/pom.xml
+++ b/examples/restful-ws-spring-boot/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/rocketmq/README.md b/examples/rocketmq/README.md
new file mode 100644
index 000000000..8e4a98e60
--- /dev/null
+++ b/examples/rocketmq/README.md
@@ -0,0 +1,27 @@
+# RocketMQ + CloudEvents Sample
+
+This example demonstrates the integration of [RocketMQ 5.x client library](https://github.com/apache/rocketmq-clients)
+with CloudEvents to create a RocketMQ binding.
+
+## Building the Project
+
+```shell
+mvn package
+```
+
+## Setting Up a RocketMQ Instance
+
+Follow the [quickstart guide](https://rocketmq.apache.org/docs/quick-start/01quickstart) on the official RocketMQ
+website to set up the necessary components, including nameserver, proxy, and broker.
+
+## Event Production
+
+```shell
+mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqProducer" -Dexec.args="foobar:8081 sample-topic"
+```
+
+## Event Consumption
+
+```shell
+mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqConsumer" -Dexec.args="foobar:8081 sample-topic sample-consumer-group"
+```
diff --git a/examples/rocketmq/pom.xml b/examples/rocketmq/pom.xml
new file mode 100644
index 000000000..022ac49cc
--- /dev/null
+++ b/examples/rocketmq/pom.xml
@@ -0,0 +1,21 @@
+
+
+
+ cloudevents-examples
+ io.cloudevents
+ 3.0.0-SNAPSHOT
+
+ 4.0.0
+
+ cloudevents-rocketmq-example
+
+
+
+ io.cloudevents
+ cloudevents-rocketmq
+ ${project.version}
+
+
+
diff --git a/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqConsumer.java b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqConsumer.java
new file mode 100644
index 000000000..3f539b4ac
--- /dev/null
+++ b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqConsumer.java
@@ -0,0 +1,50 @@
+package io.cloudevents.examples.rocketmq;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.rocketmq.RocketMqMessageFactory;
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+
+public class RocketmqConsumer {
+ private RocketmqConsumer() {
+ }
+
+ public static void main(String[] args) throws InterruptedException, ClientException, IOException {
+ if (args.length < 3) {
+ System.out.println("Usage: rocketmq_consumer ");
+ return;
+ }
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+ String endpoints = args[0];
+ ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .build();
+ FilterExpression filterExpression = new FilterExpression();
+ String topic = args[1];
+ String consumerGroup = args[2];
+
+ // Create the RocketMQ Consumer.
+ PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(consumerGroup)
+ .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+ .setMessageListener(messageView -> {
+ final MessageReader reader = RocketMqMessageFactory.createReader(messageView);
+ final CloudEvent event = reader.toEvent();
+ System.out.println("Received event=" + event + ", messageId=" + messageView.getMessageId());
+ return ConsumeResult.SUCCESS;
+ })
+ .build();
+ // Block the main thread, no need for production environment.
+ Thread.sleep(Long.MAX_VALUE);
+ // Close the push consumer when you don't need it anymore.
+ pushConsumer.close();
+ }
+}
diff --git a/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqProducer.java b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqProducer.java
new file mode 100644
index 000000000..adeadb7f7
--- /dev/null
+++ b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqProducer.java
@@ -0,0 +1,64 @@
+package io.cloudevents.examples.rocketmq;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.v1.CloudEventBuilder;
+import io.cloudevents.rocketmq.RocketMqMessageFactory;
+import io.cloudevents.types.Time;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.shaded.com.google.gson.Gson;
+
+public class RocketmqProducer {
+ private RocketmqProducer() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException {
+ if (args.length < 2) {
+ System.out.println("Usage: rocketmq_producer ");
+ return;
+ }
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+ String endpoints = args[0];
+ ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .build();
+ String topic = args[1];
+
+ // Create the RocketMQ Producer.
+ final Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setTopics(topic)
+ .build();
+ final Gson gson = new Gson();
+ Map payload = new HashMap<>();
+ payload.put("foo", "bar");
+ final CloudEvent event = new CloudEventBuilder()
+ .withId("client-id")
+ .withSource(URI.create("http://127.0.0.1/rocketmq-client"))
+ .withType("com.foobar")
+ .withTime(Time.parseTime("2022-11-09T21:47:12.032198+00:00"))
+ .withData(gson.toJson(payload).getBytes(StandardCharsets.UTF_8))
+ .build();
+ // Transform event into message.
+ final Message message = RocketMqMessageFactory.createWriter(topic).writeBinary(event);
+ try {
+ // Send the message.
+ final SendReceipt sendReceipt = producer.send(message);
+ System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
+ } catch (Exception e) {
+ System.out.println("Failed to send message");
+ e.printStackTrace();
+ }
+ // Close the producer when you don't need it anymore.
+ producer.close();
+ }
+}
diff --git a/examples/spring-function/pom.xml b/examples/spring-function/pom.xml
index 42a071d1f..5a0047f1a 100644
--- a/examples/spring-function/pom.xml
+++ b/examples/spring-function/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/spring-reactive/pom.xml b/examples/spring-reactive/pom.xml
index 86f2bdac7..2a4489f04 100644
--- a/examples/spring-reactive/pom.xml
+++ b/examples/spring-reactive/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/spring-rsocket/pom.xml b/examples/spring-rsocket/pom.xml
index 505339f0e..151a7fe84 100644
--- a/examples/spring-rsocket/pom.xml
+++ b/examples/spring-rsocket/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/examples/vertx/pom.xml b/examples/vertx/pom.xml
index 76601c4c0..7fccab33e 100644
--- a/examples/vertx/pom.xml
+++ b/examples/vertx/pom.xml
@@ -5,7 +5,7 @@
cloudevents-examples
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0
diff --git a/formats/json-jackson/pom.xml b/formats/json-jackson/pom.xml
index 1994883af..b39374299 100644
--- a/formats/json-jackson/pom.xml
+++ b/formats/json-jackson/pom.xml
@@ -22,7 +22,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java b/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java
index 542405894..d73edbdf9 100644
--- a/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java
+++ b/formats/json-jackson/src/main/java/io/cloudevents/jackson/JsonFormat.java
@@ -22,6 +22,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.format.EventSerializationException;
diff --git a/formats/json-jackson/src/test/java/io/cloudevents/jackson/JsonFormatTest.java b/formats/json-jackson/src/test/java/io/cloudevents/jackson/JsonFormatTest.java
index f82d25926..59c2d70c0 100644
--- a/formats/json-jackson/src/test/java/io/cloudevents/jackson/JsonFormatTest.java
+++ b/formats/json-jackson/src/test/java/io/cloudevents/jackson/JsonFormatTest.java
@@ -24,6 +24,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.rw.CloudEventRWException;
@@ -40,6 +41,7 @@
import java.util.Objects;
import java.util.stream.Stream;
+import static io.cloudevents.core.format.ContentType.*;
import static io.cloudevents.core.test.Data.*;
import static org.assertj.core.api.Assertions.*;
@@ -185,7 +187,10 @@ static Stream jsonContentTypes() {
//https://www.rfc-editor.org/rfc/rfc2045#section-5.1
// any us-ascii char can be part of parameters (except CTRLs and tspecials)
Arguments.of("text/json; char-set = $!#$%&'*+.^_`|"),
- Arguments.of((Object) null)
+ Arguments.of((Object) null),
+ Arguments.of(JSON + ""),
+ Arguments.of(JSON.value()),
+ Arguments.of(JSON.toString())
);
}
@@ -307,7 +312,7 @@ public static Stream badJsonContent() {
}
private JsonFormat getFormat() {
- return (JsonFormat) EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
+ return (JsonFormat) EventFormatProvider.getInstance().resolveFormat(JSON);
}
private static byte[] loadFile(String input) {
diff --git a/formats/protobuf/pom.xml b/formats/protobuf/pom.xml
index e76c935f1..0e4dc3da9 100644
--- a/formats/protobuf/pom.xml
+++ b/formats/protobuf/pom.xml
@@ -23,7 +23,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtobufFormat.java b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtobufFormat.java
index 0d3b65d4e..170d92f20 100644
--- a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtobufFormat.java
+++ b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtobufFormat.java
@@ -20,6 +20,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.format.EventSerializationException;
diff --git a/formats/xml/pom.xml b/formats/xml/pom.xml
index 9f57fbf23..9c0bf6319 100644
--- a/formats/xml/pom.xml
+++ b/formats/xml/pom.xml
@@ -23,7 +23,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/formats/xml/src/main/java/io/cloudevents/xml/XMLFormat.java b/formats/xml/src/main/java/io/cloudevents/xml/XMLFormat.java
index 996f07cf3..d0e4c0bc7 100644
--- a/formats/xml/src/main/java/io/cloudevents/xml/XMLFormat.java
+++ b/formats/xml/src/main/java/io/cloudevents/xml/XMLFormat.java
@@ -19,6 +19,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.ContentType;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.format.EventSerializationException;
diff --git a/http/basic/pom.xml b/http/basic/pom.xml
index 446744f86..b473c81b0 100644
--- a/http/basic/pom.xml
+++ b/http/basic/pom.xml
@@ -21,7 +21,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/http/restful-ws-integration-tests/pom.xml b/http/restful-ws-integration-tests/pom.xml
index e35b4b383..fa0b5b307 100644
--- a/http/restful-ws-integration-tests/pom.xml
+++ b/http/restful-ws-integration-tests/pom.xml
@@ -22,7 +22,7 @@
cloudevents-parent
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
4.0.0
diff --git a/http/restful-ws-integration-tests/restful-ws-common/pom.xml b/http/restful-ws-integration-tests/restful-ws-common/pom.xml
index 1f2b7bb55..ad95ba0e5 100644
--- a/http/restful-ws-integration-tests/restful-ws-common/pom.xml
+++ b/http/restful-ws-integration-tests/restful-ws-common/pom.xml
@@ -22,7 +22,7 @@
cloudevents-http-restful-ws-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-integration-tests/restful-ws-jersey/pom.xml b/http/restful-ws-integration-tests/restful-ws-jersey/pom.xml
index 601afea1b..d1383d4e9 100644
--- a/http/restful-ws-integration-tests/restful-ws-jersey/pom.xml
+++ b/http/restful-ws-integration-tests/restful-ws-jersey/pom.xml
@@ -22,7 +22,7 @@
cloudevents-http-restful-ws-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-integration-tests/restful-ws-resteasy/pom.xml b/http/restful-ws-integration-tests/restful-ws-resteasy/pom.xml
index d1f3c5a73..0ed365e9b 100644
--- a/http/restful-ws-integration-tests/restful-ws-resteasy/pom.xml
+++ b/http/restful-ws-integration-tests/restful-ws-resteasy/pom.xml
@@ -22,7 +22,7 @@
cloudevents-http-restful-ws-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-integration-tests/restful-ws-spring/pom.xml b/http/restful-ws-integration-tests/restful-ws-spring/pom.xml
index 54a4e7e4e..a4d0c15f3 100644
--- a/http/restful-ws-integration-tests/restful-ws-spring/pom.xml
+++ b/http/restful-ws-integration-tests/restful-ws-spring/pom.xml
@@ -22,7 +22,7 @@
cloudevents-http-restful-ws-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-jakarta-integration-tests/pom.xml b/http/restful-ws-jakarta-integration-tests/pom.xml
index 11e49266c..683c25981 100644
--- a/http/restful-ws-jakarta-integration-tests/pom.xml
+++ b/http/restful-ws-jakarta-integration-tests/pom.xml
@@ -5,7 +5,7 @@
cloudevents-parent
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
4.0.0
diff --git a/http/restful-ws-jakarta-integration-tests/restful-ws-jakarta-common/pom.xml b/http/restful-ws-jakarta-integration-tests/restful-ws-jakarta-common/pom.xml
index d9d63d571..1a010d5b2 100644
--- a/http/restful-ws-jakarta-integration-tests/restful-ws-jakarta-common/pom.xml
+++ b/http/restful-ws-jakarta-integration-tests/restful-ws-jakarta-common/pom.xml
@@ -3,7 +3,7 @@
cloudevents-http-restful-ws-jakarta-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-jakarta-integration-tests/restful-ws-liberty/pom.xml b/http/restful-ws-jakarta-integration-tests/restful-ws-liberty/pom.xml
index 1789438fb..c3aca0774 100644
--- a/http/restful-ws-jakarta-integration-tests/restful-ws-liberty/pom.xml
+++ b/http/restful-ws-jakarta-integration-tests/restful-ws-liberty/pom.xml
@@ -5,7 +5,7 @@
cloudevents-http-restful-ws-jakarta-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-jakarta-integration-tests/restful-ws-resteasy/pom.xml b/http/restful-ws-jakarta-integration-tests/restful-ws-resteasy/pom.xml
index 08a87fa73..95fa033e9 100644
--- a/http/restful-ws-jakarta-integration-tests/restful-ws-resteasy/pom.xml
+++ b/http/restful-ws-jakarta-integration-tests/restful-ws-resteasy/pom.xml
@@ -3,7 +3,7 @@
cloudevents-http-restful-ws-jakarta-integration-tests
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../
4.0.0
diff --git a/http/restful-ws-jakarta/pom.xml b/http/restful-ws-jakarta/pom.xml
index 6d64b836c..5f6367f3d 100644
--- a/http/restful-ws-jakarta/pom.xml
+++ b/http/restful-ws-jakarta/pom.xml
@@ -21,7 +21,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/http/restful-ws/pom.xml b/http/restful-ws/pom.xml
index fd6550a6f..d9ab6ad73 100644
--- a/http/restful-ws/pom.xml
+++ b/http/restful-ws/pom.xml
@@ -21,7 +21,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/http/vertx/pom.xml b/http/vertx/pom.xml
index 7ddcec1c7..123a41d37 100644
--- a/http/vertx/pom.xml
+++ b/http/vertx/pom.xml
@@ -22,7 +22,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
../../pom.xml
diff --git a/kafka/pom.xml b/kafka/pom.xml
index e4309f781..787062c40 100644
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -23,7 +23,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-kafka
diff --git a/pom.xml b/pom.xml
index c927e409a..457c9d95a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
pom
CloudEvents
@@ -81,6 +81,7 @@
spring
sql
bom
+ rocketmq
@@ -243,7 +244,7 @@
org.apache.maven.plugins
maven-gpg-plugin
- 1.6
+ 3.1.0
sign-artifacts
diff --git a/rocketmq/pom.xml b/rocketmq/pom.xml
new file mode 100644
index 000000000..e7d2bad2c
--- /dev/null
+++ b/rocketmq/pom.xml
@@ -0,0 +1,73 @@
+
+
+
+ 4.0.0
+
+
+ cloudevents-parent
+ io.cloudevents
+ 3.0.0-SNAPSHOT
+
+
+ cloudevents-rocketmq
+ CloudEvents - RocketMQ Binding
+ jar
+
+
+ 5.0.4
+ io.cloudevents.rocketmq
+
+
+
+
+ io.cloudevents
+ cloudevents-core
+ ${project.version}
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ ${rocketmq.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit-jupiter.version}
+ test
+
+
+ io.cloudevents
+ cloudevents-core
+ tests
+ test-jar
+ ${project.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+ test
+
+
+
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java
new file mode 100644
index 000000000..5eb454baf
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.rw.CloudEventWriter;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageView;
+
+/**
+ * A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances
+ * based on RocketMQ {@link MessageView} and {@link Message}.
+ */
+public class RocketMqMessageFactory {
+ private RocketMqMessageFactory() {
+ // prevent instantiation
+ }
+
+ /**
+ * Creates a {@link MessageReader} to read a RocketMQ {@link MessageView}.
+ *
+ * @param message The RocketMQ {@link MessageView} to read from.
+ * @return A {@link MessageReader} that can read the given {@link MessageView} to a {@link io.cloudevents.CloudEvent} representation.
+ */
+ public static MessageReader createReader(final MessageView message) {
+ final ByteBuffer byteBuffer = message.getBody();
+ byte[] body = new byte[byteBuffer.remaining()];
+ byteBuffer.get(body);
+ final Map properties = message.getProperties();
+ final String contentType = properties.get(RocketmqConstants.PROPERTY_CONTENT_TYPE);
+ return createReader(contentType, properties, body);
+ }
+
+ /**
+ * Creates a {@link MessageReader} using the content type, properties, and body of a RocketMQ {@link MessageView}.
+ *
+ * @param contentType The content type of the message payload.
+ * @param properties The properties of the RocketMQ message containing CloudEvent metadata (attributes and/or extensions).
+ * @param body The message body as byte array.
+ * @return A {@link MessageReader} capable of parsing a {@link io.cloudevents.CloudEvent} from the content-type, properties, and payload of a RocketMQ message.
+ */
+ public static MessageReader createReader(final String contentType, final Map properties, final byte[] body) {
+ return MessageUtils.parseStructuredOrBinaryMessage(
+ () -> contentType,
+ format -> new GenericStructuredMessageReader(format, body),
+ () -> properties.get(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION),
+ sv -> new RocketmqBinaryMessageReader(sv, properties, contentType, body)
+ );
+ }
+
+ /**
+ * Creates a {@link MessageWriter} instance capable of translating a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
+ *
+ * @param topic The topic to which the created RocketMQ message will be sent.
+ * @return A {@link MessageWriter} capable of converting a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
+ */
+ public static MessageWriter, Message> createWriter(final String topic) {
+ return new RocketmqMessageWriter(topic);
+ }
+}
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java
new file mode 100644
index 000000000..5439f44ab
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * A RocketMQ message reader that can be read as a CloudEvent.
+ */
+final class RocketmqBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl {
+ private final String contentType;
+ private final Map messageProperties;
+
+ /**
+ * Create an instance of an RocketMQ message reader.
+ *
+ * @param specVersion The version of the cloud event message.
+ * @param messageProperties The properties of the RocketMQ message that contains.
+ * @param contentType The content-type property of the RocketMQ message or {@code null} if the message content type if unknown.
+ * @param body The message payload or {@code null}/{@link RocketmqConstants#EMPTY_BODY} if the message does not contain any payload.
+ */
+ RocketmqBinaryMessageReader(final SpecVersion specVersion, Map messageProperties,
+ final String contentType, final byte[] body) {
+ super(specVersion, body != null && !Arrays.equals(RocketmqConstants.EMPTY_BODY, body) && body.length > 0 ? BytesCloudEventData.wrap(body) : null);
+ this.contentType = contentType;
+ this.messageProperties = messageProperties;
+ }
+
+ @Override
+ protected boolean isContentTypeHeader(String key) {
+ return key.equals(RocketmqConstants.PROPERTY_CONTENT_TYPE);
+ }
+
+ /**
+ * Tests whether the given property key belongs to cloud events headers.
+ *
+ * @param key The key to test for.
+ * @return True if the specified key belongs to cloud events headers.
+ */
+ @Override
+ protected boolean isCloudEventsHeader(String key) {
+ final int prefixLength = RocketmqConstants.CE_PREFIX.length();
+ return key.length() > prefixLength && key.startsWith(RocketmqConstants.CE_PREFIX);
+ }
+
+ /**
+ * Transforms a RocketMQ message property key into a CloudEvents attribute or extension key.
+ *
+ * This method removes the {@link RocketmqConstants#CE_PREFIX} prefix from the given key,
+ * assuming that the key has already been determined to be a CloudEvents header by
+ * {@link #isCloudEventsHeader(String)}.
+ *
+ * @param key The RocketMQ message property key with the CloudEvents header prefix.
+ * @return The CloudEvents attribute or extension key without the prefix.
+ */
+ @Override
+ protected String toCloudEventsKey(String key) {
+ return key.substring(RocketmqConstants.CE_PREFIX.length());
+ }
+
+ @Override
+ protected void forEachHeader(BiConsumer fn) {
+ if (contentType != null) {
+ // visit the content-type message property
+ fn.accept(RocketmqConstants.PROPERTY_CONTENT_TYPE, contentType);
+ }
+ // visit message properties
+ messageProperties.forEach((k, v) -> {
+ if (k != null && v != null) {
+ fn.accept(k, v);
+ }
+ });
+ }
+
+ /**
+ * Gets the cloud event representation of the value.
+ *
+ * This method simply returns the string representation of the type of value passed as argument.
+ *
+ * @param value The value of a CloudEvent attribute or extension.
+ * @return The string representation of the specified value.
+ */
+ @Override
+ protected String toCloudEventsValue(Object value) {
+ return value.toString();
+ }
+}
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java
new file mode 100644
index 000000000..f10098521
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.core.message.impl.MessageUtils;
+import java.util.Map;
+
+/**
+ * Constants and methods used throughout the RocketMQ binding for cloud events.
+ */
+final class RocketmqConstants {
+ private RocketmqConstants() {
+ // prevent instantiation
+ }
+
+ static final byte[] EMPTY_BODY = new byte[] {(byte) '\0'};
+
+ /**
+ * The prefix name for CloudEvent attributes for use in properties of a RocketMQ message.
+ */
+ static final String CE_PREFIX = "CE_";
+
+ static final Map ATTRIBUTES_TO_PROPERTY_NAMES = MessageUtils.generateAttributesToHeadersMapping(CEA -> CE_PREFIX + CEA);
+
+ static final String PROPERTY_CONTENT_TYPE = "CE_contenttype";
+ static final String MESSAGE_PROPERTY_SPEC_VERSION = ATTRIBUTES_TO_PROPERTY_NAMES.get("specversion");
+}
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java
new file mode 100644
index 000000000..8f6561f90
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+
+/**
+ * The RocketmqMessageWriter class is a CloudEvents message writer for RocketMQ.
+ * It allows CloudEvents attributes, context attributes, and the event payload to be populated
+ * in a RocketMQ {@link Message} instance. This class implements the
+ * {@link MessageWriter} interface for creating and completing CloudEvents messages in a
+ * RocketMQ-compatible format.
+ */
+final class RocketmqMessageWriter implements MessageWriter, Message>, CloudEventWriter {
+ private final Map messageProperties;
+ private final MessageBuilder messageBuilder;
+
+ /**
+ * Create a RocketMQ message writer.
+ *
+ * @param topic message's topic.
+ */
+ RocketmqMessageWriter(String topic) {
+ this.messageProperties = new HashMap<>();
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+ this.messageBuilder = provider.newMessageBuilder();
+ messageBuilder.setTopic(topic);
+ }
+
+ @Override
+ public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+ if (name.equals(CloudEventV1.DATACONTENTTYPE)) {
+ messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, value);
+ return this;
+ }
+ String propertyName = RocketmqConstants.ATTRIBUTES_TO_PROPERTY_NAMES.get(name);
+ if (propertyName == null) {
+ propertyName = name;
+ }
+ messageProperties.put(propertyName, value);
+ return this;
+ }
+
+ @Override
+ public CloudEventWriter create(SpecVersion version) throws CloudEventRWException {
+ messageProperties.put(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION, version.toString());
+ return this;
+ }
+
+ @Override
+ public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
+ messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType());
+ messageBuilder.setBody(value);
+ messageProperties.forEach(messageBuilder::addProperty);
+ return messageBuilder.build();
+ }
+
+ @Override
+ public Message end(CloudEventData data) throws CloudEventRWException {
+ messageBuilder.setBody(data.toBytes());
+ messageProperties.forEach(messageBuilder::addProperty);
+ return messageBuilder.build();
+ }
+
+ @Override
+ public Message end() throws CloudEventRWException {
+ messageBuilder.setBody(RocketmqConstants.EMPTY_BODY);
+ messageProperties.forEach(messageBuilder::addProperty);
+ return messageBuilder.build();
+ }
+}
diff --git a/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java
new file mode 100644
index 000000000..ea11e2a8f
--- /dev/null
+++ b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests verifying the behavior of the {@code RocketmqMessageFactory}.
+ */
+public class RocketmqMessageFactoryTest {
+ private static final String PREFIX_TEMPLATE = RocketmqConstants.CE_PREFIX + "%s";
+
+ private static final String DATA_CONTENT_TYPE_NULL = null;
+ private static final byte[] DATA_PAYLOAD_NULL = null;
+
+ @ParameterizedTest()
+ @MethodSource("binaryTestArguments")
+ public void readBinary(final Map props, final String contentType, final byte[] body, final CloudEvent event) {
+ final MessageReader reader = RocketMqMessageFactory.createReader(contentType, props, body);
+ assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
+ assertThat(reader.toEvent()).isEqualTo(event);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+ public void readStructured(final CloudEvent event) {
+ final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
+ final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);
+
+ final MessageReader reader = RocketMqMessageFactory.createReader(contentType, null, contentPayload);
+ assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
+ assertThat(reader.toEvent()).isEqualTo(event);
+ }
+
+ private static Stream binaryTestArguments() {
+
+ return Stream.of(
+ // V03
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignore")
+ ),
+ DATA_CONTENT_TYPE_NULL,
+ DATA_PAYLOAD_NULL,
+ Data.V03_MIN
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignore")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V03_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V03_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_XML,
+ Data.DATA_XML_SERIALIZED,
+ Data.V03_WITH_XML_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_TEXT,
+ Data.DATA_TEXT_SERIALIZED,
+ Data.V03_WITH_TEXT_DATA
+ ),
+ // V1
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignored")
+ ),
+ DATA_CONTENT_TYPE_NULL,
+ DATA_PAYLOAD_NULL,
+ Data.V1_MIN
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V1_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V1_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_XML,
+ Data.DATA_XML_SERIALIZED,
+ Data.V1_WITH_XML_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_TEXT,
+ Data.DATA_TEXT_SERIALIZED,
+ Data.V1_WITH_TEXT_DATA
+ )
+ );
+ }
+
+ private static AbstractMap.SimpleEntry property(final String name, final String value) {
+ return name.equalsIgnoreCase("ignored") ?
+ new AbstractMap.SimpleEntry<>(name, value) :
+ new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+ }
+
+ @SafeVarargs
+ private static Map properties(final AbstractMap.SimpleEntry... entries) {
+ return Stream.of(entries)
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+ }
+}
diff --git a/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java
new file mode 100644
index 000000000..c6d1b2c4e
--- /dev/null
+++ b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.v1.CloudEventV1;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketmqMessageWriterTest {
+
+ /**
+ * Verifies that a binary CloudEvent message can be successfully represented
+ * as a RocketMQ message.
+ */
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithStringExtensions")
+ public void testWriteBinaryCloudEventToRocketmqRepresentation(final CloudEvent binaryEvent) {
+
+ String topic = "foobar";
+ final Message expectedMessage = translateBinaryEvent(topic, binaryEvent);
+
+ final MessageWriter, Message> writer = RocketMqMessageFactory.createWriter(topic);
+ final Message actualMessage = writer.writeBinary(binaryEvent);
+
+ assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody()));
+ assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties());
+ }
+
+ /**
+ * Verifies that a structured CloudEvent message (in CSV format) can be successfully represented
+ * as a RocketMQ message.
+ */
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+ public void testWriteStructuredCloudEventToRocketmqRepresentation(final CloudEvent event) {
+ final EventFormat format = CSVFormat.INSTANCE;
+ final Message expectedMessage = translateStructured(event, format);
+
+ String topic = "foobar";
+ final MessageWriter, Message> writer = RocketMqMessageFactory.createWriter(topic);
+ final Message actualMessage = writer.writeStructured(event, format.serializedContentType());
+
+ assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody()));
+ assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties());
+ }
+
+ private Message translateBinaryEvent(final String topic, final CloudEvent event) {
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+
+ final MessageBuilder messageBuilder = provider.newMessageBuilder();
+ messageBuilder.setTopic(topic);
+ messageBuilder.setBody(RocketmqConstants.EMPTY_BODY);
+
+ final Map map = new HashMap<>();
+ if (!event.getAttributeNames().isEmpty()) {
+ event.getAttributeNames().forEach(name -> {
+ if (name.equals(CloudEventV1.DATACONTENTTYPE) && event.getAttribute(name) != null) {
+ map.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, event.getAttribute(name).toString());
+ } else {
+ addProperty(map, name, Objects.toString(event.getAttribute(name)), true);
+ }
+ });
+ }
+ if (!event.getExtensionNames().isEmpty()) {
+ event.getExtensionNames().forEach(name -> addProperty(map, name, Objects.toString(event.getExtension(name)), false));
+ }
+ map.forEach(messageBuilder::addProperty);
+ if (event.getData() != null) {
+ messageBuilder.setBody(event.getData().toBytes());
+ }
+ return messageBuilder.build();
+ }
+
+ private Message translateStructured(final CloudEvent event, final EventFormat format) {
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+ final MessageBuilder messageBuilder = provider.newMessageBuilder();
+ messageBuilder.setTopic("foobar");
+ messageBuilder.addProperty(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType());
+ messageBuilder.setBody(format.serialize(event));
+ return messageBuilder.build();
+ }
+
+ private void addProperty(final Map map, final String name, final String value, final boolean prefix) {
+ if (prefix) {
+ map.put(String.format(RocketmqConstants.CE_PREFIX + "%s", name), value);
+ } else {
+ map.put(name, value);
+ }
+ }
+}
diff --git a/spring/pom.xml b/spring/pom.xml
index 84c78f0dc..038fecb8d 100644
--- a/spring/pom.xml
+++ b/spring/pom.xml
@@ -23,7 +23,7 @@
io.cloudevents
cloudevents-parent
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
cloudevents-spring
diff --git a/sql/pom.xml b/sql/pom.xml
index 63d4f5551..9c2f5f7fb 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -5,7 +5,7 @@
cloudevents-parent
io.cloudevents
- 2.5.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
4.0.0