diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcherBuilder.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/AbstractTopicFetcherBuilder.java similarity index 59% rename from inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcherBuilder.java rename to inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/AbstractTopicFetcherBuilder.java index ce90e34e232..b7c0cfde184 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcherBuilder.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/AbstractTopicFetcherBuilder.java @@ -20,32 +20,55 @@ import org.apache.inlong.sdk.sort.entity.InLongTopic; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + /** - * Builder to build single topic fetcher + * Abstract Builder of topic fetcher */ -public abstract class SingleTopicFetcherBuilder implements TopicFetcherBuilder { +public abstract class AbstractTopicFetcherBuilder implements TopicFetcherBuilder { protected Interceptor interceptor; protected Deserializer deserializer; protected ClientContext context; protected InLongTopic topic; + protected List topics; + protected String fetchKey; - public SingleTopicFetcherBuilder interceptor(Interceptor interceptor) { + @Override + public TopicFetcherBuilder interceptor(Interceptor interceptor) { this.interceptor = interceptor; return this; } - public SingleTopicFetcherBuilder topic(InLongTopic topic) { + @Override + public TopicFetcherBuilder topic(InLongTopic topic) { this.topic = topic; return this; } - public SingleTopicFetcherBuilder deserializer(Deserializer deserializer) { + @Override + public TopicFetcherBuilder topic(Collection topics) { + this.topics = new ArrayList<>(topics); + return this; + } + + @Override + public TopicFetcherBuilder deserializer(Deserializer deserializer) { this.deserializer = deserializer; return this; } - public SingleTopicFetcherBuilder context(ClientContext context) { + @Override + public TopicFetcherBuilder context(ClientContext context) { this.context = context; return this; } + + @Override + public TopicFetcherBuilder fetchKey(String fetchKey) { + this.fetchKey = fetchKey; + return this; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java index 557ebaa1bbd..7c49cd6f7bc 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java @@ -39,7 +39,9 @@ public abstract class MultiTopicsFetcher implements TopicFetcher { protected final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true); protected final ScheduledExecutorService executor; + protected final String fetchKey; protected Map onlineTopics; + protected List newTopics; protected ClientContext context; protected Deserializer deserializer; protected volatile Thread fetchThread; @@ -56,7 +58,8 @@ public MultiTopicsFetcher( List topics, ClientContext context, Interceptor interceptor, - Deserializer deserializer) { + Deserializer deserializer, + String fetchKey) { this.onlineTopics = topics.stream() .filter(Objects::nonNull) .collect((Collectors.toMap(InLongTopic::getTopic, t -> t))); @@ -64,6 +67,12 @@ public MultiTopicsFetcher( this.interceptor = interceptor; this.deserializer = deserializer; this.executor = Executors.newSingleThreadScheduledExecutor(); + this.fetchKey = fetchKey; + } + + @Override + public String getFetchKey() { + return fetchKey; } protected boolean needUpdate(Collection newTopics) { diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcher.java index 03c965f5324..6ffb65059ad 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SingleTopicFetcher.java @@ -56,6 +56,11 @@ public SingleTopicFetcher( this.interceptor = Optional.ofNullable(interceptor).orElse(new MsgTimeInterceptor()); } + @Override + public String getFetchKey() { + return topic.getTopicKey(); + } + @Override public boolean updateTopics(List topics) { if (topics.size() != 1) { diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java index 9bf8b5adda0..2a9a1efb05c 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java @@ -59,9 +59,14 @@ public class SortClientConfig implements Serializable { private int maxEmptyPollSleepMs = 500; private int emptyPollTimes = 10; private int cleanOldConsumerIntervalSec = 60; - - public SortClientConfig(String sortTaskId, String sortClusterName, InLongTopicChangeListener assignmentsListener, - ConsumeStrategy consumeStrategy, String localIp) { + private int maxConsumerSize = 5; + + public SortClientConfig( + String sortTaskId, + String sortClusterName, + InLongTopicChangeListener assignmentsListener, + ConsumeStrategy consumeStrategy, + String localIp) { this.sortTaskId = sortTaskId; this.sortClusterName = sortClusterName; this.assignmentsListener = assignmentsListener; @@ -323,6 +328,14 @@ public void setCleanOldConsumerIntervalSec(int cleanOldConsumerIntervalSec) { this.cleanOldConsumerIntervalSec = cleanOldConsumerIntervalSec; } + public int getMaxConsumerSize() { + return maxConsumerSize; + } + + public void setMaxConsumerSize(int maxConsumerSize) { + this.maxConsumerSize = maxConsumerSize; + } + /** * ConsumeStrategy */ @@ -367,7 +380,7 @@ public void setParameters(Map sortSdkParams) { this.managerApiVersion = sortSdkParams.getOrDefault("managerApiVersion", managerApiVersion); String strConsumeStrategy = sortSdkParams.getOrDefault("consumeStrategy", consumeStrategy.name()); String strManagerType = sortSdkParams.getOrDefault("topicManagerType", - TopicType.SINGLE_TOPIC.toString()); + TopicType.MULTI_TOPIC.toString()); this.consumeStrategy = ConsumeStrategy.valueOf(strConsumeStrategy); this.topicType = TopicType.valueOf(strManagerType); @@ -385,5 +398,6 @@ public void setParameters(Map sortSdkParams) { this.emptyPollSleepStepMs = NumberUtils.toInt(sortSdkParams.get("emptyPollSleepStepMs"), emptyPollSleepStepMs); this.maxEmptyPollSleepMs = NumberUtils.toInt(sortSdkParams.get("maxEmptyPollSleepMs"), maxEmptyPollSleepMs); this.emptyPollTimes = NumberUtils.toInt(sortSdkParams.get("emptyPollTimes"), emptyPollTimes); + this.maxConsumerSize = NumberUtils.toInt(sortSdkParams.get("maxConsumerSize"), maxConsumerSize); } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java index a2a21659922..b0141a3edf6 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java @@ -40,6 +40,12 @@ public interface TopicFetcher { */ void ack(String msgOffset) throws Exception; + /** + * Get the unique fetcher key to specify the fetcher who consume this message. + * @return Message key. + */ + String getFetchKey(); + /** * Pause the consuming */ diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcherBuilder.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcherBuilder.java index 5643ba1cced..da6ca2b40cd 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcherBuilder.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcherBuilder.java @@ -18,9 +18,14 @@ package org.apache.inlong.sdk.sort.api; -import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaSingleTopicFetcherBuilder; -import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarSingleTopicFetcherBuilder; -import org.apache.inlong.sdk.sort.fetcher.tube.TubeSingleTopicFetcherBuilder; +import org.apache.inlong.sdk.sort.entity.InLongMessage; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaTopicFetcherBuilderImpl; +import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarTopicFetcherBuilderImpl; +import org.apache.inlong.sdk.sort.fetcher.tube.TubeTopicFetcherBuilderImpl; +import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; + +import java.util.Collection; /** * Interface of topic fetcher builder. @@ -29,19 +34,88 @@ public interface TopicFetcherBuilder { /** * Subscribe topics and build the {@link TopicFetcher} + * * @return The prepared topic fetcher */ TopicFetcher subscribe(); - static KafkaSingleTopicFetcherBuilder kafkaSingleTopic() { - return new KafkaSingleTopicFetcherBuilder(); + /** + * Specify the interceptor of TopicFetcher. + * The interceptor is used to filter or modify the message fetched from MQ. + * The default interceptor is {@link MessageInterceptor}. + * + * @param interceptor Interceptor + * @return TopicFetcherBuilder + */ + TopicFetcherBuilder interceptor(Interceptor interceptor); + + /** + * Specify the topic to be subscribed. + * Repeated call will replace the previous topic. + * + * @param topic Topic to be subscribed. + * @return TopicFetcherBuilder + */ + TopicFetcherBuilder topic(InLongTopic topic); + + /** + * Specify the topics to be subscribed. + * Repeated call will replace the previous topics. + *

+ * This method will removed the topic which is added though TopicFetcherBuilder::topic(InLongTopic topic) + *

+ * + * @param topics Topics to be subscribed. + * @return TopicFetcherBuilder + */ + TopicFetcherBuilder topic(Collection topics); + + /** + * Specify the deserializer of fetcher. + * Deserializer is used to decode the messages fetched from MQ, and arrange them to {@link InLongMessage} format. + * The default deserializer is {@link MessageDeserializer} + * + * @param deserializer Deserializer. + * @return TopicFetcherBuilder + */ + TopicFetcherBuilder deserializer(Deserializer deserializer); + + /** + * Specify the clientContext of topic fetcher + * + * @param context ClientContext. + * @return TopicFetcherBuilder + */ + TopicFetcherBuilder context(ClientContext context); + + /** + * The fetchKey to specify that which one fetcher this message belongs to. + * @param fetchKey Key of fetcher. + * @return TopicFetcherBuilder + */ + TopicFetcherBuilder fetchKey(String fetchKey); + + /** + * Got a kafka topic fetcher builder + * @return KafkaTopicFetcherBuilderImpl + */ + static KafkaTopicFetcherBuilderImpl newKafkaBuilder() { + return new KafkaTopicFetcherBuilderImpl(); } - static PulsarSingleTopicFetcherBuilder pulsarSingleTopic() { - return new PulsarSingleTopicFetcherBuilder(); + /** + * Got a pulsar topic fetcher builder + * @return KafkaTopicFetcherBuilderImpl + */ + static PulsarTopicFetcherBuilderImpl newPulsarBuilder() { + return new PulsarTopicFetcherBuilderImpl(); } - static TubeSingleTopicFetcherBuilder tubeSingleTopic() { - return new TubeSingleTopicFetcherBuilder(); + /** + * Got a tube topic fetcher builder + * @return KafkaTopicFetcherBuilderImpl + */ + static TubeTopicFetcherBuilderImpl newTubeBuilder() { + return new TubeTopicFetcherBuilderImpl(); } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java index 4c8d456c66d..0cf27bc8254 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java @@ -18,79 +18,150 @@ package org.apache.inlong.sdk.sort.fetcher.kafka; +import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.sdk.sort.api.Seeker; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class AckOffsetOnRebalance implements ConsumerRebalanceListener { private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class); + private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000L; + private final long maxWaitForAckTime; private final String clusterId; private final Seeker seeker; private final ConcurrentHashMap commitOffsetMap; private final ConcurrentHashMap> ackOffsetMap; + private final KafkaConsumer consumer; + private final AtomicLong revokedNum = new AtomicLong(0); + private final AtomicLong assignedNum = new AtomicLong(0); public AckOffsetOnRebalance(String clusterId, Seeker seeker, ConcurrentHashMap commitOffsetMap) { - this(clusterId, seeker, commitOffsetMap, null); + this(clusterId, seeker, commitOffsetMap, null, null); } public AckOffsetOnRebalance( String clusterId, Seeker seeker, ConcurrentHashMap commitOffsetMap, - ConcurrentHashMap> ackOffsetMap) { + ConcurrentHashMap> ackOffsetMap, + KafkaConsumer consumer) { + this(clusterId, seeker, commitOffsetMap, ackOffsetMap, consumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME); + } + + public AckOffsetOnRebalance( + String clusterId, + Seeker seeker, + ConcurrentHashMap commitOffsetMap, + ConcurrentHashMap> ackOffsetMap, + KafkaConsumer consumer, + long maxWaitForAckTime) { this.clusterId = clusterId; this.seeker = seeker; this.commitOffsetMap = commitOffsetMap; this.ackOffsetMap = ackOffsetMap; + this.consumer = consumer; + this.maxWaitForAckTime = maxWaitForAckTime; } @Override public void onPartitionsRevoked(Collection collection) { - LOGGER.debug("*- in re-balance:onPartitionsRevoked"); + LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", revokedNum.incrementAndGet()); collection.forEach((v) -> { - LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString()); + LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}", + clusterId, v.toString(), consumer.position(v)); }); - if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) { - ackRevokedPartitions(collection); + + try { + if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) { + // sleep 15s to wait un-ack messages + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < maxWaitForAckTime && !ackReady(collection)) { + TimeUnit.MILLISECONDS.sleep(1000L); + } + ackRemovedTopicPartitions(collection); + } + } catch (Throwable t) { + LOGGER.warn("got exception in onPartitionsRevoked : ", t); } } - private void ackRevokedPartitions(Collection collection) { - collection.forEach(tp -> { - if (!ackOffsetMap.containsKey(tp)) { - return; + private boolean ackReady(Collection revoked) { + for (TopicPartition tp : revoked) { + ConcurrentSkipListMap tpMap = ackOffsetMap.get(tp); + if (Objects.isNull(tpMap)) { + continue; + } + for (Map.Entry entry : tpMap.entrySet()) { + if (!entry.getValue()) { + LOGGER.info("tp {}, offset {} has not been ack, wait", tp, entry.getKey()); + return false; + } } - ConcurrentSkipListMap tpOffsetMap = ackOffsetMap.remove(tp); + } + LOGGER.info("all revoked tp have been ack, re-balance right now."); + return true; + } + + private void ackRemovedTopicPartitions(Collection revoked) { + LOGGER.info("ack revoked topic partitions"); + prepareCommit(); + consumer.commitSync(commitOffsetMap); + // remove revoked topic partitions + Set keySet = ackOffsetMap.keySet(); + revoked.stream() + .filter(keySet::contains) + .forEach(ackOffsetMap::remove); + } + + private void prepareCommit() { + List removeOffsets = new ArrayList<>(); + ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> { + // get the remove list long commitOffset = -1; for (Long ackOffset : tpOffsetMap.keySet()) { if (!tpOffsetMap.get(ackOffset)) { break; } + removeOffsets.add(ackOffset); commitOffset = ackOffset; } // the first haven't ack, do nothing - if (commitOffset == -1) { + if (CollectionUtils.isEmpty(removeOffsets)) { return; } - commitOffsetMap.put(tp, new OffsetAndMetadata(commitOffset)); + + // remove offset and commit offset + removeOffsets.forEach(tpOffsetMap::remove); + removeOffsets.clear(); + commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset)); }); } @Override public void onPartitionsAssigned(Collection collection) { - LOGGER.debug("*- in re-balance:onPartitionsAssigned "); + LOGGER.info("*- in re-balance:onPartitionsAssigned, it is the {} time", assignedNum.incrementAndGet()); collection.forEach((v) -> { - LOGGER.info("clusterId:{},onPartitionsAssigned:{}", clusterId, v.toString()); + long position = consumer.position(v); + LOGGER.debug("clusterId:{},onPartitionsAssigned:{}, position is {}", + clusterId, v.toString(), position); + consumer.seek(v, position + 1); }); seeker.seek(); } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java index 2a4bbdd1530..77fd169f557 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java @@ -44,7 +44,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,8 +71,9 @@ public KafkaMultiTopicsFetcher( ClientContext context, Interceptor interceptor, Deserializer deserializer, - String bootstrapServers) { - super(topics, context, interceptor, deserializer); + String bootstrapServers, + String fetchKey) { + super(topics, context, interceptor, deserializer, fetchKey); this.bootstrapServers = bootstrapServers; this.commitOffsetMap = new ConcurrentHashMap<>(); this.ackOffsetMap = new ConcurrentHashMap<>(); @@ -86,8 +86,14 @@ public boolean init() { InLongTopic topic = onlineTopics.values().stream().findFirst().get(); this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic); this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker, - commitOffsetMap); + commitOffsetMap, ackOffsetMap, consumer); consumer.subscribe(onlineTopics.keySet(), listener); + LOGGER.info("init kafka multi topic fetcher success, bootstrap is {}, fetchKey is {}", + bootstrapServers, fetchKey); + String threadName = String.format("sort_sdk_kafka_multi_topic_fetch_thread_%s_%s", + topic.getInLongCluster().getClusterId(), this.fetchKey); + this.fetchThread = new Thread(new KafkaMultiTopicsFetcher.Fetcher(), threadName); + fetchThread.start(); return true; } catch (Throwable t) { LOGGER.error("failed to init kafka consumer: ", t); @@ -120,7 +126,6 @@ private KafkaConsumer createKafkaConsumer() { context.getConfig().getKafkaFetchSizeBytes()); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, context.getConfig().getKafkaFetchWaitMs()); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L); @@ -130,6 +135,7 @@ private KafkaConsumer createKafkaConsumer() { @Override public void ack(String msgOffset) throws Exception { + LOGGER.info("ack {}", msgOffset); // the format of multi topic kafka fetcher msg offset is topic:partitionId:offset, such as topic1:20:1746839 String[] offset = msgOffset.split(":"); if (offset.length != 3) { @@ -143,7 +149,7 @@ public void ack(String msgOffset) throws Exception { // ack if (!ackOffsetMap.containsKey(topicPartition) || !ackOffsetMap.get(topicPartition).containsKey(ackOffset)) { - LOGGER.warn("did not find offsetMap or ack offset of {}, offset {}, just ignore it", + LOGGER.warn("did not find offsetMap to ack offset of {}, offset {}, just ignore it", topicPartition, ackOffset); return; } @@ -178,6 +184,7 @@ public boolean close() { consumer.commitSync(commitOffsetMap); consumer.close(); } + ackOffsetMap.clear(); commitOffsetMap.clear(); } catch (Throwable t) { LOGGER.warn("got exception in multi topic fetcher close: ", t); @@ -209,69 +216,66 @@ public List getTopics() { @Override public boolean updateTopics(List topics) { if (needUpdate(topics)) { - return updateAll(topics); + LOGGER.info("need to update topic"); + newTopics = topics; + return true; } LOGGER.info("no need to update topics"); return false; } - private boolean updateAll(Collection newTopics) { - if (CollectionUtils.isEmpty(newTopics)) { - LOGGER.error("new topics is empty or null"); - return false; - } - - // stop - this.setStopConsume(true); - - // update - this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t)); - InLongTopic topic = onlineTopics.values().stream().findFirst().get(); - this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic); - this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker, - commitOffsetMap, ackOffsetMap); - Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic)); - - // subscribe new - consumer.subscribe(onlineTopics.keySet(), listener); - - // resume - this.setStopConsume(false); - return true; - } - private void prepareCommit() { + List removeOffsets = new ArrayList<>(); ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> { - synchronized (tpOffsetMap) { - // get the remove list - List removeOffsets = new ArrayList<>(); - long commitOffset = -1; - for (Long ackOffset : tpOffsetMap.keySet()) { - if (!tpOffsetMap.get(ackOffset)) { - break; - } - removeOffsets.add(ackOffset); - commitOffset = ackOffset; - } - // the first haven't ack, do nothing - if (commitOffset == -1) { - return; + // get the remove list + long commitOffset = -1; + for (Long ackOffset : tpOffsetMap.keySet()) { + if (!tpOffsetMap.get(ackOffset)) { + break; } - - // remove offset and commit offset - removeOffsets.forEach(tpOffsetMap::remove); - commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset)); + removeOffsets.add(ackOffset); + commitOffset = ackOffset; + } + // the first haven't ack, do nothing + if (CollectionUtils.isEmpty(removeOffsets)) { + return; } + + // remove offset and commit offset + removeOffsets.forEach(tpOffsetMap::remove); + removeOffsets.clear(); + commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset)); }); } public class Fetcher implements Runnable { + private boolean subscribeNew() { + if (CollectionUtils.isEmpty(newTopics)) { + return false; + } + LOGGER.info("start to update topics"); + try { + // update + onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t)); + InLongTopic topic = onlineTopics.values().stream().findFirst().get(); + seeker = SeekerFactory.createKafkaSeeker(consumer, topic); + Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic)); + LOGGER.info("new subscribe topic is {}", onlineTopics.keySet()); + // subscribe new + consumer.subscribe(onlineTopics.keySet(), listener); + return true; + } finally { + newTopics = null; + } + } + private void commitKafkaOffset() { prepareCommit(); if (consumer != null) { try { - consumer.commitAsync(commitOffsetMap, null); + LOGGER.info("commit {}", commitOffsetMap); + consumer.commitSync(commitOffsetMap); commitOffsetMap.clear(); } catch (Exception e) { LOGGER.error("commit kafka offset failed: ", e); @@ -329,10 +333,12 @@ public void run() { context.acquireRequestPermit(); hasPermit = true; - // fetch from kafka - fetchFromKafka(); + // update topics + subscribeNew(); // commit commitKafkaOffset(); + // fetch from kafka + fetchFromKafka(); } catch (Exception e) { context.getDefaultStateCounter().addFetchErrorTimes(1); LOGGER.error("failed in kafka multi topic fetcher: ", e); @@ -351,6 +357,7 @@ private void fetchFromKafka() throws Exception { ConsumerRecords records = consumer .poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs())); context.getDefaultStateCounter().addFetchTimeCost(System.currentTimeMillis() - startFetchTime); + LOGGER.info("fetch time is {}", System.currentTimeMillis() - startFetchTime); if (null != records && !records.isEmpty()) { for (ConsumerRecord msg : records) { @@ -366,7 +373,7 @@ private void fetchFromKafka() throws Exception { continue; } - msgs.add(new MessageRecord(topic.getTopicKey(), + msgs.add(new MessageRecord(fetchKey, inLongMessages, offsetKey, System.currentTimeMillis())); context.getStateCounterByTopic(topic).addConsumeSize(msg.value().length); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcherBuilder.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaTopicFetcherBuilderImpl.java similarity index 57% rename from inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcherBuilder.java rename to inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaTopicFetcherBuilderImpl.java index 0fb5bedd25b..a623bde6a03 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcherBuilder.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaTopicFetcherBuilderImpl.java @@ -18,36 +18,48 @@ package org.apache.inlong.sdk.sort.fetcher.kafka; -import org.apache.inlong.sdk.sort.api.SingleTopicFetcherBuilder; +import org.apache.commons.collections.CollectionUtils; +import org.apache.inlong.sdk.sort.api.AbstractTopicFetcherBuilder; import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.entity.InLongTopic; import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor; +import java.util.Objects; import java.util.Optional; +import java.util.Random; /** - * Builder of kafka single topic fetcher. + * Builder of kafka topic fetcher. */ -public class KafkaSingleTopicFetcherBuilder extends SingleTopicFetcherBuilder { +public class KafkaTopicFetcherBuilderImpl extends AbstractTopicFetcherBuilder { private String bootstrapServers; - public KafkaSingleTopicFetcherBuilder bootstrapServers(String bootstrapServers) { + public KafkaTopicFetcherBuilderImpl bootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; return this; } @Override public TopicFetcher subscribe() { - Optional.ofNullable(topic) - .orElseThrow(() -> new IllegalStateException("subscribe kafka single topic, but never assign topic")); Optional.ofNullable(context) .orElseThrow(() -> new IllegalStateException("context is null")); Optional.ofNullable(bootstrapServers) .orElseThrow(() -> new IllegalStateException("kafka bootstrapServers is null")); + deserializer = Optional.ofNullable(deserializer).orElse(new MessageDeserializer()); + if (CollectionUtils.isNotEmpty(topics)) { + return subscribeMultiTopic(); + } else if (Objects.nonNull(topic)) { + return subscribeSingleTopic(); + } else { + throw new IllegalArgumentException("subscribe kafka fetcher, but never assign any topic"); + } + } + + private TopicFetcher subscribeSingleTopic() { interceptor = Optional.ofNullable(interceptor).orElse(new MsgTimeInterceptor()); interceptor.configure(topic); - deserializer = Optional.ofNullable(deserializer).orElse(new MessageDeserializer()); TopicFetcher kafkaSingleTopicFetcher = new KafkaSingleTopicFetcher(topic, context, interceptor, deserializer, bootstrapServers); if (!kafkaSingleTopicFetcher.init()) { @@ -56,4 +68,18 @@ public TopicFetcher subscribe() { return kafkaSingleTopicFetcher; } + private TopicFetcher subscribeMultiTopic() { + InLongTopic firstTopic = topics.stream().findFirst().get(); + interceptor = Optional.ofNullable(interceptor).orElse(new MsgTimeInterceptor()); + interceptor.configure(firstTopic); + String key = Optional.ofNullable(fetchKey) + .orElse(firstTopic.getInLongCluster().getClusterId() + new Random().nextLong()); + TopicFetcher kafkaMultiTopicFetcher = + new KafkaMultiTopicsFetcher(topics, context, interceptor, deserializer, bootstrapServers, key); + if (!kafkaMultiTopicFetcher.init()) { + throw new IllegalStateException("init kafka multi topic fetcher failed"); + } + return kafkaMultiTopicFetcher; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java index b3f263087d2..4ae98f2f174 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java @@ -65,8 +65,9 @@ public PulsarMultiTopicsFetcher( ClientContext context, Interceptor interceptor, Deserializer deserializer, - PulsarClient pulsarClient) { - super(topics, context, interceptor, deserializer); + PulsarClient pulsarClient, + String fetchKey) { + super(topics, context, interceptor, deserializer, fetchKey); this.pulsarClient = Preconditions.checkNotNull(pulsarClient); } @@ -80,7 +81,8 @@ public boolean init() { this.currentConsumer = new PulsarConsumer(newConsumer); InLongTopic firstTopic = onlineTopics.values().stream().findFirst().get(); this.seeker = SeekerFactory.createPulsarSeeker(newConsumer, firstTopic); - String threadName = String.format("sort_sdk_pulsar_multi_topic_fetch_thread_%d", this.hashCode()); + String threadName = String.format("sort_sdk_pulsar_multi_topic_fetch_thread_%s_%s", + firstTopic.getInLongCluster().getClusterId(), this.fetchKey); this.fetchThread = new Thread(new PulsarMultiTopicsFetcher.Fetcher(), threadName); this.fetchThread.start(); this.executor.scheduleWithFixedDelay(this::clearRemovedConsumerList, @@ -245,18 +247,14 @@ public boolean close() { mainLock.writeLock().lock(); try { this.setStopConsume(true); + toBeRemovedConsumers.add(currentConsumer); LOGGER.info("closed online topics {}", onlineTopics); try { - if (currentConsumer != null) { - currentConsumer.close(); - } if (fetchThread != null) { fetchThread.interrupt(); } - } catch (PulsarClientException e) { - LOGGER.warn("close pulsar client: ", e); } catch (Throwable t) { - LOGGER.warn("got exception in close multi topic fetcher: ", t); + LOGGER.warn("got exception in close fetcher thread: ", t); } toBeRemovedConsumers.stream() .filter(Objects::nonNull) @@ -264,10 +262,15 @@ public boolean close() { try { c.close(); } catch (PulsarClientException e) { - LOGGER.warn("close pulsar client: ", e); + LOGGER.warn("got exception in close pulsar consumer: ", e); } }); toBeRemovedConsumers.clear(); + try { + pulsarClient.close(); + } catch (PulsarClientException e) { + LOGGER.warn("got exception in close pulsar client: ", e); + } return true; } finally { this.closed = true; @@ -354,7 +357,7 @@ private void processPulsarMsg(Messages messages) throws Exception { continue; } List msgs = new ArrayList<>(); - msgs.add(new MessageRecord(topic.getTopicKey(), + msgs.add(new MessageRecord(fetchKey, inLongMessages, offsetKey, System.currentTimeMillis())); context.getStateCounterByTopic(topic).addConsumeSize(msg.getData().length); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcherBuilder.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarTopicFetcherBuilderImpl.java similarity index 58% rename from inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcherBuilder.java rename to inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarTopicFetcherBuilderImpl.java index 1c29b039158..82482a8bb3e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcherBuilder.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarTopicFetcherBuilderImpl.java @@ -18,38 +18,50 @@ package org.apache.inlong.sdk.sort.fetcher.pulsar; -import org.apache.inlong.sdk.sort.api.SingleTopicFetcherBuilder; +import org.apache.commons.collections.CollectionUtils; +import org.apache.inlong.sdk.sort.api.AbstractTopicFetcherBuilder; import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.entity.InLongTopic; import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor; import org.apache.pulsar.client.api.PulsarClient; +import java.util.Objects; import java.util.Optional; +import java.util.Random; /** * Builder of pulsar single topic fetcher. */ -public class PulsarSingleTopicFetcherBuilder extends SingleTopicFetcherBuilder { +public class PulsarTopicFetcherBuilderImpl extends AbstractTopicFetcherBuilder { PulsarClient pulsarClient; - public PulsarSingleTopicFetcherBuilder pulsarClient(PulsarClient pulsarClient) { + public PulsarTopicFetcherBuilderImpl pulsarClient(PulsarClient pulsarClient) { this.pulsarClient = pulsarClient; return this; } @Override public TopicFetcher subscribe() { - Optional.ofNullable(topic) - .orElseThrow(() -> new IllegalStateException("subscribe pulsar single topic, but never assign topic")); Optional.ofNullable(context) .orElseThrow(() -> new IllegalStateException("context is null")); Optional.ofNullable(pulsarClient) .orElseThrow(() -> new IllegalStateException("pulsar client is null")); + deserializer = Optional.ofNullable(deserializer).orElse(new MessageDeserializer()); + if (CollectionUtils.isNotEmpty(topics)) { + return subscribeMultiTopic(); + } else if (Objects.nonNull(topic)) { + return subscribeSingleTopic(); + } else { + throw new IllegalArgumentException("subscribe pulsar fetcher, but never assign any topic"); + } + } + + private TopicFetcher subscribeSingleTopic() { interceptor = Optional.ofNullable(interceptor).orElse(new MsgTimeInterceptor()); interceptor.configure(topic); - deserializer = Optional.ofNullable(deserializer).orElse(new MessageDeserializer()); - PulsarSingleTopicFetcher fetcher = + TopicFetcher fetcher = new PulsarSingleTopicFetcher(topic, context, interceptor, deserializer, pulsarClient); if (!fetcher.init()) { throw new IllegalStateException("init pulsar single topic fetcher failed"); @@ -57,4 +69,18 @@ public TopicFetcher subscribe() { return fetcher; } + private TopicFetcher subscribeMultiTopic() { + InLongTopic firstTopic = topics.stream().findFirst().get(); + interceptor = Optional.ofNullable(interceptor).orElse(new MsgTimeInterceptor()); + interceptor.configure(firstTopic); + String key = Optional.ofNullable(fetchKey) + .orElse(firstTopic.getInLongCluster().getClusterId() + new Random().nextLong()); + TopicFetcher fetcher = + new PulsarMultiTopicsFetcher(topics, context, interceptor, deserializer, pulsarClient, key); + if (!fetcher.init()) { + throw new IllegalStateException("init pulsar multi topic fetcher failed"); + } + return fetcher; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcherBuilder.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeTopicFetcherBuilderImpl.java similarity index 78% rename from inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcherBuilder.java rename to inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeTopicFetcherBuilderImpl.java index 160548d5ce0..5edcf3f3433 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcherBuilder.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeTopicFetcherBuilderImpl.java @@ -18,25 +18,34 @@ package org.apache.inlong.sdk.sort.fetcher.tube; -import org.apache.inlong.sdk.sort.api.SingleTopicFetcherBuilder; +import org.apache.inlong.sdk.sort.api.AbstractTopicFetcherBuilder; import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder; +import org.apache.inlong.sdk.sort.entity.InLongTopic; import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor; +import java.util.Collection; import java.util.Optional; /** * Builder of tube single topic fetcher. */ -public class TubeSingleTopicFetcherBuilder extends SingleTopicFetcherBuilder { +public class TubeTopicFetcherBuilderImpl extends AbstractTopicFetcherBuilder { private TubeConsumerCreator tubeConsumerCreator; - public TubeSingleTopicFetcherBuilder tubeConsumerCreater(TubeConsumerCreator tubeConsumerCreator) { + public TubeTopicFetcherBuilderImpl tubeConsumerCreater(TubeConsumerCreator tubeConsumerCreator) { this.tubeConsumerCreator = tubeConsumerCreator; return this; } + @Override + public TopicFetcherBuilder topic(Collection topics) { + throw new IllegalArgumentException("tube topic fetcher do not support multi topics, " + + "plz call the single topic method"); + } + @Override public TopicFetcher subscribe() { Optional.ofNullable(topic) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index 1e070d10d16..0079c205f6c 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -112,21 +112,21 @@ public TopicFetcher addTopic(InLongTopic topic) { private TopicFetcher createInLongTopicFetcher(InLongTopic topic) { if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType())) { LOGGER.info("the topic is pulsar {}", topic); - return TopicFetcherBuilder.pulsarSingleTopic() + return TopicFetcherBuilder.newPulsarBuilder() .pulsarClient(pulsarClients.get(topic.getInLongCluster().getClusterId())) .topic(topic) .context(context) .subscribe(); } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType())) { LOGGER.info("the topic is kafka {}", topic); - return TopicFetcherBuilder.kafkaSingleTopic() + return TopicFetcherBuilder.newKafkaBuilder() .bootstrapServers(topic.getInLongCluster().getBootstraps()) .topic(topic) .context(context) .subscribe(); } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType())) { LOGGER.info("the topic is tube {}", topic); - return TopicFetcherBuilder.tubeSingleTopic() + return TopicFetcherBuilder.newTubeBuilder() .tubeConsumerCreater(tubeFactories.get(topic.getInLongCluster().getClusterId())) .topic(topic) .context(context)