Skip to content

Commit

Permalink
fix code to improve efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
TaoZex committed Jan 20, 2023
1 parent 920d843 commit a6d9635
Showing 1 changed file with 23 additions and 31 deletions.
Expand Up @@ -42,6 +42,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand All @@ -52,7 +53,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -67,10 +67,10 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
private String topic;
private long lastCheckpointId = 0;
private boolean isExtractTopic;
private SeaTunnelRowType seaTunnelRowType;

private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
private final Function<SeaTunnelRow, String> topicExtractor;

private static final int PREFIX_RANGE = 10000;

Expand All @@ -80,9 +80,10 @@ public KafkaSinkWriter(
Config pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.topic = pluginConfig.getString(TOPIC.key());
this.isExtractTopic = setExtractTopic(this.topic);
this.topicExtractor = createTopicExtractor(this.topic, seaTunnelRowType);
this.seaTunnelRowType = seaTunnelRowType;
Pair<Boolean, String> topicResult = isExtractTopic(pluginConfig.getString(TOPIC.key()));
this.isExtractTopic = topicResult.getKey();
this.topic = topicResult.getRight();
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
}
Expand Down Expand Up @@ -111,10 +112,7 @@ public KafkaSinkWriter(

@Override
public void write(SeaTunnelRow element) {
if (isExtractTopic) {
topic = topicExtractor.apply(element);
}
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(topic, element);
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(extractTopic(element), element);
kafkaProducerSender.send(producerRecord);
}

Expand Down Expand Up @@ -212,37 +210,31 @@ private List<String> getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType
return Collections.emptyList();
}

private boolean setExtractTopic(String topicConfig){
private Pair<Boolean, String> isExtractTopic(String topicConfig){
String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
Matcher matcher = pattern.matcher(topicConfig);
if (matcher.find()) {
return true;
return Pair.of(true, matcher.group(1));
}
return false;
return Pair.of(false, topicConfig);
}

private Function<SeaTunnelRow, String> createTopicExtractor(String topicConfig, SeaTunnelRowType seaTunnelRowType) {
String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
Matcher matcher = pattern.matcher(topicConfig);
if (!matcher.find()) {
return row -> topicConfig;
private String extractTopic(SeaTunnelRow row) {
if (!isExtractTopic) {
return topic;
}
String topicField = matcher.group(1);
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(topicField)) {
if (!fieldNames.contains(topic)) {
throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
String.format("Field name { %s } is not found!", topicField));
}
int topicFieldIndex = seaTunnelRowType.indexOf(topicField);
return row -> {
Object topicFieldValue = row.getField(topicFieldIndex);
if (topicFieldValue == null) {
throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"The column value is empty!");
}
return topicFieldValue.toString();
};
String.format("Field name { %s } is not found!", topic));
}
int topicFieldIndex = seaTunnelRowType.indexOf(topic);
Object topicFieldValue = row.getField(topicFieldIndex);
if (topicFieldValue == null) {
throw new KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"The column value is empty!");
}
return topicFieldValue.toString();
}
}

0 comments on commit a6d9635

Please sign in to comment.