Skip to content

Commit

Permalink
feat: add rocketmq binding
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Ai <yangkun.ayk@alibaba-inc.com>
  • Loading branch information
aaron-ai committed Apr 26, 2023
1 parent 4ebeab0 commit 8db234c
Show file tree
Hide file tree
Showing 12 changed files with 918 additions and 0 deletions.
27 changes: 27 additions & 0 deletions examples/rocketmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# RocketMQ + CloudEvents Sample

This example demonstrates the integration of [RocketMQ 5.x client library](https://github.com/apache/rocketmq-clients)
with CloudEvents to create a RocketMQ binding.

## Building the Project

```shell
mvn package
```

## Setting Up a RocketMQ Instance

Follow the [quickstart guide](https://rocketmq.apache.org/docs/quick-start/01quickstart) on the official RocketMQ
website to set up the necessary components, including nameserver, proxy, and broker.

## Event Production

```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqProducer" -Dexec.args="foobar:8081 sample-topic"
```

## Event Consumption

```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqConsumer" -Dexec.args="foobar:8081 sample-topic sample-consumer-group"
```
21 changes: 21 additions & 0 deletions examples/rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-rocketmq-example</artifactId>

<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.cloudevents.examples.rocketmq;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rocketmq.RocketMqMessageFactory;
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

public class RocketmqConsumer {
private RocketmqConsumer() {
}

public static void main(String[] args) throws InterruptedException, ClientException, IOException {
if (args.length < 3) {
System.out.println("Usage: rocketmq_consumer <endpoints> <topic> <consumer_group>");
return;
}
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = args[0];
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
FilterExpression filterExpression = new FilterExpression();
String topic = args[1];
String consumerGroup = args[2];

// Create the RocketMQ Consumer.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
final MessageReader reader = RocketMqMessageFactory.createReader(messageView);
final CloudEvent event = reader.toEvent();
System.out.println("Received event=" + event + ", messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
// Block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// Close the push consumer when you don't need it anymore.
pushConsumer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.cloudevents.examples.rocketmq;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.rocketmq.RocketMqMessageFactory;
import io.cloudevents.types.Time;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.shaded.com.google.gson.Gson;

public class RocketmqProducer {
private RocketmqProducer() {
}

public static void main(String[] args) throws ClientException, IOException {
if (args.length < 2) {
System.out.println("Usage: rocketmq_producer <endpoints> <topic>");
return;
}
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = args[0];
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String topic = args[1];

// Create the RocketMQ Producer.
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
final Gson gson = new Gson();
Map<String, String> payload = new HashMap<>();
payload.put("foo", "bar");
final CloudEvent event = new CloudEventBuilder()
.withId("client-id")
.withSource(URI.create("http://127.0.0.1/rocketmq-client"))
.withType("com.foobar")
.withTime(Time.parseTime("2022-11-09T21:47:12.032198+00:00"))
.withData(gson.toJson(payload).getBytes(StandardCharsets.UTF_8))
.build();
// Transform event into message.
final Message message = RocketMqMessageFactory.createWriter(topic).writeBinary(event);
try {
// Send the message.
final SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
} catch (Exception e) {
System.out.println("Failed to send message");
e.printStackTrace();
}
// Close the producer when you don't need it anymore.
producer.close();
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<module>spring</module>
<module>sql</module>
<module>bom</module>
<module>rocketmq</module>
</modules>

<properties>
Expand Down
73 changes: 73 additions & 0 deletions rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
</parent>

<artifactId>cloudevents-rocketmq</artifactId>
<name>CloudEvents - RocketMQ Binding</name>
<packaging>jar</packaging>

<properties>
<rocketmq.version>5.0.4</rocketmq.version>
<module-name>io.cloudevents.rocketmq</module-name>
</properties>

<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq.version}</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.cloudevents.rocketmq;

import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.rocketmq.impl.RocketmqBinaryMessageReader;
import io.cloudevents.rocketmq.impl.RocketmqConstants;
import io.cloudevents.rocketmq.impl.RocketmqMessageWriter;
import io.cloudevents.rw.CloudEventWriter;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageView;

/**
* A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances
* based on RocketMQ {@link MessageView} and {@link Message}.
*/
public class RocketMqMessageFactory {
private RocketMqMessageFactory() {
// prevent instantiation
}

/**
* Creates a {@link MessageReader} to read a RocketMQ {@link MessageView}.
*
* @param message The RocketMQ {@link MessageView} to read from.
* @return A {@link MessageReader} that can read the given {@link MessageView} to a {@link io.cloudevents.CloudEvent} representation.
*/
public static MessageReader createReader(final MessageView message) {
final ByteBuffer byteBuffer = message.getBody();
byte[] body = new byte[byteBuffer.remaining()];
byteBuffer.get(body);
final Map<String, String> properties = message.getProperties();
final String contentType = properties.get(RocketmqConstants.PROPERTY_CONTENT_TYPE);
return createReader(contentType, properties, body);
}

/**
* Creates a {@link MessageReader} using the content type, properties, and body of a RocketMQ {@link MessageView}.
*
* @param contentType The content type of the message payload.
* @param properties The properties of the RocketMQ message containing CloudEvent metadata (attributes and/or extensions).
* @param body The message body as byte array.
* @return A {@link MessageReader} capable of parsing a {@link io.cloudevents.CloudEvent} from the content-type, properties, and payload of a RocketMQ message.
*/
public static MessageReader createReader(final String contentType, final Map<String, String> properties, final byte[] body) {
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, body),
() -> properties.get(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION),
sv -> new RocketmqBinaryMessageReader(sv, properties, contentType, body)
);
}

/**
* Creates a {@link MessageWriter} instance capable of translating a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
*
* @param topic The topic to which the created RocketMQ message will be sent.
* @return A {@link MessageWriter} capable of converting a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
*/
public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(final String topic) {
return new RocketmqMessageWriter(topic);
}
}
Loading

0 comments on commit 8db234c

Please sign in to comment.