Skip to content

Commit

Permalink
[streaming] StreamDiscretizer rework to support only 1 eviction and t…
Browse files Browse the repository at this point in the history
…rigger for robustness + test cleanup
  • Loading branch information
gyfora authored and mbalassi committed Feb 16, 2015
1 parent c560d76 commit 412779f
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 1,728 deletions.
Expand Up @@ -865,7 +865,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
* @return A {@link WindowedDataStream} providing further operations. * @return A {@link WindowedDataStream} providing further operations.
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) { public WindowedDataStream<OUT> window(WindowingHelper policyHelpers) {
return new WindowedDataStream<OUT>(this, policyHelpers); return new WindowedDataStream<OUT>(this, policyHelpers);
} }


Expand All @@ -884,9 +884,8 @@ public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
* number of elements in each time window. * number of elements in each time window.
* @return A {@link WindowedDataStream} providing further operations. * @return A {@link WindowedDataStream} providing further operations.
*/ */
public WindowedDataStream<OUT> window(List<TriggerPolicy<OUT>> triggers, public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> evicter) {
List<EvictionPolicy<OUT>> evicters) { return new WindowedDataStream<OUT>(this, trigger, evicter);
return new WindowedDataStream<OUT>(this, triggers, evicters);
} }


/** /**
Expand Down
Expand Up @@ -17,10 +17,6 @@


package org.apache.flink.streaming.api.datastream; package org.apache.flink.streaming.api.datastream;


import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction;
Expand All @@ -44,7 +40,6 @@
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.apache.flink.streaming.util.keys.KeySelectorUtil;
Expand All @@ -64,69 +59,51 @@ public class WindowedDataStream<OUT> {
protected DataStream<OUT> dataStream; protected DataStream<OUT> dataStream;


protected boolean isLocal = false; protected boolean isLocal = false;
protected boolean isCentral = true;


protected KeySelector<OUT, ?> discretizerKey; protected KeySelector<OUT, ?> discretizerKey;
protected KeySelector<OUT, ?> groupByKey; protected KeySelector<OUT, ?> groupByKey;


protected List<WindowingHelper<OUT>> triggerHelpers; protected WindowingHelper<OUT> triggerHelper;
protected List<WindowingHelper<OUT>> evictionHelpers; protected WindowingHelper<OUT> evictionHelper;

protected LinkedList<TriggerPolicy<OUT>> userTriggers;
protected LinkedList<EvictionPolicy<OUT>> userEvicters;

protected WindowedDataStream() {


} protected TriggerPolicy<OUT> userTrigger;
protected EvictionPolicy<OUT> userEvicter;


protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) { protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> policyHelper) {
this.dataStream = dataStream.copy(); this.dataStream = dataStream.copy();
this.triggerHelpers = new ArrayList<WindowingHelper<OUT>>(); this.triggerHelper = policyHelper;
for (WindowingHelper<OUT> helper : policyHelpers) {
this.triggerHelpers.add(helper);
}


if (dataStream instanceof GroupedDataStream) { if (dataStream instanceof GroupedDataStream) {
this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector; this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
// set all policies distributed
this.isCentral = false;
} }
} }


protected WindowedDataStream(DataStream<OUT> dataStream, List<TriggerPolicy<OUT>> triggers, protected WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> trigger,
List<EvictionPolicy<OUT>> evicters) { EvictionPolicy<OUT> evicter) {
this.dataStream = dataStream.copy(); this.dataStream = dataStream.copy();


if (triggers != null) { this.userTrigger = trigger;
this.userTriggers = new LinkedList<TriggerPolicy<OUT>>(); this.userEvicter = evicter;
this.userTriggers.addAll(triggers);
}

if (evicters != null) {
this.userEvicters = new LinkedList<EvictionPolicy<OUT>>();
this.userEvicters.addAll(evicters);
}


if (dataStream instanceof GroupedDataStream) { if (dataStream instanceof GroupedDataStream) {
this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector; this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
// set all policies distributed
this.isCentral = false;

} }
} }


protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) { protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
this.dataStream = windowedDataStream.dataStream.copy(); this.dataStream = windowedDataStream.dataStream.copy();
this.discretizerKey = windowedDataStream.discretizerKey; this.discretizerKey = windowedDataStream.discretizerKey;
this.groupByKey = windowedDataStream.groupByKey; this.groupByKey = windowedDataStream.groupByKey;
this.triggerHelpers = windowedDataStream.triggerHelpers; this.triggerHelper = windowedDataStream.triggerHelper;
this.evictionHelpers = windowedDataStream.evictionHelpers; this.evictionHelper = windowedDataStream.evictionHelper;
this.userTriggers = windowedDataStream.userTriggers; this.userTrigger = windowedDataStream.userTrigger;
this.userEvicters = windowedDataStream.userEvicters; this.userEvicter = windowedDataStream.userEvicter;
this.isCentral = windowedDataStream.isCentral;
this.isLocal = windowedDataStream.isLocal; this.isLocal = windowedDataStream.isLocal;
} }


public WindowedDataStream() {
}

public <F> F clean(F f) { public <F> F clean(F f) {
return dataStream.clean(f); return dataStream.clean(f);
} }
Expand All @@ -146,15 +123,13 @@ public <F> F clean(F f) {
* @return The windowed data stream with triggering set * @return The windowed data stream with triggering set
*/ */
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public WindowedDataStream<OUT> every(WindowingHelper... policyHelpers) { public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
WindowedDataStream<OUT> ret = this.copy(); WindowedDataStream<OUT> ret = this.copy();
if (ret.evictionHelpers == null) { if (ret.evictionHelper == null) {
ret.evictionHelpers = ret.triggerHelpers; ret.evictionHelper = ret.triggerHelper;
ret.triggerHelpers = new ArrayList<WindowingHelper<OUT>>(); ret.triggerHelper = policyHelper;
}
for (WindowingHelper<OUT> helper : policyHelpers) {
ret.triggerHelpers.add(helper);
} }

return ret; return ret;
} }


