diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java index 7f7ab6ec40f..557ebaa1bbd 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java @@ -19,6 +19,7 @@ import org.apache.inlong.sdk.sort.entity.InLongTopic; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; @@ -65,4 +66,20 @@ public MultiTopicsFetcher( this.executor = Executors.newSingleThreadScheduledExecutor(); } + protected boolean needUpdate(Collection newTopics) { + if (newTopics.size() != onlineTopics.size()) { + return true; + } + // all topic should share the same properties in one task + if (Objects.equals(newTopics.stream().findFirst(), onlineTopics.values().stream().findFirst())) { + return true; + } + for (InLongTopic topic : newTopics) { + if (!onlineTopics.containsKey(topic.getTopic())) { + return true; + } + } + return false; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java index 1aec582fb33..4c8d456c66d 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java @@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; public class AckOffsetOnRebalance implements ConsumerRebalanceListener { @@ -34,12 +36,22 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener { private final String clusterId; private final Seeker seeker; private final ConcurrentHashMap commitOffsetMap; + private final ConcurrentHashMap> ackOffsetMap; public AckOffsetOnRebalance(String clusterId, Seeker seeker, ConcurrentHashMap commitOffsetMap) { + this(clusterId, seeker, commitOffsetMap, null); + } + + public AckOffsetOnRebalance( + String clusterId, + Seeker seeker, + ConcurrentHashMap commitOffsetMap, + ConcurrentHashMap> ackOffsetMap) { this.clusterId = clusterId; this.seeker = seeker; this.commitOffsetMap = commitOffsetMap; + this.ackOffsetMap = ackOffsetMap; } @Override @@ -48,6 +60,30 @@ public void onPartitionsRevoked(Collection collection) { collection.forEach((v) -> { LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString()); }); + if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) { + ackRevokedPartitions(collection); + } + } + + private void ackRevokedPartitions(Collection collection) { + collection.forEach(tp -> { + if (!ackOffsetMap.containsKey(tp)) { + return; + } + ConcurrentSkipListMap tpOffsetMap = ackOffsetMap.remove(tp); + long commitOffset = -1; + for (Long ackOffset : tpOffsetMap.keySet()) { + if (!tpOffsetMap.get(ackOffset)) { + break; + } + commitOffset = ackOffset; + } + // the first haven't ack, do nothing + if (commitOffset == -1) { + return; + } + commitOffsetMap.put(tp, new OffsetAndMetadata(commitOffset)); + }); } @Override diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java new file mode 100644 index 00000000000..2a4bbdd1530 --- /dev/null +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.fetcher.kafka; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.Deserializer; +import org.apache.inlong.sdk.sort.api.Interceptor; +import org.apache.inlong.sdk.sort.api.MultiTopicsFetcher; +import org.apache.inlong.sdk.sort.api.SeekerFactory; +import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.entity.InLongMessage; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.entity.MessageRecord; +import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarMultiTopicsFetcher; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Kafka multi topics fetcher + */ +public class KafkaMultiTopicsFetcher extends MultiTopicsFetcher { + private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMultiTopicsFetcher.class); + private final ConcurrentHashMap commitOffsetMap; + private final ConcurrentHashMap> ackOffsetMap; + private final String bootstrapServers; + private ConsumerRebalanceListener listener; + private KafkaConsumer consumer; + + public KafkaMultiTopicsFetcher( + List topics, + ClientContext context, + Interceptor interceptor, + Deserializer deserializer, + String bootstrapServers) { + super(topics, context, interceptor, deserializer); + this.bootstrapServers = bootstrapServers; + this.commitOffsetMap = new ConcurrentHashMap<>(); + this.ackOffsetMap = new ConcurrentHashMap<>(); + } + + @Override + public boolean init() { + try { + this.consumer = createKafkaConsumer(); + InLongTopic topic = onlineTopics.values().stream().findFirst().get(); + this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic); + this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker, + commitOffsetMap); + consumer.subscribe(onlineTopics.keySet(), listener); + return true; + } catch (Throwable t) { + LOGGER.error("failed to init kafka consumer: ", t); + return false; + } + } + + private KafkaConsumer createKafkaConsumer() { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, + context.getConfig().getKafkaSocketRecvBufferSize()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + SortClientConfig.ConsumeStrategy offsetResetStrategy = context.getConfig().getOffsetResetStrategy(); + if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest + || offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest_absolutely) { + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + } else if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest + || offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest_absolutely) { + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else { + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + } + properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, + context.getConfig().getKafkaFetchSizeBytes()); + properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, + context.getConfig().getKafkaFetchWaitMs()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + RangeAssignor.class.getName()); + properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L); + LOGGER.info("start to create kafka consumer:{}", properties); + return new KafkaConsumer<>(properties); + } + + @Override + public void ack(String msgOffset) throws Exception { + // the format of multi topic kafka fetcher msg offset is topic:partitionId:offset, such as topic1:20:1746839 + String[] offset = msgOffset.split(":"); + if (offset.length != 3) { + throw new Exception("offset is illegal, the correct format is topic:partitionId:offset, " + + "the error offset is:" + msgOffset); + } + + // parse topic partition offset + TopicPartition topicPartition = new TopicPartition(offset[0], Integer.parseInt(offset[1])); + long ackOffset = Long.parseLong(offset[2]); + + // ack + if (!ackOffsetMap.containsKey(topicPartition) || !ackOffsetMap.get(topicPartition).containsKey(ackOffset)) { + LOGGER.warn("did not find offsetMap or ack offset of {}, offset {}, just ignore it", + topicPartition, ackOffset); + return; + } + + // mark this offset has been ack. + ConcurrentSkipListMap tpOffsetMap = ackOffsetMap.get(topicPartition); + // to prevent race condition in AckOffsetOnRebalance::onPartitionsRevoked + if (Objects.nonNull(tpOffsetMap)) { + tpOffsetMap.put(ackOffset, true); + } + } + + @Override + public void pause() { + consumer.pause(consumer.assignment()); + } + + @Override + public void resume() { + consumer.resume(consumer.assignment()); + } + + @Override + public boolean close() { + this.closed = true; + try { + if (fetchThread != null) { + fetchThread.interrupt(); + } + if (consumer != null) { + prepareCommit(); + consumer.commitSync(commitOffsetMap); + consumer.close(); + } + commitOffsetMap.clear(); + } catch (Throwable t) { + LOGGER.warn("got exception in multi topic fetcher close: ", t); + } + LOGGER.info("closed kafka multi topic fetcher"); + return true; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public void setStopConsume(boolean stopConsume) { + this.stopConsume = stopConsume; + } + + @Override + public boolean isStopConsume() { + return stopConsume; + } + + @Override + public List getTopics() { + return new ArrayList<>(onlineTopics.values()); + } + + @Override + public boolean updateTopics(List topics) { + if (needUpdate(topics)) { + return updateAll(topics); + } + LOGGER.info("no need to update topics"); + return false; + } + + private boolean updateAll(Collection newTopics) { + if (CollectionUtils.isEmpty(newTopics)) { + LOGGER.error("new topics is empty or null"); + return false; + } + + // stop + this.setStopConsume(true); + + // update + this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t)); + InLongTopic topic = onlineTopics.values().stream().findFirst().get(); + this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic); + this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker, + commitOffsetMap, ackOffsetMap); + Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic)); + + // subscribe new + consumer.subscribe(onlineTopics.keySet(), listener); + + // resume + this.setStopConsume(false); + return true; + } + + private void prepareCommit() { + ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> { + synchronized (tpOffsetMap) { + // get the remove list + List removeOffsets = new ArrayList<>(); + long commitOffset = -1; + for (Long ackOffset : tpOffsetMap.keySet()) { + if (!tpOffsetMap.get(ackOffset)) { + break; + } + removeOffsets.add(ackOffset); + commitOffset = ackOffset; + } + // the first haven't ack, do nothing + if (commitOffset == -1) { + return; + } + + // remove offset and commit offset + removeOffsets.forEach(tpOffsetMap::remove); + commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset)); + } + }); + } + + public class Fetcher implements Runnable { + + private void commitKafkaOffset() { + prepareCommit(); + if (consumer != null) { + try { + consumer.commitAsync(commitOffsetMap, null); + commitOffsetMap.clear(); + } catch (Exception e) { + LOGGER.error("commit kafka offset failed: ", e); + } + } + } + + /** + * put the received msg to onFinished method + * + * @param messageRecords {@link List < MessageRecord >} + */ + private void handleAndCallbackMsg(List messageRecords) { + long start = System.currentTimeMillis(); + try { + context.getDefaultStateCounter().addCallbackTimes(1); + context.getConfig().getCallback().onFinishedBatch(messageRecords); + context.getDefaultStateCounter() + .addCallbackTimeCost(System.currentTimeMillis() - start) + .addCallbackDoneTimes(1); + } catch (Exception e) { + context.getDefaultStateCounter().addCallbackErrorTimes(1); + LOGGER.error("failed to callback: ", e); + } + } + + private String getOffset(String topic, int partitionId, long offset) { + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + ackOffsetMap.computeIfAbsent(topicPartition, k -> new ConcurrentSkipListMap<>()).put(offset, false); + return topic + ":" + partitionId + ":" + offset; + } + + private Map getMsgHeaders(Headers headers) { + Map headerMap = new HashMap<>(); + for (Header header : headers) { + headerMap.put(header.key(), new String(header.value())); + } + return headerMap; + } + + @Override + public void run() { + boolean hasPermit; + while (true) { + hasPermit = false; + try { + if (context.getConfig().isStopConsume() || stopConsume) { + TimeUnit.MILLISECONDS.sleep(50); + continue; + } + + if (sleepTime > 0) { + TimeUnit.MILLISECONDS.sleep(sleepTime); + } + + context.acquireRequestPermit(); + hasPermit = true; + // fetch from kafka + fetchFromKafka(); + // commit + commitKafkaOffset(); + } catch (Exception e) { + context.getDefaultStateCounter().addFetchErrorTimes(1); + LOGGER.error("failed in kafka multi topic fetcher: ", e); + } finally { + if (hasPermit) { + context.releaseRequestPermit(); + } + } + } + } + + private void fetchFromKafka() throws Exception { + context.getDefaultStateCounter().addMsgCount(1).addFetchTimes(1); + + long startFetchTime = System.currentTimeMillis(); + ConsumerRecords records = consumer + .poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs())); + context.getDefaultStateCounter().addFetchTimeCost(System.currentTimeMillis() - startFetchTime); + if (null != records && !records.isEmpty()) { + + for (ConsumerRecord msg : records) { + List msgs = new ArrayList<>(); + String topicName = msg.topic(); + InLongTopic topic = onlineTopics.get(topicName); + String offsetKey = getOffset(topicName, msg.partition(), msg.offset()); + List inLongMessages = deserializer + .deserialize(context, topic, getMsgHeaders(msg.headers()), msg.value()); + inLongMessages = interceptor.intercept(inLongMessages); + if (inLongMessages.isEmpty()) { + ack(offsetKey); + continue; + } + + msgs.add(new MessageRecord(topic.getTopicKey(), + inLongMessages, + offsetKey, System.currentTimeMillis())); + context.getStateCounterByTopic(topic).addConsumeSize(msg.value().length); + context.getStateCounterByTopic(topic).addMsgCount(msgs.size()); + handleAndCallbackMsg(msgs); + } + sleepTime = 0L; + } else { + context.getDefaultStateCounter().addEmptyFetchTimes(1); + emptyFetchTimes++; + if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) { + sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()), + context.getConfig().getMaxEmptyPollSleepMs()); + emptyFetchTimes = 0; + } + } + } + } +} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java index 3244e9d5bcc..938869bf742 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java @@ -304,22 +304,6 @@ public boolean updateTopics(List topics) { return false; } - private boolean needUpdate(Collection newTopics) { - if (newTopics.size() != onlineTopics.size()) { - return true; - } - // all topic should share the same properties in one task - if (Objects.equals(newTopics.stream().findFirst(), onlineTopics.values().stream().findFirst())) { - return true; - } - for (InLongTopic topic : newTopics) { - if (!onlineTopics.containsKey(topic.getTopic())) { - return true; - } - } - return false; - } - public class Fetcher implements Runnable { /**