Skip to content

Commit

Permalink
[FLINK-2550] Rename SplitDataStream to SplitStream
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Oct 5, 2015
1 parent 7b6e762 commit 9513f0e
Show file tree
Hide file tree
Showing 16 changed files with 45 additions and 44 deletions.
@@ -1,3 +1,4 @@
/*
/* /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -44,7 +45,7 @@
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;


import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
Expand Down Expand Up @@ -112,7 +113,7 @@ public FlinkTopology createTopology() {
} else { } else {
source = env.addSource(spoutWrapper, spoutId, source = env.addSource(spoutWrapper, spoutId,
TypeExtractor.getForClass(SplitStreamType.class)); TypeExtractor.getForClass(SplitStreamType.class));
SplitDataStream splitSource = source.split(new FlinkStormStreamSelector()); SplitStream splitSource = source.split(new FlinkStormStreamSelector());


for (String streamId : sourceStreams.keySet()) { for (String streamId : sourceStreams.keySet()) {
outputStreams.put(streamId, splitSource.select(streamId)); outputStreams.put(streamId, splitSource.select(streamId));
Expand Down Expand Up @@ -246,7 +247,7 @@ public FlinkTopology createTopology() {
new StormBoltWrapper(userBolt, this.outputStreams.get( new StormBoltWrapper(userBolt, this.outputStreams.get(
producerId).get(inputStreamId))); producerId).get(inputStreamId)));


SplitDataStream splitStreams = outputStream SplitStream splitStreams = outputStream
.split(new FlinkStormStreamSelector()); .split(new FlinkStormStreamSelector());


HashMap<String, DataStream> op = new HashMap<String, DataStream>(); HashMap<String, DataStream> op = new HashMap<String, DataStream>();
Expand Down
Expand Up @@ -18,13 +18,13 @@


import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;


/** /**
* Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
* a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
* {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
* {@link SplitDataStream#select(String...) .select(...)}). * {@link SplitStream#select(String...) .select(...)}).
* *
* @param <T> * @param <T>
*/ */
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper; import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/** /**
Expand Down Expand Up @@ -60,7 +60,7 @@ public static void main(final String[] args) throws Exception {
new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0), new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>())); rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));


SplitDataStream<SplitStreamType<Integer>> splitStream = numbers SplitStream<SplitStreamType<Integer>> splitStream = numbers
.split(new FlinkStormStreamSelector<Integer>()); .split(new FlinkStormStreamSelector<Integer>());


DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM); DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.io.Serializable; import java.io.Serializable;


import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;


/** /**
* Interface for defining an OutputSelector for a {@link SplitDataStream} using * Interface for defining an OutputSelector for a {@link SplitStream} using
* the {@link SingleOutputStreamOperator#split} call. Every output object of a * the {@link SingleOutputStreamOperator#split} call. Every output object of a
* {@link SplitDataStream} will run through this operator to select outputs. * {@link SplitStream} will run through this operator to select outputs.
* *
* @param <OUT> * @param <OUT>
* Type parameter of the split values. * Type parameter of the split values.
Expand Down
Expand Up @@ -204,16 +204,16 @@ public DataStream<T> union(DataStream<T>... streams) {
/** /**
* Operator used for directing tuples to specific named outputs using an * Operator used for directing tuples to specific named outputs using an
* {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
* Calling this method on an operator creates a new {@link SplitDataStream}. * Calling this method on an operator creates a new {@link SplitStream}.
* *
* @param outputSelector * @param outputSelector
* The user defined * The user defined
* {@link org.apache.flink.streaming.api.collector.selector.OutputSelector} * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
* for directing the tuples. * for directing the tuples.
* @return The {@link SplitDataStream} * @return The {@link SplitStream}
*/ */
public SplitDataStream<T> split(OutputSelector<T> outputSelector) { public SplitStream<T> split(OutputSelector<T> outputSelector) {
return new SplitDataStream<T>(this, clean(outputSelector)); return new SplitStream<T>(this, clean(outputSelector));
} }


/** /**
Expand Down
Expand Up @@ -23,16 +23,16 @@
import org.apache.flink.streaming.api.transformations.SplitTransformation; import org.apache.flink.streaming.api.transformations.SplitTransformation;


/** /**
* The SplitDataStream represents an operator that has been split using an * The SplitStream represents an operator that has been split using an
* {@link OutputSelector}. Named outputs can be selected using the * {@link OutputSelector}. Named outputs can be selected using the
* {@link #select} function. To apply transformation on the whole output simply * {@link #select} function. To apply transformation on the whole output simply
* call the transformation on the SplitDataStream * call the transformation on the SplitStream
* *
* @param <OUT> The type of the elements in the Stream * @param <OUT> The type of the elements in the Stream
*/ */
public class SplitDataStream<OUT> extends DataStream<OUT> { public class SplitStream<OUT> extends DataStream<OUT> {


protected SplitDataStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) { protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector)); super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
} }


Expand Down
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.WindowMapFunction;
Expand Down Expand Up @@ -457,7 +457,7 @@ public Iterable<String> select(Integer value) {
} }
}; };


