Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 22, 2021
1 parent 0d42f00 commit 6efdf3a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@

public class IncrSourceHelper {

private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
/**
* Kafka reset offset strategies.
*/
public enum MissingCheckpointStrategy {
// read from latest commit in hoodie source table
READ_LATEST,
// read everything until latest commit
READ_EVERYTHING_UNTIL_LATEST
// read everything upto latest commit
READ_UPTO_LATEST_COMMIT
}

/**
Expand Down Expand Up @@ -77,22 +78,22 @@ public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext
if (missingCheckpointStrategy != null) {
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) {
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000");
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse(DEFAULT_BEGIN_TIMESTAMP);
} else {
return "000";
return DEFAULT_BEGIN_TIMESTAMP;
}
} else {
throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest "
+ "committed instant set hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy to a valid value");
}
});

if (!beginInstantTime.equals("000")) {
if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
} else {
// if beginInstant is null, MissingCheckpointStrategy should be set.
// if beginInstant is DEFAULT_BEGIN_TIMESTAMP, MissingCheckpointStrategy should be set.
// when MissingCheckpointStrategy is set to read everything until latest.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return Pair.of(beginInstantTime, lastInstant.get().getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,8 @@ public void testHoodieIncrSource() throws IOException {
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null);

Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath);
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST.name());
TypedProperties typedProperties = new TypedProperties(properties);
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));

// read everything until latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_EVERYTHING_UNTIL_LATEST, 300, inserts3.getKey());
// read everything upto latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, 300, inserts3.getKey());

// read just the latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey());
Expand Down

0 comments on commit 6efdf3a

Please sign in to comment.