Skip to content

Commit f65f44f

Browse files
authored
[Improve][Connector-V2][Kafka] Support to specify multiple partition keys (#3230)
1 parent e4b97b7 commit f65f44f

File tree

35 files changed

+508
-1522
lines changed

35 files changed

+508
-1522
lines changed

docs/en/connector-v2/sink/Kafka.md

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
1515

1616
## Options
1717

18-
| name | type | required | default value |
19-
| ------------------ | ---------------------- | -------- | ------------- |
20-
| topic | string | yes | - |
21-
| bootstrap.servers | string | yes | - |
22-
| kafka.* | kafka producer config | no | - |
23-
| semantic | string | no | NON |
24-
| partition_key | string | no | - |
25-
| partition | int | no | - |
26-
| assign_partitions | list | no | - |
27-
| transaction_prefix | string | no | - |
28-
| common-options | config | no | - |
18+
| name | type | required | default value |
19+
|----------------------|-----------------------| -------- | ------------- |
20+
| topic | string | yes | - |
21+
| bootstrap.servers | string | yes | - |
22+
| kafka.* | kafka producer config | no | - |
23+
| semantic | string | no | NON |
24+
| partition_key_fields | array | no | - |
25+
| partition | int | no | - |
26+
| assign_partitions | array | no | - |
27+
| transaction_prefix | string | no | - |
28+
| common-options | config | no | - |
2929

3030
### topic [string]
3131

@@ -51,11 +51,11 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
5151

5252
NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
5353

54-
### partition_key [string]
54+
### partition_key_fields [array]
5555

56-
Configure which field is used as the key of the kafka message.
56+
Configure which fields are used as the key of the kafka message.
5757

58-
For example, if you want to use value of a field from upstream data as key, you can assign it to the field name.
58+
For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.
5959

6060
Upstream data is the following:
6161

@@ -66,13 +66,13 @@ Upstream data is the following:
6666

6767
If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.
6868

69-
If the field name does not exist in the upstream data, the configured parameter will be used as the key.
69+
If not set partition key fields, the null message key will be sent to.
7070

7171
### partition [int]
7272

7373
We can specify the partition, all messages will be sent to this partition.
7474

75-
### assign_partitions [list]
75+
### assign_partitions [array]
7676

7777
We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information.
7878

@@ -113,3 +113,6 @@ sink {
113113
### 2.3.0-beta 2022-10-20
114114

115115
- Add Kafka Sink Connector
116+
### next version
117+
118+
- [Feature] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ public class Config {
101101
.withDescription("We can decide which partition to send based on the content of the message. " +
102102
"The function of this parameter is to distribute information.");
103103

104-
public static final Option<String> PARTITION_KEY = Options.key("partition_key")
105-
.stringType()
104+
public static final Option<List<String>> PARTITION_KEY_FIELDS = Options.key("partition_key_fields")
105+
.listType()
106106
.noDefaultValue()
107-
.withDescription("Configure which field is used as the key of the kafka message.");
107+
.withDescription("Configure which fields are used as the key of the kafka message.");
108108

109109
public static final Option<StartMode> START_MODE = Options.key("start_mode")
110110
.objectType(StartMode.class)

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,78 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
1919

20+
import org.apache.seatunnel.api.serialization.SerializationSchema;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2022
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2123
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2224
import org.apache.seatunnel.format.json.JsonSerializationSchema;
2325

2426
import org.apache.kafka.clients.producer.ProducerRecord;
2527

28+
import java.util.List;
29+
import java.util.function.Function;
30+
2631
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
2732

28-
private int partation = -1;
33+
private Integer partition;
2934
private final String topic;
30-
private final JsonSerializationSchema jsonSerializationSchema;
35+
private final SerializationSchema keySerialization;
36+
private final SerializationSchema valueSerialization;
3137

3238
public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) {
33-
this.topic = topic;
34-
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
39+
this(topic, element -> null, createSerializationSchema(seaTunnelRowType));
3540
}
3641

37-
public DefaultSeaTunnelRowSerializer(String topic, int partation, SeaTunnelRowType seaTunnelRowType) {
42+
public DefaultSeaTunnelRowSerializer(String topic, Integer partition, SeaTunnelRowType seaTunnelRowType) {
3843
this(topic, seaTunnelRowType);
39-
this.partation = partation;
44+
this.partition = partition;
45+
}
46+
47+
public DefaultSeaTunnelRowSerializer(String topic,
48+
List<String> keyFieldNames,
49+
SeaTunnelRowType seaTunnelRowType) {
50+
this(topic, createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
51+
createSerializationSchema(seaTunnelRowType));
52+
}
53+
54+
public DefaultSeaTunnelRowSerializer(String topic,
55+
SerializationSchema keySerialization,
56+
SerializationSchema valueSerialization) {
57+
this.topic = topic;
58+
this.keySerialization = keySerialization;
59+
this.valueSerialization = valueSerialization;
4060
}
4161

4262
@Override
4363
public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
44-
if (this.partation != -1) {
45-
return new ProducerRecord<>(topic, this.partation, null, jsonSerializationSchema.serialize(row));
46-
}
47-
else {
48-
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
49-
}
64+
return new ProducerRecord<>(topic, partition,
65+
keySerialization.serialize(row), valueSerialization.serialize(row));
5066
}
5167

52-
@Override
53-
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
54-
//if the key is null, kafka will send message to a random partition
55-
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
68+
private static SerializationSchema createSerializationSchema(SeaTunnelRowType rowType) {
69+
return new JsonSerializationSchema(rowType);
5670
}
5771

72+
private static SerializationSchema createKeySerializationSchema(List<String> keyFieldNames,
73+
SeaTunnelRowType seaTunnelRowType) {
74+
int[] keyFieldIndexArr = new int[keyFieldNames.size()];
75+
SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
76+
for (int i = 0; i < keyFieldNames.size(); i++) {
77+
String keyFieldName = keyFieldNames.get(i);
78+
int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
79+
keyFieldIndexArr[i] = rowFieldIndex;
80+
keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
81+
}
82+
SeaTunnelRowType keyType = new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
83+
SerializationSchema keySerializationSchema = createSerializationSchema(keyType);
84+
85+
Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
86+
Object[] keyFields = new Object[keyFieldIndexArr.length];
87+
for (int i = 0; i < keyFieldIndexArr.length; i++) {
88+
keyFields[i] = row.getField(keyFieldIndexArr[i]);
89+
}
90+
return new SeaTunnelRow(keyFields);
91+
};
92+
return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
93+
}
5894
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,4 @@ public interface SeaTunnelRowSerializer<K, V> {
3030
* @return kafka record.
3131
*/
3232
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
33-
34-
/**
35-
* Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
36-
*
37-
* @param key String
38-
* @param row seatunnel row
39-
* @return kafka record.
40-
*/
41-
ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
4233
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public OptionRule optionRule() {
3636
return OptionRule.builder()
3737
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
3838
.optional(Config.KAFKA_CONFIG_PREFIX, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
39-
.exclusive(Config.PARTITION, Config.PARTITION_KEY)
39+
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
4040
.build();
4141
}
4242
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java

Lines changed: 28 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
1919

2020
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
21+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
2122
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
22-
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
23+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
2324
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
2425
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
2526

@@ -40,56 +41,33 @@
4041
import org.apache.kafka.common.serialization.ByteArraySerializer;
4142

4243
import java.util.Arrays;
44+
import java.util.Collections;
4345
import java.util.List;
4446
import java.util.Optional;
4547
import java.util.Properties;
4648
import java.util.Random;
47-
import java.util.function.Function;
4849

4950
/**
5051
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
5152
*/
5253
public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> {
5354

5455
private final SinkWriter.Context context;
55-
private final Config pluginConfig;
56-
private final Function<SeaTunnelRow, String> partitionExtractor;
5756

5857
private String transactionPrefix;
5958
private long lastCheckpointId = 0;
60-
private int partition;
6159

6260
private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
6361
private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
6462

6563
private static final int PREFIX_RANGE = 10000;
6664

67-
// check config
68-
@Override
69-
public void write(SeaTunnelRow element) {
70-
ProducerRecord<byte[], byte[]> producerRecord = null;
71-
//Determine the partition of the kafka send message based on the field name
72-
if (pluginConfig.hasPath(PARTITION_KEY.key())){
73-
String key = partitionExtractor.apply(element);
74-
producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element);
75-
}
76-
else {
77-
producerRecord = seaTunnelRowSerializer.serializeRow(element);
78-
}
79-
kafkaProducerSender.send(producerRecord);
80-
}
81-
8265
public KafkaSinkWriter(
8366
SinkWriter.Context context,
8467
SeaTunnelRowType seaTunnelRowType,
8568
Config pluginConfig,
8669
List<KafkaSinkState> kafkaStates) {
8770
this.context = context;
88-
this.pluginConfig = pluginConfig;
89-
this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);
90-
if (pluginConfig.hasPath(PARTITION.key())) {
91-
this.partition = pluginConfig.getInt(PARTITION.key());
92-
}
9371
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
9472
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
9573
}
@@ -116,6 +94,12 @@ public KafkaSinkWriter(
11694
}
11795
}
11896

97+
@Override
98+
public void write(SeaTunnelRow element) {
99+
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
100+
kafkaProducerSender.send(producerRecord);
101+
}
102+
119103
@Override
120104
public List<KafkaSinkState> snapshotState(long checkpointId) {
121105
List<KafkaSinkState> states = kafkaProducerSender.snapshotState(checkpointId);
@@ -145,8 +129,7 @@ public void close() {
145129
}
146130

147131
private Properties getKafkaProperties(Config pluginConfig) {
148-
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
149-
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX.key(), false);
132+
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, KAFKA_CONFIG_PREFIX.key(), false);
150133
Properties kafkaProperties = new Properties();
151134
kafkaConfig.entrySet().forEach(entry -> {
152135
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
@@ -160,13 +143,13 @@ private Properties getKafkaProperties(Config pluginConfig) {
160143
return kafkaProperties;
161144
}
162145

163-
// todo: parse the target field from config
164146
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
165-
if (pluginConfig.hasPath(PARTITION.key())){
166-
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()), this.partition, seaTunnelRowType);
167-
}
168-
else {
169-
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()), seaTunnelRowType);
147+
if (pluginConfig.hasPath(PARTITION.key())) {
148+
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
149+
pluginConfig.getInt(PARTITION.key()), seaTunnelRowType);
150+
} else {
151+
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
152+
getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType);
170153
}
171154
}
172155