Expand Down Expand Up @@ -238,11 +213,11 @@ private DiscretizedStream<OUT> discretize(boolean isMap) {
StreamInvokable<OUT, StreamWindow<OUT>> discretizer; StreamInvokable<OUT, StreamWindow<OUT>> discretizer;


if (discretizerKey == null) { if (discretizerKey == null) {
discretizer = new StreamDiscretizer<OUT>(getTriggers(), getEvicters()); discretizer = new StreamDiscretizer<OUT>(getTrigger(), getEvicter());
} else { } else {
discretizer = new GroupedStreamDiscretizer<OUT>(discretizerKey, discretizer = new GroupedStreamDiscretizer<OUT>(discretizerKey,
getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(), (CloneableTriggerPolicy<OUT>) getTrigger(),
getCentralEvicters()); (CloneableEvictionPolicy<OUT>) getEvicter());
} }


int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment
Expand Down Expand Up @@ -537,103 +512,32 @@ private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) {
return reduceWindow(aggregator); return reduceWindow(aggregator);
} }


protected LinkedList<TriggerPolicy<OUT>> getTriggers() { protected TriggerPolicy<OUT> getTrigger() {

LinkedList<TriggerPolicy<OUT>> triggers = new LinkedList<TriggerPolicy<OUT>>();

if (triggerHelpers != null) {
for (WindowingHelper<OUT> helper : triggerHelpers) {
triggers.add(helper.toTrigger());
}
}

if (userTriggers != null) {
triggers.addAll(userTriggers);
}

return triggers;

}

protected LinkedList<EvictionPolicy<OUT>> getEvicters() {


LinkedList<EvictionPolicy<OUT>> evicters = new LinkedList<EvictionPolicy<OUT>>(); if (triggerHelper != null) {

return triggerHelper.toTrigger();
if (evictionHelpers != null) { } else if (userTrigger != null) {
for (WindowingHelper<OUT> helper : evictionHelpers) { return userTrigger;
evicters.add(helper.toEvict());
}
} else { } else {
if (userEvicters == null) { throw new RuntimeException("Trigger must not be null");
boolean notOnlyTime = false;
for (WindowingHelper<OUT> helper : triggerHelpers) {
if (helper instanceof Time<?>) {
evicters.add(helper.toEvict());
} else {
notOnlyTime = true;
}
}
if (notOnlyTime) {
evicters.add(new TumblingEvictionPolicy<OUT>());
}
}
}

if (userEvicters != null) {
evicters.addAll(userEvicters);
}

return evicters;
}

protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
if (isCentral) {
cTriggers.addAll(getTriggers());
} else {
for (TriggerPolicy<OUT> trigger : getTriggers()) {
if (trigger instanceof TimeTriggerPolicy) {
cTriggers.add(trigger);
}
}
}
return cTriggers;
}

protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null;

if (!isCentral) {
dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
for (TriggerPolicy<OUT> trigger : getTriggers()) {
if (!(trigger instanceof TimeTriggerPolicy)) {
dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
}
}
} }


return dTriggers;
} }


protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() { protected EvictionPolicy<OUT> getEvicter() {
LinkedList<CloneableEvictionPolicy<OUT>> evicters = null;


if (!isCentral) { if (evictionHelper != null) {
evicters = new LinkedList<CloneableEvictionPolicy<OUT>>(); return evictionHelper.toEvict();
for (EvictionPolicy<OUT> evicter : getEvicters()) { } else if (userEvicter == null) {
evicters.add((CloneableEvictionPolicy<OUT>) evicter); if (triggerHelper instanceof Time) {
return triggerHelper.toEvict();
} else {
return new TumblingEvictionPolicy<OUT>();
} }
}

return evicters;
}

protected LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
if (isCentral) {
return getEvicters();
} else { } else {
return null; return userEvicter;
} }

} }


/** /**
Expand Down Expand Up @@ -664,7 +568,7 @@ public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> { protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

@Override @Override
public Integer getKey(StreamWindow<R> value) throws Exception { public Integer getKey(StreamWindow<R> value) throws Exception {
return value.windowID; return value.windowID;
Expand Down
Expand Up @@ -58,7 +58,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
protected IN nextObject; protected IN nextObject;
protected boolean isMutable; protected boolean isMutable;


protected Collector<OUT> collector; public Collector<OUT> collector;
protected Function userFunction; protected Function userFunction;
protected volatile boolean isRunning; protected volatile boolean isRunning;


Expand Down

0 comments on commit 412779f

Please sign in to comment.