Skip to content

Commit

Permalink
[Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports cu…
Browse files Browse the repository at this point in the history
…stom schema
  • Loading branch information
eyys committed Sep 24, 2022
1 parent 255533d commit c4321a4
Show file tree
Hide file tree
Showing 17 changed files with 960 additions and 18 deletions.
90 changes: 90 additions & 0 deletions docs/en/connector-v2/source/Kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Apache Kafka

> Apache Kafka source connector
## Description

Source connector for Apache Kafka.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
| --- | --- | --- | --- |
| topic | String | Yes | - |
| pattern | Boolean | No | - |
| bootstrap.servers | String | Yes | - |
| consumer.group | String | No | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | No | - |
| schema | Config | No | content |
| format | String | No | json |
| result_table_name | String | No | - |


### topic [String]

Kafka topic name, If there are multiple topics, use , to split, for example: "tpc1,tpc2", If Pattern is set to True,
Support regular matching topic, for example: `tpc.*`;

### pattern [Boolean]

Whether to enable the regular matching topic, use java pattern match topic, Set to `true` to start the regular matching topic;

### bootstrap.servers [String]

The server address of kafka cluster, for example: `hadoop101:9092,hadoop102:9092,hadoop103:9092`;

### consumer.group [String]

Kafka consumer group. The default value is `SeaTunnel-Consumer-Group`;

### commit_on_checkpoint [Boolean]

If `true` the consumer's offset will be periodically committed in the background;

### schema [Config]

User - defined data type, refer to the article: Schema ;

### format [String]

Data format, By default, data of the JSON type is read. Other Settings will be treated as strings, for example `json`;

### kafka. [String]

Used to set up Kafka's configuration, for example: `kafka.max.poll.records = 500`, You can configure multiple, Will be added to the consumer's configuration;
For details, see Configuration of KafkaConsumer;

### result_table_name [String]

The table name that is converted after reading and used in the transformed SQL query;


## Example

```kafka {
source {
Kafka {
result_table_name = "kafka_name"
schema = {
fields {
name = "string"
age = "int"
}
}
format = json
topic = "topic_1,topic_2,topic_3"
bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
kafka.max.poll.records = 500
kafka.client.id = client_1
}
}
```
89 changes: 89 additions & 0 deletions docs/zh-CN/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Apache Kafka

> Apache Kafka 源连接器
## 描述

Apache Kafka 的源连接器。

## 主要特性

- [x] [](../../concept/connector-v2-features.md)
- [x] [](../../concept/connector-v2-features.md)
- [x] [精准一次](../../concept/connector-v2-features.md)
- [x] [模式投影](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户自定义切分](../../concept/connector-v2-features.md)

## 选项

| 名字 | 类型 | 是否必须 | 默认值 |
| --- | --- | --- | --- |
| topic | String | Yes | - |
| pattern | Boolean | No | - |
| bootstrap.servers | String | Yes | - |
| consumer.group | String | No | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | No | - |
| schema | Config | No | content |
| format | String | No | json |
| result_table_name | String | No | - |


### topic [String]

Kafka topic 名称,如果有多个 topic,使用`,`来分割,例如:`tpc1,tpc2`,如果Pattern设置为`true`
支持常规匹配主题,例如: `tpc.*`;

### pattern [Boolean]

是否启用常规匹配主题,使用java模式匹配 topic,设置为`true`启动常规匹配主题;

### bootstrap.servers [String]

kafka 集群的服务器地址,例如 : `hadoop101:9092,hadoop102:9092,hadoop103:9092`;

### consumer.group [String]

Kafka 消费者组,默认值是 `SeaTunnel-Consumer-Group`;

### commit_on_checkpoint [Boolean]

设置为`true`,消费者的偏移量将在后台定期提交;

### schema [Config]

用户定义的数据类型,参见文章:Schema;

### format [String]

数据格式,缺省情况下,读取`json`类型的数据。其他设置将被视为字符串,例如`json`;

### kafka. [String]

用于设置 Kafka 的配置,例如:`Kafka .max.poll.records = 500`,可以配置多个,依次添加到消费者的配置中;
详情请参见KafkaConsumer的配置;

### result_table_name [String]

读取后转换并在转换后的SQL查询中使用的表名;

## 比如

```kafka {
source {
Kafka {
result_table_name = "kafka_name"
schema = {
fields {
name = "string"
age = "int"
}
}
format = json
topic = "topic_1,topic_2,topic_3"
bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
kafka.max.poll.records = 500
kafka.client.id = client_1
}
}
```
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.config;

public class Config {
public class KafkaConfig {
/**
* The topic of kafka.
*/
Expand Down Expand Up @@ -50,4 +50,20 @@ public class Config {
* The prefix of kafka's transactionId, make sure different job use different prefix.
*/
public static final String TRANSACTION_PREFIX = "transaction_prefix";

/**
* User-defined schema
*/
public static final String SCHEMA = "schema";

/**
* data format
*/
public static final String FORMAT = "format";

/**
* The default data format is JSON
*/
public static final String DEFAULT_FORMAT = "json";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.TRANSACTION_PREFIX;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void close() {

private Properties getKafkaProperties(Config pluginConfig) {
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, false);
KafkaConfig.KAFKA_CONFIG_PREFIX, false);
Properties kafkaProperties = new Properties();
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,33 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.CONSUMER_GROUP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.PATTERN;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.DEFAULT_FORMAT;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;

import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;
Expand All @@ -54,6 +58,7 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private final ConsumerMetadata metadata = new ConsumerMetadata();
private SeaTunnelRowType typeInfo;
private JobContext jobContext;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;

@Override
public Boundedness getBoundedness() {
Expand Down Expand Up @@ -92,10 +97,7 @@ public void prepare(Config config) throws PrepareFailException {
this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
});

// TODO support user custom row type
this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});

setDeserialization(config);
}

@Override
Expand All @@ -105,7 +107,7 @@ public SeaTunnelRowType getProducedType() {

@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
return new KafkaSourceReader(this.metadata, this.typeInfo, deserializationSchema, readerContext);
}

@Override
Expand All @@ -122,4 +124,27 @@ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerat
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}

