Skip to content

Commit

Permalink
Modified KinesisIO to use WatermarkPolicyFactory for watermark comput…
Browse files Browse the repository at this point in the history
…ation. Added tests
  • Loading branch information
Ajo Thomas committed May 16, 2019
1 parent 02ac871 commit 9fe0a03
Show file tree
Hide file tree
Showing 16 changed files with 639 additions and 334 deletions.
1 change: 1 addition & 0 deletions sdks/java/io/kinesis/build.gradle
Expand Up @@ -54,6 +54,7 @@ dependencies {
testCompile library.java.guava_testlib
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile "org.assertj:assertj-core:3.11.1"
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java")
Expand Down
Expand Up @@ -125,6 +125,52 @@
* .apply( ... ) // other transformations
* }</pre>
*
* <p>Kinesis IO uses ArrivalTimeWatermarkPolicy by default. To use Processing time as event time:
*
* <pre>{@code
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withProcessingTimeWatermarkPolicy())
* }</pre>
*
* <p>It is also possible to specify a custom watermark policy to control watermark computation.
* Below is an example
*
* <pre>{@code
* // custom policy
* class MyCustomPolicy implements WatermarkPolicy {
* private WatermarkPolicyFactory.CustomWatermarkPolicy customWatermarkPolicy;
*
* MyCustomPolicy() {
* this.customWatermarkPolicy = new WatermarkPolicyFactory.CustomWatermarkPolicy(WatermarkParameters.create());
* }
*
* @Override
* public Instant getWatermark() {
* return customWatermarkPolicy.getWatermark();
* }
*
* @Override
* public void update(KinesisRecord record) {
* customWatermarkPolicy.update(record);
* }
* }
*
* // custom factory
* class MyCustomPolicyFactory implements WatermarkPolicyFactory {
* @Override
* public WatermarkPolicy createWatermarkPolicy() {
* return new MyCustomPolicy();
* }
* }
*
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withCustomWatermarkPolicy(new MyCustomPolicyFactory())
* }</pre>
*
* <h3>Writing to Kinesis</h3>
*
* <p>Example usage:
Expand Down Expand Up @@ -190,6 +236,7 @@ public static Read read() {
return new AutoValue_KinesisIO_Read.Builder()
.setMaxNumRecords(Long.MAX_VALUE)
.setUpToDateThreshold(Duration.ZERO)
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.build();
}

Expand Down Expand Up @@ -221,6 +268,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Kinesis
@Nullable
abstract Integer getRequestRecordsLimit();

abstract WatermarkPolicyFactory getWatermarkPolicyFactory();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -240,6 +289,8 @@ abstract static class Builder {

abstract Builder setRequestRecordsLimit(Integer limit);

abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);

abstract Read build();
}

Expand Down Expand Up @@ -330,6 +381,43 @@ public Read withRequestRecordsLimit(int limit) {
return toBuilder().setRequestRecordsLimit(limit).build();
}

/** Specifies the {@code WatermarkPolicyFactory} as ArrivalTimeWatermarkPolicyFactory. */
public Read withArrivalTimeWatermarkPolicy() {
return toBuilder()
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.build();
}

/**
* Specifies the {@code WatermarkPolicyFactory} as ArrivalTimeWatermarkPolicyFactory.
*
* <p>{@param watermarkIdleDurationThreshold} Denotes the duration for which the watermark can
* be idle.
*/
public Read withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThreshold) {
return toBuilder()
.setWatermarkPolicyFactory(
WatermarkPolicyFactory.withArrivalTimePolicy(watermarkIdleDurationThreshold))
.build();
}

/** Specifies the {@code WatermarkPolicyFactory} as ProcessingTimeWatermarkPolicyFactory. */
public Read withProcessingTimeWatermarkPolicy() {
return toBuilder()
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withProcessingTimePolicy())
.build();
}

/**
* Specifies the {@code WatermarkPolicyFactory} as a custom watermarkPolicyFactory.
*
* @param watermarkPolicyFactory Custom Watermark policy factory.
*/
public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) {
checkArgument(watermarkPolicyFactory != null, "watermarkPolicyFactory cannot be null");
return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
}

