Skip to content

Commit

Permalink
[ISSUE #3913] Add Source Connector RocketMQ Module (#3914)
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 12, 2023
1 parent 9f3f835 commit e81a1f4
Show file tree
Hide file tree
Showing 8 changed files with 454 additions and 0 deletions.
41 changes: 41 additions & 0 deletions eventmesh-connectors/source-connector-rocketmq/build.gradle
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

List rocketmq = [
"org.apache.rocketmq:rocketmq-client:$rocketmq_version",
"org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
"org.apache.rocketmq:rocketmq-common:$rocketmq_version",
"org.apache.rocketmq:rocketmq-store:$rocketmq_version",
"org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
"org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
"org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
"org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
"org.apache.rocketmq:rocketmq-test:$rocketmq_version",
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
"org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
"org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",

]

dependencies {
implementation project(":eventmesh-connectors:eventmesh-connector-api")
implementation project(":eventmesh-sdk-java")
implementation rocketmq
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
17 changes: 17 additions & 0 deletions eventmesh-connectors/source-connector-rocketmq/gradle.properties
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
rocketmq_version=4.9.5
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.source.connector.rocketmq;

import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;

public class EventMeshTestUtils {

private static final int SEQ_LENGTH = 10;

private static final String ASYNC_MSG_BODY = "testAsyncMessage";

private static final String DEFAULT_TTL_MS = "30000";

// generate pub-client
public static UserAgent generateClient1() {
final UserAgent agent = UserAgent.builder()
.env(UtilsConstants.ENV)
.host(UtilsConstants.HOST)
.password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
.username(UtilsConstants.USER_NAME)
.group(UtilsConstants.GROUP)
.path(UtilsConstants.PATH)
.port(UtilsConstants.PORT_1)
.subsystem(UtilsConstants.SUB_SYSTEM_1)
.pid(UtilsConstants.PID_1)
.version(UtilsConstants.VERSION)
.idc(UtilsConstants.IDC)
.build();
return MessageUtils.generatePubClient(agent);
}

// generate sub-client
public static UserAgent generateClient2() {
final UserAgent agent = UserAgent.builder()
.env(UtilsConstants.ENV)
.host(UtilsConstants.HOST)
.password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
.username(UtilsConstants.USER_NAME)
.group(UtilsConstants.GROUP)
.path(UtilsConstants.PATH)
.port(UtilsConstants.PORT_2)
.subsystem(UtilsConstants.SUB_SYSTEM_2)
.pid(UtilsConstants.PID_2)
.version(UtilsConstants.VERSION)
.idc(UtilsConstants.IDC)
.build();
return MessageUtils.generateSubClient(agent);
}

public static Package rrResponse(final EventMeshMessage request) {
final Package msg = new Package();
msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, generateRandomString(SEQ_LENGTH)));
msg.setBody(request);
return msg;
}

private static String generateRandomString(final int length) {
final StringBuilder builder = new StringBuilder(length);
for (int i = 0; i < length; i++) {
builder.append((char) ThreadLocalRandom.current().nextInt(48, 57));
}
return builder.toString();
}

public static CloudEvent generateCloudEventV1(String destination, String message) {
final Map<String, String> content = new HashMap<>();
content.put(UtilsConstants.CONTENT, message);

return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(destination)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(Objects.requireNonNull(JsonUtils.toJSONString(content)).getBytes(StandardCharsets.UTF_8))
.withExtension(UtilsConstants.TTL, DEFAULT_TTL_MS)
.build();
}

}
@@ -0,0 +1,60 @@
package org.apache.eventmesh.source.connector.rocketmq;

import io.cloudevents.CloudEvent;
import java.util.List;
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.connector.api.data.ConnectRecord;
import org.apache.eventmesh.source.connector.rocketmq.config.RocketMQSourceConfig;
import org.apache.eventmesh.source.connector.rocketmq.connector.RocketMQSourceConnector;

public class RocketMQSourceWorker {

public static final String SOURCE_CONSUMER_GROUP = "DEFAULT-CONSUMER-GROUP";
public static final String SOURCE_CONNECT_NAMESRVADDR = "127.0.0.1:9877";
public static final String SOURCE_TOPIC = "TopicTest";

public static final String DESTINATION = "SourceTopic";


public static void main(String[] args) throws Exception {

UserAgent userAgent = EventMeshTestUtils.generateClient1();
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host("127.0.0.1")
.port(10002)
.userAgent(userAgent)
.build();

final EventMeshTCPClient<CloudEvent> client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);

client.init();

RocketMQSourceConnector rocketMQSourceConnector = new RocketMQSourceConnector();

RocketMQSourceConfig rocketMQSourceConfig = new RocketMQSourceConfig();

rocketMQSourceConfig.setSourceNameserver(SOURCE_CONNECT_NAMESRVADDR);
rocketMQSourceConfig.setSourceTopic(SOURCE_TOPIC);
rocketMQSourceConfig.setSourceGroup(SOURCE_CONSUMER_GROUP);

rocketMQSourceConnector.init(rocketMQSourceConfig);

rocketMQSourceConnector.start();

while(true) {
List<ConnectRecord> connectorRecordList = rocketMQSourceConnector.poll();
for(ConnectRecord connectRecord : connectorRecordList) {
// todo:connectorRecord 转换 cloudEvents
CloudEvent event = EventMeshTestUtils.generateCloudEventV1(connectRecord.getExtension("topic"), connectRecord.getData().toString());
client.publish(event, 3000);
Thread.sleep(500);
}
}

}

}
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.source.connector.rocketmq;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;


@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class UtilsConstants {

public static final String ENV = "test";
public static final String HOST = "localhost";
public static final Integer PASSWORD_LENGTH = 8;
public static final String USER_NAME = "PU4283";
public static final String GROUP = "EventmeshTestGroup";
public static final String PATH = "/data/app/umg_proxy";
public static final Integer PORT_1 = 8362;
public static final Integer PORT_2 = 9362;
public static final String SUB_SYSTEM_1 = "5023";
public static final String SUB_SYSTEM_2 = "5017";
public static final Integer PID_1 = 32_893;
public static final Integer PID_2 = 42_893;
public static final String VERSION = "2.0.11";
public static final String IDC = "FT";
/**
* PROPERTY KEY NAME .
*/
public static final String MSG_TYPE = "msgtype";
public static final String TTL = "ttl";
public static final String KEYS = "keys";
public static final String REPLY_TO = "replyto";
public static final String PROPERTY_MESSAGE_REPLY_TO = "propertymessagereplyto";
public static final String CONTENT = "content";


}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.source.connector.rocketmq.config;

import org.apache.eventmesh.connector.api.config.SourceConfig;

public class RocketMQSourceConfig extends SourceConfig {

String connectorName;

String sourceNameserver;

String sourceTopic;

String sourceGroup;

public String getConnectorName() {
return connectorName;
}

public void setConnectorName(String connectorName) {
this.connectorName = connectorName;
}

public String getSourceNameserver() {
return sourceNameserver;
}

public void setSourceNameserver(String sourceNameserver) {
this.sourceNameserver = sourceNameserver;
}

public String getSourceTopic() {
return sourceTopic;
}

public void setSourceTopic(String sourceTopic) {
this.sourceTopic = sourceTopic;
}

public String getSourceGroup() {
return sourceGroup;
}

public void setSourceGroup(String sourceGroup) {
this.sourceGroup = sourceGroup;
}
}

0 comments on commit e81a1f4

Please sign in to comment.