Skip to content

Commit

Permalink
[scala] [streaming] Base functionality added for streaming scala api
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jan 2, 2015
1 parent 87d699d commit 34353f6
Show file tree
Hide file tree
Showing 13 changed files with 425 additions and 292 deletions.
Expand Up @@ -72,7 +72,6 @@ public class JobGraphBuilder {
private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
private Map<String, byte[]> serializedFunctions;
private Map<String, byte[]> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
private Map<String, Integer> iterationIds;
Expand Down Expand Up @@ -104,7 +103,6 @@ public JobGraphBuilder() {
typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
serializedFunctions = new HashMap<String, byte[]>();
outputSelectors = new HashMap<String, byte[]>();
vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
iterationIds = new HashMap<String, Integer>();
Expand Down Expand Up @@ -133,18 +131,14 @@ public JobGraphBuilder() {
* Output type for serialization
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* @param parallelism
* Number of parallel instances created
*/
public <IN, OUT> void addStreamVertex(String vertexName,
StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo, String operatorName, byte[] serializedFunction,
int parallelism) {
TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) {

addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
serializedFunction, parallelism);
addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism);

StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
inTypeInfo) : null;
Expand All @@ -171,8 +165,6 @@ public <IN, OUT> void addStreamVertex(String vertexName,
* Output type for serialization
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* @param parallelism
* Number of parallel instances created
*/
Expand All @@ -185,7 +177,7 @@ public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> fun
function);

addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName,
serializedFunction, parallelism);
parallelism);
}

/**
Expand All @@ -206,7 +198,7 @@ public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> fun
public void addIterationHead(String vertexName, String iterationHead, Integer iterationID,
int parallelism, long waitTime) {

addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism);
addVertex(vertexName, StreamIterationHead.class, null, null, parallelism);

iterationIds.put(vertexName, iterationID);
iterationIDtoHeadName.put(iterationID, vertexName);
Expand Down Expand Up @@ -247,7 +239,7 @@ public void addIterationTail(String vertexName, String iterationTail, Integer it
throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
}

addVertex(vertexName, StreamIterationTail.class, null, null, null, parallelism);
addVertex(vertexName, StreamIterationTail.class, null, null, parallelism);

iterationIds.put(vertexName, iterationID);
iterationIDtoTailName.put(iterationID, vertexName);
Expand All @@ -264,10 +256,9 @@ public void addIterationTail(String vertexName, String iterationTail, Integer it
public <IN1, IN2, OUT> void addCoTask(String vertexName,
CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo,
String operatorName, byte[] serializedFunction, int parallelism) {
String operatorName, int parallelism) {

addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
serializedFunction, parallelism);
addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);

addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo),
new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>(
Expand All @@ -289,20 +280,16 @@ public <IN1, IN2, OUT> void addCoTask(String vertexName,
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
* @param serializedFunction
* Serialized operator
* @param parallelism
* Number of parallel instances created
*/
private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass,
StreamInvokable<?, ?> invokableObject, String operatorName, byte[] serializedFunction,
int parallelism) {
StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) {

vertexClasses.put(vertexName, vertexClass);
setParallelism(vertexName, parallelism);
invokableObjects.put(vertexName, invokableObject);
operatorNames.put(vertexName, operatorName);
serializedFunctions.put(vertexName, serializedFunction);
outEdgeList.put(vertexName, new ArrayList<String>());
outEdgeType.put(vertexName, new ArrayList<Integer>());
outEdgeNames.put(vertexName, new ArrayList<List<String>>());
Expand Down Expand Up @@ -333,8 +320,6 @@ private void createVertex(String vertexName) {
// Get vertex attributes
Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName);
StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName);
String operatorName = operatorNames.get(vertexName);
byte[] serializedFunction = serializedFunctions.get(vertexName);
int parallelism = vertexParallelism.get(vertexName);
byte[] outputSelector = outputSelectors.get(vertexName);
Map<String, OperatorState<?>> state = operatorStates.get(vertexName);
Expand Down Expand Up @@ -362,7 +347,6 @@ private void createVertex(String vertexName) {
// Set vertex config
config.setUserInvokable(invokableObject);
config.setVertexName(vertexName);
config.setFunction(serializedFunction, operatorName);
config.setOutputSelector(outputSelector);
config.setOperatorStates(state);

Expand Down Expand Up @@ -522,8 +506,8 @@ public <T> void setOutputSelector(String vertexName, byte[] serializedOutputSele
}

/**
* Sets udf operator and TypeSerializerWrapper from one vertex to another,
* used with some sinks.
* Sets TypeSerializerWrapper from one vertex to another, used with some
* sinks.
*
* @param from
* from
Expand All @@ -532,7 +516,6 @@ public <T> void setOutputSelector(String vertexName, byte[] serializedOutputSele
*/
public void setBytesFrom(String from, String to) {
operatorNames.put(to, operatorNames.get(from));
serializedFunctions.put(to, serializedFunctions.get(from));

typeSerializersIn1.put(to, typeSerializersOut1.get(from));
typeSerializersIn2.put(to, typeSerializersOut2.get(from));
Expand Down
Expand Up @@ -47,7 +47,6 @@ public class StreamConfig {
private static final String OUTPUT_SELECTOR = "outputSelector";
private static final String DIRECTED_EMIT = "directedEmit";
private static final String FUNCTION_NAME = "operatorName";
private static final String FUNCTION = "operator";
private static final String VERTEX_NAME = "vertexName";
private static final String SERIALIZEDUDF = "serializedudf";
private static final String USER_FUNCTION = "userfunction";
Expand Down Expand Up @@ -173,21 +172,6 @@ public String getVertexName() {
return config.getString(VERTEX_NAME, null);
}

public void setFunction(byte[] serializedFunction, String functionName) {
if (serializedFunction != null) {
config.setBytes(FUNCTION, serializedFunction);
config.setString(FUNCTION_NAME, functionName);
}
}

public Object getFunction(ClassLoader cl) {
try {
return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
} catch (Exception e) {
throw new RuntimeException("Cannot deserialize invokable object", e);
}
}

public String getFunctionName() {
return config.getString(FUNCTION_NAME, "");
}
Expand Down
Expand Up @@ -17,13 +17,9 @@

package org.apache.flink.streaming.api.datastream;

import java.io.Serializable;
import java.util.List;

import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
Expand Down Expand Up @@ -53,7 +49,7 @@
* The ConnectedDataStream represents a stream for two different data types. It
* can be used to apply transformations like {@link CoMapFunction} on two
* {@link DataStream}s
*
*
* @param <IN1>
* Type of the first input data steam.
* @param <IN2>
Expand Down Expand Up @@ -417,8 +413,9 @@ public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
coMapper.getClass(), 2, null, null);

return addCoFunction("coMap", clean(coMapper), outTypeInfo,
new CoMapInvokable<IN1, IN2, OUT>(clean(coMapper)));
return addCoFunction("coMap", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
clean(coMapper)));

}

