Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3011] Adding ability to read entire data with HoodieIncrSource with empty checkpoint #4334

Merged
merged 2 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -109,6 +110,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
private String testMethodName;
protected transient JavaSparkContext jsc = null;
protected transient HoodieSparkEngineContext context = null;
protected transient SparkSession sparkSession = null;
protected transient Configuration hadoopConf = null;
protected transient SQLContext sqlContext;
protected transient FileSystem fs;
Expand Down Expand Up @@ -182,6 +184,7 @@ protected void initSparkContexts(String appName) {
sqlContext = new SQLContext(jsc);
context = new HoodieSparkEngineContext(jsc);
hadoopConf = context.getHadoopConf().get();
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,19 @@ static class Config {

/**
* {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to incrementally fetch from latest committed
* instant when checkpoint is not provided.
* instant when checkpoint is not provided. This config is deprecated. Please refer to {@link #MISSING_CHECKPOINT_STRATEGY}.
*/
@Deprecated
static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
"hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt";
static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;

/**
* {@value #MISSING_CHECKPOINT_STRATEGY} allows delta-streamer to decide the checkpoint to consume from when checkpoint is not set.
* instant when checkpoint is not provided.
*/
static final String MISSING_CHECKPOINT_STRATEGY = "hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy";

/**
* {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet.
*/
Expand Down Expand Up @@ -106,13 +113,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
int numInstantsPerFetch = props.getInteger(Config.NUM_INSTANTS_PER_FETCH, Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(Config.READ_LATEST_INSTANT_ON_MISSING_CKPT,
Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(Config.MISSING_CHECKPOINT_STRATEGY))
? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(Config.MISSING_CHECKPOINT_STRATEGY)) : null;
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}

// Use begin Instant if set and non-empty
Option<String> beginInstant =
lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty();

Pair<String, String> instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath,
numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
numInstantsPerFetch, beginInstant, missingCheckpointStrategy);

if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(
READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY))
? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY)) : null;
codope marked this conversation as resolved.
Show resolved Hide resolved
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}

// Use begin Instant if set and non-empty
Option<String> beginInstant =
Expand All @@ -92,7 +97,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt

Pair<String, String> instantEndpts =
IncrSourceHelper.calculateBeginAndEndInstants(
sparkContext, srcPath, numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy);

if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@

public class IncrSourceHelper {

private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
/**
* Kafka reset offset strategies.
*/
public enum MissingCheckpointStrategy {
// read from latest commit in hoodie source table
READ_LATEST,
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
// read everything upto latest commit
READ_UPTO_LATEST_COMMIT
}

/**
* Get a timestamp which is the next value in a descending sequence.
*
Expand All @@ -47,15 +58,15 @@ private static String getStrictlyLowerTimestamp(String timestamp) {
/**
* Find begin and end instants to be set for the next fetch.
*
* @param jssc Java Spark Context
* @param srcBasePath Base path of Hudi source table
* @param numInstantsPerFetch Max Instants per fetch
* @param beginInstant Last Checkpoint String
* @param readLatestOnMissingBeginInstant when begin instant is missing, allow reading from latest committed instant
* @param jssc Java Spark Context
* @param srcBasePath Base path of Hudi source table
* @param numInstantsPerFetch Max Instants per fetch
* @param beginInstant Last Checkpoint String
* @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy
* @return begin and end instants
*/
public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
int numInstantsPerFetch, Option<String> beginInstant, boolean readLatestOnMissingBeginInstant) {
int numInstantsPerFetch, Option<String> beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) {
ValidationUtils.checkArgument(numInstantsPerFetch > 0,
"Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
Expand All @@ -64,27 +75,38 @@ public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();

String beginInstantTime = beginInstant.orElseGet(() -> {
if (readLatestOnMissingBeginInstant) {
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000");
if (missingCheckpointStrategy != null) {
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) {
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse(DEFAULT_BEGIN_TIMESTAMP);
} else {
return DEFAULT_BEGIN_TIMESTAMP;
}
} else {
throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest "
+ "committed instant set hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt to true");
+ "committed instant set hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy to a valid value");
}
});

Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
} else {
// if beginInstant is DEFAULT_BEGIN_TIMESTAMP, MissingCheckpointStrategy should be set.
// when MissingCheckpointStrategy is set to read everything until latest.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return Pair.of(beginInstantTime, lastInstant.get().getTimestamp());
}
}

/**
* Validate instant time seen in the incoming row.
*
* @param row Input Row
* @param instantTime Hoodie Instant time of the row
* @param row Input Row
* @param instantTime Hoodie Instant time of the row
* @param sinceInstant begin instant of the batch
* @param endInstant end instant of the batch
* @param endInstant end instant of the batch
*/
public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) {
Objects.requireNonNull(instantTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestHoodieIncrSource extends HoodieClientTestHarness {

@BeforeEach
public void setUp() throws IOException {
initResources();
}

@AfterEach
public void tearDown() throws IOException {
cleanupResources();
}

@Test
public void testHoodieIncrSource() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add parameterized test for configs
archive.min_commits=2 clean.retain_commits=2
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its required for this test. thanks for the reminder though.

HoodieWriteConfig writeConfig = getConfigBuilder(basePath).build();

SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null);

// read everything upto latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, 300, inserts3.getKey());

// read just the latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey());
}

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, int expectedCount, String expectedCheckpoint) {

Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath);
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
TypedProperties typedProperties = new TypedProperties(properties);
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));

// read everything until latest
Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(Option.empty(), 500);
Assertions.assertNotNull(batchCheckPoint.getValue());
assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
}

public Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords) throws IOException {
String commit = writeClient.startCommit();
List<HoodieRecord> records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), commit);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
return Pair.of(commit, records);
}

public HoodieWriteConfig.Builder getConfigBuilder(String basePath) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.forTable("test-hoodie-incr-source");
}

class TestSchemaProvider extends SchemaProvider {

private final Schema schema;

public TestSchemaProvider(Schema schema) {
super(new TypedProperties());
this.schema = schema;
}

@Override
public Schema getSourceSchema() {
return schema;
}
}
}