diff --git a/rocketmq/pom.xml b/rocketmq/pom.xml
new file mode 100644
index 000000000..ff26dfb0a
--- /dev/null
+++ b/rocketmq/pom.xml
@@ -0,0 +1,73 @@
+
+
+
+ 4.0.0
+
+
+ cloudevents-parent
+ io.cloudevents
+ 2.5.0-SNAPSHOT
+
+
+ cloudevents-rocketmq
+ CloudEvents - RocketMQ Binding
+ jar
+
+
+ 5.0.4
+ io.cloudevents.rocketmq
+
+
+
+
+ io.cloudevents
+ cloudevents-core
+ ${project.version}
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ ${rocketmq.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit-jupiter.version}
+ test
+
+
+ io.cloudevents
+ cloudevents-core
+ tests
+ test-jar
+ ${project.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+ test
+
+
+
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java
new file mode 100644
index 000000000..5eb454baf
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.rw.CloudEventWriter;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageView;
+
+/**
+ * A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances
+ * based on RocketMQ {@link MessageView} and {@link Message}.
+ */
+public class RocketMqMessageFactory {
+ private RocketMqMessageFactory() {
+ // prevent instantiation
+ }
+
+ /**
+ * Creates a {@link MessageReader} to read a RocketMQ {@link MessageView}.
+ *
+ * @param message The RocketMQ {@link MessageView} to read from.
+ * @return A {@link MessageReader} that can read the given {@link MessageView} to a {@link io.cloudevents.CloudEvent} representation.
+ */
+ public static MessageReader createReader(final MessageView message) {
+ final ByteBuffer byteBuffer = message.getBody();
+ byte[] body = new byte[byteBuffer.remaining()];
+ byteBuffer.get(body);
+ final Map properties = message.getProperties();
+ final String contentType = properties.get(RocketmqConstants.PROPERTY_CONTENT_TYPE);
+ return createReader(contentType, properties, body);
+ }
+
+ /**
+ * Creates a {@link MessageReader} using the content type, properties, and body of a RocketMQ {@link MessageView}.
+ *
+ * @param contentType The content type of the message payload.
+ * @param properties The properties of the RocketMQ message containing CloudEvent metadata (attributes and/or extensions).
+ * @param body The message body as byte array.
+ * @return A {@link MessageReader} capable of parsing a {@link io.cloudevents.CloudEvent} from the content-type, properties, and payload of a RocketMQ message.
+ */
+ public static MessageReader createReader(final String contentType, final Map properties, final byte[] body) {
+ return MessageUtils.parseStructuredOrBinaryMessage(
+ () -> contentType,
+ format -> new GenericStructuredMessageReader(format, body),
+ () -> properties.get(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION),
+ sv -> new RocketmqBinaryMessageReader(sv, properties, contentType, body)
+ );
+ }
+
+ /**
+ * Creates a {@link MessageWriter} instance capable of translating a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
+ *
+ * @param topic The topic to which the created RocketMQ message will be sent.
+ * @return A {@link MessageWriter} capable of converting a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
+ */
+ public static MessageWriter, Message> createWriter(final String topic) {
+ return new RocketmqMessageWriter(topic);
+ }
+}
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java
new file mode 100644
index 000000000..5439f44ab
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * A RocketMQ message reader that can be read as a CloudEvent.
+ */
+final class RocketmqBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl {
+ private final String contentType;
+ private final Map messageProperties;
+
+ /**
+ * Create an instance of an RocketMQ message reader.
+ *
+ * @param specVersion The version of the cloud event message.
+ * @param messageProperties The properties of the RocketMQ message that contains.
+ * @param contentType The content-type property of the RocketMQ message or {@code null} if the message content type if unknown.
+ * @param body The message payload or {@code null}/{@link RocketmqConstants#EMPTY_BODY} if the message does not contain any payload.
+ */
+ RocketmqBinaryMessageReader(final SpecVersion specVersion, Map messageProperties,
+ final String contentType, final byte[] body) {
+ super(specVersion, body != null && !Arrays.equals(RocketmqConstants.EMPTY_BODY, body) && body.length > 0 ? BytesCloudEventData.wrap(body) : null);
+ this.contentType = contentType;
+ this.messageProperties = messageProperties;
+ }
+
+ @Override
+ protected boolean isContentTypeHeader(String key) {
+ return key.equals(RocketmqConstants.PROPERTY_CONTENT_TYPE);
+ }
+
+ /**
+ * Tests whether the given property key belongs to cloud events headers.
+ *
+ * @param key The key to test for.
+ * @return True if the specified key belongs to cloud events headers.
+ */
+ @Override
+ protected boolean isCloudEventsHeader(String key) {
+ final int prefixLength = RocketmqConstants.CE_PREFIX.length();
+ return key.length() > prefixLength && key.startsWith(RocketmqConstants.CE_PREFIX);
+ }
+
+ /**
+ * Transforms a RocketMQ message property key into a CloudEvents attribute or extension key.
+ *
+ * This method removes the {@link RocketmqConstants#CE_PREFIX} prefix from the given key,
+ * assuming that the key has already been determined to be a CloudEvents header by
+ * {@link #isCloudEventsHeader(String)}.
+ *
+ * @param key The RocketMQ message property key with the CloudEvents header prefix.
+ * @return The CloudEvents attribute or extension key without the prefix.
+ */
+ @Override
+ protected String toCloudEventsKey(String key) {
+ return key.substring(RocketmqConstants.CE_PREFIX.length());
+ }
+
+ @Override
+ protected void forEachHeader(BiConsumer fn) {
+ if (contentType != null) {
+ // visit the content-type message property
+ fn.accept(RocketmqConstants.PROPERTY_CONTENT_TYPE, contentType);
+ }
+ // visit message properties
+ messageProperties.forEach((k, v) -> {
+ if (k != null && v != null) {
+ fn.accept(k, v);
+ }
+ });
+ }
+
+ /**
+ * Gets the cloud event representation of the value.
+ *
+ * This method simply returns the string representation of the type of value passed as argument.
+ *
+ * @param value The value of a CloudEvent attribute or extension.
+ * @return The string representation of the specified value.
+ */
+ @Override
+ protected String toCloudEventsValue(Object value) {
+ return value.toString();
+ }
+}
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java
new file mode 100644
index 000000000..f10098521
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.core.message.impl.MessageUtils;
+import java.util.Map;
+
+/**
+ * Constants and methods used throughout the RocketMQ binding for cloud events.
+ */
+final class RocketmqConstants {
+ private RocketmqConstants() {
+ // prevent instantiation
+ }
+
+ static final byte[] EMPTY_BODY = new byte[] {(byte) '\0'};
+
+ /**
+ * The prefix name for CloudEvent attributes for use in properties of a RocketMQ message.
+ */
+ static final String CE_PREFIX = "CE_";
+
+ static final Map ATTRIBUTES_TO_PROPERTY_NAMES = MessageUtils.generateAttributesToHeadersMapping(CEA -> CE_PREFIX + CEA);
+
+ static final String PROPERTY_CONTENT_TYPE = "CE_contenttype";
+ static final String MESSAGE_PROPERTY_SPEC_VERSION = ATTRIBUTES_TO_PROPERTY_NAMES.get("specversion");
+}
diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java
new file mode 100644
index 000000000..8f6561f90
--- /dev/null
+++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+
+/**
+ * The RocketmqMessageWriter class is a CloudEvents message writer for RocketMQ.
+ * It allows CloudEvents attributes, context attributes, and the event payload to be populated
+ * in a RocketMQ {@link Message} instance. This class implements the
+ * {@link MessageWriter} interface for creating and completing CloudEvents messages in a
+ * RocketMQ-compatible format.
+ */
+final class RocketmqMessageWriter implements MessageWriter, Message>, CloudEventWriter {
+ private final Map messageProperties;
+ private final MessageBuilder messageBuilder;
+
+ /**
+ * Create a RocketMQ message writer.
+ *
+ * @param topic message's topic.
+ */
+ RocketmqMessageWriter(String topic) {
+ this.messageProperties = new HashMap<>();
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+ this.messageBuilder = provider.newMessageBuilder();
+ messageBuilder.setTopic(topic);
+ }
+
+ @Override
+ public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+ if (name.equals(CloudEventV1.DATACONTENTTYPE)) {
+ messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, value);
+ return this;
+ }
+ String propertyName = RocketmqConstants.ATTRIBUTES_TO_PROPERTY_NAMES.get(name);
+ if (propertyName == null) {
+ propertyName = name;
+ }
+ messageProperties.put(propertyName, value);
+ return this;
+ }
+
+ @Override
+ public CloudEventWriter create(SpecVersion version) throws CloudEventRWException {
+ messageProperties.put(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION, version.toString());
+ return this;
+ }
+
+ @Override
+ public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
+ messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType());
+ messageBuilder.setBody(value);
+ messageProperties.forEach(messageBuilder::addProperty);
+ return messageBuilder.build();
+ }
+
+ @Override
+ public Message end(CloudEventData data) throws CloudEventRWException {
+ messageBuilder.setBody(data.toBytes());
+ messageProperties.forEach(messageBuilder::addProperty);
+ return messageBuilder.build();
+ }
+
+ @Override
+ public Message end() throws CloudEventRWException {
+ messageBuilder.setBody(RocketmqConstants.EMPTY_BODY);
+ messageProperties.forEach(messageBuilder::addProperty);
+ return messageBuilder.build();
+ }
+}
diff --git a/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java
new file mode 100644
index 000000000..ea11e2a8f
--- /dev/null
+++ b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests verifying the behavior of the {@code RocketmqMessageFactory}.
+ */
+public class RocketmqMessageFactoryTest {
+ private static final String PREFIX_TEMPLATE = RocketmqConstants.CE_PREFIX + "%s";
+
+ private static final String DATA_CONTENT_TYPE_NULL = null;
+ private static final byte[] DATA_PAYLOAD_NULL = null;
+
+ @ParameterizedTest()
+ @MethodSource("binaryTestArguments")
+ public void readBinary(final Map props, final String contentType, final byte[] body, final CloudEvent event) {
+ final MessageReader reader = RocketMqMessageFactory.createReader(contentType, props, body);
+ assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
+ assertThat(reader.toEvent()).isEqualTo(event);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+ public void readStructured(final CloudEvent event) {
+ final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
+ final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);
+
+ final MessageReader reader = RocketMqMessageFactory.createReader(contentType, null, contentPayload);
+ assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
+ assertThat(reader.toEvent()).isEqualTo(event);
+ }
+
+ private static Stream binaryTestArguments() {
+
+ return Stream.of(
+ // V03
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignore")
+ ),
+ DATA_CONTENT_TYPE_NULL,
+ DATA_PAYLOAD_NULL,
+ Data.V03_MIN
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignore")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V03_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V03_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_XML,
+ Data.DATA_XML_SERIALIZED,
+ Data.V03_WITH_XML_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_TEXT,
+ Data.DATA_TEXT_SERIALIZED,
+ Data.V03_WITH_TEXT_DATA
+ ),
+ // V1
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignored")
+ ),
+ DATA_CONTENT_TYPE_NULL,
+ DATA_PAYLOAD_NULL,
+ Data.V1_MIN
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V1_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V1_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_XML,
+ Data.DATA_XML_SERIALIZED,
+ Data.V1_WITH_XML_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_TEXT,
+ Data.DATA_TEXT_SERIALIZED,
+ Data.V1_WITH_TEXT_DATA
+ )
+ );
+ }
+
+ private static AbstractMap.SimpleEntry property(final String name, final String value) {
+ return name.equalsIgnoreCase("ignored") ?
+ new AbstractMap.SimpleEntry<>(name, value) :
+ new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+ }
+
+ @SafeVarargs
+ private static Map properties(final AbstractMap.SimpleEntry... entries) {
+ return Stream.of(entries)
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+ }
+}
diff --git a/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java
new file mode 100644
index 000000000..c6d1b2c4e
--- /dev/null
+++ b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.rocketmq;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.v1.CloudEventV1;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketmqMessageWriterTest {
+
+ /**
+ * Verifies that a binary CloudEvent message can be successfully represented
+ * as a RocketMQ message.
+ */
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithStringExtensions")
+ public void testWriteBinaryCloudEventToRocketmqRepresentation(final CloudEvent binaryEvent) {
+
+ String topic = "foobar";
+ final Message expectedMessage = translateBinaryEvent(topic, binaryEvent);
+
+ final MessageWriter, Message> writer = RocketMqMessageFactory.createWriter(topic);
+ final Message actualMessage = writer.writeBinary(binaryEvent);
+
+ assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody()));
+ assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties());
+ }
+
+ /**
+ * Verifies that a structured CloudEvent message (in CSV format) can be successfully represented
+ * as a RocketMQ message.
+ */
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+ public void testWriteStructuredCloudEventToRocketmqRepresentation(final CloudEvent event) {
+ final EventFormat format = CSVFormat.INSTANCE;
+ final Message expectedMessage = translateStructured(event, format);
+
+ String topic = "foobar";
+ final MessageWriter, Message> writer = RocketMqMessageFactory.createWriter(topic);
+ final Message actualMessage = writer.writeStructured(event, format.serializedContentType());
+
+ assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody()));
+ assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties());
+ }
+
+ private Message translateBinaryEvent(final String topic, final CloudEvent event) {
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+
+ final MessageBuilder messageBuilder = provider.newMessageBuilder();
+ messageBuilder.setTopic(topic);
+ messageBuilder.setBody(RocketmqConstants.EMPTY_BODY);
+
+ final Map map = new HashMap<>();
+ if (!event.getAttributeNames().isEmpty()) {
+ event.getAttributeNames().forEach(name -> {
+ if (name.equals(CloudEventV1.DATACONTENTTYPE) && event.getAttribute(name) != null) {
+ map.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, event.getAttribute(name).toString());
+ } else {
+ addProperty(map, name, Objects.toString(event.getAttribute(name)), true);
+ }
+ });
+ }
+ if (!event.getExtensionNames().isEmpty()) {
+ event.getExtensionNames().forEach(name -> addProperty(map, name, Objects.toString(event.getExtension(name)), false));
+ }
+ map.forEach(messageBuilder::addProperty);
+ if (event.getData() != null) {
+ messageBuilder.setBody(event.getData().toBytes());
+ }
+ return messageBuilder.build();
+ }
+
+ private Message translateStructured(final CloudEvent event, final EventFormat format) {
+ final ClientServiceProvider provider = ClientServiceProvider.loadService();
+ final MessageBuilder messageBuilder = provider.newMessageBuilder();
+ messageBuilder.setTopic("foobar");
+ messageBuilder.addProperty(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType());
+ messageBuilder.setBody(format.serialize(event));
+ return messageBuilder.build();
+ }
+
+ private void addProperty(final Map map, final String name, final String value, final boolean prefix) {
+ if (prefix) {
+ map.put(String.format(RocketmqConstants.CE_PREFIX + "%s", name), value);
+ } else {
+ map.put(name, value);
+ }
+ }
+}