SplitDataStream<Integer> split = unionFilter.split(outputSelector); SplitStream<Integer> split = unionFilter.split(outputSelector);
split.select("dummy").addSink(new NoOpSink<Integer>()); split.select("dummy").addSink(new NoOpSink<Integer>());
List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors(); List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
assertEquals(1, outputSelectors.size()); assertEquals(1, outputSelectors.size());
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams; import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
Expand Down Expand Up @@ -212,7 +212,7 @@ public void testmultipleHeadsTailsSimple() {
DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>()); DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>()); DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());


SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5) SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
.map(NoOpIntMap).name("EvenOddSourceMap") .map(NoOpIntMap).name("EvenOddSourceMap")
.split(new EvenOddOutputSelector()); .split(new EvenOddOutputSelector());


Expand Down Expand Up @@ -295,7 +295,7 @@ public void testmultipleHeadsTailsWithTailPartitioning() {
.addSink(new ReceiveCheckNoOpSink<Integer>()); .addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>()); DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());


SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5) SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
.map(NoOpIntMap) .map(NoOpIntMap)
.name("split") .name("split")
.split(new EvenOddOutputSelector()); .split(new EvenOddOutputSelector());
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testFoldOperation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys)); DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));


SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
.keyBy(0) .keyBy(0)
.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() { .fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
@Override @Override
Expand Down
Expand Up @@ -27,7 +27,7 @@
import java.util.Map; import java.util.Map;


import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
Expand Down Expand Up @@ -102,7 +102,7 @@ public void outputSelectorTest() throws Exception {
TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>(); TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
TestListResultSink<Long> allSink = new TestListResultSink<Long>(); TestListResultSink<Long> allSink = new TestListResultSink<Long>();


SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector()); SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
source.select(EVEN).addSink(evenSink); source.select(EVEN).addSink(evenSink);
source.select(ODD, TEN).addSink(oddAndTenSink); source.select(ODD, TEN).addSink(oddAndTenSink);
source.select(EVEN, ODD).addSink(evenAndOddSink); source.select(EVEN, ODD).addSink(evenAndOddSink);
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction;
Expand Down Expand Up @@ -138,7 +138,7 @@ public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception
} }
}).iterate(5000); }).iterate(5000);


SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new SplitStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new
MyOutputSelector()); MyOutputSelector());
it.closeWith(step.select("iterate")); it.closeWith(step.select("iterate"));


Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;


Expand Down Expand Up @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {


// apply the step function to get the next Fibonacci number // apply the step function to get the next Fibonacci number
// increment the counter and split the output with the output selector // increment the counter and split the output with the output selector
SplitDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step()) SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
.split(new MySelector()); .split(new MySelector());


// close the iteration by selecting the tuples that were directed to the // close the iteration by selecting the tuples that were directed to the
Expand Down
Expand Up @@ -729,15 +729,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
* *
* Operator used for directing tuples to specific named outputs using an * Operator used for directing tuples to specific named outputs using an
* OutputSelector. Calling this method on an operator creates a new * OutputSelector. Calling this method on an operator creates a new
* SplitDataStream. * [[SplitStream]].
*/ */
def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector) def split(selector: OutputSelector[T]): SplitStream[T] = javaStream.split(selector)


/** /**
* Creates a new SplitDataStream that contains only the elements satisfying the * Creates a new [[SplitStream]] that contains only the elements satisfying the
* given output selector predicate. * given output selector predicate.
*/ */
def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = { def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
if (fun == null) { if (fun == null) {
throw new NullPointerException("OutputSelector must not be null.") throw new NullPointerException("OutputSelector must not be null.")
} }
Expand Down
Expand Up @@ -18,16 +18,16 @@


package org.apache.flink.streaming.api.scala package org.apache.flink.streaming.api.scala


import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }


/** /**
* The SplitDataStream represents an operator that has been split using an * The SplitStream represents an operator that has been split using an
* {@link OutputSelector}. Named outputs can be selected using the * {@link OutputSelector}. Named outputs can be selected using the
* {@link #select} function. To apply a transformation on the whole output simply call * {@link #select} function. To apply a transformation on the whole output simply call
* the appropriate method on this stream. * the appropriate method on this stream.
* *
*/ */
class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){ class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){


/** /**
* Sets the output names for which the next operator will receive values. * Sets the output names for which the next operator will receive values.
Expand Down
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreato
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams } import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream } import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
import language.implicitConversions import language.implicitConversions
Expand All @@ -44,8 +44,8 @@ package object scala {
implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
new WindowedDataStream[R](javaWStream) new WindowedDataStream[R](javaWStream)


implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] =
new SplitDataStream[R](javaStream) new SplitStream[R](javaStream)


implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]): implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream) ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
Expand Down
Expand Up @@ -118,9 +118,9 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
classOf[ConnectedStreams[_,_]]) classOf[ConnectedStreams[_,_]])


checkMethods( checkMethods(
"SplitDataStream", "SplitDataStream", "SplitStream", "SplitStream",
classOf[org.apache.flink.streaming.api.datastream.SplitDataStream[_]], classOf[org.apache.flink.streaming.api.datastream.SplitStream[_]],
classOf[SplitDataStream[_]]) classOf[SplitStream[_]])


checkMethods( checkMethods(
"WindowedStream", "WindowedStream", "WindowedStream", "WindowedStream",
Expand Down

0 comments on commit 9513f0e

Please sign in to comment.