private void setDeserialization(Config config) {
if (config.hasPath(SCHEMA)) {
Config schema = config.getConfig(SCHEMA);
typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
} else {
typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
}
// TODO: use format SPI
// default use json format
String format;
if(config.hasPath(FORMAT)){
if(DEFAULT_FORMAT.equals(config.getString(FORMAT))){
deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
}else{
format = config.getString(FORMAT);
this.deserializationSchema = null;
}
}else {
format = DEFAULT_FORMAT;
deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
Expand Down Expand Up @@ -63,14 +64,17 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
private final ConcurrentMap<TopicPartition, KafkaSourceSplit> sourceSplitMap;
private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
private final ExecutorService executorService;
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
// TODO support user custom type
private SeaTunnelRowType typeInfo;

KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
SourceReader.Context context) {
this.metadata = metadata;
this.context = context;
this.typeInfo = typeInfo;
this.deserializationSchema = deserializationSchema;
this.sourceSplits = new HashSet<>();
this.consumerThreadMap = new ConcurrentHashMap<>();
this.sourceSplitMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -118,9 +122,12 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
List<ConsumerRecord<byte[], byte[]>> recordList = records.records(partition);
for (ConsumerRecord<byte[], byte[]> record : recordList) {

String v = stringDeserializer.deserialize(partition.topic(), record.value());
String t = partition.topic();
output.collect(new SeaTunnelRow(new Object[]{t, v}));
if(deserializationSchema != null){
deserializationSchema.deserialize(record.value(),output);
}else {
String content = stringDeserializer.deserialize(partition.topic(), record.value());
output.collect(new SeaTunnelRow(new Object[]{content}));
}

if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
record.offset() >= sourceSplit.getEndOffset()) {
Expand Down
Loading

0 comments on commit c4321a4

Please sign in to comment.