Skip to content

Commit 385e1f4

Browse files
authored
[feature][connector][kafka] Support extract partition from SeaTunnelRow fields (#3085)
1 parent 0fd8da9 commit 385e1f4

File tree

5 files changed

+73
-3
lines changed

5 files changed

+73
-3
lines changed

docs/en/connector-v2/sink/Kafka.md

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
2121
| bootstrap.servers | string | yes | - |
2222
| kafka.* | kafka producer config | no | - |
2323
| semantic | string | no | NON |
24+
| partition_key | string | no | - |
2425
| partition | int | no | - |
2526
| assign_partitions | list | no | - |
2627
| transaction_prefix | string | no | - |
@@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
5051

5152
NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
5253

54+
### partition_key [string]
55+
56+
Configure which field is used as the key of the kafka message.
57+
58+
For example, if you want to use value of a field from upstream data as key, you can assign it to the field name.
59+
60+
Upstream data is the following:
61+
62+
| name | age | data |
63+
| ---- | ---- | ------------- |
64+
| Jack | 16 | data-example1 |
65+
| Mary | 23 | data-example2 |
66+
67+
If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.
68+
69+
If the field name does not exist in the upstream data, the configured parameter will be used as the key.
70+
5371
### partition [int]
5472

5573
We can specify the partition, all messages will be sent to this partition.
@@ -93,7 +111,9 @@ sink {
93111

94112
### change log
95113
#### next version
96-
114+
97115
- Add kafka sink doc
98116
- New feature : Kafka specified partition to send
99-
- New feature : Determine the partition that kafka send based on the message content
117+
- New feature : Determine the partition that kafka send message based on the message content
118+
- New feature : Configure which field is used as the key of the kafka message
119+

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,9 @@ public class Config {
5959
* Determine the partition to send based on the content of the message.
6060
*/
6161
public static final String ASSIGN_PARTITIONS = "assign_partitions";
62+
63+
/**
64+
* Determine the key of the kafka send partition
65+
*/
66+
public static final String PARTITION_KEY = "partition_key";
6267
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,11 @@ public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
4848
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
4949
}
5050
}
51+
52+
@Override
53+
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
54+
//if the key is null, kafka will send message to a random partition
55+
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
56+
}
57+
5158
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,13 @@ public interface SeaTunnelRowSerializer<K, V> {
3030
* @return kafka record.
3131
*/
3232
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
33+
34+
/**
35+
* Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
36+
*
37+
* @param key String
38+
* @param row seatunnel row
39+
* @return kafka record.
40+
*/
41+
ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
3342
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
2121
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
22+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
2223
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
2324
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
2425

@@ -38,10 +39,12 @@
3839
import org.apache.kafka.clients.producer.ProducerRecord;
3940
import org.apache.kafka.common.serialization.ByteArraySerializer;
4041

42+
import java.util.Arrays;
4143
import java.util.List;
4244
import java.util.Optional;
4345
import java.util.Properties;
4446
import java.util.Random;
47+
import java.util.function.Function;
4548

4649
/**
4750
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
@@ -50,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
5053

5154
private final SinkWriter.Context context;
5255
private final Config pluginConfig;
56+
private final Function<SeaTunnelRow, String> partitionExtractor;
5357

5458
private String transactionPrefix;
5559
private long lastCheckpointId = 0;
@@ -63,7 +67,15 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
6367
// check config
6468
@Override
6569
public void write(SeaTunnelRow element) {
66-
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
70+
ProducerRecord<byte[], byte[]> producerRecord = null;
71+
//Determine the partition of the kafka send message based on the field name
72+
if (pluginConfig.hasPath(PARTITION_KEY)){
73+
String key = partitionExtractor.apply(element);
74+
producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element);
75+
}
76+
else {
77+
producerRecord = seaTunnelRowSerializer.serializeRow(element);
78+
}
6779
kafkaProducerSender.send(producerRecord);
6880
}
6981

@@ -74,6 +86,7 @@ public KafkaSinkWriter(
7486
List<KafkaSinkState> kafkaStates) {
7587
this.context = context;
7688
this.pluginConfig = pluginConfig;
89+
this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);
7790
if (pluginConfig.hasPath(PARTITION)) {
7891
this.partition = pluginConfig.getInt(PARTITION);
7992
}
@@ -175,4 +188,20 @@ private void restoreState(List<KafkaSinkState> states) {
175188
}
176189
}
177190

191+
private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
192+
SeaTunnelRowType seaTunnelRowType) {
193+
String partitionKey = pluginConfig.getString(PARTITION_KEY);
194+
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
195+
if (!fieldNames.contains(partitionKey)) {
196+
return row -> partitionKey;
197+
}
198+
int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
199+
return row -> {
200+
Object partitionFieldValue = row.getField(partitionFieldIndex);
201+
if (partitionFieldValue != null) {
202+
return partitionFieldValue.toString();
203+
}
204+
return null;
205+
};
206+
}
178207
}

0 commit comments

Comments
 (0)