Skip to content

Commit

Permalink
[HUDI-1944] Support Hudi to read from committed offset (#3175)
Browse files Browse the repository at this point in the history
* [HUDI-1944] Support Hudi to read from committed offset

* [HUDI-1944] Adding group option to KafkaResetOffsetStrategies

* [HUDI-1944] Update Exception msg
  • Loading branch information
veenaypatil committed Jun 30, 2021
1 parent 1cbf43b commit 94f0f40
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static long totalNewMessages(OffsetRange[] ranges) {
* Kafka reset offset strategies.
*/
enum KafkaResetOffsetStrategies {
LATEST, EARLIEST
LATEST, EARLIEST, GROUP
}

/**
Expand Down Expand Up @@ -192,6 +192,9 @@ public KafkaOffsetGen(TypedProperties props) {
if (!found) {
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
}
if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
}
}

public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
Expand Down Expand Up @@ -220,8 +223,11 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
case LATEST:
fromOffsets = consumer.endOffsets(topicPartitions);
break;
case GROUP:
fromOffsets = getGroupOffsets(consumer, topicPartitions);
break;
default:
throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
}
}

Expand Down Expand Up @@ -318,7 +324,6 @@ private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
public void commitOffsetToKafka(String checkpointStr) {
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
Map<String, Object> kafkaParams = excludeHoodieConfigs(props);
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(offsetMap.size());
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
Expand All @@ -327,4 +332,19 @@ public void commitOffsetToKafka(String checkpointStr) {
LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
}
}

private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (TopicPartition topicPartition : topicPartitions) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition);
if (committedOffsetAndMetadata != null) {
fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
} else {
LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset");
fromOffsets = consumer.endOffsets(topicPartitions);
break;
}
}
return fromOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
Expand All @@ -33,6 +34,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -66,6 +69,7 @@ private TypedProperties getConsumerConfigs(String autoOffsetReset) {
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
return props;
}

Expand Down Expand Up @@ -127,6 +131,30 @@ public void testGetNextOffsetRangesFromMultiplePartitions() {
assertEquals(249, nextOffsetRanges[1].untilOffset());
}

@Test
public void testGetNextOffsetRangesFromGroup() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 2);
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
// don't pass lastCheckpointString as we want to read from group committed offset
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
assertEquals(250, nextOffsetRanges[0].fromOffset());
assertEquals(400, nextOffsetRanges[0].untilOffset());
assertEquals(249, nextOffsetRanges[1].fromOffset());
assertEquals(399, nextOffsetRanges[1].untilOffset());

// committed offsets are not present for the consumer group
kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
assertEquals(500, nextOffsetRanges[0].fromOffset());
assertEquals(500, nextOffsetRanges[0].untilOffset());
assertEquals(500, nextOffsetRanges[1].fromOffset());
assertEquals(500, nextOffsetRanges[1].untilOffset());
}

@Test
public void testCheckTopicExists() {
TypedProperties props = getConsumerConfigs("latest");
Expand Down

0 comments on commit 94f0f40

Please sign in to comment.