@Override
public PCollection<KinesisRecord> expand(PBegin input) {
Unbounded<KinesisRecord> unbounded =
Expand All @@ -339,6 +427,7 @@ public PCollection<KinesisRecord> expand(PBegin input) {
getStreamName(),
getInitialPosition(),
getUpToDateThreshold(),
getWatermarkPolicyFactory(),
getRequestRecordsLimit()));

PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded;
Expand Down
Expand Up @@ -38,7 +38,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
private final SimplifiedKinesisClient kinesis;
private final KinesisSource source;
private final CheckpointGenerator initialCheckpointGenerator;
private final KinesisWatermark watermark;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final Duration upToDateThreshold;
private final Duration backlogBytesCheckThreshold;
private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
Expand All @@ -50,12 +50,13 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
SimplifiedKinesisClient kinesis,
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
Duration upToDateThreshold) {
this(
kinesis,
initialCheckpointGenerator,
source,
new KinesisWatermark(),
watermarkPolicyFactory,
upToDateThreshold,
Duration.standardSeconds(30));
}
Expand All @@ -64,13 +65,13 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
SimplifiedKinesisClient kinesis,
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
KinesisWatermark watermark,
WatermarkPolicyFactory watermarkPolicyFactory,
Duration upToDateThreshold,
Duration backlogBytesCheckThreshold) {
this.kinesis = checkNotNull(kinesis, "kinesis");
this.initialCheckpointGenerator =
checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
this.watermark = watermark;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.source = source;
this.upToDateThreshold = upToDateThreshold;
this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
Expand All @@ -95,12 +96,7 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
currentRecord = shardReadersPool.nextRecord();
if (currentRecord.isPresent()) {
Instant approximateArrivalTimestamp = currentRecord.get().getApproximateArrivalTimestamp();
watermark.update(approximateArrivalTimestamp);
return true;
}
return false;
return currentRecord.isPresent();
}

@Override
Expand Down Expand Up @@ -131,7 +127,7 @@ public void close() throws IOException {

@Override
public Instant getWatermark() {
return watermark.getCurrent(shardReadersPool::allShardsUpToDate);
return shardReadersPool.getWatermark();
}

@Override
Expand Down Expand Up @@ -173,6 +169,7 @@ public long getTotalBacklogBytes() {
}

ShardReadersPool createShardReadersPool() throws TransientKinesisException {
return new ShardReadersPool(kinesis, initialCheckpointGenerator.generate(kinesis));
return new ShardReadersPool(
kinesis, initialCheckpointGenerator.generate(kinesis), watermarkPolicyFactory);
}
}
Expand Up @@ -37,6 +37,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
private final AWSClientsProvider awsClientsProvider;
private final String streamName;
private final Duration upToDateThreshold;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private CheckpointGenerator initialCheckpointGenerator;
private final Integer limit;

Expand All @@ -45,12 +46,14 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
String streamName,
StartingPoint startingPoint,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
Integer limit) {
this(
awsClientsProvider,
new DynamicCheckpointGenerator(streamName, startingPoint),
streamName,
upToDateThreshold,
watermarkPolicyFactory,
limit);
}

Expand All @@ -59,11 +62,13 @@ private KinesisSource(
CheckpointGenerator initialCheckpoint,
String streamName,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
Integer limit) {
this.awsClientsProvider = awsClientsProvider;
this.initialCheckpointGenerator = initialCheckpoint;
this.streamName = streamName;
this.upToDateThreshold = upToDateThreshold;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.limit = limit;
validate();
}
Expand All @@ -87,6 +92,7 @@ public List<KinesisSource> split(int desiredNumSplits, PipelineOptions options)
new StaticCheckpointGenerator(partition),
streamName,
upToDateThreshold,
watermarkPolicyFactory,
limit));
}
return sources;
Expand All @@ -113,6 +119,7 @@ public UnboundedReader<KinesisRecord> createReader(
SimplifiedKinesisClient.from(awsClientsProvider, limit),
checkpointGenerator,
this,
watermarkPolicyFactory,
upToDateThreshold);
}

Expand Down

This file was deleted.

0 comments on commit 9fe0a03

Please sign in to comment.