Skip to content

Commit

Permalink
[FLINK-1909] [streaming] Type handling refactor for sources + scala api
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Apr 22, 2015
1 parent bad77a3 commit 6df1dd2
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 100 deletions.
Expand Up @@ -123,7 +123,7 @@ public DataStream<IN2> getSecond() {
* *
* @return The type of the first input * @return The type of the first input
*/ */
public TypeInformation<IN1> getInputType1() { public TypeInformation<IN1> getType1() {
return dataStream1.getType(); return dataStream1.getType();
} }


Expand All @@ -132,7 +132,7 @@ public TypeInformation<IN1> getInputType1() {
* *
* @return The type of the second input * @return The type of the second input
*/ */
public TypeInformation<IN2> getInputType2() { public TypeInformation<IN2> getType2() {
return dataStream2.getType(); return dataStream2.getType();
} }


Expand Down Expand Up @@ -244,10 +244,10 @@ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) { public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {


TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
CoMapFunction.class, false, true, getInputType1(), getInputType2(), CoMapFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true); Utils.getCallLocationName(), true);


return addCoFunction("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>( return transform("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
clean(coMapper))); clean(coMapper)));


} }
Expand All @@ -271,10 +271,10 @@ CoMapFunction.class, false, true, getInputType1(), getInputType2(),
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) { CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {


TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(), CoFlatMapFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true); Utils.getCallLocationName(), true);


return addCoFunction("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>( return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
clean(coFlatMapper))); clean(coFlatMapper)));
} }


Expand All @@ -297,10 +297,10 @@ CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) { public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {


TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer, TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
CoReduceFunction.class, false, true, getInputType1(), getInputType2(), CoReduceFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true); Utils.getCallLocationName(), true);


return addCoFunction("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer))); return transform("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));


} }


Expand Down Expand Up @@ -365,10 +365,10 @@ CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
} }


TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction, TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
CoWindowFunction.class, false, true, getInputType1(), getInputType2(), CoWindowFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true); Utils.getCallLocationName(), true);


return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>( return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));


} }
Expand Down Expand Up @@ -397,20 +397,20 @@ protected <OUT> CoStreamOperator<IN1, IN2, OUT> getReduceOperator(
throw new IllegalArgumentException("Slide interval must be positive"); throw new IllegalArgumentException("Slide interval must be positive");
} }


return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>( return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));


} }


public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName, public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
TypeInformation<OUT> outTypeInfo, CoStreamOperator<IN1, IN2, OUT> operator) { TypeInformation<OUT> outTypeInfo, CoStreamOperator<IN1, IN2, OUT> operator) {


@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator( SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo, operator); environment, functionName, outTypeInfo, operator);


dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getInputType1(), dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(),
getInputType2(), outTypeInfo, functionName); getType2(), outTypeInfo, functionName);


dataStream1.connectGraph(dataStream1, returnStream.getId(), 1); dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2); dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
Expand Down
Expand Up @@ -93,7 +93,7 @@
* <ul> * <ul>
* <li>{@link DataStream#map},</li> * <li>{@link DataStream#map},</li>
* <li>{@link DataStream#filter}, or</li> * <li>{@link DataStream#filter}, or</li>
* <li>{@link DataStream#aggregate}.</li> * <li>{@link DataStream#sum}.</li>
* </ul> * </ul>
* *
* @param <OUT> * @param <OUT>
Expand Down
Expand Up @@ -133,7 +133,7 @@ protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction)
return reduced.discretizedStream return reduced.discretizedStream
.groupBy(new WindowKey<OUT>()) .groupBy(new WindowKey<OUT>())
.connect(numOfParts.groupBy(0)) .connect(numOfParts.groupBy(0))
.addCoFunction( .transform(
"CoFlatMap", "CoFlatMap",
reduced.discretizedStream.getType(), reduced.discretizedStream.getType(),
new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>( new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
Expand Down
Expand Up @@ -22,18 +22,18 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;


import com.esotericsoftware.kryo.Serializer;

import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client; import org.apache.flink.client.program.Client;
Expand All @@ -44,6 +44,7 @@
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.functions.source.FileReadFunction; import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction; import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
Expand All @@ -53,11 +54,12 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSource;


import com.esotericsoftware.kryo.Serializer;

/** /**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
* necessary to construct streaming topologies. * necessary to construct streaming topologies.
Expand Down Expand Up @@ -420,7 +422,7 @@ public DataStreamSource<String> readTextFile(String filePath, String charsetName
public DataStream<String> readFileStream(String filePath, long intervalMillis, public DataStream<String> readFileStream(String filePath, long intervalMillis,
WatchType watchType) { WatchType watchType) {
DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction( DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
filePath, intervalMillis, watchType), null, "File Stream"); filePath, intervalMillis, watchType), "File Stream");


return source.flatMap(new FileReadFunction()); return source.flatMap(new FileReadFunction());
} }
Expand Down Expand Up @@ -448,7 +450,7 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data


SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);


return addSource(function, outTypeInfo, "Elements source"); return addSource(function, "Elements source").returns(outTypeInfo);
} }


/** /**
Expand All @@ -475,7 +477,7 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collectio
TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next()); TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);


return addSource(function, outTypeInfo, "Collection Source"); return addSource(function, "Collection Source").returns(outTypeInfo);
} }


/** /**
Expand Down Expand Up @@ -508,7 +510,7 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collectio
*/ */
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
long maxRetry) { long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null, return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream"); "Socket Stream");
} }


Expand Down Expand Up @@ -560,13 +562,13 @@ public DataStreamSource<Long> generateSequence(long from, long to) {
if (from > to) { if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end"); throw new IllegalArgumentException("Start of sequence must not be greater than the end");
} }
return addSource(new GenSequenceFunction(from, to), null, "Sequence Source"); return addSource(new GenSequenceFunction(from, to), "Sequence Source");
} }


private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat, private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) { TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function, null, "File Source"); DataStreamSource<String> returnStream = addSource(function, "File Source");
streamGraph.setInputFormat(returnStream.getId(), inputFormat); streamGraph.setInputFormat(returnStream.getId(), inputFormat);
return returnStream; return returnStream;
} }
Expand All @@ -588,31 +590,7 @@ private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputForma
* @return the data stream constructed * @return the data stream constructed
*/ */
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
return addSource(function, null); return addSource(function, "Custom source");
}