/**
Expand All @@ -441,8 +438,8 @@ public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
coFlatMapper.getClass(), 2, null, null);

return addCoFunction("coFlatMap", clean(coFlatMapper), outTypeInfo,
new CoFlatMapInvokable<IN1, IN2, OUT>(clean(coFlatMapper)));
return addCoFunction("coFlatMap", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
clean(coFlatMapper)));
}

/**
Expand All @@ -466,8 +463,8 @@ public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
coReducer.getClass(), 2, null, null);

return addCoFunction("coReduce", clean(coReducer), outTypeInfo,
getReduceInvokable(clean(coReducer)));
return addCoFunction("coReduce", outTypeInfo, getReduceInvokable(clean(coReducer)));

}

/**
Expand Down Expand Up @@ -531,9 +528,9 @@ public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
coWindowFunction.getClass(), 2, null, null);

return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
timestamp1, timestamp2));
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));

}

protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
Expand Down Expand Up @@ -607,27 +604,21 @@ public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
throw new IllegalArgumentException("Slide interval must be positive");
}

return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
timestamp1, timestamp2));
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));

}

protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeInformation<OUT> outTypeInfo,
CoInvokable<IN1, IN2, OUT> functionInvokable) {
public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, OUT> functionInvokable) {

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo);

try {
dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
getInputType1(), getInputType2(), outTypeInfo, functionName,
SerializationUtils.serialize((Serializable) function),
environment.getDegreeOfParallelism());
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
}
dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
getInputType1(), getInputType2(), outTypeInfo, functionName,
environment.getDegreeOfParallelism());

dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
Expand Down

0 comments on commit 34353f6

Please sign in to comment.