Skip to content
Permalink
Browse files
[HUDI-4122] Fix NPE caused by adding kafka nodes (#5632)
  • Loading branch information
wangxianghu committed May 21, 2022
1 parent 7d02b1f commit 2af98303d3881e5d1da7d2e08f904b18f8b79488
Showing 1 changed file with 39 additions and 8 deletions.
@@ -22,17 +22,17 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.sources.AvroKafkaSource;

import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -169,9 +170,14 @@ public static class Config {
.withDocumentation("Kafka topic name.");

public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.checkpoint.type")
.defaultValue("string")
.withDocumentation("Kafka chepoint type.");
.key("hoodie.deltastreamer.source.kafka.checkpoint.type")
.defaultValue("string")
.withDocumentation("Kafka checkpoint type.");

public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.fetch_partition.time.out")
.defaultValue(300 * 1000L)
.withDocumentation("Time out for fetching partitions. 5min by default");

public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
@@ -236,8 +242,7 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
if (!checkTopicExists(consumer)) {
throw new HoodieException("Kafka topic:" + topicName + " does not exist");
}
List<PartitionInfo> partitionInfoList;
partitionInfoList = consumer.partitionsFor(topicName);
List<PartitionInfo> partitionInfoList = fetchPartitionInfos(consumer, topicName);
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());

@@ -287,6 +292,32 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
}

/**
* Fetch partition infos for given topic.
*
* @param consumer
* @param topicName
*/
private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String topicName) {
long timeout = this.props.getLong(Config.KAFKA_FETCH_PARTITION_TIME_OUT.key(), Config.KAFKA_FETCH_PARTITION_TIME_OUT.defaultValue());
long start = System.currentTimeMillis();

List<PartitionInfo> partitionInfos;
do {
partitionInfos = consumer.partitionsFor(topicName);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
LOG.error("Sleep failed while fetching partitions");
}
} while (partitionInfos == null && (System.currentTimeMillis() <= (start + timeout)));

if (partitionInfos == null) {
throw new HoodieDeltaStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", topicName));
}
return partitionInfos;
}

/**
* Fetch checkpoint offsets for each partition.
* @param consumer instance of {@link KafkaConsumer} to fetch offsets from.

0 comments on commit 2af9830

Please sign in to comment.