Skip to content

Commit

Permalink
[FLINK-2807] Add Javadocs for new windowing semantics/internals
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Oct 5, 2015
1 parent 8c2c769 commit 62df0a0
Show file tree
Hide file tree
Showing 29 changed files with 483 additions and 27 deletions.
Expand Up @@ -23,6 +23,14 @@
import java.util.Collection;
import java.util.Collections;

/**
* A {@link WindowAssigner} that assigns all elements to the same global window.
*
* <p>
* Use this if you want to use a {@link Trigger} and
* {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to to flexible, policy based
* windows.
*/
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -28,6 +28,19 @@
import java.util.Collection;
import java.util.List;

/**
* A {@link WindowAssigner} that windows elements into sliding, time-based windows. The windowing
* is based on system time. Windows can possibly overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -26,6 +26,19 @@
import java.util.Collection;
import java.util.List;

/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
* elements. Windows can possibly overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -25,6 +25,19 @@
import java.util.Collection;
import java.util.Collections;

/**
* A {@link WindowAssigner} that windows elements into time-based windows. The windowing is
* based on system time. Windows cannot overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -25,6 +25,19 @@
import java.util.Collection;
import java.util.Collections;

/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -23,10 +23,32 @@

import java.util.Collection;

/**
* A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
*
* <p>
* In a window operation, elements are grouped by their key (if available) and by the windows to
* which it was assigned. The set of elements with the same key and window is called a pane.
* When a {@link Trigger} decides that a certain pane should fire the
* {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
* to produce output elements for that pane.
*
* @param <T> The type of elements that this WindowAssigner can assign windows to.
* @param <W> The type of {@code Window} that this assigner assigns.
*/
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;

/**
* Returns a {@code Collection} of windows that should be assigned to the element.
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
*/
public abstract Collection<W> assignWindows(T element, long timestamp);

/**
* Returns the default trigger associated with this {@code WindowAssigner}.
*/
public abstract Trigger<T, W> getDefaultTrigger();
}
Expand Up @@ -20,6 +20,11 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* An {@link Evictor} that keeps only a certain amount of elements.
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
public class CountEvictor<W extends Window> implements Evictor<Object, W> {
private static final long serialVersionUID = 1L;

Expand All @@ -38,6 +43,11 @@ public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
}
}

/**
* Creates a {@code CountEvictor} that keeps the given number of elements.
*
* @param maxCount The number of elements to keep in the pane.
*/
public static <W extends Window> CountEvictor<W> of(long maxCount) {
return new CountEvictor<>(maxCount);
}
Expand Down
Expand Up @@ -22,6 +22,16 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
*
* <p>
* Eviction starts from the first element of the buffer and removes all elements from the buffer
* which have a higher delta then the threshold. As soon as there is an element with a lower delta,
* the eviction stops.
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -52,6 +62,12 @@ public String toString() {
return "DeltaEvictor(" + deltaFunction + ", " + threshold + ")";
}

/**
* Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
*
* @param threshold The threshold
* @param deltaFunction The {@code DeltaFunction}
*/
public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
return new DeltaEvictor<>(threshold, deltaFunction);
}
Expand Down
Expand Up @@ -21,8 +21,31 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import scala.Serializable;

/**
* An {@code Evictor} can remove elements from a pane before it is being processed and after
* window evaluation was triggered by a
* {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
*
* <p>
* A pane is the bucket of elements that have the same key (assigned by the
* {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
* be in multiple panes of it was assigned to multiple windows by the
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
* have their own instance of the {@code Evictor}.
*
* @param <T> The type of elements that this {@code Evictor} can evict.
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
public interface Evictor<T, W extends Window> extends Serializable {

public abstract int evict(Iterable<StreamRecord<T>> elements, int size, W window);
/**
* Computes how many elements should be removed from the pane. The result specifies how
* many elements should be removed from the beginning.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
*/
int evict(Iterable<StreamRecord<T>> elements, int size, W window);
}

Expand Up @@ -22,6 +22,12 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* An {@link Evictor} that keeps elements for a certain amount of time. Elements older
* than {@code current_time - keep_time} are evicted.
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -55,6 +61,11 @@ public long getWindowSize() {
return windowSize;
}

/**
* Creates a {@code TimeEvictor} that keeps the given number of elements.
*
* @param windowSize The amount of time for which to keep elements.
*/
public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
return new TimeEvictor<>(windowSize.toMilliseconds());
}
Expand Down
Expand Up @@ -21,6 +21,12 @@
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
* A {@link Trigger} that continuously fires based on a given time interval. The time is the current
* system time.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -80,6 +86,12 @@ public String toString() {
return "ContinuousProcessingTimeTrigger(" + interval + ")";
}

/**
* Creates a trigger that continuously fires based on the given interval.
*
* @param interval The time interval at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
}
Expand Down
Expand Up @@ -21,6 +21,14 @@
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
* A {@link Trigger} that continuously fires based on a given time interval. This fires based
* on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -66,6 +74,12 @@ public long getInterval() {
return interval;
}

/**
* Creates a trigger that continuously fires based on the given interval.
*
* @param interval The time interval at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
}
Expand Down
Expand Up @@ -19,6 +19,11 @@

import org.apache.flink.streaming.api.windowing.windows.Window;

/**
* A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public class CountTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -55,6 +60,12 @@ public String toString() {
return "CountTrigger(" + maxCount + ")";
}

/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
Expand Down
Expand Up @@ -20,6 +20,16 @@
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
* A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
*
* <p>
* This trigger calculates a delta between the data point which triggered last
* and the currently arrived data point. It triggers if the delta is higher than
* a specified threshold.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -60,6 +70,16 @@ public String toString() {
return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")";
}

/**
* Creates a delta trigger from the given threshold and {@code DeltaFunction}.
*
* @param threshold The threshold at which to trigger.
* @param deltaFunction The delta function to use
*
* @param <T> The type of elements on which this trigger can operate.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
* @return
*/
public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
return new DeltaTrigger<>(threshold, deltaFunction);
}
Expand Down
Expand Up @@ -19,6 +19,10 @@

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
* A {@link Trigger} that fires once the current system time passes the end of the window
* to which a pane belongs.
*/
public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -50,6 +54,9 @@ public String toString() {
return "ProcessingTimeTrigger()";
}

/**
* Creates a new trigger that fires once system time passes the end of the window.
*/
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
Expand Down

0 comments on commit 62df0a0

Please sign in to comment.