diff --git a/examples/pom.xml b/examples/pom.xml index a706a51e2..7a8bd1538 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -29,6 +29,7 @@ spring-reactive spring-rsocket spring-function + rocketmq diff --git a/examples/rocketmq/README.md b/examples/rocketmq/README.md new file mode 100644 index 000000000..8e4a98e60 --- /dev/null +++ b/examples/rocketmq/README.md @@ -0,0 +1,27 @@ +# RocketMQ + CloudEvents Sample + +This example demonstrates the integration of [RocketMQ 5.x client library](https://github.com/apache/rocketmq-clients) +with CloudEvents to create a RocketMQ binding. + +## Building the Project + +```shell +mvn package +``` + +## Setting Up a RocketMQ Instance + +Follow the [quickstart guide](https://rocketmq.apache.org/docs/quick-start/01quickstart) on the official RocketMQ +website to set up the necessary components, including nameserver, proxy, and broker. + +## Event Production + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqProducer" -Dexec.args="foobar:8081 sample-topic" +``` + +## Event Consumption + +```shell +mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqConsumer" -Dexec.args="foobar:8081 sample-topic sample-consumer-group" +``` diff --git a/examples/rocketmq/pom.xml b/examples/rocketmq/pom.xml new file mode 100644 index 000000000..c40f24a6a --- /dev/null +++ b/examples/rocketmq/pom.xml @@ -0,0 +1,21 @@ + + + + cloudevents-examples + io.cloudevents + 2.5.0-SNAPSHOT + + 4.0.0 + + cloudevents-rocketmq-example + + + + io.cloudevents + cloudevents-rocketmq + ${project.version} + + + diff --git a/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqConsumer.java b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqConsumer.java new file mode 100644 index 000000000..3f539b4ac --- /dev/null +++ b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqConsumer.java @@ -0,0 +1,50 @@ +package io.cloudevents.examples.rocketmq; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.rocketmq.RocketMqMessageFactory; +import java.io.IOException; +import java.util.Collections; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; + +public class RocketmqConsumer { + private RocketmqConsumer() { + } + + public static void main(String[] args) throws InterruptedException, ClientException, IOException { + if (args.length < 3) { + System.out.println("Usage: rocketmq_consumer "); + return; + } + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String endpoints = args[0]; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .build(); + FilterExpression filterExpression = new FilterExpression(); + String topic = args[1]; + String consumerGroup = args[2]; + + // Create the RocketMQ Consumer. + PushConsumer pushConsumer = provider.newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageView -> { + final MessageReader reader = RocketMqMessageFactory.createReader(messageView); + final CloudEvent event = reader.toEvent(); + System.out.println("Received event=" + event + ", messageId=" + messageView.getMessageId()); + return ConsumeResult.SUCCESS; + }) + .build(); + // Block the main thread, no need for production environment. + Thread.sleep(Long.MAX_VALUE); + // Close the push consumer when you don't need it anymore. + pushConsumer.close(); + } +} diff --git a/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqProducer.java b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqProducer.java new file mode 100644 index 000000000..adeadb7f7 --- /dev/null +++ b/examples/rocketmq/src/main/java/io/cloudevents/examples/rocketmq/RocketmqProducer.java @@ -0,0 +1,64 @@ +package io.cloudevents.examples.rocketmq; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.v1.CloudEventBuilder; +import io.cloudevents.rocketmq.RocketMqMessageFactory; +import io.cloudevents.types.Time; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.apache.rocketmq.shaded.com.google.gson.Gson; + +public class RocketmqProducer { + private RocketmqProducer() { + } + + public static void main(String[] args) throws ClientException, IOException { + if (args.length < 2) { + System.out.println("Usage: rocketmq_producer "); + return; + } + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String endpoints = args[0]; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .build(); + String topic = args[1]; + + // Create the RocketMQ Producer. + final Producer producer = provider.newProducerBuilder() + .setClientConfiguration(clientConfiguration) + .setTopics(topic) + .build(); + final Gson gson = new Gson(); + Map payload = new HashMap<>(); + payload.put("foo", "bar"); + final CloudEvent event = new CloudEventBuilder() + .withId("client-id") + .withSource(URI.create("http://127.0.0.1/rocketmq-client")) + .withType("com.foobar") + .withTime(Time.parseTime("2022-11-09T21:47:12.032198+00:00")) + .withData(gson.toJson(payload).getBytes(StandardCharsets.UTF_8)) + .build(); + // Transform event into message. + final Message message = RocketMqMessageFactory.createWriter(topic).writeBinary(event); + try { + // Send the message. + final SendReceipt sendReceipt = producer.send(message); + System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId()); + } catch (Exception e) { + System.out.println("Failed to send message"); + e.printStackTrace(); + } + // Close the producer when you don't need it anymore. + producer.close(); + } +} diff --git a/pom.xml b/pom.xml index b0f706009..d7fd69119 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ spring sql bom + rocketmq diff --git a/rocketmq/pom.xml b/rocketmq/pom.xml new file mode 100644 index 000000000..ff26dfb0a --- /dev/null +++ b/rocketmq/pom.xml @@ -0,0 +1,73 @@ + + + + 4.0.0 + + + cloudevents-parent + io.cloudevents + 2.5.0-SNAPSHOT + + + cloudevents-rocketmq + CloudEvents - RocketMQ Binding + jar + + + 5.0.4 + io.cloudevents.rocketmq + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + org.apache.rocketmq + rocketmq-client-java + ${rocketmq.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java new file mode 100644 index 000000000..5eb454baf --- /dev/null +++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketMqMessageFactory.java @@ -0,0 +1,80 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rocketmq; + +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.message.impl.MessageUtils; +import io.cloudevents.rw.CloudEventWriter; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.message.MessageView; + +/** + * A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances + * based on RocketMQ {@link MessageView} and {@link Message}. + */ +public class RocketMqMessageFactory { + private RocketMqMessageFactory() { + // prevent instantiation + } + + /** + * Creates a {@link MessageReader} to read a RocketMQ {@link MessageView}. + * + * @param message The RocketMQ {@link MessageView} to read from. + * @return A {@link MessageReader} that can read the given {@link MessageView} to a {@link io.cloudevents.CloudEvent} representation. + */ + public static MessageReader createReader(final MessageView message) { + final ByteBuffer byteBuffer = message.getBody(); + byte[] body = new byte[byteBuffer.remaining()]; + byteBuffer.get(body); + final Map properties = message.getProperties(); + final String contentType = properties.get(RocketmqConstants.PROPERTY_CONTENT_TYPE); + return createReader(contentType, properties, body); + } + + /** + * Creates a {@link MessageReader} using the content type, properties, and body of a RocketMQ {@link MessageView}. + * + * @param contentType The content type of the message payload. + * @param properties The properties of the RocketMQ message containing CloudEvent metadata (attributes and/or extensions). + * @param body The message body as byte array. + * @return A {@link MessageReader} capable of parsing a {@link io.cloudevents.CloudEvent} from the content-type, properties, and payload of a RocketMQ message. + */ + public static MessageReader createReader(final String contentType, final Map properties, final byte[] body) { + return MessageUtils.parseStructuredOrBinaryMessage( + () -> contentType, + format -> new GenericStructuredMessageReader(format, body), + () -> properties.get(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION), + sv -> new RocketmqBinaryMessageReader(sv, properties, contentType, body) + ); + } + + /** + * Creates a {@link MessageWriter} instance capable of translating a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}. + * + * @param topic The topic to which the created RocketMQ message will be sent. + * @return A {@link MessageWriter} capable of converting a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}. + */ + public static MessageWriter, Message> createWriter(final String topic) { + return new RocketmqMessageWriter(topic); + } +} diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java new file mode 100644 index 000000000..5439f44ab --- /dev/null +++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqBinaryMessageReader.java @@ -0,0 +1,107 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rocketmq; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; +import java.util.Arrays; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * A RocketMQ message reader that can be read as a CloudEvent. + */ +final class RocketmqBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl { + private final String contentType; + private final Map messageProperties; + + /** + * Create an instance of an RocketMQ message reader. + * + * @param specVersion The version of the cloud event message. + * @param messageProperties The properties of the RocketMQ message that contains. + * @param contentType The content-type property of the RocketMQ message or {@code null} if the message content type if unknown. + * @param body The message payload or {@code null}/{@link RocketmqConstants#EMPTY_BODY} if the message does not contain any payload. + */ + RocketmqBinaryMessageReader(final SpecVersion specVersion, Map messageProperties, + final String contentType, final byte[] body) { + super(specVersion, body != null && !Arrays.equals(RocketmqConstants.EMPTY_BODY, body) && body.length > 0 ? BytesCloudEventData.wrap(body) : null); + this.contentType = contentType; + this.messageProperties = messageProperties; + } + + @Override + protected boolean isContentTypeHeader(String key) { + return key.equals(RocketmqConstants.PROPERTY_CONTENT_TYPE); + } + + /** + * Tests whether the given property key belongs to cloud events headers. + * + * @param key The key to test for. + * @return True if the specified key belongs to cloud events headers. + */ + @Override + protected boolean isCloudEventsHeader(String key) { + final int prefixLength = RocketmqConstants.CE_PREFIX.length(); + return key.length() > prefixLength && key.startsWith(RocketmqConstants.CE_PREFIX); + } + + /** + * Transforms a RocketMQ message property key into a CloudEvents attribute or extension key. + *

