diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java index 1257af0423d..ffec8130e06 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java @@ -42,6 +42,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -52,7 +53,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Random; -import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -67,10 +67,10 @@ public class KafkaSinkWriter implements SinkWriter kafkaProducerSender; private final SeaTunnelRowSerializer seaTunnelRowSerializer; - private final Function topicExtractor; private static final int PREFIX_RANGE = 10000; @@ -80,9 +80,10 @@ public KafkaSinkWriter( Config pluginConfig, List kafkaStates) { this.context = context; - this.topic = pluginConfig.getString(TOPIC.key()); - this.isExtractTopic = setExtractTopic(this.topic); - this.topicExtractor = createTopicExtractor(this.topic, seaTunnelRowType); + this.seaTunnelRowType = seaTunnelRowType; + Pair topicResult = isExtractTopic(pluginConfig.getString(TOPIC.key())); + this.isExtractTopic = topicResult.getKey(); + this.topic = topicResult.getRight(); if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) { MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key())); } @@ -111,10 +112,7 @@ public KafkaSinkWriter( @Override public void write(SeaTunnelRow element) { - if (isExtractTopic) { - topic = topicExtractor.apply(element); - } - ProducerRecord producerRecord = seaTunnelRowSerializer.serializeRow(topic, element); + ProducerRecord producerRecord = seaTunnelRowSerializer.serializeRow(extractTopic(element), element); kafkaProducerSender.send(producerRecord); } @@ -212,37 +210,31 @@ private List getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType return Collections.emptyList(); } - private boolean setExtractTopic(String topicConfig){ + private Pair isExtractTopic(String topicConfig){ String regex = "\\$\\{(.*?)\\}"; Pattern pattern = Pattern.compile(regex, Pattern.DOTALL); Matcher matcher = pattern.matcher(topicConfig); if (matcher.find()) { - return true; + return Pair.of(true, matcher.group(1)); } - return false; + return Pair.of(false, topicConfig); } - private Function createTopicExtractor(String topicConfig, SeaTunnelRowType seaTunnelRowType) { - String regex = "\\$\\{(.*?)\\}"; - Pattern pattern = Pattern.compile(regex, Pattern.DOTALL); - Matcher matcher = pattern.matcher(topicConfig); - if (!matcher.find()) { - return row -> topicConfig; + private String extractTopic(SeaTunnelRow row) { + if (!isExtractTopic) { + return topic; } - String topicField = matcher.group(1); List fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames()); - if (!fieldNames.contains(topicField)) { + if (!fieldNames.contains(topic)) { throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, - String.format("Field name { %s } is not found!", topicField)); - } - int topicFieldIndex = seaTunnelRowType.indexOf(topicField); - return row -> { - Object topicFieldValue = row.getField(topicFieldIndex); - if (topicFieldValue == null) { - throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, - "The column value is empty!"); - } - return topicFieldValue.toString(); - }; + String.format("Field name { %s } is not found!", topic)); + } + int topicFieldIndex = seaTunnelRowType.indexOf(topic); + Object topicFieldValue = row.getField(topicFieldIndex); + if (topicFieldValue == null) { + throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + "The column value is empty!"); + } + return topicFieldValue.toString(); } }