From 2c5a4cd080eeaaf3505c64b773589c2da8217381 Mon Sep 17 00:00:00 2001 From: David Desberg Date: Mon, 11 Jul 2016 12:24:18 -0700 Subject: [PATCH 1/2] Allow for custom timestamp/watermark function in FlinkPipelineRunner --- .../FlinkStreamingTransformTranslators.java | 18 ++++++++++++++++-- .../streaming/io/UnboundedFlinkSource.java | 13 +++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 5d040684f5cb..ca6a95bdeae3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -70,7 +70,9 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.slf4j.Logger; @@ -252,6 +254,8 @@ public void translateNode(Read.Unbounded transform, FlinkStreamingTranslation if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { @SuppressWarnings("unchecked") UnboundedFlinkSource flinkSourceFunction = (UnboundedFlinkSource) transform.getSource(); + final AssignerWithPeriodicWatermarks flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner(); + DataStream flinkSource = context.getExecutionEnvironment() .addSource(flinkSourceFunction.getFlinkSource()); @@ -266,11 +270,21 @@ public void flatMap(T s, Collector> collector) throws Exception collector.collect( WindowedValue.of( s, - Instant.now(), + new Instant(flinkAssigner.extractTimestamp(s, -1)), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } - }).assignTimestampsAndWatermarks(new IngestionTimeExtractor>()); + }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() { + @Override + public Watermark getCurrentWatermark() { + return flinkAssigner.getCurrentWatermark(); + } + + @Override + public long extractTimestamp(WindowedValue element, long previousElementTimestamp) { + return flinkAssigner.extractTimestamp(element.getValue(), previousElementTimestamp); + } + }); } else { try { transform.getSource(); diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 94b73cebaae8..bd111a0bf1e3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.List; @@ -40,6 +42,9 @@ public class UnboundedFlinkSource extends UnboundedSource coder; + /** Timestamp / watermark assigner for source; defaults to ingestion time */ + private AssignerWithPeriodicWatermarks flinkTimestampAssigner = new IngestionTimeExtractor(); + public UnboundedFlinkSource(SourceFunction source) { flinkSource = checkNotNull(source); } @@ -48,6 +53,10 @@ public SourceFunction getFlinkSource() { return this.flinkSource; } + public AssignerWithPeriodicWatermarks getFlinkTimestampAssigner() { + return flinkTimestampAssigner; + } + @Override public List> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner."); @@ -79,6 +88,10 @@ public void setCoder(Coder coder) { this.coder = coder; } + public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks flinkTimestampAssigner) { + this.flinkTimestampAssigner = flinkTimestampAssigner; + } + /** * Creates a new unbounded source from a Flink source. * @param flinkSource The Flink source function From 5d274e4ee5b5fc498bcabcab75bd7be30210bb3c Mon Sep 17 00:00:00 2001 From: David Desberg Date: Mon, 11 Jul 2016 12:56:21 -0700 Subject: [PATCH 2/2] Added new "of" signature and constructor for UnboundedFlinkSource to allow event timestamping --- .../wrappers/streaming/io/UnboundedFlinkSource.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index bd111a0bf1e3..716ca304e539 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -49,6 +49,11 @@ public UnboundedFlinkSource(SourceFunction source) { flinkSource = checkNotNull(source); } + public UnboundedFlinkSource(SourceFunction source, AssignerWithPeriodicWatermarks timestampAssigner) { + flinkSource = checkNotNull(source); + flinkTimestampAssigner = checkNotNull(timestampAssigner); + } + public SourceFunction getFlinkSource() { return this.flinkSource; } @@ -101,4 +106,9 @@ public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks flinkTim public static UnboundedSource of(SourceFunction flinkSource) { return new UnboundedFlinkSource<>(flinkSource); } + + public static UnboundedSource of( + SourceFunction flinkSource, AssignerWithPeriodicWatermarks flinkTimestampAssigner) { + return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner); + } }