+ * This method removes the {@link RocketmqConstants#CE_PREFIX} prefix from the given key, + * assuming that the key has already been determined to be a CloudEvents header by + * {@link #isCloudEventsHeader(String)}. + * + * @param key The RocketMQ message property key with the CloudEvents header prefix. + * @return The CloudEvents attribute or extension key without the prefix. + */ + @Override + protected String toCloudEventsKey(String key) { + return key.substring(RocketmqConstants.CE_PREFIX.length()); + } + + @Override + protected void forEachHeader(BiConsumer fn) { + if (contentType != null) { + // visit the content-type message property + fn.accept(RocketmqConstants.PROPERTY_CONTENT_TYPE, contentType); + } + // visit message properties + messageProperties.forEach((k, v) -> { + if (k != null && v != null) { + fn.accept(k, v); + } + }); + } + + /** + * Gets the cloud event representation of the value. + *

+ * This method simply returns the string representation of the type of value passed as argument. + * + * @param value The value of a CloudEvent attribute or extension. + * @return The string representation of the specified value. + */ + @Override + protected String toCloudEventsValue(Object value) { + return value.toString(); + } +} diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java new file mode 100644 index 000000000..f10098521 --- /dev/null +++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqConstants.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rocketmq; + +import io.cloudevents.core.message.impl.MessageUtils; +import java.util.Map; + +/** + * Constants and methods used throughout the RocketMQ binding for cloud events. + */ +final class RocketmqConstants { + private RocketmqConstants() { + // prevent instantiation + } + + static final byte[] EMPTY_BODY = new byte[] {(byte) '\0'}; + + /** + * The prefix name for CloudEvent attributes for use in properties of a RocketMQ message. + */ + static final String CE_PREFIX = "CE_"; + + static final Map ATTRIBUTES_TO_PROPERTY_NAMES = MessageUtils.generateAttributesToHeadersMapping(CEA -> CE_PREFIX + CEA); + + static final String PROPERTY_CONTENT_TYPE = "CE_contenttype"; + static final String MESSAGE_PROPERTY_SPEC_VERSION = ATTRIBUTES_TO_PROPERTY_NAMES.get("specversion"); +} diff --git a/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java new file mode 100644 index 000000000..8f6561f90 --- /dev/null +++ b/rocketmq/src/main/java/io/cloudevents/rocketmq/RocketmqMessageWriter.java @@ -0,0 +1,98 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rocketmq; + +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.message.MessageBuilder; + +/** + * The RocketmqMessageWriter class is a CloudEvents message writer for RocketMQ. + * It allows CloudEvents attributes, context attributes, and the event payload to be populated + * in a RocketMQ {@link Message} instance. This class implements the + * {@link MessageWriter} interface for creating and completing CloudEvents messages in a + * RocketMQ-compatible format. + */ +final class RocketmqMessageWriter implements MessageWriter, Message>, CloudEventWriter { + private final Map messageProperties; + private final MessageBuilder messageBuilder; + + /** + * Create a RocketMQ message writer. + * + * @param topic message's topic. + */ + RocketmqMessageWriter(String topic) { + this.messageProperties = new HashMap<>(); + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + this.messageBuilder = provider.newMessageBuilder(); + messageBuilder.setTopic(topic); + } + + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + if (name.equals(CloudEventV1.DATACONTENTTYPE)) { + messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, value); + return this; + } + String propertyName = RocketmqConstants.ATTRIBUTES_TO_PROPERTY_NAMES.get(name); + if (propertyName == null) { + propertyName = name; + } + messageProperties.put(propertyName, value); + return this; + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + messageProperties.put(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION, version.toString()); + return this; + } + + @Override + public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType()); + messageBuilder.setBody(value); + messageProperties.forEach(messageBuilder::addProperty); + return messageBuilder.build(); + } + + @Override + public Message end(CloudEventData data) throws CloudEventRWException { + messageBuilder.setBody(data.toBytes()); + messageProperties.forEach(messageBuilder::addProperty); + return messageBuilder.build(); + } + + @Override + public Message end() throws CloudEventRWException { + messageBuilder.setBody(RocketmqConstants.EMPTY_BODY); + messageProperties.forEach(messageBuilder::addProperty); + return messageBuilder.build(); + } +} diff --git a/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java new file mode 100644 index 000000000..ea11e2a8f --- /dev/null +++ b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageFactoryTest.java @@ -0,0 +1,233 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rocketmq; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.message.Encoding; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.test.Data; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.types.Time; +import java.util.AbstractMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests verifying the behavior of the {@code RocketmqMessageFactory}. + */ +public class RocketmqMessageFactoryTest { + private static final String PREFIX_TEMPLATE = RocketmqConstants.CE_PREFIX + "%s"; + + private static final String DATA_CONTENT_TYPE_NULL = null; + private static final byte[] DATA_PAYLOAD_NULL = null; + + @ParameterizedTest() + @MethodSource("binaryTestArguments") + public void readBinary(final Map props, final String contentType, final byte[] body, final CloudEvent event) { + final MessageReader reader = RocketMqMessageFactory.createReader(contentType, props, body); + assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY); + assertThat(reader.toEvent()).isEqualTo(event); + } + + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void readStructured(final CloudEvent event) { + final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"; + final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event); + + final MessageReader reader = RocketMqMessageFactory.createReader(contentType, null, contentPayload); + assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED); + assertThat(reader.toEvent()).isEqualTo(event); + } + + private static Stream binaryTestArguments() { + + return Stream.of( + // V03 + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property("ignored", "ignore") + ), + DATA_CONTENT_TYPE_NULL, + DATA_PAYLOAD_NULL, + Data.V03_MIN + ), + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()), + property(CloudEventV03.SUBJECT, Data.SUBJECT), + property(CloudEventV03.TIME, Time.writeTime(Data.TIME)), + property("ignored", "ignore") + ), + Data.DATACONTENTTYPE_JSON, + Data.DATA_JSON_SERIALIZED, + Data.V03_WITH_JSON_DATA + ), + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()), + property(CloudEventV03.SUBJECT, Data.SUBJECT), + property(CloudEventV03.TIME, Time.writeTime(Data.TIME)), + property("astring", "aaa"), + property("aboolean", "true"), + property("anumber", "10"), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_JSON, + Data.DATA_JSON_SERIALIZED, + Data.V03_WITH_JSON_DATA_WITH_EXT_STRING + ), + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property(CloudEventV03.SUBJECT, Data.SUBJECT), + property(CloudEventV03.TIME, Time.writeTime(Data.TIME)), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_XML, + Data.DATA_XML_SERIALIZED, + Data.V03_WITH_XML_DATA + ), + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property(CloudEventV03.SUBJECT, Data.SUBJECT), + property(CloudEventV03.TIME, Time.writeTime(Data.TIME)), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_TEXT, + Data.DATA_TEXT_SERIALIZED, + Data.V03_WITH_TEXT_DATA + ), + // V1 + Arguments.of( + properties( + property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()), + property(CloudEventV1.ID, Data.ID), + property(CloudEventV1.TYPE, Data.TYPE), + property(CloudEventV1.SOURCE, Data.SOURCE.toString()), + property("ignored", "ignored") + ), + DATA_CONTENT_TYPE_NULL, + DATA_PAYLOAD_NULL, + Data.V1_MIN + ), + Arguments.of( + properties( + property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()), + property(CloudEventV1.ID, Data.ID), + property(CloudEventV1.TYPE, Data.TYPE), + property(CloudEventV1.SOURCE, Data.SOURCE.toString()), + property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()), + property(CloudEventV1.SUBJECT, Data.SUBJECT), + property(CloudEventV1.TIME, Time.writeTime(Data.TIME)), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_JSON, + Data.DATA_JSON_SERIALIZED, + Data.V1_WITH_JSON_DATA + ), + Arguments.of( + properties( + property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()), + property(CloudEventV1.ID, Data.ID), + property(CloudEventV1.TYPE, Data.TYPE), + property(CloudEventV1.SOURCE, Data.SOURCE.toString()), + property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()), + property(CloudEventV1.SUBJECT, Data.SUBJECT), + property(CloudEventV1.TIME, Time.writeTime(Data.TIME)), + property("astring", "aaa"), + property("aboolean", "true"), + property("anumber", "10"), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_JSON, + Data.DATA_JSON_SERIALIZED, + Data.V1_WITH_JSON_DATA_WITH_EXT_STRING + ), + Arguments.of( + properties( + property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()), + property(CloudEventV1.ID, Data.ID), + property(CloudEventV1.TYPE, Data.TYPE), + property(CloudEventV1.SOURCE, Data.SOURCE.toString()), + property(CloudEventV1.SUBJECT, Data.SUBJECT), + property(CloudEventV1.TIME, Time.writeTime(Data.TIME)), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_XML, + Data.DATA_XML_SERIALIZED, + Data.V1_WITH_XML_DATA + ), + Arguments.of( + properties( + property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()), + property(CloudEventV1.ID, Data.ID), + property(CloudEventV1.TYPE, Data.TYPE), + property(CloudEventV1.SOURCE, Data.SOURCE.toString()), + property(CloudEventV1.SUBJECT, Data.SUBJECT), + property(CloudEventV1.TIME, Time.writeTime(Data.TIME)), + property("ignored", "ignored") + ), + Data.DATACONTENTTYPE_TEXT, + Data.DATA_TEXT_SERIALIZED, + Data.V1_WITH_TEXT_DATA + ) + ); + } + + private static AbstractMap.SimpleEntry property(final String name, final String value) { + return name.equalsIgnoreCase("ignored") ? + new AbstractMap.SimpleEntry<>(name, value) : + new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value); + } + + @SafeVarargs + private static Map properties(final AbstractMap.SimpleEntry... entries) { + return Stream.of(entries) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + + } +} diff --git a/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java new file mode 100644 index 000000000..c6d1b2c4e --- /dev/null +++ b/rocketmq/src/test/java/io/cloudevents/rocketmq/RocketmqMessageWriterTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.rocketmq; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.v1.CloudEventV1; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.message.MessageBuilder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RocketmqMessageWriterTest { + + /** + * Verifies that a binary CloudEvent message can be successfully represented + * as a RocketMQ message. + */ + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithStringExtensions") + public void testWriteBinaryCloudEventToRocketmqRepresentation(final CloudEvent binaryEvent) { + + String topic = "foobar"; + final Message expectedMessage = translateBinaryEvent(topic, binaryEvent); + + final MessageWriter writer = RocketMqMessageFactory.createWriter(topic); + final Message actualMessage = writer.writeBinary(binaryEvent); + + assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody())); + assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties()); + } + + /** + * Verifies that a structured CloudEvent message (in CSV format) can be successfully represented + * as a RocketMQ message. + */ + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void testWriteStructuredCloudEventToRocketmqRepresentation(final CloudEvent event) { + final EventFormat format = CSVFormat.INSTANCE; + final Message expectedMessage = translateStructured(event, format); + + String topic = "foobar"; + final MessageWriter writer = RocketMqMessageFactory.createWriter(topic); + final Message actualMessage = writer.writeStructured(event, format.serializedContentType()); + + assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody())); + assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties()); + } + + private Message translateBinaryEvent(final String topic, final CloudEvent event) { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + + final MessageBuilder messageBuilder = provider.newMessageBuilder(); + messageBuilder.setTopic(topic); + messageBuilder.setBody(RocketmqConstants.EMPTY_BODY); + + final Map map = new HashMap<>(); + if (!event.getAttributeNames().isEmpty()) { + event.getAttributeNames().forEach(name -> { + if (name.equals(CloudEventV1.DATACONTENTTYPE) && event.getAttribute(name) != null) { + map.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, event.getAttribute(name).toString()); + } else { + addProperty(map, name, Objects.toString(event.getAttribute(name)), true); + } + }); + } + if (!event.getExtensionNames().isEmpty()) { + event.getExtensionNames().forEach(name -> addProperty(map, name, Objects.toString(event.getExtension(name)), false)); + } + map.forEach(messageBuilder::addProperty); + if (event.getData() != null) { + messageBuilder.setBody(event.getData().toBytes()); + } + return messageBuilder.build(); + } + + private Message translateStructured(final CloudEvent event, final EventFormat format) { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + final MessageBuilder messageBuilder = provider.newMessageBuilder(); + messageBuilder.setTopic("foobar"); + messageBuilder.addProperty(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType()); + messageBuilder.setBody(format.serialize(event)); + return messageBuilder.build(); + } + + private void addProperty(final Map map, final String name, final String value, final boolean prefix) { + if (prefix) { + map.put(String.format(RocketmqConstants.CE_PREFIX + "%s", name), value); + } else { + map.put(name, value); + } + } +}