Skip to content

Commit

Permalink
[Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRo…
Browse files Browse the repository at this point in the history
…w field (#3742)
  • Loading branch information
TaoZex committed Jan 20, 2023
1 parent 73b523d commit 8aff807
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 56 deletions.
18 changes: 17 additions & 1 deletion docs/en/connector-v2/sink/Kafka.md
Expand Up @@ -31,6 +31,21 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on

Kafka Topic.

Currently two formats are supported:

1. Fill in the name of the topic.

2. Use value of a field from upstream data as topic,the format is `${your field name}`, where topic is the value of one of the columns of the upstream data.

For example, Upstream data is the following:

| name | age | data |
| ---- | ---- | ------------- |
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.

### bootstrap.servers [string]

Kafka Brokers List.
Expand Down Expand Up @@ -190,4 +205,5 @@ sink {

- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
Expand Up @@ -37,36 +37,37 @@
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {

private Integer partition;
private final String topic;
private final SerializationSchema keySerialization;
private final SerializationSchema valueSerialization;

public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType, String format, String delimiter) {
this(topic, element -> null, createSerializationSchema(seaTunnelRowType, format, delimiter));
public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType,
String format,
String delimiter) {
this(element -> null, createSerializationSchema(seaTunnelRowType, format, delimiter));
}

public DefaultSeaTunnelRowSerializer(String topic, Integer partition, SeaTunnelRowType seaTunnelRowType, String format, String delimiter) {
this(topic, seaTunnelRowType, format, delimiter);
public DefaultSeaTunnelRowSerializer(Integer partition,
SeaTunnelRowType seaTunnelRowType,
String format, String delimiter) {
this(seaTunnelRowType, format, delimiter);
this.partition = partition;
}

public DefaultSeaTunnelRowSerializer(String topic, List<String> keyFieldNames,
public DefaultSeaTunnelRowSerializer(List<String> keyFieldNames,
SeaTunnelRowType seaTunnelRowType,
String format, String delimiter) {
this(topic, createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
this(createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
createSerializationSchema(seaTunnelRowType, format, delimiter));
}

public DefaultSeaTunnelRowSerializer(String topic,
SerializationSchema keySerialization,
public DefaultSeaTunnelRowSerializer(SerializationSchema keySerialization,
SerializationSchema valueSerialization) {
this.topic = topic;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
}

@Override
public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
public ProducerRecord<byte[], byte[]> serializeRow(String topic, SeaTunnelRow row) {
return new ProducerRecord<>(topic, partition,
keySerialization.serialize(row), valueSerialization.serialize(row));
}
Expand Down
Expand Up @@ -29,5 +29,5 @@ public interface SeaTunnelRowSerializer<K, V> {
* @param row seatunnel row
* @return kafka record.
*/
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
ProducerRecord<K, V> serializeRow(String topic, SeaTunnelRow row);
}
Expand Up @@ -42,6 +42,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand All @@ -52,6 +53,8 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
Expand All @@ -61,7 +64,10 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
private final SinkWriter.Context context;

private String transactionPrefix;
private String topic;
private long lastCheckpointId = 0;
private boolean isExtractTopic;
private SeaTunnelRowType seaTunnelRowType;

private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
Expand All @@ -74,6 +80,10 @@ public KafkaSinkWriter(
Config pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.seaTunnelRowType = seaTunnelRowType;
Pair<Boolean, String> topicResult = isExtractTopic(pluginConfig.getString(TOPIC.key()));
this.isExtractTopic = topicResult.getKey();
this.topic = topicResult.getRight();
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
}
Expand Down Expand Up @@ -102,7 +112,7 @@ public KafkaSinkWriter(

@Override
public void write(SeaTunnelRow element) {
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(extractTopic(element), element);
kafkaProducerSender.send(producerRecord);
}

Expand Down Expand Up @@ -159,10 +169,10 @@ private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
}
if (pluginConfig.hasPath(PARTITION.key())) {
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
return new DefaultSeaTunnelRowSerializer(
pluginConfig.getInt(PARTITION.key()), seaTunnelRowType, format, delimiter);
} else {
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
return new DefaultSeaTunnelRowSerializer(
getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType, format, delimiter);
}
}
Expand Down Expand Up @@ -199,4 +209,32 @@ private List<String> getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType
}
return Collections.emptyList();
}

private Pair<Boolean, String> isExtractTopic(String topicConfig){
String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
Matcher matcher = pattern.matcher(topicConfig);
if (matcher.find()) {
return Pair.of(true, matcher.group(1));
}
return Pair.of(false, topicConfig);
}

private String extractTopic(SeaTunnelRow row) {
if (!isExtractTopic) {
return topic;
}
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(topic)) {
throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
String.format("Field name { %s } is not found!", topic));
}
int topicFieldIndex = seaTunnelRowType.indexOf(topic);
Object topicFieldValue = row.getField(topicFieldIndex);
if (topicFieldValue == null) {
throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"The column value is empty!");
}
return topicFieldValue.toString();
}
}
Expand Up @@ -51,6 +51,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-transforms-v2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Expand Up @@ -106,8 +106,8 @@ public void startUp() throws Exception {
.untilAsserted(() -> initKafkaProducer());

log.info("Write 100 records to topic test_topic_source");
DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow("test_topic_source", row), 0, 100);
}

