Skip to content

Commit

Permalink
[Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports cu…
Browse files Browse the repository at this point in the history
…stom schema
  • Loading branch information
eyys committed Oct 6, 2022
1 parent 15636bd commit 87c53d3
Show file tree
Hide file tree
Showing 13 changed files with 1,189 additions and 0 deletions.
90 changes: 90 additions & 0 deletions docs/en/connector-v2/source/Kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Apache Kafka

> Apache Kafka source connector
## Description

Source connector for Apache Kafka.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
| --- | --- | --- | --- |
| topic | String | Yes | - |
| pattern | Boolean | No | - |
| bootstrap.servers | String | Yes | - |
| consumer.group | String | No | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | No | - |
| schema | Config | No | content |
| format | String | No | json |
| result_table_name | String | No | - |


### topic [String]

Kafka topic name, If there are multiple topics, use , to split, for example: "tpc1,tpc2", If Pattern is set to True,
Support regular matching topic, for example: `tpc.*`;

### pattern [Boolean]

Whether to enable the regular matching topic, use java pattern match topic, Set to `true` to start the regular matching topic;

### bootstrap.servers [String]

The server address of kafka cluster, for example: `hadoop101:9092,hadoop102:9092,hadoop103:9092`;

### consumer.group [String]

Kafka consumer group. The default value is `SeaTunnel-Consumer-Group`;

### commit_on_checkpoint [Boolean]

If `true` the consumer's offset will be periodically committed in the background;

### schema [Config]

User - defined data type, refer to the article: Schema ;

### format [String]

Data format, By default, data of the JSON type is read. Other Settings will be treated as strings, for example `json`;

### kafka. [String]

Used to set up Kafka's configuration, for example: `kafka.max.poll.records = 500`, You can configure multiple, Will be added to the consumer's configuration;
For details, see Configuration of KafkaConsumer;

### result_table_name [String]

The table name that is converted after reading and used in the transformed SQL query;


## Example

```kafka {
source {
Kafka {
result_table_name = "kafka_name"
schema = {
fields {
name = "string"
age = "int"
}
}
format = json
topic = "topic_1,topic_2,topic_3"
bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
kafka.max.poll.records = 500
kafka.client.id = client_1
}
}
```
89 changes: 89 additions & 0 deletions docs/zh-CN/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Apache Kafka

> Apache Kafka 源连接器
## 描述

Apache Kafka 的源连接器。

## 主要特性

- [x] [](../../concept/connector-v2-features.md)
- [x] [](../../concept/connector-v2-features.md)
- [x] [精准一次](../../concept/connector-v2-features.md)
- [x] [模式投影](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户自定义切分](../../concept/connector-v2-features.md)

## 选项

| 名字 | 类型 | 是否必须 | 默认值 |
| --- | --- | --- | --- |
| topic | String | Yes | - |
| pattern | Boolean | No | - |
| bootstrap.servers | String | Yes | - |
| consumer.group | String | No | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | No | - |
| schema | Config | No | content |
| format | String | No | json |
| result_table_name | String | No | - |


### topic [String]

Kafka topic 名称,如果有多个 topic,使用`,`来分割,例如:`tpc1,tpc2`,如果Pattern设置为`true`
支持常规匹配主题,例如: `tpc.*`;

### pattern [Boolean]

是否启用常规匹配主题,使用java模式匹配 topic,设置为`true`启动常规匹配主题;

### bootstrap.servers [String]

kafka 集群的服务器地址,例如 : `hadoop101:9092,hadoop102:9092,hadoop103:9092`;

### consumer.group [String]

Kafka 消费者组,默认值是 `SeaTunnel-Consumer-Group`;

### commit_on_checkpoint [Boolean]

设置为`true`,消费者的偏移量将在后台定期提交;

### schema [Config]

用户定义的数据类型,参见文章:Schema;

### format [String]

数据格式,缺省情况下,读取`json`类型的数据。其他设置将被视为字符串,例如`json`;

### kafka. [String]

用于设置 Kafka 的配置,例如:`Kafka .max.poll.records = 500`,可以配置多个,依次添加到消费者的配置中;
详情请参见KafkaConsumer的配置;

### result_table_name [String]

读取后转换并在转换后的SQL查询中使用的表名;

## 比如

```kafka {
source {
Kafka {
result_table_name = "kafka_name"
schema = {
fields {
name = "string"
age = "int"
}
}
format = json
topic = "topic_1,topic_2,topic_3"
bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
kafka.max.poll.records = 500
kafka.client.id = client_1
}
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kafka.config;

public class KafkaConfig {
/**
* The topic of kafka.
*/
public static final String TOPIC = "topic";

/**
* The topic of kafka is java pattern or list.
*/
public static final String PATTERN = "pattern";

/**
* The server address of kafka cluster.
*/
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";

public static final String KAFKA_CONFIG_PREFIX = "kafka.";

/**
* consumer group of kafka client consume message.
*/
public static final String CONSUMER_GROUP = "consumer.group";


/**
* consumer group of kafka client consume message.
*/
public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint";

/**
* The prefix of kafka's transactionId, make sure different job use different prefix.
*/
public static final String TRANSACTION_PREFIX = "transaction_prefix";

/**
* User-defined schema
*/
public static final String SCHEMA = "schema";

/**
* data format
*/
public static final String FORMAT = "format";

/**
* The default data format is JSON
*/
public static final String DEFAULT_FORMAT = "json";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-kafka-flink-e2e</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-flink-e2e-base</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 87c53d3

Please sign in to comment.