Skip to content

Commit

Permalink
Adding ability to read entire data with HoodieIncrSource with empty c…
Browse files Browse the repository at this point in the history
…heckpoint
  • Loading branch information
nsivabalan committed Dec 16, 2021
1 parent 3b89457 commit 0d42f00
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -109,6 +110,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
private String testMethodName;
protected transient JavaSparkContext jsc = null;
protected transient HoodieSparkEngineContext context = null;
protected transient SparkSession sparkSession = null;
protected transient Configuration hadoopConf = null;
protected transient SQLContext sqlContext;
protected transient FileSystem fs;
Expand Down Expand Up @@ -182,6 +184,7 @@ protected void initSparkContexts(String appName) {
sqlContext = new SQLContext(jsc);
context = new HoodieSparkEngineContext(jsc);
hadoopConf = context.getHadoopConf().get();
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,19 @@ static class Config {

/**
* {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to incrementally fetch from latest committed
* instant when checkpoint is not provided.
* instant when checkpoint is not provided. This config is deprecated. Please refer to {@link #MISSING_CHECKPOINT_STRATEGY}.
*/
@Deprecated
static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
"hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt";
static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;

/**
* {@value #MISSING_CHECKPOINT_STRATEGY} allows delta-streamer to decide the checkpoint to consume from when checkpoint is not set.
* instant when checkpoint is not provided.
*/
static final String MISSING_CHECKPOINT_STRATEGY = "hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy";

/**
* {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet.
*/
Expand Down Expand Up @@ -106,13 +113,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
int numInstantsPerFetch = props.getInteger(Config.NUM_INSTANTS_PER_FETCH, Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(Config.READ_LATEST_INSTANT_ON_MISSING_CKPT,
Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(Config.MISSING_CHECKPOINT_STRATEGY))
? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(Config.MISSING_CHECKPOINT_STRATEGY)) : null;
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}

// Use begin Instant if set and non-empty
Option<String> beginInstant =
lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty();

Pair<String, String> instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath,
numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
numInstantsPerFetch, beginInstant, missingCheckpointStrategy);

if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(
READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY))
? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY)) : null;
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}

// Use begin Instant if set and non-empty
Option<String> beginInstant =
Expand All @@ -92,7 +97,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt

Pair<String, String> instantEndpts =
IncrSourceHelper.calculateBeginAndEndInstants(
sparkContext, srcPath, numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy);

if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@

public class IncrSourceHelper {

/**
* Kafka reset offset strategies.
*/
public enum MissingCheckpointStrategy {
// read from latest commit in hoodie source table
READ_LATEST,
// read everything until latest commit
READ_EVERYTHING_UNTIL_LATEST
}

/**
* Get a timestamp which is the next value in a descending sequence.
*
Expand All @@ -47,15 +57,15 @@ private static String getStrictlyLowerTimestamp(String timestamp) {
/**
* Find begin and end instants to be set for the next fetch.
*
* @param jssc Java Spark Context
* @param srcBasePath Base path of Hudi source table
* @param numInstantsPerFetch Max Instants per fetch
* @param beginInstant Last Checkpoint String
* @param readLatestOnMissingBeginInstant when begin instant is missing, allow reading from latest committed instant
* @param jssc Java Spark Context
* @param srcBasePath Base path of Hudi source table
* @param numInstantsPerFetch Max Instants per fetch
* @param beginInstant Last Checkpoint String
* @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy
* @return begin and end instants
*/
public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
int numInstantsPerFetch, Option<String> beginInstant, boolean readLatestOnMissingBeginInstant) {
int numInstantsPerFetch, Option<String> beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) {
ValidationUtils.checkArgument(numInstantsPerFetch > 0,
"Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
Expand All @@ -64,27 +74,38 @@ public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();

String beginInstantTime = beginInstant.orElseGet(() -> {
if (readLatestOnMissingBeginInstant) {
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000");
if (missingCheckpointStrategy != null) {
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) {
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000");
} else {
return "000";
}
} else {
throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest "
+ "committed instant set hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt to true");
+ "committed instant set hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy to a valid value");
}
});

Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
if (!beginInstantTime.equals("000")) {
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
} else {
// if beginInstant is null, MissingCheckpointStrategy should be set.
// when MissingCheckpointStrategy is set to read everything until latest.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return Pair.of(beginInstantTime, lastInstant.get().getTimestamp());
}
}

/**
* Validate instant time seen in the incoming row.
*
* @param row Input Row
* @param instantTime Hoodie Instant time of the row
* @param row Input Row
* @param instantTime Hoodie Instant time of the row
* @param sinceInstant begin instant of the batch
* @param endInstant end instant of the batch
* @param endInstant end instant of the batch
*/
public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) {
Objects.requireNonNull(instantTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestHoodieIncrSource extends HoodieClientTestHarness {

@BeforeEach
public void setUp() throws IOException {
initResources();
}

@AfterEach
public void tearDown() throws IOException {
cleanupResources();
}

@Test
public void testHoodieIncrSource() throws IOException {
HoodieWriteConfig writeConfig = getConfigBuilder(basePath).build();

SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null);

Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath);
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST.name());
TypedProperties typedProperties = new TypedProperties(properties);
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));

// read everything until latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST, 300, inserts3.getKey());

// read just the latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey());
}

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, int expectedCount, String expectedCheckpoint) {

Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath);
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
TypedProperties typedProperties = new TypedProperties(properties);
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));

// read everything until latest
Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(Option.empty(), 500);
Assertions.assertNotNull(batchCheckPoint.getValue());
assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
}

public Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords) throws IOException {
String commit = writeClient.startCommit();
List<HoodieRecord> records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), commit);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
return Pair.of(commit, records);
}

public HoodieWriteConfig.Builder getConfigBuilder(String basePath) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.forTable("test-hoodie-incr-source");
}

class TestSchemaProvider extends SchemaProvider {

private final Schema schema;

public TestSchemaProvider(Schema schema) {
super(new TypedProperties());
this.schema = schema;
}

@Override
public Schema getSourceSchema() {
return schema;
}
}
}

0 comments on commit 0d42f00

Please sign in to comment.