@AfterAll
Expand All @@ -127,25 +127,8 @@ public void testSinkKafka(TestContainer container) throws IOException, Interrupt
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_topic";
Map<String, String> data = new HashMap<>();
Map<String, String> data = getKafkaConsumerData(topicName);
ObjectMapper objectMapper = new ObjectMapper();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
Long endOffset = offsets.entrySet().iterator().next().getValue();
Long lastProcessedOffset = -1L;

do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (lastProcessedOffset < record.offset()) {

data.put(record.key(), record.value());
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Expand All @@ -159,23 +142,22 @@ public void testTextFormatSinkKafka(TestContainer container) throws IOException,
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_text_topic";
Map<String, String> data = new HashMap<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
Long endOffset = offsets.entrySet().iterator().next().getValue();
Long lastProcessedOffset = -1L;
Map<String, String> data = getKafkaConsumerData(topicName);
Assertions.assertEquals(10, data.size());
}

do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (lastProcessedOffset < record.offset()) {
data.put(record.key(), record.value());
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
@TestTemplate
public void testExtractTopicFunction(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/extractTopic_fake_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_extract_topic";
Map<String, String> data = getKafkaConsumerData(topicName);
ObjectMapper objectMapper = new ObjectMapper();
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Assertions.assertTrue(objectNode.has("c_string"));
Assertions.assertEquals(10, data.size());
}

Expand All @@ -192,8 +174,8 @@ public void testSourceKafkaTextToConsole(TestContainer container) throws IOExcep

@TestTemplate
public void testSourceKafkaJsonToConsole(TestContainer container) throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow("test_topic_json", row), 0, 100);
Container.ExecResult execResult = container.executeJob("/kafkasource_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
Expand All @@ -208,8 +190,8 @@ public void testSourceKafka(TestContainer container) throws IOException, Interru

@TestTemplate
public void testSourceKafkaStartConfig(TestContainer container) throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 100, 150);
DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer(SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow("test_topic_group", row), 100, 150);
testKafkaGroupOffsetsToConsole(container);
}

Expand Down Expand Up @@ -320,6 +302,27 @@ private void generateTestData(ProducerRecordConverter converter, int start, int
}
);

private Map<String, String> getKafkaConsumerData(String topicName){
Map<String, String> data = new HashMap<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
Long endOffset = offsets.entrySet().iterator().next().getValue();
Long lastProcessedOffset = -1L;

do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (lastProcessedOffset < record.offset()) {
data.put(record.key(), record.value());
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
return data;
}

interface ProducerRecordConverter {
ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
}
Expand Down
@@ -0,0 +1,77 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
result_table_name = "fake"
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, smallint>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
Replace {
source_table_name = "fake"
result_table_name = "fake1"
replace_field = "c_string"
pattern = ".+"
replacement = "test_extract_topic"
is_regex = true
}
}

sink {
Kafka {
source_table_name = "fake1"
bootstrap.servers = "kafkaCluster:9092"
topic = "${c_string}"
partition_key_fields = ["c_map","c_string"]
}
}

0 comments on commit 8aff807

Please sign in to comment.