Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Nwang/fix streamwindowoperator generic types (#3124)
Browse files Browse the repository at this point in the history
* Fix StreamletWindowOperator generic types

* Clean up generic types in reduce operators and groupings
  • Loading branch information
nwangtw committed Dec 19, 2018
1 parent 5c42171 commit 8cef739
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,15 @@ public List<Streamlet<R>> clone(int numClones) {
* @param reduceFn The reduce function that you want to apply to all the values of a key.
*/
@Override
public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor,
WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, SerializableFunction<R, T> valueExtractor,
WindowConfig windowCfg, SerializableBinaryOperator<T> reduceFn) {
checkNotNull(keyExtractor, "keyExtractor cannot be null");
checkNotNull(valueExtractor, "valueExtractor cannot be null");
checkNotNull(windowCfg, "windowCfg cannot be null");
checkNotNull(reduceFn, "reduceFn cannot be null");

ReduceByKeyAndWindowStreamlet<K, V, R> retval =
ReduceByKeyAndWindowStreamlet<R, K, T> retval =
new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
windowCfg, reduceFn);
addChild(retval);
Expand Down Expand Up @@ -491,7 +491,7 @@ public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
checkNotNull(identity, "identity cannot be null");
checkNotNull(reduceFn, "reduceFn cannot be null");

GeneralReduceByKeyAndWindowStreamlet<K, R, T> retval =
GeneralReduceByKeyAndWindowStreamlet<R, K, T> retval =
new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg,
identity, reduceFn);
addChild(retval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
* JoinOperator. It essentially ensures that the values being routed are of type
* KeyValue uses the key to route the tuple to the destination.
*/
public class JoinCustomGrouping<K, V> implements CustomStreamGrouping {
public class JoinCustomGrouping<R, K> implements CustomStreamGrouping {
private static final long serialVersionUID = 2007892247960031525L;
private SerializableFunction<V, K> keyExtractor;
private SerializableFunction<R, K> keyExtractor;
private List<Integer> taskIds;

public JoinCustomGrouping(SerializableFunction<V, K> keyExtractor) {
public JoinCustomGrouping(SerializableFunction<R, K> keyExtractor) {
this.keyExtractor = keyExtractor;
}

Expand All @@ -52,7 +52,7 @@ public void prepare(TopologyContext context, String component,
@Override
public List<Integer> chooseTasks(List<Object> values) {
List<Integer> ret = new ArrayList<>();
V obj = (V) values.get(0);
R obj = (R) values.get(0);
int key = keyExtractor.apply(obj).hashCode();
ret.add(Utils.assignKeyToTask(key, taskIds));
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
* The current implementation is identical to JoinCustomGrouping but it might
* evolve in the future.
*/
public class ReduceByKeyAndWindowCustomGrouping<K, V> implements CustomStreamGrouping {
public class ReduceByKeyAndWindowCustomGrouping<R, K> implements CustomStreamGrouping {
private static final long serialVersionUID = -7630948017550637716L;
private SerializableFunction<V, K> keyExtractor;
private SerializableFunction<R, K> keyExtractor;
private List<Integer> taskIds;

public ReduceByKeyAndWindowCustomGrouping(SerializableFunction<V, K> keyExtractor) {
public ReduceByKeyAndWindowCustomGrouping(SerializableFunction<R, K> keyExtractor) {
this.keyExtractor = keyExtractor;
}

Expand All @@ -55,7 +55,7 @@ public void prepare(TopologyContext context, String component,
@Override
public List<Integer> chooseTasks(List<Object> values) {
List<Integer> ret = new ArrayList<>();
V obj = (V) values.get(0);
R obj = (R) values.get(0);
int key = keyExtractor.apply(obj).hashCode();
ret.add(Utils.assignKeyToTask(key, taskIds));
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@
* It takes in a reduceFunction Function as an input.
* For every time window, the bolt goes over all the tuples in that window and applies the reduce
* function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
* R: Incoming data type, K: Key type, T: Result data type
*/
public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator<V, V> {
private static final long serialVersionUID = 2833576046687752396L;
private SerializableFunction<V, K> keyExtractor;
private VR identity;
private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
public class GeneralReduceByKeyAndWindowOperator<R, K, T>
extends StreamletWindowOperator<R, KeyValue<KeyedWindow<K>, T>> {

public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor,
VR identity,
SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
private SerializableFunction<R, K> keyExtractor;
private T identity;
private SerializableBiFunction<T, R, ? extends T> reduceFn;

public GeneralReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
T identity,
SerializableBiFunction<T, R, ? extends T> reduceFn) {
this.keyExtractor = keyExtractor;
this.identity = identity;
this.reduceFn = reduceFn;
Expand All @@ -54,10 +56,10 @@ public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtract
@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
Map<K, VR> reduceMap = new HashMap<>();
Map<K, T> reduceMap = new HashMap<>();
Map<K, Integer> windowCountMap = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
V tup = (V) tuple.getValue(0);
R tup = (R) tuple.getValue(0);
addMap(reduceMap, windowCountMap, tup);
}
long startWindow;
Expand All @@ -79,7 +81,7 @@ public void execute(TupleWindow inputWindow) {
}
}

private void addMap(Map<K, VR> reduceMap, Map<K, Integer> windowCountMap, V tup) {
private void addMap(Map<K, T> reduceMap, Map<K, Integer> windowCountMap, R tup) {
K key = keyExtractor.apply(tup);
if (!reduceMap.containsKey(key)) {
reduceMap.put(key, identity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@
* It takes in a reduceFunction Function as an input.
* For every time window, the bolt goes over all the tuples in that window and applies the reduce
* function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
* R: Incoming data type, K: Key type, T: Result data type
*/
public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
public class ReduceByKeyAndWindowOperator<R, K, T>
extends StreamletWindowOperator<R, KeyValue<KeyedWindow<K>, T>> {

private static final long serialVersionUID = 2833576046687750496L;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
private SerializableBinaryOperator<V> reduceFn;
private SerializableFunction<R, T> valueExtractor;
private SerializableBinaryOperator<T> reduceFn;

public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
SerializableFunction<R, V> valueExtractor,
SerializableBinaryOperator<V> reduceFn) {
SerializableFunction<R, T> valueExtractor,
SerializableBinaryOperator<T> reduceFn) {
this.keyExtractor = keyExtractor;
this.valueExtractor = valueExtractor;
this.reduceFn = reduceFn;
Expand All @@ -55,7 +56,7 @@ public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
Map<K, V> reduceMap = new HashMap<>();
Map<K, T> reduceMap = new HashMap<>();
Map<K, Integer> windowCountMap = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
R tup = (R) tuple.getValue(0);
Expand All @@ -80,7 +81,7 @@ public void execute(TupleWindow inputWindow) {
}
}

private void addMap(Map<K, V> reduceMap, Map<K, Integer> windowCountMap, R tup) {
private void addMap(Map<K, T> reduceMap, Map<K, Integer> windowCountMap, R tup) {
K key = keyExtractor.apply(tup);
if (reduceMap.containsKey(key)) {
reduceMap.put(key, reduceFn.apply(reduceMap.get(key), valueExtractor.apply(tup)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@
* applying user supplied reduceFn on all elements within each window defined by a
* user supplied Window Config.
* ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
* KeyWindowInfo&lt;K&gt; type and the value is of type V.
* KeyWindowInfo&lt;K&gt; type and the value is of type T.
*/
public class GeneralReduceByKeyAndWindowStreamlet<K, V, VR>
extends StreamletImpl<KeyValue<KeyedWindow<K>, VR>> {
private StreamletImpl<V> parent;
private SerializableFunction<V, K> keyExtractor;
public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private WindowConfig windowCfg;
private VR identity;
private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
private T identity;
private SerializableBiFunction<T, R, ? extends T> reduceFn;

public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<V> parent,
SerializableFunction<V, K> keyExtractor,
public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
SerializableFunction<R, K> keyExtractor,
WindowConfig windowCfg,
VR identity,
SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
T identity,
SerializableBiFunction<T, R, ? extends T> reduceFn) {
this.parent = parent;
this.keyExtractor = keyExtractor;
this.windowCfg = windowCfg;
Expand All @@ -63,12 +63,12 @@ public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<V> parent,
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
GeneralReduceByKeyAndWindowOperator<K, V, VR> bolt =
new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
GeneralReduceByKeyAndWindowOperator<R, K, T> bolt =
new GeneralReduceByKeyAndWindowOperator<R, K, T>(keyExtractor, identity, reduceFn);
windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<K, V>(keyExtractor));
new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(left.getName(), left.getStreamId(),
new JoinCustomGrouping<K, R>(leftKeyExtractor))
new JoinCustomGrouping<R, K>(leftKeyExtractor))
.customGrouping(right.getName(), right.getStreamId(),
new JoinCustomGrouping<K, S>(rightKeyExtractor));
new JoinCustomGrouping<S, K>(rightKeyExtractor));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@
* applying user supplied reduceFn on all elements within each window defined by a
* user supplied Window Config.
* ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
* KeyWindowInfo&lt;K&gt; type and the value is of type V.
* KeyWindowInfo&lt;K&gt; type and the value is of type T.
*/
public class ReduceByKeyAndWindowStreamlet<K, V, R>
extends StreamletImpl<KeyValue<KeyedWindow<K>, V>> {
public class ReduceByKeyAndWindowStreamlet<R, K, T>
extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
private SerializableFunction<R, T> valueExtractor;
private WindowConfig windowCfg;
private SerializableBinaryOperator<V> reduceFn;
private SerializableBinaryOperator<T> reduceFn;

public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
SerializableFunction<R, K> keyExtractor,
SerializableFunction<R, V> valueExtractor,
SerializableFunction<R, T> valueExtractor,
WindowConfig windowCfg,
SerializableBinaryOperator<V> reduceFn) {
SerializableBinaryOperator<T> reduceFn) {
this.parent = parent;
this.keyExtractor = keyExtractor;
this.valueExtractor = valueExtractor;
Expand All @@ -63,12 +63,12 @@ public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
ReduceByKeyAndWindowOperator<K, V, R> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
ReduceByKeyAndWindowOperator<R, K, T> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
valueExtractor, reduceFn);
windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<K, R>(keyExtractor));
new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void setUp() {
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testReduceByWindowOperator() {
GeneralReduceByKeyAndWindowOperator<String, KeyValue<String, Integer>, Integer> reduceOperator =
GeneralReduceByKeyAndWindowOperator<KeyValue<String, Integer>, String, Integer> reduceOperator =
getReduceByWindowOperator(12);

TupleWindow tupleWindow = getTupleWindow(3, 5);
Expand Down Expand Up @@ -106,9 +106,9 @@ private TupleWindow getTupleWindow(int nkeys, int count) {


@SuppressWarnings({"rawtypes", "unchecked"})
private GeneralReduceByKeyAndWindowOperator<String, KeyValue<String, Integer>, Integer>
private GeneralReduceByKeyAndWindowOperator<KeyValue<String, Integer>, String, Integer>
getReduceByWindowOperator(Integer identity) {
GeneralReduceByKeyAndWindowOperator<String, KeyValue<String, Integer>, Integer>
GeneralReduceByKeyAndWindowOperator<KeyValue<String, Integer>, String, Integer>
reduceByWindowOperator = new GeneralReduceByKeyAndWindowOperator<>(
x -> x.getKey(), identity, (o, o2) -> o + o2.getValue());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void setUp() {
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testReduceByWindowOperator() {
ReduceByKeyAndWindowOperator<String, Integer, String> reduceOperator =
ReduceByKeyAndWindowOperator<String, String, Integer> reduceOperator =
getReduceByWindowOperator();

TupleWindow tupleWindow = getTupleWindow(3, 5);
Expand Down Expand Up @@ -106,8 +106,8 @@ private TupleWindow getTupleWindow(int nkeys, int count) {


@SuppressWarnings({"rawtypes", "unchecked"})
private ReduceByKeyAndWindowOperator<String, Integer, String> getReduceByWindowOperator() {
ReduceByKeyAndWindowOperator<String, Integer, String> reduceByWindowOperator =
private ReduceByKeyAndWindowOperator<String, String, Integer> getReduceByWindowOperator() {
ReduceByKeyAndWindowOperator<String, String, Integer> reduceByWindowOperator =
new ReduceByKeyAndWindowOperator<>(x -> x, x -> 1, (o, o2) -> o + o2);

reduceByWindowOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class),
Expand Down

0 comments on commit 8cef739

Please sign in to comment.