@@ -188,23 +171,18 @@ private void restoreState(List<KafkaSinkState> states) {
188171
}
189172
}
190173

191-
private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
192-
SeaTunnelRowType seaTunnelRowType) {
193-
if (!pluginConfig.hasPath(PARTITION_KEY.key())){
194-
return row -> null;
195-
}
196-
String partitionKey = pluginConfig.getString(PARTITION_KEY.key());
197-
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
198-
if (!fieldNames.contains(partitionKey)) {
199-
return row -> partitionKey;
200-
}
201-
int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
202-
return row -> {
203-
Object partitionFieldValue = row.getField(partitionFieldIndex);
204-
if (partitionFieldValue != null) {
205-
return partitionFieldValue.toString();
174+
private List<String> getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
175+
if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
176+
List<String> partitionKeyFields = pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
177+
List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
178+
for (String partitionKeyField : partitionKeyFields) {
179+
if (!rowTypeFieldNames.contains(partitionKeyField)) {
180+
throw new IllegalArgumentException(String.format(
181+
"Partition key field not found: %s, rowType: %s", partitionKeyField, rowTypeFieldNames));
182+
}
206183
}
207-
return null;
208-
};
184+
return partitionKeyFields;
185+
}
186+
return Collections.emptyList();
209187
}
210188
}
Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,36 @@
1818
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1919
<parent>
2020
<groupId>org.apache.seatunnel</groupId>
21-
<artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
21+
<artifactId>seatunnel-connector-v2-e2e</artifactId>
2222
<version>${revision}</version>
2323
</parent>
2424
<modelVersion>4.0.0</modelVersion>
2525

