Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions connectors/rocketmq-connect-kafka-connector-adapter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
**rocketmq-connect-kafka-connector-adapter**

本项目的目标是让kafka connector运行在rocketmq-connect,使得数据在rocketmq导入导出。

**参数说明**

参数分为3类:rocketmq connect runtime参数、 kafka-connector-adapter参数,以及 具体kafka connector参数

rocketmq connect runtime参数:
- **connector-class**: kafka-connector-adapter的类名

如果是SourceConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector。

如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。

- **connect-topicname**: 要导入导出数据的rocketmq topic
- **tasks.num**: 启动的task数目

kafka-connector-adapter参数:
- **connector.class**: kafka connector的类名
- **plugin.path**: kafka connector插件路径

具体kafka connector参数:

参考具体kafka connector的文档


# 快速开始

demo展示如何启动kafka-file-connector

适配的kafka-file-connector的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件

## 1.获取kafka-file-connector

1. 下载kafka的二进制包:https://kafka.apache.org/downloads
2. 解压后到libs目录找到kafka-file-connector的jar包:connect-file-{version}.jar
3. 将jar拷贝到专门目录,这个目录作为kafka connector插件路径:plugin.path,比如:/tmp/kafka-plugins


## 2.构建rocketmq-connect-kafka-connector-adapter

```
git clone https://github.com/apache/rocketmq-connect.git

cd connectors/rocketmq-connect-kafka-connector-adapter/

mvn package

```
最后将/target/rocketmq-connect-kafka-connector-adapter-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到rocketmq插件目录下,并修改connect-standalone.conf的pluginPaths为对应的rocketmq插件目录
,比如/tmp/rocketmq-plugins

## 3.运行Worker

```
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

```

## 4.启动source connector

```
touch /tmp/test-source-file.txt

echo "Hello \r\nRocketMQ\r\n Connect" >> /tmp/test-source-file.txt

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","plugin.path":"/tmp/kafka-plugins","topic":"fileTopic","file":"/tmp/test-source-file.txt"}'
```

## 5.启动sink connector

```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","plugin.path":"/tmp/kafka-plugins","file":"/tmp/test-sink-file.txt"}'

cat /tmp/test-sink-file.txt
```

# kafka connect transform

todo

# 如何运行kafka-mongo-connector

todo

# 如何运行kafka-jdbc-connector

todo
192 changes: 192 additions & 0 deletions connectors/rocketmq-connect-kafka-connector-adapter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-kafka-connector-adapter</artifactId>
<version>0.0.1-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>-Xms512m -Xmx1024m</argLine>
<forkMode>always</forkMode>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.6</version>
<configuration>
<locales>en_US</locales>
<outputEncoding>UTF-8</outputEncoding>
<inputEncoding>UTF-8</inputEncoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.12</version>
<configuration>
<excludes>
<exclude>README.md</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<charset>UTF-8</charset>
<locale>en_US</locale>
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
</configuration>
<executions>
<execution>
<id>aggregate</id>
<goals>
<goal>aggregate</goal>
</goals>
<phase>site</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded-->
<manifest>
<mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
<execution>
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<includeTestResources>false</includeTestResources>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>3.2.0</version>
</dependency>

<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.4</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.rocketmq.connect.kafka.config;

import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;

import java.util.HashSet;
import java.util.Set;

public class ConfigDefine {
public static String ROCKETMQ_CONNECTOR_CLASS = "connector-class";
public static String CONNECTOR_CLASS = ConnectorConfig.CONNECTOR_CLASS_CONFIG;
public static String PLUGIN_PATH = "plugin.path";

public static final String TASK_CLASS = TaskConfig.TASK_CLASS_CONFIG;

public static final String KEY_CONVERTER = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;


public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
{
add(CONNECTOR_CLASS);
add(PLUGIN_PATH);
}
};
}
Loading