/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(SourceFunction)} </p> By default sources have a
* parallelism of 1. To enable parallel execution, the user defined source
* should implement {@link ParallelSourceFunction} or extend
* {@link RichParallelSourceFunction}. In these cases the resulting source
* will have the parallelism of the environment. To change this afterwards
* call {@link DataStreamSource#setParallelism(int)}
*
* @param function
* the user defined function
* @param outTypeInfo
* the user defined type information for the stream
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo) {
return addSource(function, outTypeInfo, "Custom Source");
} }


/** /**
Expand All @@ -623,24 +601,25 @@ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
* *
* @param function * @param function
* the user defined function * the user defined function
* @param outTypeInfo
* the user defined type information for the stream
* @param sourceName * @param sourceName
* Name of the data source * Name of the data source
* @param <OUT> * @param <OUT>
* type of the returned stream * type of the returned stream
* @return the data stream constructed * @return the data stream constructed
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
TypeInformation<OUT> outTypeInfo, String sourceName) {
TypeInformation<OUT> outTypeInfo;


if (outTypeInfo == null) { if (function instanceof GenericSourceFunction) {
if (function instanceof GenericSourceFunction) { outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
outTypeInfo = ((GenericSourceFunction<OUT>) function).getType(); } else {
} else { try {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
function.getClass(), 0, null, null); function.getClass(), 0, null, null);
} catch (InvalidTypesException e) {
outTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
} }
} }


Expand All @@ -649,8 +628,8 @@ private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
ClosureCleaner.clean(function, true); ClosureCleaner.clean(function, true);
StreamOperator<OUT, OUT> sourceOperator = new StreamSource<OUT>(function); StreamOperator<OUT, OUT> sourceOperator = new StreamSource<OUT>(function);


return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, isParallel,
isParallel, sourceName); sourceName);
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction; import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
import org.apache.flink.streaming.api.functions.co.CoWindowFunction; import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.junit.Test; import org.junit.Test;


Expand All @@ -42,6 +43,12 @@ public class TypeFillTest {
public void test() { public void test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


try {
env.addSource(new TestSource<Integer>()).print();
fail();
} catch (Exception e) {
}

DataStream<Long> source = env.generateSequence(1, 10); DataStream<Long> source = env.generateSequence(1, 10);


try { try {
Expand Down Expand Up @@ -76,6 +83,7 @@ public void test() {
} catch (Exception e) { } catch (Exception e) {
} }


env.addSource(new TestSource<Integer>()).returns("Integer");
source.map(new TestMap<Long, Long>()).returns(Long.class).print(); source.map(new TestMap<Long, Long>()).returns(Long.class).print();
source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print(); source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print(); source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
Expand Down Expand Up @@ -106,6 +114,19 @@ public String map(Long value) throws Exception {


} }


private class TestSource<T> implements SourceFunction<T> {

@Override
public void run(Collector<T> collector) throws Exception {

}

@Override
public void cancel() {
}

}

private class TestMap<T, O> implements MapFunction<T, O> { private class TestMap<T, O> implements MapFunction<T, O> {
@Override @Override
public O map(T value) throws Exception { public O map(T value) throws Exception {
Expand Down

0 comments on commit 6df1dd2

Please sign in to comment.