26-
<artifactId>connector-kafka-flink-e2e</artifactId>
26+
<artifactId>connector-kafka-e2e</artifactId>
2727

2828
<dependencies>
29+
<!-- SeaTunnel connectors -->
2930
<dependency>
3031
<groupId>org.apache.seatunnel</groupId>
31-
<artifactId>connector-flink-e2e-base</artifactId>
32+
<artifactId>connector-kafka</artifactId>
3233
<version>${project.version}</version>
33-
<classifier>tests</classifier>
34-
<type>test-jar</type>
3534
<scope>test</scope>
3635
</dependency>
37-
38-
<!-- SeaTunnel connectors -->
3936
<dependency>
4037
<groupId>org.apache.seatunnel</groupId>
41-
<artifactId>connector-kafka</artifactId>
38+
<artifactId>connector-console</artifactId>
4239
<version>${project.version}</version>
4340
<scope>test</scope>
4441
</dependency>
4542
<dependency>
4643
<groupId>org.apache.seatunnel</groupId>
47-
<artifactId>connector-console</artifactId>
44+
<artifactId>connector-assert</artifactId>
4845
<version>${project.version}</version>
4946
<scope>test</scope>
5047
</dependency>
5148
<dependency>
5249
<groupId>org.apache.seatunnel</groupId>
53-
<artifactId>connector-assert</artifactId>
50+
<artifactId>connector-fake</artifactId>
5451
<version>${project.version}</version>
5552
<scope>test</scope>
5653
</dependency>

0 commit comments

Comments
 (0)