Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public DataStream<IN2> getSecond() {
*
* @return The type of the first input
*/
public TypeInformation<IN1> getInputType1() {
public TypeInformation<IN1> getType1() {
return dataStream1.getType();
}

Expand All @@ -132,7 +132,7 @@ public TypeInformation<IN1> getInputType1() {
*
* @return The type of the second input
*/
public TypeInformation<IN2> getInputType2() {
public TypeInformation<IN2> getType2() {
return dataStream2.getType();
}

Expand Down Expand Up @@ -244,10 +244,10 @@ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
CoMapFunction.class, false, true, getInputType1(), getInputType2(),
CoMapFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
return transform("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
clean(coMapper)));

}
Expand All @@ -271,10 +271,10 @@ CoMapFunction.class, false, true, getInputType1(), getInputType2(),
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
CoFlatMapFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
clean(coFlatMapper)));
}

Expand All @@ -297,10 +297,10 @@ CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
CoReduceFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
return transform("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));

}

Expand Down Expand Up @@ -365,10 +365,10 @@ CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
}

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
CoWindowFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));

}
Expand Down Expand Up @@ -397,20 +397,20 @@ protected <OUT> CoStreamOperator<IN1, IN2, OUT> getReduceOperator(
throw new IllegalArgumentException("Slide interval must be positive");
}

return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));

}

public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
TypeInformation<OUT> outTypeInfo, CoStreamOperator<IN1, IN2, OUT> operator) {

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo, operator);

dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getInputType1(),
getInputType2(), outTypeInfo, functionName);
dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(),
getType2(), outTypeInfo, functionName);

dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
* <ul>
* <li>{@link DataStream#map},</li>
* <li>{@link DataStream#filter}, or</li>
* <li>{@link DataStream#aggregate}.</li>
* <li>{@link DataStream#sum}.</li>
* </ul>
*
* @param <OUT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction)
return reduced.discretizedStream
.groupBy(new WindowKey<OUT>())
.connect(numOfParts.groupBy(0))
.addCoFunction(
.transform(
"CoFlatMap",
reduced.discretizedStream.getType(),
new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import java.util.Collection;
import java.util.List;

import com.esotericsoftware.kryo.Serializer;

import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
Expand All @@ -44,6 +44,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
Expand All @@ -53,11 +54,12 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;

import com.esotericsoftware.kryo.Serializer;

/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
* necessary to construct streaming topologies.
Expand Down Expand Up @@ -420,7 +422,7 @@ public DataStreamSource<String> readTextFile(String filePath, String charsetName
public DataStream<String> readFileStream(String filePath, long intervalMillis,
WatchType watchType) {
DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
filePath, intervalMillis, watchType), null, "File Stream");
filePath, intervalMillis, watchType), "File Stream");

return source.flatMap(new FileReadFunction());
}
Expand Down Expand Up @@ -448,7 +450,7 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data

SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);

return addSource(function, outTypeInfo, "Elements source");
return addSource(function, "Elements source").returns(outTypeInfo);
}

/**
Expand All @@ -475,7 +477,7 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collectio
TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);

return addSource(function, outTypeInfo, "Collection Source");
return addSource(function, "Collection Source").returns(outTypeInfo);
}

/**
Expand Down Expand Up @@ -508,7 +510,7 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collectio
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null,
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}

Expand Down Expand Up @@ -560,13 +562,13 @@ public DataStreamSource<Long> generateSequence(long from, long to) {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
return addSource(new GenSequenceFunction(from, to), null, "Sequence Source");
return addSource(new GenSequenceFunction(from, to), "Sequence Source");
}

private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function, null, "File Source");
DataStreamSource<String> returnStream = addSource(function, "File Source");
streamGraph.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
Expand All @@ -588,31 +590,7 @@ private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputForma
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
return addSource(function, null);
}

/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(SourceFunction)} </p> By default sources have a
* parallelism of 1. To enable parallel execution, the user defined source
* should implement {@link ParallelSourceFunction} or extend
* {@link RichParallelSourceFunction}. In these cases the resulting source
* will have the parallelism of the environment. To change this afterwards
* call {@link DataStreamSource#setParallelism(int)}
*
* @param function
* the user defined function
* @param outTypeInfo
* the user defined type information for the stream
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo) {
return addSource(function, outTypeInfo, "Custom Source");
return addSource(function, "Custom source");
}

/**
Expand All @@ -623,24 +601,25 @@ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
*
* @param function
* the user defined function
* @param outTypeInfo
* the user defined type information for the stream
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
@SuppressWarnings("unchecked")
private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo, String sourceName) {
private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {

TypeInformation<OUT> outTypeInfo;

if (outTypeInfo == null) {
if (function instanceof GenericSourceFunction) {
outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
} else {
if (function instanceof GenericSourceFunction) {
outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
} else {
try {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
function.getClass(), 0, null, null);
} catch (InvalidTypesException e) {
outTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this?
I suspect the exception will now be thrown somewhere else?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same mechanism that all flink operators use to allow missing types that can be filled with .returns(..) afterwards.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see

}
}

Expand All @@ -649,8 +628,8 @@ private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
ClosureCleaner.clean(function, true);
StreamOperator<OUT, OUT> sourceOperator = new StreamSource<OUT>(function);

return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator,
isParallel, sourceName);
return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, isParallel,
sourceName);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;

Expand All @@ -42,6 +43,12 @@ public class TypeFillTest {
public void test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

try {
env.addSource(new TestSource<Integer>()).print();
fail();
} catch (Exception e) {
}

DataStream<Long> source = env.generateSequence(1, 10);

try {
Expand Down Expand Up @@ -76,6 +83,7 @@ public void test() {
} catch (Exception e) {
}

env.addSource(new TestSource<Integer>()).returns("Integer");
source.map(new TestMap<Long, Long>()).returns(Long.class).print();
source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
Expand Down Expand Up @@ -106,6 +114,19 @@ public String map(Long value) throws Exception {

}

private class TestSource<T> implements SourceFunction<T> {

@Override
public void run(Collector<T> collector) throws Exception {

}

@Override
public void cancel() {
}

}

private class TestMap<T, O> implements MapFunction<T, O> {
@Override
public O map(T value) throws Exception {
Expand Down
Loading