Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
[Java Streamlet API] Support abstractions on Streamlet Operators (#3112)
Browse files Browse the repository at this point in the history
* [Java Streamlet API] Support Abstractions on Streamlet Operators
  • Loading branch information
erenavsarogullari authored and nwangtw committed Nov 19, 2018
1 parent cbf7e51 commit 8f44c11
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 115 deletions.
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/


package org.apache.heron.streamlet.impl;

import java.io.Serializable;
Expand Down
Expand Up @@ -17,10 +17,8 @@
* under the License.
*/


package org.apache.heron.streamlet.impl;


import java.time.Duration;

import org.apache.heron.api.bolt.BaseWindowedBolt;
Expand All @@ -42,7 +40,7 @@ private enum WindowType { TIME, COUNT, CUSTOM }
private TriggerPolicy<Tuple, ?> triggerPolicy;
private EvictionPolicy<Tuple, ?> evictionPolicy;

public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
this.windowType = WindowType.TIME;
this.windowDuration = windowDuration;
this.slidingIntervalDuration = slidingIntervalDuration;
Expand Down
Expand Up @@ -17,13 +17,8 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializablePredicate;
Expand All @@ -38,18 +33,10 @@ public class FilterOperator<R> extends StreamletOperator<R, R> {
private static final long serialVersionUID = -4748646871471052706L;
private SerializablePredicate<? super R> filterFn;

private OutputCollector collector;

public FilterOperator(SerializablePredicate<? super R> filterFn) {
this.filterFn = filterFn;
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
Expand Down
Expand Up @@ -17,13 +17,8 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializableFunction;
Expand All @@ -38,19 +33,11 @@ public class FlatMapOperator<R, T> extends StreamletOperator<R, T> {
private static final long serialVersionUID = -2418329215159618998L;
private SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn;

private OutputCollector collector;

public FlatMapOperator(
SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn) {
this.flatMapFn = flatMapFn;
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
Expand Down
Expand Up @@ -17,14 +17,11 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
Expand All @@ -45,7 +42,6 @@ public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWind
private SerializableFunction<V, K> keyExtractor;
private VR identity;
private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
private OutputCollector collector;

public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor, VR identity,
SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
Expand All @@ -54,12 +50,6 @@ public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtract
this.reduceFn = reduceFn;
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
Expand Down
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
Expand All @@ -26,8 +25,6 @@
import java.util.Map;

import org.apache.heron.api.Pair;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
Expand Down Expand Up @@ -58,7 +55,6 @@ public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR>
private SerializableFunction<V2, K> rightKeyExtractor;
// The user supplied join function
private SerializableBiFunction<V1, V2, ? extends VR> joinFn;
private OutputCollector collector;

public JoinOperator(JoinType joinType, String leftComponent, String rightComponent,
SerializableFunction<V1, K> leftKeyExtractor,
Expand All @@ -72,12 +68,6 @@ public JoinOperator(JoinType joinType, String leftComponent, String rightCompone
this.joinFn = joinFn;
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> cfg = super.getComponentConfiguration();
Expand Down
Expand Up @@ -17,13 +17,8 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializableFunction;
Expand All @@ -37,18 +32,10 @@ public class MapOperator<R, T> extends StreamletOperator<R, T> {
private static final long serialVersionUID = -1303096133107278700L;
private SerializableFunction<? super R, ? extends T> mapFn;

private OutputCollector collector;

public MapOperator(SerializableFunction<? super R, ? extends T> mapFn) {
this.mapFn = mapFn;
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
Expand Down
Expand Up @@ -17,14 +17,11 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
Expand All @@ -41,11 +38,11 @@
* function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
*/
public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {

private static final long serialVersionUID = 2833576046687750496L;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
private SerializableBinaryOperator<V> reduceFn;
private OutputCollector collector;

public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
SerializableFunction<R, V> valueExtractor,
Expand All @@ -55,12 +52,6 @@ public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
this.reduceFn = reduceFn;
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
Expand Down
Expand Up @@ -17,11 +17,14 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Map;

import org.apache.heron.api.bolt.BaseRichBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.streamlet.IStreamletRichOperator;

Expand All @@ -35,6 +38,16 @@ public abstract class StreamletOperator<R, T>
private static final long serialVersionUID = 8524238140745238942L;
private static final String OUTPUT_FIELD_NAME = "output";

protected OutputCollector collector;

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map<String, Object> map,
TopologyContext topologyContext,
OutputCollector outputCollector) {
collector = outputCollector;
}

/**
* The operators implementing streamlet functionality have some properties.
* 1. They all output only one stream
Expand Down
Expand Up @@ -17,11 +17,14 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Map;

import org.apache.heron.api.bolt.BaseWindowedBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.streamlet.IStreamletWindowOperator;

Expand All @@ -32,8 +35,18 @@
public abstract class StreamletWindowOperator<R, T>
extends BaseWindowedBolt
implements IStreamletWindowOperator<R, T> {

private static final long serialVersionUID = -4836560876041237959L;
private static final String OUTPUT_FIELD_NAME = "output";
protected OutputCollector collector;

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map<String, Object> map,
TopologyContext topologyContext,
OutputCollector outputCollector) {
collector = outputCollector;
}

/**
* The operators implementing streamlet functionality have some properties.
Expand Down
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.io.Serializable;
Expand All @@ -41,10 +40,10 @@
*/
public class TransformOperator<R, T> extends StreamletOperator<R, T>
implements IStatefulComponent<Serializable, Serializable> {

private static final long serialVersionUID = 429297144878185182L;
private SerializableTransformer<? super R, ? extends T> serializableTransformer;

private OutputCollector collector;
private State<Serializable, Serializable> state;

public TransformOperator(
Expand All @@ -71,7 +70,7 @@ public void cleanup() {
public void prepare(Map<String, Object> map,
TopologyContext topologyContext,
OutputCollector outputCollector) {
collector = outputCollector;
super.prepare(map, topologyContext, outputCollector);
Context context = new ContextImpl(topologyContext, map, state);
serializableTransformer.setup(context);
}
Expand Down
Expand Up @@ -17,13 +17,8 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;

Expand All @@ -33,17 +28,10 @@
*/
public class UnionOperator<I> extends StreamletOperator<I, I> {
private static final long serialVersionUID = -7326832064961413315L;
private OutputCollector collector;

public UnionOperator() {
}

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}

@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
Expand Down
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.sinks;

import java.io.Serializable;
Expand All @@ -39,9 +38,9 @@
*/
public class ComplexSink<R> extends StreamletOperator<R, R>
implements IStatefulComponent<Serializable, Serializable> {

private static final long serialVersionUID = 8717991188885786658L;
private Sink<R> sink;
private OutputCollector collector;
private State<Serializable, Serializable> state;

public ComplexSink(Sink<R> sink) {
Expand Down

0 comments on commit 8f44c11

Please sign in to comment.