Skip to content

Commit

Permalink
[HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (
Browse files Browse the repository at this point in the history
…#2227)


Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
  • Loading branch information
liujinhui1994 and nsivabalan committed Feb 25, 2021
1 parent 06dc7c7 commit 617cc24
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;

Expand All @@ -40,6 +41,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
Expand All @@ -49,6 +52,12 @@ public class KafkaOffsetGen {

private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);

/**
* kafka checkpoint Pattern.
* Format: topic_name,partition_num:offset,partition_num:offset,....
*/
private final Pattern pattern = Pattern.compile(".*,.*:.*");

public static class CheckpointUtils {

/**
Expand Down Expand Up @@ -148,23 +157,38 @@ public static class Config {

private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets";

This comment has been minimized.

Copy link
@nsivabalan

nsivabalan Mar 15, 2021

Author Contributor

@liujinhui1994 : Guess we might have to revisit this. I didn't realize this param ("auto.reset.offsets") actually a Kafka config. And so we have to maintain the same config name. Don't think we can change the param name here. My bad.

private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST;
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}

private final HashMap<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
private KafkaResetOffsetStrategies autoResetValue;

public KafkaOffsetGen(TypedProperties props) {
this.props = props;

kafkaParams = new HashMap<>();
for (Object prop : props.keySet()) {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name());
boolean found = false;
for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) {
if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
found = true;
autoResetValue = entry;
break;
}
}
if (!found) {
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr);
}
}

public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
Expand All @@ -186,8 +210,6 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);

This comment has been minimized.

Copy link
@nsivabalan

nsivabalan Mar 15, 2021

Author Contributor

also, we could simplify the changes. we could make this if condition as

if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr.get()) {
   fetch offsets from checkpoint. 
} else {
 reset to earliest or latest.
}

by this, we don't need to reset in lines 260 to 268.

This comment has been minimized.

Copy link
@nsivabalan

nsivabalan Mar 15, 2021

Author Contributor

@liujinhui1994 : do you think you can put up a PR for this if you agree.

metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case EARLIEST:
fromOffsets = consumer.beginningOffsets(topicPartitions);
Expand Down Expand Up @@ -227,12 +249,23 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
// else return earliest offsets
private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
if (checkTopicCheckpoint(lastCheckpointStr)) {
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}

switch (autoResetValue) {
case EARLIEST:
return earliestOffsets;
case LATEST:
return consumer.endOffsets(topicPartitions);
default:
throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
}

boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}

private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
Expand All @@ -257,6 +290,11 @@ public boolean checkTopicExists(KafkaConsumer consumer) {
return result.containsKey(topicName);
}

private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
Matcher matcher = pattern.matcher(lastCheckpointStr.get());
return matcher.matches();
}

public String getTopicName() {
return topicName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
Expand All @@ -56,10 +57,12 @@
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -116,9 +119,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
private static String PARQUET_SOURCE_ROOT;
private static String JSON_KAFKA_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
private static final int JSON_KAFKA_NUM_RECORDS = 5;
// Required fields
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
Expand All @@ -136,15 +142,18 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
public static KafkaTestUtils testUtils;
protected static String topicName;

private static int testNum = 1;
protected static int testNum = 1;

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;

// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
Expand Down Expand Up @@ -236,7 +245,7 @@ private static void populateCommonProps(TypedProperties props) {

//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
Expand Down Expand Up @@ -966,27 +975,56 @@ public void testDistributedTestDataSource() {
}

private static void prepareParquetDFSFiles(int numRecords) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/1.parquet";
prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
}

private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords)), new Path(path));
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
} else {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords)), new Path(path));
}
}

private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException {
if (createTopic) {
try {
testUtils.createTopic(topicName, 2);
} catch (TopicExistsException e) {
// no op
}
}
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server","false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
if (hasTransformer) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + targetSchemaFile);
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);

UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
}

private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
Expand All @@ -1001,6 +1039,99 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
testNum++;
}

private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateCommonProps(props);
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server","false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue);

UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
}

/**
* Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource.
* @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST)
* @throws Exception
*/
private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception {
// prep parquet source
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
int parquetRecords = 10;
prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);

prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT);
// delta streamer w/ parquest source
String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
deltaStreamer.shutdownGracefully();

// prep json kafka source
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest", topicName);
// delta streamer w/ json kafka source
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
// if auto reset value is set to LATEST, this all kafka records so far may not be synced.
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);

// verify 2nd batch to test LATEST auto reset value.
prepareJsonKafkaDFSFiles(20, false, topicName);
totalExpectedRecords += 20;
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}

@Test
public void testJsonKafkaDFSSource() throws Exception {
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName);
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);

int totalRecords = JSON_KAFKA_NUM_RECORDS;
int records = 10;
totalRecords += records;
prepareJsonKafkaDFSFiles(records, false, topicName);
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
}

@Test
public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(false);
}

@Test
public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(true);
}

@Test
public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
testParquetDFSSource(false, null);
Expand Down
Loading

0 comments on commit 617cc24

Please sign in to comment.