From 0d42f0084e7918d69816217c962aacc85259c9c8 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 15 Dec 2021 17:12:31 -0800 Subject: [PATCH 1/2] Adding ability to read entire data with HoodieIncrSource with empty checkpoint --- .../testutils/HoodieClientTestHarness.java | 3 + .../utilities/sources/HoodieIncrSource.java | 16 ++- .../sources/S3EventsHoodieIncrSource.java | 7 +- .../sources/helpers/IncrSourceHelper.java | 53 ++++--- .../sources/TestHoodieIncrSource.java | 129 ++++++++++++++++++ 5 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1e07485d433f..3c023eba38e3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -71,6 +71,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -109,6 +110,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im private String testMethodName; protected transient JavaSparkContext jsc = null; protected transient HoodieSparkEngineContext context = null; + protected transient SparkSession sparkSession = null; protected transient Configuration hadoopConf = null; protected transient SQLContext sqlContext; protected transient FileSystem fs; @@ -182,6 +184,7 @@ protected void initSparkContexts(String appName) { sqlContext = new SQLContext(jsc); context = new HoodieSparkEngineContext(jsc); hadoopConf = context.getHadoopConf().get(); + sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); } /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index a217e6b7a800..ebb359390be0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -72,12 +72,19 @@ static class Config { /** * {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to incrementally fetch from latest committed - * instant when checkpoint is not provided. + * instant when checkpoint is not provided. This config is deprecated. Please refer to {@link #MISSING_CHECKPOINT_STRATEGY}. */ + @Deprecated static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = "hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt"; static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false; + /** + * {@value #MISSING_CHECKPOINT_STRATEGY} allows delta-streamer to decide the checkpoint to consume from when checkpoint is not set. + * instant when checkpoint is not provided. + */ + static final String MISSING_CHECKPOINT_STRATEGY = "hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy"; + /** * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet. */ @@ -106,13 +113,18 @@ public Pair>, String> fetchNextBatch(Option lastCkpt int numInstantsPerFetch = props.getInteger(Config.NUM_INSTANTS_PER_FETCH, Config.DEFAULT_NUM_INSTANTS_PER_FETCH); boolean readLatestOnMissingCkpt = props.getBoolean(Config.READ_LATEST_INSTANT_ON_MISSING_CKPT, Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT); + IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(Config.MISSING_CHECKPOINT_STRATEGY)) + ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(Config.MISSING_CHECKPOINT_STRATEGY)) : null; + if (readLatestOnMissingCkpt) { + missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST; + } // Use begin Instant if set and non-empty Option beginInstant = lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty(); Pair instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath, - numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt); + numInstantsPerFetch, beginInstant, missingCheckpointStrategy); if (instantEndpts.getKey().equals(instantEndpts.getValue())) { LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index ec789ab28f49..c011157cc06f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -83,6 +83,11 @@ public Pair>, String> fetchNextBatch(Option lastCkpt int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH); boolean readLatestOnMissingCkpt = props.getBoolean( READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT); + IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY)) + ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY)) : null; + if (readLatestOnMissingCkpt) { + missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST; + } // Use begin Instant if set and non-empty Option beginInstant = @@ -92,7 +97,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt Pair instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants( - sparkContext, srcPath, numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt); + sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy); if (instantEndpts.getKey().equals(instantEndpts.getValue())) { LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 8f434a007a0b..3cb0b0b33e58 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -32,6 +32,16 @@ public class IncrSourceHelper { + /** + * Kafka reset offset strategies. + */ + public enum MissingCheckpointStrategy { + // read from latest commit in hoodie source table + READ_LATEST, + // read everything until latest commit + READ_EVERYTHING_UNTIL_LATEST + } + /** * Get a timestamp which is the next value in a descending sequence. * @@ -47,15 +57,15 @@ private static String getStrictlyLowerTimestamp(String timestamp) { /** * Find begin and end instants to be set for the next fetch. * - * @param jssc Java Spark Context - * @param srcBasePath Base path of Hudi source table - * @param numInstantsPerFetch Max Instants per fetch - * @param beginInstant Last Checkpoint String - * @param readLatestOnMissingBeginInstant when begin instant is missing, allow reading from latest committed instant + * @param jssc Java Spark Context + * @param srcBasePath Base path of Hudi source table + * @param numInstantsPerFetch Max Instants per fetch + * @param beginInstant Last Checkpoint String + * @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy * @return begin and end instants */ public static Pair calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, - int numInstantsPerFetch, Option beginInstant, boolean readLatestOnMissingBeginInstant) { + int numInstantsPerFetch, Option beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) { ValidationUtils.checkArgument(numInstantsPerFetch > 0, "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); @@ -64,27 +74,38 @@ public static Pair calculateBeginAndEndInstants(JavaSparkContext srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); String beginInstantTime = beginInstant.orElseGet(() -> { - if (readLatestOnMissingBeginInstant) { - Option lastInstant = activeCommitTimeline.lastInstant(); - return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000"); + if (missingCheckpointStrategy != null) { + if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) { + Option lastInstant = activeCommitTimeline.lastInstant(); + return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000"); + } else { + return "000"; + } } else { throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest " - + "committed instant set hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt to true"); + + "committed instant set hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy to a valid value"); } }); - Option nthInstant = Option.fromJavaOptional(activeCommitTimeline - .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); - return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); + if (!beginInstantTime.equals("000")) { + Option nthInstant = Option.fromJavaOptional(activeCommitTimeline + .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); + return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); + } else { + // if beginInstant is null, MissingCheckpointStrategy should be set. + // when MissingCheckpointStrategy is set to read everything until latest. + Option lastInstant = activeCommitTimeline.lastInstant(); + return Pair.of(beginInstantTime, lastInstant.get().getTimestamp()); + } } /** * Validate instant time seen in the incoming row. * - * @param row Input Row - * @param instantTime Hoodie Instant time of the row + * @param row Input Row + * @param instantTime Hoodie Instant time of the row * @param sinceInstant begin instant of the batch - * @param endInstant end instant of the batch + * @param endInstant end instant of the batch */ public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) { Objects.requireNonNull(instantTime); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java new file mode 100644 index 000000000000..c12fc929b92f --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; + +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieIncrSource extends HoodieClientTestHarness { + + @BeforeEach + public void setUp() throws IOException { + initResources(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupResources(); + } + + @Test + public void testHoodieIncrSource() throws IOException { + HoodieWriteConfig writeConfig = getConfigBuilder(basePath).build(); + + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + Pair> inserts = writeRecords(writeClient, true, null); + Pair> inserts2 = writeRecords(writeClient, true, null); + Pair> inserts3 = writeRecords(writeClient, true, null); + + Properties properties = new Properties(); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST.name()); + TypedProperties typedProperties = new TypedProperties(properties); + HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); + + // read everything until latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST, 300, inserts3.getKey()); + + // read just the latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey()); + } + + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, int expectedCount, String expectedCheckpoint) { + + Properties properties = new Properties(); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); + TypedProperties typedProperties = new TypedProperties(properties); + HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); + + // read everything until latest + Pair>, String> batchCheckPoint = incrSource.fetchNextBatch(Option.empty(), 500); + Assertions.assertNotNull(batchCheckPoint.getValue()); + assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); + } + + public Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords) throws IOException { + String commit = writeClient.startCommit(); + List records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords); + JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), commit); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + return Pair.of(commit, records); + } + + public HoodieWriteConfig.Builder getConfigBuilder(String basePath) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable("test-hoodie-incr-source"); + } + + class TestSchemaProvider extends SchemaProvider { + + private final Schema schema; + + public TestSchemaProvider(Schema schema) { + super(new TypedProperties()); + this.schema = schema; + } + + @Override + public Schema getSourceSchema() { + return schema; + } + } +} From 6efdf3a650aa54ad909252e28a5255d268209e06 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 21 Dec 2021 16:16:07 -0800 Subject: [PATCH 2/2] Addressing comments --- .../utilities/sources/helpers/IncrSourceHelper.java | 13 +++++++------ .../utilities/sources/TestHoodieIncrSource.java | 10 ++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 3cb0b0b33e58..a370c314a168 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -32,14 +32,15 @@ public class IncrSourceHelper { + private static final String DEFAULT_BEGIN_TIMESTAMP = "000"; /** * Kafka reset offset strategies. */ public enum MissingCheckpointStrategy { // read from latest commit in hoodie source table READ_LATEST, - // read everything until latest commit - READ_EVERYTHING_UNTIL_LATEST + // read everything upto latest commit + READ_UPTO_LATEST_COMMIT } /** @@ -77,9 +78,9 @@ public static Pair calculateBeginAndEndInstants(JavaSparkContext if (missingCheckpointStrategy != null) { if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) { Option lastInstant = activeCommitTimeline.lastInstant(); - return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000"); + return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse(DEFAULT_BEGIN_TIMESTAMP); } else { - return "000"; + return DEFAULT_BEGIN_TIMESTAMP; } } else { throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest " @@ -87,12 +88,12 @@ public static Pair calculateBeginAndEndInstants(JavaSparkContext } }); - if (!beginInstantTime.equals("000")) { + if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) { Option nthInstant = Option.fromJavaOptional(activeCommitTimeline .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); } else { - // if beginInstant is null, MissingCheckpointStrategy should be set. + // if beginInstant is DEFAULT_BEGIN_TIMESTAMP, MissingCheckpointStrategy should be set. // when MissingCheckpointStrategy is set to read everything until latest. Option lastInstant = activeCommitTimeline.lastInstant(); return Pair.of(beginInstantTime, lastInstant.get().getTimestamp()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index c12fc929b92f..250e288294ac 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -68,14 +68,8 @@ public void testHoodieIncrSource() throws IOException { Pair> inserts2 = writeRecords(writeClient, true, null); Pair> inserts3 = writeRecords(writeClient, true, null); - Properties properties = new Properties(); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST.name()); - TypedProperties typedProperties = new TypedProperties(properties); - HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); - - // read everything until latest - readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST, 300, inserts3.getKey()); + // read everything upto latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, 300, inserts3.getKey()); // read just the latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey());