From 7292a72d11128c4f5cd433764924ceef0e600e00 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Sat, 3 Dec 2016 12:15:45 -0800 Subject: [PATCH 1/2] [FLINK-5247] Fix check to make sure that we throw error when allowed lateness is set for non event-time windows. Also, fix outdated documentation. --- .../flink/streaming/api/datastream/AllWindowedStream.java | 4 ++-- .../apache/flink/streaming/api/datastream/WindowedStream.java | 4 ++-- .../apache/flink/streaming/api/scala/AllWindowedStream.scala | 2 +- .../org/apache/flink/streaming/api/scala/WindowedStream.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index ae71ce53a5f67..a8d8ff40f8b51 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -123,9 +123,9 @@ public AllWindowedStream trigger(Trigger trigger) { @PublicEvolving public AllWindowedStream allowedLateness(Time lateness) { long millis = lateness.toMilliseconds(); - if (allowedLateness < 0) { + if (millis < 0) { throw new IllegalArgumentException("The allowed lateness cannot be negative."); - } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) { + } else if (millis != 0 && !windowAssigner.isEventTime()) { throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows."); } else { this.allowedLateness = millis; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 98bf89ae6add5..9d9b1bda3ac8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -131,9 +131,9 @@ public WindowedStream trigger(Trigger trigger) { @PublicEvolving public WindowedStream allowedLateness(Time lateness) { long millis = lateness.toMilliseconds(); - if (allowedLateness < 0) { + if (millis < 0) { throw new IllegalArgumentException("The allowed lateness cannot be negative."); - } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) { + } else if (millis != 0 && !windowAssigner.isEventTime()) { throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows."); } else { this.allowedLateness = millis; diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 83104e8318f8f..324689a108a1b 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -58,7 +58,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { /** * Sets the allowed lateness to a user-specified value. - * If not explicitly set, the allowed lateness is [[Long.MaxValue]]. + * If not explicitly set, the allowed lateness is [[0L]]. * Setting the allowed lateness is only valid for event-time windows. * If a value different than 0 is provided with a processing-time * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]], diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 76d9cdab0e829..db187eaa815e8 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -61,7 +61,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { /** * Sets the allowed lateness to a user-specified value. - * If not explicitly set, the allowed lateness is [[Long.MaxValue]]. + * If not explicitly set, the allowed lateness is [[0L]]. * Setting the allowed lateness is only valid for event-time windows. * If a value different than 0 is provided with a processing-time * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]], From ea83df290a376c1750b12eb71c3947e80992de12 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Tue, 20 Dec 2016 20:22:19 -0800 Subject: [PATCH 2/2] [FLINK-5247] Make allowedLateness() method a no-op for processing-time windows. --- .../flink/streaming/api/datastream/AllWindowedStream.java | 4 +--- .../apache/flink/streaming/api/datastream/WindowedStream.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index a8d8ff40f8b51..0f0e947fb0bb8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -125,9 +125,7 @@ public AllWindowedStream allowedLateness(Time lateness) { long millis = lateness.toMilliseconds(); if (millis < 0) { throw new IllegalArgumentException("The allowed lateness cannot be negative."); - } else if (millis != 0 && !windowAssigner.isEventTime()) { - throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows."); - } else { + } else if (windowAssigner.isEventTime()) { this.allowedLateness = millis; } return this; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 9d9b1bda3ac8f..7210823d53ec3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -133,9 +133,7 @@ public WindowedStream allowedLateness(Time lateness) { long millis = lateness.toMilliseconds(); if (millis < 0) { throw new IllegalArgumentException("The allowed lateness cannot be negative."); - } else if (millis != 0 && !windowAssigner.isEventTime()) { - throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows."); - } else { + } else if (windowAssigner.isEventTime()) { this.allowedLateness = millis; } return this;