From a521c83eb31b653f0a4bfc9da58837a587a378c4 Mon Sep 17 00:00:00 2001 From: Yangjun Wang Date: Sat, 5 Dec 2015 03:16:49 +0200 Subject: [PATCH 1/6] [FLINK-3109]Join two streams with two different buffer time -- Java implementation --- .../api/datastream/JoinedStreams.java | 191 ++++++++++- .../api/operators/StreamJoinOperator.java | 305 ++++++++++++++++++ .../windowing/CoGroupJoinITCase.java | 103 ++++++ 3 files changed, 595 insertions(+), 4 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java index cff93553717d3..85dfc50e5fb22 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -24,9 +24,15 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.operators.StreamJoinOperator; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -58,6 +64,34 @@ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) * .apply(new MyJoinFunction()); * } + * + * =========================== + * + * {@code JoinedStreams} supports join two {@link DataStream DataStreams} without one window limit. + * + *

+ * To finalize the join operation you also need to specify a {@link KeySelector} and a {@link Time} + * for both the first and second input. + * If timeCharacteristic is TimeCharacteristic.EventTime, you also need to specify a {@link TimestampExtractor} + * for both the first and second input. + * + *

+ * Example: + * + *

 {@code
+ * DataStream> one = ...;
+ * DataStream> twp = ...;
+ *
+ * DataStream result = one.join(two)
+ *     .where(new MyFirstKeySelector())
+ *     .assignTimestamps(timestampExtractor1)
+ *     .buffer(Time.of(20, TimeUnit.SECONDS))
+ *     .equalTo(new MyFirstKeySelector())
+ *     .assignTimestamps(timestampExtractor2)
+ *     .buffer(Time.of(5, TimeUnit.SECONDS))
+ *     .apply(new MyJoinFunction());
+ * } 
+ * */ public class JoinedStreams { @@ -67,6 +101,8 @@ public class JoinedStreams { /** The second input stream */ private final DataStream input2; + /** The parallelism of joined stream */ + private final int parallelism; /** * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group. * @@ -76,6 +112,7 @@ public class JoinedStreams { public JoinedStreams(DataStream input1, DataStream input2) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); + this.parallelism = Math.max(input1.getParallelism(), input2.getParallelism()); } /** @@ -136,6 +173,63 @@ public WithWindow window(WindowAssigner(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } } + + public WithOneBuffer buffer(Time time){ + return new WithOneBuffer(time, keyType); + } + + // -------------------------------------------------------------------- + + /** + * A join operation that has {@link KeySelector KeySelectors} + * and {@link Time buffers} defined for both inputs. + */ + public class WithOneBuffer { + private final Time bufferSize1; + private final TypeInformation keyType; + + WithOneBuffer(Time time, TypeInformation keyType) { + this.bufferSize1 = time; + this.keyType = keyType; + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public EqualTo equalTo(KeySelector keySelector) { + TypeInformation otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); + if (!otherKey.equals(this.keyType)) { + throw new IllegalArgumentException("The keys for the two inputs are not equal: " + + "first key = " + this.keyType + " , second key = " + otherKey); + } + + return new EqualTo(input2.clean(keySelector)); + } + + // -------------------------------------------------------------------- + + /** + * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs. + */ + public class EqualTo { + + private final KeySelector keySelector2; + + EqualTo(KeySelector keySelector2) { + this.keySelector2 = requireNonNull(keySelector2); + } + + /** + * Specifies the window1 on which the co-group operation works. + */ + public WithTwoBuffers buffer(Time time) { + return new WithTwoBuffers<>(input1, input2, + keySelector1, keySelector2, + bufferSize1, time); + } + } + } + } // ------------------------------------------------------------------------ @@ -278,11 +372,100 @@ public DataStream apply(JoinFunction function, TypeInformation } } - - // ------------------------------------------------------------------------ - // Implementation of the functions - // ------------------------------------------------------------------------ + + /** + * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as + * well as a {@link WindowAssigner}. + * Doesn't support trigger and evictor + * + * @param Type of the elements from the first input + * @param Type of the elements from the second input + * @param Type of the key. This must be the same for both inputs + */ + public class WithTwoBuffers { + + private final DataStream input1; + private final DataStream input2; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + + private final Time time1; + private final Time time2; + + protected WithTwoBuffers(DataStream input1, + DataStream input2, + KeySelector keySelector1, + KeySelector keySelector2, + Time time1, + Time time2 ) { + + this.input1 = requireNonNull(input1); + this.input2 = requireNonNull(input2); + + this.keySelector1 = requireNonNull(keySelector1); + this.keySelector2 = requireNonNull(keySelector2); + + this.time1 = requireNonNull(time1); + this.time2 = requireNonNull(time2); + + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return input1.getExecutionEnvironment(); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window1. + */ + public DataStream apply(JoinFunction function) { + StreamExecutionEnvironment env = getExecutionEnvironment(); + function = env.clean(function); + boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + + TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + JoinFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "Join", + false); + + StreamJoinOperator joinOperator + = new StreamJoinOperator<>( + function, + keySelector1, + keySelector2, + time1.toMilliseconds(), + time2.toMilliseconds(), + input1.getType().createSerializer(getExecutionEnvironment().getConfig()), + input2.getType().createSerializer(getExecutionEnvironment().getConfig()) + ).enableSetProcessingTime(enableSetProcessingTime); + + TwoInputTransformation twoInputTransformation + = new TwoInputTransformation<>( + input1.keyBy(keySelector1).getTransformation(), + input2.keyBy(keySelector2).getTransformation(), + "Join", + joinOperator, + resultType, + parallelism + ); + return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); + } + + } + + + // ------------------------------------------------------------------------ + // Implementation of the functions + // ------------------------------------------------------------------------ + + // ------------------------------------------------------------------------ /** * CoGroup function that does a nested-loop join to get the join result. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java new file mode 100644 index 0000000000000..048d2b09a77e3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + + +public class StreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private static final long serialVersionUID = 8650694601687319011L; + private static final Logger LOG = LoggerFactory.getLogger(StreamJoinOperator.class); + + private HeapWindowBuffer stream1Buffer; + private HeapWindowBuffer stream2Buffer; + private final KeySelector keySelector1; + private final KeySelector keySelector2; + private long stream1WindowLength; + private long stream2WindowLength; + + protected transient long currentWatermark1 = -1L; + protected transient long currentWatermark2 = -1L; + protected transient long currentWatermark = -1L; + + private TypeSerializer inputSerializer1; + private TypeSerializer inputSerializer2; + /** + * If this is true. The current processing time is set as the timestamp of incoming elements. + * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} + * if eviction should happen based on processing time. + */ + private boolean setProcessingTime = false; + + public StreamJoinOperator(JoinFunction userFunction, + KeySelector keySelector1, + KeySelector keySelector2, + long stream1WindowLength, + long stream2WindowLength, + TypeSerializer inputSerializer1, + TypeSerializer inputSerializer2) { + super(userFunction); + this.keySelector1 = requireNonNull(keySelector1); + this.keySelector2 = requireNonNull(keySelector2); + + this.stream1WindowLength = requireNonNull(stream1WindowLength); + this.stream2WindowLength = requireNonNull(stream2WindowLength); + + this.inputSerializer1 = requireNonNull(inputSerializer1); + this.inputSerializer2 = requireNonNull(inputSerializer2); + } + + @Override + public void open() throws Exception { + super.open(); + if (null == inputSerializer1 || null == inputSerializer2) { + throw new IllegalStateException("Input serializer was not set."); + } + + this.stream1Buffer = new HeapWindowBuffer.Factory().create(); + this.stream2Buffer = new HeapWindowBuffer.Factory().create(); + } + + /** + * @param element record of stream1 + * @throws Exception + */ + @Override + public void processElement1(StreamRecord element) throws Exception { + if (setProcessingTime) { + element.replace(element.getValue(), System.currentTimeMillis()); + } + stream1Buffer.storeElement(element); + + if (setProcessingTime) { + IN1 item1 = element.getValue(); + long time1 = element.getTimestamp(); + + int expiredDataNum = 0; + for (StreamRecord record2 : stream2Buffer.getElements()) { + IN2 item2 = record2.getValue(); + long time2 = record2.getTimestamp(); + if (time2 < time1 && time2 + this.stream2WindowLength >= time1) { + if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { + output.collect(new StreamRecord<>(userFunction.join(item1, item2))); + } + } else { + expiredDataNum++; + } + } + // clean data + stream2Buffer.removeElements(expiredDataNum); + } + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + if (setProcessingTime) { + element.replace(element.getValue(), System.currentTimeMillis()); + } + stream2Buffer.storeElement(element); + + if (setProcessingTime) { + IN2 item2 = element.getValue(); + long time2 = element.getTimestamp(); + + int expiredDataNum = 0; + for (StreamRecord record1 : stream1Buffer.getElements()) { + IN1 item1 = record1.getValue(); + long time1 = record1.getTimestamp(); + if (time1 <= time2 && time1 + this.stream1WindowLength >= time2) { + if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { + output.collect(new StreamRecord<>(userFunction.join(item1, item2))); + } + } else { + expiredDataNum++; + } + } + // clean data + stream1Buffer.removeElements(expiredDataNum); + } + } + + /** + * Process join operator on element during [currentWaterMark, watermark) + * @param watermark + * @throws Exception + */ + private void processWatermark(long watermark) throws Exception{ + System.out.println("Watermark:" + String.valueOf(watermark)); + + if(setProcessingTime) { + return; + } + // process elements after current watermark1 and lower than mark + for (StreamRecord record1 : stream1Buffer.getElements()) { + if(record1.getTimestamp() >= this.currentWatermark + && record1.getTimestamp() < watermark){ + for (StreamRecord record2 : stream2Buffer.getElements()) { + if(keySelector1.getKey(record1.getValue()).equals(keySelector2.getKey(record2.getValue()))) { + if (record1.getTimestamp() >= record2.getTimestamp() + && record2.getTimestamp() + this.stream2WindowLength >= record1.getTimestamp()) { + output.collect(new StreamRecord<>(userFunction.join(record1.getValue(), record2.getValue()))); + } + } + } + } + } + + for (StreamRecord record2 : stream2Buffer.getElements()) { + if(record2.getTimestamp() >= this.currentWatermark + && record2.getTimestamp() < watermark){ + for (StreamRecord record1 : stream1Buffer.getElements()) { + if(keySelector1.getKey(record1.getValue()).equals(keySelector2.getKey(record2.getValue()))) { + if (record2.getTimestamp() > record1.getTimestamp() + && record1.getTimestamp() + this.stream1WindowLength >= record2.getTimestamp()) { + output.collect(new StreamRecord<>(userFunction.join(record1.getValue(), record2.getValue()))); + } + } + } + } + } + + // clean data + int stream1Expired = 0; + for (StreamRecord record1 : stream1Buffer.getElements()) { + if (record1.getTimestamp() + this.stream1WindowLength < watermark) { + stream1Expired++; + } else { + break; + } + } + stream1Buffer.removeElements(stream1Expired); + + int stream2Expired = 0; + for (StreamRecord record2 : stream2Buffer.getElements()) { + if (record2.getTimestamp() + this.stream2WindowLength < watermark) { + stream2Expired++; + } else { + break; + } + } + stream2Buffer.removeElements(stream2Expired); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + long watermark = Math.min(mark.getTimestamp(), currentWatermark2); + // process elements [currentWatermark, watermark) + processWatermark(watermark); + + output.emitWatermark(mark); + this.currentWatermark = watermark; + this.currentWatermark1 = mark.getTimestamp(); + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + long watermark = Math.min(mark.getTimestamp(), currentWatermark1); + // process elements [currentWatermark, watermark) + processWatermark(watermark); + + output.emitWatermark(mark); + this.currentWatermark = watermark; + this.currentWatermark2 = mark.getTimestamp(); + } + + /** + * When this flag is enabled the current processing time is set as the timestamp of elements + * upon arrival. This must be used, for example, when using the + * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing + * time semantics. + */ + public StreamJoinOperator enableSetProcessingTime(boolean setProcessingTime) { + this.setProcessingTime = setProcessingTime; + return this; + } + + // ------------------------------------------------------------------------ + // checkpointing and recovery + // ------------------------------------------------------------------------ + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + + // we write the panes with the key/value maps into the stream + StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + + out.writeLong(stream1WindowLength); + out.writeLong(stream2WindowLength); + + MultiplexingStreamRecordSerializer recordSerializer1 = new MultiplexingStreamRecordSerializer<>(inputSerializer1); + out.writeInt(stream1Buffer.size()); + for (StreamRecord element: stream1Buffer.getElements()) { + recordSerializer1.serialize(element, out); + } + + MultiplexingStreamRecordSerializer recordSerializer2 = new MultiplexingStreamRecordSerializer<>(inputSerializer2); + out.writeInt(stream2Buffer.size()); + for (StreamRecord element: stream2Buffer.getElements()) { + recordSerializer2.serialize(element, out); + } + + taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { + super.restoreState(taskState, recoveryTimestamp); + + final ClassLoader userClassloader = getUserCodeClassloader(); + + @SuppressWarnings("unchecked") + StateHandle inputState = (StateHandle) taskState.getOperatorState(); + DataInputView in = inputState.getState(userClassloader); + + stream1WindowLength = in.readLong(); + stream2WindowLength = in.readLong(); + + int numElements = in.readInt(); + + MultiplexingStreamRecordSerializer recordSerializer1 = new MultiplexingStreamRecordSerializer<>(inputSerializer1); + for (int i = 0; i < numElements; i++) { + stream1Buffer.storeElement(recordSerializer1.deserialize(in).asRecord()); + } + + int numElements2 = in.readInt(); + MultiplexingStreamRecordSerializer recordSerializer2 = new MultiplexingStreamRecordSerializer<>(inputSerializer2); + for (int i = 0; i < numElements2; i++) { + stream2Buffer.storeElement(recordSerializer2.deserialize(in).asRecord()); + } + } + +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java index cfae0263b9b44..bc62d08459467 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -234,6 +234,109 @@ public void invoke(String value) throws Exception { Assert.assertEquals(expectedResult, testResults); } + + // TODO: design buffer join test + @Test + public void testBufferJoin() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(3); + + DataStream> source1 = env.addSource(new SourceFunction>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("b", "y", 1)); + ctx.collect(Tuple3.of("c", "z", 2)); + + ctx.collect(Tuple3.of("d", "u", 3)); + ctx.collect(Tuple3.of("e", "u", 4)); + ctx.collect(Tuple3.of("f", "w", 5)); + + ctx.collect(Tuple3.of("h", "j", 6)); + ctx.collect(Tuple3.of("g", "i", 7)); + ctx.collect(Tuple3.of("i", "k", 8)); + ctx.collect(Tuple3.of("j", "k", 9)); + ctx.collect(Tuple3.of("k", "k", 10)); + + } + + @Override + public void cancel() { + } + }).assignTimestamps(new Tuple3TimestampExtractor()); + + DataStream> source2 = env.addSource(new SourceFunction>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "u", 0)); + ctx.collect(Tuple3.of("e", "w", 1)); + + ctx.collect(Tuple3.of("g", "i", 3)); + ctx.collect(Tuple3.of("a", "i", 3)); + ctx.collect(Tuple3.of("d", "i", 4)); + ctx.collect(Tuple3.of("b", "k", 5)); + + ctx.collect(Tuple3.of("c", "x", 6)); + ctx.collect(Tuple3.of("f", "x", 6)); + ctx.collect(Tuple3.of("h", "x", 6)); + ctx.collect(Tuple3.of("k", "z", 8)); + ctx.collect(Tuple3.of("j", "z", 9)); + ctx.collect(Tuple3.of("i", "z", 10)); + + } + + @Override + public void cancel() { + } + }).assignTimestamps(new Tuple3TimestampExtractor()); + + + source1.join(source2) + .where(new Tuple3KeyExtractor()) + .buffer(Time.of(3, TimeUnit.MILLISECONDS)) + .equalTo(new Tuple3KeyExtractor()) + .buffer(Time.of(4, TimeUnit.MILLISECONDS)) + .apply(new JoinFunction, Tuple3, String>() { + @Override + public String join(Tuple3 first, Tuple3 second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Join Test"); + + List expectedResult = Lists.newArrayList( + "(a,x,0):(a,i,3)", + "(a,x,0):(a,u,0)", + "(d,u,3):(d,i,4)", + "(e,u,4):(e,w,1)", + "(f,w,5):(f,x,6)", + "(g,i,7):(g,i,3)", + "(h,j,6):(h,x,6)", + "(i,k,8):(i,z,10)", + "(j,k,9):(j,z,9)", + "(k,k,10):(k,z,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + @Test public void testSelfJoin() throws Exception { From 32834d31a35daf9069a94833fb4db65ef974ec76 Mon Sep 17 00:00:00 2001 From: Yangjun Wang Date: Fri, 22 Jan 2016 10:57:59 +0200 Subject: [PATCH 2/6] [FLINK-3109] Join two stream with two different buffer time, remove stdout print --- .../flink/streaming/api/operators/StreamJoinOperator.java | 5 ----- .../runtime/operators/windowing/CoGroupJoinITCase.java | 1 - 2 files changed, 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java index 048d2b09a77e3..c2f3ebc7466f4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java @@ -28,8 +28,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; @@ -39,7 +37,6 @@ public class StreamJoinOperator implements TwoInputStreamOperator { private static final long serialVersionUID = 8650694601687319011L; - private static final Logger LOG = LoggerFactory.getLogger(StreamJoinOperator.class); private HeapWindowBuffer stream1Buffer; private HeapWindowBuffer stream2Buffer; @@ -156,8 +153,6 @@ public void processElement2(StreamRecord element) throws Exception { * @throws Exception */ private void processWatermark(long watermark) throws Exception{ - System.out.println("Watermark:" + String.valueOf(watermark)); - if(setProcessingTime) { return; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java index bc62d08459467..388c596f17c39 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -235,7 +235,6 @@ public void invoke(String value) throws Exception { } - // TODO: design buffer join test @Test public void testBufferJoin() throws Exception { From 5f0786296813f7500b063eba5db45ea6390e3e32 Mon Sep 17 00:00:00 2001 From: Yangjun Wang Date: Thu, 18 Feb 2016 10:29:25 +0800 Subject: [PATCH 3/6] [FLINK-3109]Join two streams, add Flink's state backend supports --- .../api/datastream/CoGroupedStreams.java | 2 +- .../streaming/api/datastream/DataStream.java | 8 + .../api/datastream/JoinedStreams.java | 213 +------ .../api/datastream/TimeJoinedStreams.java | 396 ++++++++++++ .../api/operators/StreamJoinOperator.java | 576 ++++++++++++++---- .../windowing/CoGroupJoinITCase.java | 16 +- 6 files changed, 873 insertions(+), 338 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 9e2bc5d4f1d58..f3f7478d1c3f6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -318,7 +318,7 @@ public static TaggedUnion two(T2 two) { } } - private static class UnionTypeInfo extends TypeInformation> { + protected static class UnionTypeInfo extends TypeInformation> { private static final long serialVersionUID = 1L; TypeInformation oneType; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 64d0821a50c4b..bb146492f2824 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -642,6 +642,14 @@ public JoinedStreams join(DataStream otherStream) { return new JoinedStreams<>(this, otherStream); } + /** + * Creates a join operation. See {@link TimeJoinedStreams} for an example of how the keys + * and buffer time can be specified. + */ + public TimeJoinedStreams timeJoin(DataStream otherStream) { + return new TimeJoinedStreams<>(this, otherStream); + } + /** * Windows this {@code DataStream} into tumbling time windows. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java index ce8eff9628442..0d788958ed1a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -26,15 +26,9 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; -import org.apache.flink.streaming.api.operators.StreamJoinOperator; -import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -66,34 +60,6 @@ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) * .apply(new MyJoinFunction()); * } - * - * =========================== - * - * {@code JoinedStreams} supports join two {@link DataStream DataStreams} without one window limit. - * - *

- * To finalize the join operation you also need to specify a {@link KeySelector} and a {@link Time} - * for both the first and second input. - * If timeCharacteristic is TimeCharacteristic.EventTime, you also need to specify a {@link TimestampExtractor} - * for both the first and second input. - * - *

- * Example: - * - *

 {@code
- * DataStream> one = ...;
- * DataStream> twp = ...;
- *
- * DataStream result = one.join(two)
- *     .where(new MyFirstKeySelector())
- *     .assignTimestamps(timestampExtractor1)
- *     .buffer(Time.of(20, TimeUnit.SECONDS))
- *     .equalTo(new MyFirstKeySelector())
- *     .assignTimestamps(timestampExtractor2)
- *     .buffer(Time.of(5, TimeUnit.SECONDS))
- *     .apply(new MyJoinFunction());
- * } 
- * */ @Public public class JoinedStreams { @@ -104,8 +70,6 @@ public class JoinedStreams { /** The second input stream */ private final DataStream input2; - /** The parallelism of joined stream */ - private final int parallelism; /** * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group. * @@ -115,7 +79,6 @@ public class JoinedStreams { public JoinedStreams(DataStream input1, DataStream input2) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); - this.parallelism = Math.max(input1.getParallelism(), input2.getParallelism()); } /** @@ -179,65 +142,8 @@ public WithWindow window(WindowAssigner(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } } - - public WithOneBuffer buffer(Time time){ - return new WithOneBuffer(time, keyType); - } - - // -------------------------------------------------------------------- - - /** - * A join operation that has {@link KeySelector KeySelectors} - * and {@link Time buffers} defined for both inputs. - */ - public class WithOneBuffer { - private final Time bufferSize1; - private final TypeInformation keyType; - - WithOneBuffer(Time time, TypeInformation keyType) { - this.bufferSize1 = time; - this.keyType = keyType; - } - - /** - * Specifies a {@link KeySelector} for elements from the second input. - */ - public EqualTo equalTo(KeySelector keySelector) { - TypeInformation otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); - if (!otherKey.equals(this.keyType)) { - throw new IllegalArgumentException("The keys for the two inputs are not equal: " + - "first key = " + this.keyType + " , second key = " + otherKey); - } - - return new EqualTo(input2.clean(keySelector)); - } - - // -------------------------------------------------------------------- - - /** - * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs. - */ - public class EqualTo { - - private final KeySelector keySelector2; - - EqualTo(KeySelector keySelector2) { - this.keySelector2 = requireNonNull(keySelector2); - } - - /** - * Specifies the window1 on which the co-group operation works. - */ - public WithTwoBuffers buffer(Time time) { - return new WithTwoBuffers<>(input1, input2, - keySelector1, keySelector2, - bufferSize1, time); - } - } - } - } - + // ------------------------------------------------------------------------ /** @@ -251,7 +157,7 @@ public WithTwoBuffers buffer(Time time) { */ @Public public static class WithWindow { - + private final DataStream input1; private final DataStream input2; @@ -267,23 +173,23 @@ public static class WithWindow { @PublicEvolving protected WithWindow(DataStream input1, - DataStream input2, - KeySelector keySelector1, - KeySelector keySelector2, - TypeInformation keyType, - WindowAssigner, W> windowAssigner, - Trigger, ? super W> trigger, - Evictor, ? super W> evictor) { - + DataStream input2, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType, + WindowAssigner, W> windowAssigner, + Trigger, ? super W> trigger, + Evictor, ? super W> evictor) { + this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); this.keySelector1 = requireNonNull(keySelector1); this.keySelector2 = requireNonNull(keySelector2); this.keyType = requireNonNull(keyType); - + this.windowAssigner = requireNonNull(windowAssigner); - + this.trigger = trigger; this.evictor = evictor; } @@ -383,99 +289,10 @@ public DataStream apply(JoinFunction function, TypeInformation } } + // ------------------------------------------------------------------------ + // Implementation of the functions + // ------------------------------------------------------------------------ - /** - * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as - * well as a {@link WindowAssigner}. - * Doesn't support trigger and evictor - * - * @param Type of the elements from the first input - * @param Type of the elements from the second input - * @param Type of the key. This must be the same for both inputs - */ - public class WithTwoBuffers { - - private final DataStream input1; - private final DataStream input2; - - private final KeySelector keySelector1; - private final KeySelector keySelector2; - - private final Time time1; - private final Time time2; - - protected WithTwoBuffers(DataStream input1, - DataStream input2, - KeySelector keySelector1, - KeySelector keySelector2, - Time time1, - Time time2 ) { - - this.input1 = requireNonNull(input1); - this.input2 = requireNonNull(input2); - - this.keySelector1 = requireNonNull(keySelector1); - this.keySelector2 = requireNonNull(keySelector2); - - this.time1 = requireNonNull(time1); - this.time2 = requireNonNull(time2); - - } - - public StreamExecutionEnvironment getExecutionEnvironment() { - return input1.getExecutionEnvironment(); - } - - /** - * Completes the join operation with the user function that is executed - * for each combination of elements with the same key in a window1. - */ - public DataStream apply(JoinFunction function) { - StreamExecutionEnvironment env = getExecutionEnvironment(); - function = env.clean(function); - boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - - TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - JoinFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "Join", - false); - - StreamJoinOperator joinOperator - = new StreamJoinOperator<>( - function, - keySelector1, - keySelector2, - time1.toMilliseconds(), - time2.toMilliseconds(), - input1.getType().createSerializer(getExecutionEnvironment().getConfig()), - input2.getType().createSerializer(getExecutionEnvironment().getConfig()) - ).enableSetProcessingTime(enableSetProcessingTime); - - TwoInputTransformation twoInputTransformation - = new TwoInputTransformation<>( - input1.keyBy(keySelector1).getTransformation(), - input2.keyBy(keySelector2).getTransformation(), - "Join", - joinOperator, - resultType, - parallelism - ); - return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); - } - - } - - - // ------------------------------------------------------------------------ - // Implementation of the functions - // ------------------------------------------------------------------------ - - // ------------------------------------------------------------------------ /** * CoGroup function that does a nested-loop join to get the join result. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java new file mode 100644 index 0000000000000..45ba482e3a387 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.operators.StreamJoinOperator; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * + * {@code JoinedStreams} supports join two {@link DataStream DataStreams} on two + * different buffer time without one window time limit. + * + *

+ * To finalize the join operation you also need to specify a {@link KeySelector} and a {@link Time} + * for both the first and second input. + * If timeCharacteristic is TimeCharacteristic.EventTime, you also need to specify a {@link TimestampExtractor} + * for both the first and second input. + * + *

+ * Example: + * + *

 {@code
+ * DataStream> one = ...;
+ * DataStream> twp = ...;
+ *
+ *** Join base on processing time ***
+ * DataStream result = one.join(two)
+ *     .where(new MyFirstKeySelector())
+ *     .window(Time.of(20, TimeUnit.SECONDS))
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(Time.of(5, TimeUnit.SECONDS))
+ *     .apply(new MyJoinFunction());
+ *
+ *** Join base on event time ***
+ * DataStream result = one.join(two)
+ *     .where(new MyFirstKeySelector())
+ *     .assignTimestamps(timestampExtractor1)
+ *     .window(Time.of(20, TimeUnit.SECONDS))
+ *     .equalTo(new MyFirstKeySelector())
+ *     .assignTimestamps(timestampExtractor2)
+ *     .window(Time.of(5, TimeUnit.SECONDS))
+ *     .apply(new MyJoinFunction());
+ * } 
+ * + */ +public class TimeJoinedStreams { + + /** The first input stream */ + private final DataStream input1; + + /** The second input stream */ + private final DataStream input2; + + + /** The parallelism of joined stream */ + private final int parallelism; + /** + * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group. + * + * @param input1 The first data stream. + * @param input2 The second data stream. + */ + public TimeJoinedStreams(DataStream input1, DataStream input2) { + this.input1 = requireNonNull(input1); + this.input2 = requireNonNull(input2); + this.parallelism = Math.max(input1.getParallelism(), input2.getParallelism()); + } + + /** + * Specifies a {@link KeySelector} for elements from the first input. + */ + public Where where(KeySelector keySelector) { + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + return new Where<>(input1.clean(keySelector), keyType); + } + + // ------------------------------------------------------------------------ + + /** + * CoGrouped streams that have the key for one side defined. + * + * @param The type of the key. + */ + public class Where { + + private final KeySelector keySelector1; + private final TypeInformation keyType; + + Where(KeySelector keySelector1, TypeInformation keyType) { + this.keySelector1 = keySelector1; + this.keyType = keyType; + } + + + public WithOneWindow window(WindowAssigner assigner) throws Exception { + return new WithOneWindow(assigner, keyType); + } + + // -------------------------------------------------------------------- + + /** + * A join operation that has {@link KeySelector KeySelectors} + * and {@link Time buffers} defined for both inputs. + */ + public class WithOneWindow { + private final TypeInformation keyType; + private final WindowAssigner windowAssigner; + private final long slideSize; + private final long windowSize; + + long getSlideSize(WindowAssigner assigner) throws Exception { + if(assigner instanceof TumblingTimeWindows){ + return ((TumblingTimeWindows) assigner).getSize(); + } else if(assigner instanceof SlidingTimeWindows) { + return ((SlidingTimeWindows) assigner).getSlide(); + } else { + throw new Exception("TimeJoin only supports time window"); + } + } + + long getWindowSize(WindowAssigner assigner) throws Exception { + if(assigner instanceof TumblingTimeWindows){ + return ((TumblingTimeWindows) assigner).getSize(); + } else if(assigner instanceof SlidingTimeWindows) { + return ((SlidingTimeWindows) assigner).getSize(); + } else { + throw new Exception("TimeJoin only supports time window"); + } + } + + WithOneWindow(WindowAssigner assigner, TypeInformation keyType) throws Exception { + this.windowAssigner = assigner; + this.keyType = keyType; + this.slideSize = getSlideSize(assigner); + this.windowSize = getWindowSize(assigner); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public EqualTo equalTo(KeySelector keySelector) { + TypeInformation otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); + if (!otherKey.equals(this.keyType)) { + throw new IllegalArgumentException("The keys for the two inputs are not equal: " + + "first key = " + this.keyType + " , second key = " + otherKey); + } + + return new EqualTo(input2.clean(keySelector)); + } + + // -------------------------------------------------------------------- + + /** + * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs. + */ + public class EqualTo { + + private final KeySelector keySelector2; + + EqualTo(KeySelector keySelector2) { + this.keySelector2 = requireNonNull(keySelector2); + } + + /** + * Specifies the window1 on which the co-group operation works. + */ + public WithTwoWindows window(WindowAssigner assigner) throws Exception { + // Check slide size + long slideSize2 = getSlideSize(assigner); + long windowSize2 = getWindowSize(assigner); + if(slideSize != slideSize2){ + throw new ExceptionInInitializerError("Slide size of two windows should be equal"); + } + if(windowSize < windowSize2) { + throw new ExceptionInInitializerError("Window size of stream1 shouldn't be less than stream2"); + } + return new WithTwoWindows<>(input1, input2, + keySelector1, keySelector2, keyType, + windowAssigner, windowSize, assigner, windowSize2); + } + } + } + + } + + /** + * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as + * well as a {@link WindowAssigner}. + * Doesn't support trigger and evictor + * + * @param Type of the key. This must be the same for both inputs + */ + public class WithTwoWindows { + + private final TypeInformation keyType; + + private final DataStream input1; + private final DataStream input2; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + private final long windowSize1; + private final long windowSize2; + + private final WindowAssigner windowAssigner1; + private final WindowAssigner windowAssigner2; + protected final Trigger trigger; + + protected WithTwoWindows(DataStream input1, + DataStream input2, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType, + WindowAssigner windowAssigner1, + long windowSize1, + WindowAssigner windowAssigner2, + long windowSize2) { + + this.input1 = requireNonNull(input1); + this.input2 = requireNonNull(input2); + + this.keySelector1 = requireNonNull(keySelector1); + this.keySelector2 = requireNonNull(keySelector2); + this.keyType = requireNonNull(keyType); + + this.windowAssigner1 = requireNonNull(windowAssigner1); + this.windowAssigner2 = requireNonNull(windowAssigner2); + + this.windowSize1 = windowSize1; + this.windowSize2 = windowSize2; + + trigger = windowAssigner1.getDefaultTrigger(input1.getExecutionEnvironment()); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return input1.getExecutionEnvironment(); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window1. + */ + public DataStream apply(JoinFunction function) { + StreamExecutionEnvironment env = getExecutionEnvironment(); + function = env.clean(function); + boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + + TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + JoinFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "Join", + false); + + CoGroupedStreams.UnionTypeInfo unionType = new CoGroupedStreams.UnionTypeInfo<>(input1.getType(), input2.getType()); + + ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", + unionType.createSerializer(getExecutionEnvironment().getConfig())); + + StreamJoinOperator joinOperator + = new StreamJoinOperator<>( + function, + keySelector1, + keySelector2, + keyType.createSerializer(getExecutionEnvironment().getConfig()), + windowAssigner1, + windowAssigner1.getWindowSerializer(getExecutionEnvironment().getConfig()), + windowSize1, + windowAssigner2, + windowAssigner2.getWindowSerializer(getExecutionEnvironment().getConfig()), + windowSize2, + stateDesc, + input1.getType().createSerializer(getExecutionEnvironment().getConfig()), + input2.getType().createSerializer(getExecutionEnvironment().getConfig()), + trigger + ).enableSetProcessingTime(enableSetProcessingTime); + + TwoInputTransformation twoInputTransformation + = new TwoInputTransformation<>( + input1.keyBy(keySelector1).getTransformation(), + input2.keyBy(keySelector2).getTransformation(), + "Join", + joinOperator, + resultType, + parallelism + ); + twoInputTransformation.setStateKeySelectors(keySelector1, keySelector2); + twoInputTransformation.setStateKeyType(keyType); + + return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); + } + + } + + + // ------------------------------------------------------------------------ + // Implementation of the functions + // ------------------------------------------------------------------------ + + // ------------------------------------------------------------------------ + /** + * CoGroup function that does a nested-loop join to get the join result. + */ + private static class JoinCoGroupFunction + extends WrappingFunction> + implements CoGroupFunction { + private static final long serialVersionUID = 1L; + + public JoinCoGroupFunction(JoinFunction wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void coGroup(Iterable first, Iterable second, Collector out) throws Exception { + for (T1 val1: first) { + for (T2 val2: second) { + out.collect(wrappedFunction.join(val1, val2)); + } + } + } + } + + private static class CoGroupWindowFunction + extends WrappingFunction> + implements WindowFunction>, T, KEY, W> { + + private static final long serialVersionUID = 1L; + + public CoGroupWindowFunction(CoGroupFunction userFunction) { + super(userFunction); + } + + @Override + public void apply(KEY key, + W window, + Iterable> values, + Collector out) throws Exception { + + List oneValues = new ArrayList<>(); + List twoValues = new ArrayList<>(); + + for (CoGroupedStreams.TaggedUnion val: values) { + if (val.isOne()) { + oneValues.add(val.getOne()); + } else { + twoValues.add(val.getTwo()); + } + } + wrappedFunction.coGroup(oneValues, twoValues, out); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java index c2f3ebc7466f4..02a32069b3bb2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java @@ -18,73 +18,168 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.state.*; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; import static java.util.Objects.requireNonNull; public class StreamJoinOperator extends AbstractUdfStreamOperator> - implements TwoInputStreamOperator { + implements TwoInputStreamOperator , Triggerable { - private static final long serialVersionUID = 8650694601687319011L; + private static final long serialVersionUID = 1L; - private HeapWindowBuffer stream1Buffer; - private HeapWindowBuffer stream2Buffer; - private final KeySelector keySelector1; - private final KeySelector keySelector2; - private long stream1WindowLength; - private long stream2WindowLength; + private static final Logger LOG = LoggerFactory.getLogger(StreamJoinOperator.class); + + // ------------------------------------------------------------------------ + // Configuration values and user functions + // ------------------------------------------------------------------------ + + protected final WindowAssigner windowAssigner1; + protected final WindowAssigner windowAssigner2; + // windowSize1 >= windowSize2 + protected final long windowSize1; + protected final long windowSize2; + + protected final KeySelector keySelector1; + protected final KeySelector keySelector2; + + protected final StateDescriptor>, ?> windowStateDescriptor; + + protected final Trigger trigger; - protected transient long currentWatermark1 = -1L; - protected transient long currentWatermark2 = -1L; - protected transient long currentWatermark = -1L; - private TypeSerializer inputSerializer1; - private TypeSerializer inputSerializer2; /** * If this is true. The current processing time is set as the timestamp of incoming elements. * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} * if eviction should happen based on processing time. */ - private boolean setProcessingTime = false; + protected boolean setProcessingTime = false; + + /** + * This is used to copy the incoming element because it can be put into several window + * buffers. + */ + protected TypeSerializer inputSerializer1; + protected TypeSerializer inputSerializer2; + + /** + * For serializing the key in checkpoints. + */ + protected final TypeSerializer keySerializer; + + /** + * For serializing the window in checkpoints. + */ + protected final TypeSerializer windowSerializer1; + protected final TypeSerializer windowSerializer2; + + // ------------------------------------------------------------------------ + // State that is not checkpointed + // ------------------------------------------------------------------------ + + /** + * This is given to the {@code WindowFunction} for emitting elements with a given timestamp. + */ + protected transient TimestampedCollector timestampedCollector; + + protected transient long currentWatermark1 = -1L; + protected transient long currentWatermark2 = -1L; + protected transient long currentWatermark = -1L; + protected transient Context context = new Context(null, null); + + // ------------------------------------------------------------------------ + // State that needs to be checkpointed + // ------------------------------------------------------------------------ + /** + * Processing time timers that are currently in-flight. + */ + protected transient Set> processingTimeTimers; + protected transient PriorityQueue> processingTimeTimersQueue; + + /** + * Current waiting watermark callbacks. + */ + protected transient Set> watermarkTimers; + protected transient PriorityQueue> watermarkTimersQueue; + public StreamJoinOperator(JoinFunction userFunction, KeySelector keySelector1, KeySelector keySelector2, - long stream1WindowLength, - long stream2WindowLength, + TypeSerializer keySerializer, + WindowAssigner windowAssigner1, + TypeSerializer windowSerializer1, + long windowSize1, + WindowAssigner windowAssigner2, + TypeSerializer windowSerializer2, + long windowSize2, + StateDescriptor>, ?> windowStateDescriptor, TypeSerializer inputSerializer1, - TypeSerializer inputSerializer2) { + TypeSerializer inputSerializer2, + Trigger trigger) { super(userFunction); this.keySelector1 = requireNonNull(keySelector1); this.keySelector2 = requireNonNull(keySelector2); + this.keySerializer = requireNonNull(keySerializer); + + this.windowAssigner1 = requireNonNull(windowAssigner1); + this.windowSerializer1 = requireNonNull(windowSerializer1); + this.windowSize1 = windowSize1; + this.windowAssigner2 = requireNonNull(windowAssigner2); + this.windowSerializer2 = requireNonNull(windowSerializer2); + this.windowSize2 = windowSize2; - this.stream1WindowLength = requireNonNull(stream1WindowLength); - this.stream2WindowLength = requireNonNull(stream2WindowLength); + this.windowStateDescriptor = requireNonNull(windowStateDescriptor); this.inputSerializer1 = requireNonNull(inputSerializer1); this.inputSerializer2 = requireNonNull(inputSerializer2); + this.trigger = requireNonNull(trigger); } @Override public void open() throws Exception { super.open(); + timestampedCollector = new TimestampedCollector<>(output); + if (null == inputSerializer1 || null == inputSerializer2) { throw new IllegalStateException("Input serializer was not set."); } - this.stream1Buffer = new HeapWindowBuffer.Factory().create(); - this.stream2Buffer = new HeapWindowBuffer.Factory().create(); + // these could already be initialized from restoreState() + if (watermarkTimers == null) { + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + } + + if (processingTimeTimers == null) { + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + context = new Context(null, null); } /** @@ -92,58 +187,97 @@ public void open() throws Exception { * @throws Exception */ @Override + @SuppressWarnings("unchecked") public void processElement1(StreamRecord element) throws Exception { if (setProcessingTime) { element.replace(element.getValue(), System.currentTimeMillis()); } - stream1Buffer.storeElement(element); - if (setProcessingTime) { - IN1 item1 = element.getValue(); - long time1 = element.getTimestamp(); - - int expiredDataNum = 0; - for (StreamRecord record2 : stream2Buffer.getElements()) { - IN2 item2 = record2.getValue(); - long time2 = record2.getTimestamp(); - if (time2 < time1 && time2 + this.stream2WindowLength >= time1) { - if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { - output.collect(new StreamRecord<>(userFunction.join(item1, item2))); - } - } else { - expiredDataNum++; - } - } - // clean data - stream2Buffer.removeElements(expiredDataNum); + Collection elementWindows = windowAssigner1.assignWindows(element.getValue(), element.getTimestamp()); + + K key = (K) getStateBackend().getCurrentKey(); + + for (TimeWindow window: elementWindows) { + + ListState> windowState = getPartitionedState(window, windowSerializer1, + windowStateDescriptor); + context.key = key; + context.window = window; + CoGroupedStreams.TaggedUnion unionElement = CoGroupedStreams.TaggedUnion.one(element.getValue()); + windowState.add(unionElement); + TriggerResult triggerResult = context.onElement(unionElement, element.getTimestamp()); + processTriggerResult(triggerResult, key, window); + } } @Override + @SuppressWarnings("unchecked") public void processElement2(StreamRecord element) throws Exception { if (setProcessingTime) { element.replace(element.getValue(), System.currentTimeMillis()); } - stream2Buffer.storeElement(element); - if (setProcessingTime) { - IN2 item2 = element.getValue(); - long time2 = element.getTimestamp(); - - int expiredDataNum = 0; - for (StreamRecord record1 : stream1Buffer.getElements()) { - IN1 item1 = record1.getValue(); - long time1 = record1.getTimestamp(); - if (time1 <= time2 && time1 + this.stream1WindowLength >= time2) { - if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { - output.collect(new StreamRecord<>(userFunction.join(item1, item2))); - } + Collection elementWindows2 = windowAssigner2.assignWindows(element.getValue(), element.getTimestamp()); + Collection elementWindows1 = new ArrayList<>(elementWindows2.size()); + // Convert windows of stream2 to corresponding windows of stream1 + for(TimeWindow window : elementWindows2){ + elementWindows1.add(new TimeWindow(window.getEnd() - windowSize1, window.getEnd())); + } + K key = (K) getStateBackend().getCurrentKey(); + + for (TimeWindow window: elementWindows1) { + + ListState> windowState = getPartitionedState(window, windowSerializer1, + windowStateDescriptor); + context.key = key; + context.window = window; + CoGroupedStreams.TaggedUnion unionElement = CoGroupedStreams.TaggedUnion.two(element.getValue()); + windowState.add(unionElement); + + TriggerResult triggerResult = context.onElement(unionElement, element.getTimestamp()); + processTriggerResult(triggerResult, key, window); + } + } + + protected void processTriggerResult(TriggerResult triggerResult, K key, TimeWindow window) throws Exception { + if (!triggerResult.isFire() && !triggerResult.isPurge()) { + // do nothing + return; + } + + if (triggerResult.isFire()) { + timestampedCollector.setTimestamp(window.maxTimestamp()); + + ListState> windowState = getPartitionedState(window, windowSerializer1, + windowStateDescriptor); + + Iterable> contents = windowState.get(); + + List oneValues = new ArrayList<>(); + List twoValues = new ArrayList<>(); + + for (CoGroupedStreams.TaggedUnion val: contents) { + if (val.isOne()) { + oneValues.add(val.getOne()); } else { - expiredDataNum++; + twoValues.add(val.getTwo()); + } + } + for (IN1 val1: oneValues) { + for (IN2 val2: twoValues) { + timestampedCollector.collect(userFunction.join(val1, val2)); } } - // clean data - stream1Buffer.removeElements(expiredDataNum); + if (triggerResult.isPurge()) { + windowState.clear(); + context.clear(); + } + } else if (triggerResult.isPurge()) { + ListState> windowState = getPartitionedState(window, windowSerializer1, + windowStateDescriptor); + windowState.clear(); + context.clear(); } } @@ -153,58 +287,27 @@ public void processElement2(StreamRecord element) throws Exception { * @throws Exception */ private void processWatermark(long watermark) throws Exception{ - if(setProcessingTime) { - return; - } - // process elements after current watermark1 and lower than mark - for (StreamRecord record1 : stream1Buffer.getElements()) { - if(record1.getTimestamp() >= this.currentWatermark - && record1.getTimestamp() < watermark){ - for (StreamRecord record2 : stream2Buffer.getElements()) { - if(keySelector1.getKey(record1.getValue()).equals(keySelector2.getKey(record2.getValue()))) { - if (record1.getTimestamp() >= record2.getTimestamp() - && record2.getTimestamp() + this.stream2WindowLength >= record1.getTimestamp()) { - output.collect(new StreamRecord<>(userFunction.join(record1.getValue(), record2.getValue()))); - } - } - } - } - } + boolean fire; - for (StreamRecord record2 : stream2Buffer.getElements()) { - if(record2.getTimestamp() >= this.currentWatermark - && record2.getTimestamp() < watermark){ - for (StreamRecord record1 : stream1Buffer.getElements()) { - if(keySelector1.getKey(record1.getValue()).equals(keySelector2.getKey(record2.getValue()))) { - if (record2.getTimestamp() > record1.getTimestamp() - && record1.getTimestamp() + this.stream1WindowLength >= record2.getTimestamp()) { - output.collect(new StreamRecord<>(userFunction.join(record1.getValue(), record2.getValue()))); - } - } - } - } - } + do { + Timer timer = watermarkTimersQueue.peek(); + if (timer != null && timer.timestamp <= watermark) { + fire = true; - // clean data - int stream1Expired = 0; - for (StreamRecord record1 : stream1Buffer.getElements()) { - if (record1.getTimestamp() + this.stream1WindowLength < watermark) { - stream1Expired++; - } else { - break; - } - } - stream1Buffer.removeElements(stream1Expired); + watermarkTimers.remove(timer); + watermarkTimersQueue.remove(); - int stream2Expired = 0; - for (StreamRecord record2 : stream2Buffer.getElements()) { - if (record2.getTimestamp() + this.stream2WindowLength < watermark) { - stream2Expired++; + context.key = timer.key; + context.window = timer.window; + setKeyContext(timer.key); + TriggerResult triggerResult = context.onEventTime(timer.timestamp); + processTriggerResult(triggerResult, context.key, context.window); } else { - break; + fire = false; } - } - stream2Buffer.removeElements(stream2Expired); + } while (fire); + + this.currentWatermark = watermark; } @Override @@ -229,6 +332,35 @@ public void processWatermark2(Watermark mark) throws Exception { this.currentWatermark2 = mark.getTimestamp(); } + + @Override + public final void trigger(long time) throws Exception { + boolean fire; + + do { + Timer timer = processingTimeTimersQueue.peek(); + if (timer != null && timer.timestamp <= time) { + fire = true; + + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(); + + context.key = timer.key; + context.window = timer.window; + setKeyContext(timer.key); + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); + processTriggerResult(triggerResult, context.key, context.window); + } else { + fire = false; + } + } while (fire); + + // Also check any watermark timers. We might have some in here since + // Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered + // that is already behind the watermark. + processWatermark(currentWatermark); + } + /** * When this flag is enabled the current processing time is set as the timestamp of elements * upon arrival. This must be used, for example, when using the @@ -240,6 +372,180 @@ public StreamJoinOperator enableSetProcessingTime(boolean setP return this; } + + /** + * {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused + * by setting the {@code key} and {@code window} fields. No internal state must be kept in + * the {@code Context} + */ + protected class Context implements Trigger.TriggerContext { + protected K key; + protected TimeWindow window; + + public Context(K key, TimeWindow window) { + this.key = key; + this.window = window; + } + + @Override + public ValueState getKeyValueState(String name, + Class stateType, + S defaultState) { + requireNonNull(stateType, "The state type class must not be null"); + + TypeInformation typeInfo; + try { + typeInfo = TypeExtractor.getForClass(stateType); + } + catch (Exception e) { + throw new RuntimeException("Cannot analyze type '" + stateType.getName() + + "' from the class alone, due to generic type parameters. " + + "Please specify the TypeInformation directly.", e); + } + + return getKeyValueState(name, typeInfo, defaultState); + } + + @Override + public ValueState getKeyValueState(String name, + TypeInformation stateType, + S defaultState) { + + requireNonNull(name, "The name of the state must not be null"); + requireNonNull(stateType, "The state type information must not be null"); + + ValueStateDescriptor stateDesc = new ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), defaultState); + return getPartitionedState(stateDesc); + } + + @SuppressWarnings("unchecked") + public S getPartitionedState(StateDescriptor stateDescriptor) { + try { + return StreamJoinOperator.this.getPartitionedState(window, windowSerializer1, stateDescriptor); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public void registerProcessingTimeTimer(long time) { + Timer timer = new Timer<>(time, key, window); + if (processingTimeTimers.add(timer)) { + processingTimeTimersQueue.add(timer); + getRuntimeContext().registerTimer(time, StreamJoinOperator.this); + } + } + + @Override + public void registerEventTimeTimer(long time) { + Timer timer = new Timer<>(time, key, window); + if (watermarkTimers.add(timer)) { + watermarkTimersQueue.add(timer); + } + + if (time <= currentWatermark) { + // immediately schedule a trigger, so that we don't wait for the next + // watermark update to fire the watermark trigger + getRuntimeContext().registerTimer(time, StreamJoinOperator.this); + } + } + + @Override + public void deleteProcessingTimeTimer(long time) { + Timer timer = new Timer<>(time, key, window); + if (processingTimeTimers.remove(timer)) { + processingTimeTimersQueue.remove(timer); + } + } + + @Override + public void deleteEventTimeTimer(long time) { + Timer timer = new Timer<>(time, key, window); + if (watermarkTimers.remove(timer)) { + watermarkTimersQueue.remove(timer); + } + + } + + public TriggerResult onElement(CoGroupedStreams.TaggedUnion element, long timestamp) throws Exception { + return trigger.onElement(element, timestamp, window, this); + } + + public TriggerResult onProcessingTime(long time) throws Exception { + return trigger.onProcessingTime(time, window, this); + } + + public TriggerResult onEventTime(long time) throws Exception { + return trigger.onEventTime(time, window, this); + } + + public void clear() throws Exception { + trigger.clear(window, this); + } + + @Override + public String toString() { + return "Context{" + + "key=" + key + + ", window=" + window + + '}'; + } + } + + /** + * Internal class for keeping track of in-flight timers. + */ + protected static class Timer implements Comparable> { + protected long timestamp; + protected K key; + protected W window; + + public Timer(long timestamp, K key, W window) { + this.timestamp = timestamp; + this.key = key; + this.window = window; + } + + @Override + public int compareTo(Timer o) { + return Long.compare(this.timestamp, o.timestamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + + Timer timer = (Timer) o; + + return timestamp == timer.timestamp + && key.equals(timer.key) + && window.equals(timer.window); + + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + key.hashCode(); + result = 31 * result + window.hashCode(); + return result; + } + + @Override + public String toString() { + return "Timer{" + + "timestamp=" + timestamp + + ", key=" + key + + ", window=" + window + + '}'; + } + } + // ------------------------------------------------------------------------ // checkpointing and recovery // ------------------------------------------------------------------------ @@ -248,25 +554,25 @@ public StreamJoinOperator enableSetProcessingTime(boolean setP public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - // we write the panes with the key/value maps into the stream - StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - - out.writeLong(stream1WindowLength); - out.writeLong(stream2WindowLength); + AbstractStateBackend.CheckpointStateOutputView out = + getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - MultiplexingStreamRecordSerializer recordSerializer1 = new MultiplexingStreamRecordSerializer<>(inputSerializer1); - out.writeInt(stream1Buffer.size()); - for (StreamRecord element: stream1Buffer.getElements()) { - recordSerializer1.serialize(element, out); + out.writeInt(watermarkTimersQueue.size()); + for (Timer timer : watermarkTimersQueue) { + keySerializer.serialize(timer.key, out); + windowSerializer1.serialize(timer.window, out); + out.writeLong(timer.timestamp); } - MultiplexingStreamRecordSerializer recordSerializer2 = new MultiplexingStreamRecordSerializer<>(inputSerializer2); - out.writeInt(stream2Buffer.size()); - for (StreamRecord element: stream2Buffer.getElements()) { - recordSerializer2.serialize(element, out); + out.writeInt(processingTimeTimers.size()); + for (Timer timer : processingTimeTimersQueue) { + keySerializer.serialize(timer.key, out); + windowSerializer1.serialize(timer.window, out); + out.writeLong(timer.timestamp); } taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; } @@ -280,20 +586,28 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro StateHandle inputState = (StateHandle) taskState.getOperatorState(); DataInputView in = inputState.getState(userClassloader); - stream1WindowLength = in.readLong(); - stream2WindowLength = in.readLong(); - - int numElements = in.readInt(); - - MultiplexingStreamRecordSerializer recordSerializer1 = new MultiplexingStreamRecordSerializer<>(inputSerializer1); - for (int i = 0; i < numElements; i++) { - stream1Buffer.storeElement(recordSerializer1.deserialize(in).asRecord()); + int numWatermarkTimers = in.readInt(); + watermarkTimers = new HashSet<>(numWatermarkTimers); + watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); + for (int i = 0; i < numWatermarkTimers; i++) { + K key = keySerializer.deserialize(in); + TimeWindow window = windowSerializer1.deserialize(in); + long timestamp = in.readLong(); + Timer timer = new Timer<>(timestamp, key, window); + watermarkTimers.add(timer); + watermarkTimersQueue.add(timer); } - int numElements2 = in.readInt(); - MultiplexingStreamRecordSerializer recordSerializer2 = new MultiplexingStreamRecordSerializer<>(inputSerializer2); - for (int i = 0; i < numElements2; i++) { - stream2Buffer.storeElement(recordSerializer2.deserialize(in).asRecord()); + int numProcessingTimeTimers = in.readInt(); + processingTimeTimers = new HashSet<>(numProcessingTimeTimers); + processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1)); + for (int i = 0; i < numProcessingTimeTimers; i++) { + K key = keySerializer.deserialize(in); + TimeWindow window = windowSerializer1.deserialize(in); + long timestamp = in.readLong(); + Timer timer = new Timer<>(timestamp, key, window); + processingTimeTimers.add(timer); + processingTimeTimersQueue.add(timer); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java index 388c596f17c39..2246362ad4ded 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.TimestampExtractor; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; @@ -236,7 +237,7 @@ public void invoke(String value) throws Exception { @Test - public void testBufferJoin() throws Exception { + public void test2WindowsJoin() throws Exception { testResults = Lists.newArrayList(); @@ -298,11 +299,11 @@ public void cancel() { }).assignTimestamps(new Tuple3TimestampExtractor()); - source1.join(source2) + source1.timeJoin(source2) .where(new Tuple3KeyExtractor()) - .buffer(Time.of(3, TimeUnit.MILLISECONDS)) + .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS))) .equalTo(new Tuple3KeyExtractor()) - .buffer(Time.of(4, TimeUnit.MILLISECONDS)) + .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS))) .apply(new JoinFunction, Tuple3, String>() { @Override public String join(Tuple3 first, Tuple3 second) throws Exception { @@ -321,14 +322,13 @@ public void invoke(String value) throws Exception { List expectedResult = Lists.newArrayList( "(a,x,0):(a,i,3)", "(a,x,0):(a,u,0)", + "(b,y,1):(b,k,5)", + "(c,z,2):(c,x,6)", "(d,u,3):(d,i,4)", - "(e,u,4):(e,w,1)", "(f,w,5):(f,x,6)", - "(g,i,7):(g,i,3)", "(h,j,6):(h,x,6)", "(i,k,8):(i,z,10)", - "(j,k,9):(j,z,9)", - "(k,k,10):(k,z,8)"); + "(j,k,9):(j,z,9)"); Collections.sort(expectedResult); Collections.sort(testResults); From 3a265df9f3894839dd15e9dff524a5b06fa75046 Mon Sep 17 00:00:00 2001 From: Yangjun Wang Date: Thu, 18 Feb 2016 12:04:15 +0800 Subject: [PATCH 4/6] [FLINK-3109]Join two streams, convert space to tabs --- .../api/datastream/JoinedStreams.java | 26 +- .../api/datastream/TimeJoinedStreams.java | 615 +++++++++--------- .../api/operators/StreamJoinOperator.java | 39 +- 3 files changed, 342 insertions(+), 338 deletions(-) mode change 100644 => 100755 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java old mode 100644 new mode 100755 index 0d788958ed1a0..f131b6ec9487f --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -143,7 +143,7 @@ public WithWindow window(WindowAssigner WithWindow window(WindowAssigner { - + private final DataStream input1; private final DataStream input2; @@ -173,23 +173,23 @@ public static class WithWindow { @PublicEvolving protected WithWindow(DataStream input1, - DataStream input2, - KeySelector keySelector1, - KeySelector keySelector2, - TypeInformation keyType, - WindowAssigner, W> windowAssigner, - Trigger, ? super W> trigger, - Evictor, ? super W> evictor) { - + DataStream input2, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType, + WindowAssigner, W> windowAssigner, + Trigger, ? super W> trigger, + Evictor, ? super W> evictor) { + this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); this.keySelector1 = requireNonNull(keySelector1); this.keySelector2 = requireNonNull(keySelector2); this.keyType = requireNonNull(keyType); - + this.windowAssigner = requireNonNull(windowAssigner); - + this.trigger = trigger; this.evictor = evictor; } @@ -288,7 +288,7 @@ public DataStream apply(JoinFunction function, TypeInformation } } - + // ------------------------------------------------------------------------ // Implementation of the functions // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java index 45ba482e3a387..048cde83c2639 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java @@ -19,7 +19,6 @@ import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -86,311 +85,311 @@ */ public class TimeJoinedStreams { - /** The first input stream */ - private final DataStream input1; - - /** The second input stream */ - private final DataStream input2; - - - /** The parallelism of joined stream */ - private final int parallelism; - /** - * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group. - * - * @param input1 The first data stream. - * @param input2 The second data stream. - */ - public TimeJoinedStreams(DataStream input1, DataStream input2) { - this.input1 = requireNonNull(input1); - this.input2 = requireNonNull(input2); - this.parallelism = Math.max(input1.getParallelism(), input2.getParallelism()); - } - - /** - * Specifies a {@link KeySelector} for elements from the first input. - */ - public Where where(KeySelector keySelector) { - TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); - return new Where<>(input1.clean(keySelector), keyType); - } - - // ------------------------------------------------------------------------ - - /** - * CoGrouped streams that have the key for one side defined. - * - * @param The type of the key. - */ - public class Where { - - private final KeySelector keySelector1; - private final TypeInformation keyType; - - Where(KeySelector keySelector1, TypeInformation keyType) { - this.keySelector1 = keySelector1; - this.keyType = keyType; - } - - - public WithOneWindow window(WindowAssigner assigner) throws Exception { - return new WithOneWindow(assigner, keyType); - } - - // -------------------------------------------------------------------- - - /** - * A join operation that has {@link KeySelector KeySelectors} - * and {@link Time buffers} defined for both inputs. - */ - public class WithOneWindow { - private final TypeInformation keyType; - private final WindowAssigner windowAssigner; - private final long slideSize; - private final long windowSize; - - long getSlideSize(WindowAssigner assigner) throws Exception { - if(assigner instanceof TumblingTimeWindows){ - return ((TumblingTimeWindows) assigner).getSize(); - } else if(assigner instanceof SlidingTimeWindows) { - return ((SlidingTimeWindows) assigner).getSlide(); - } else { - throw new Exception("TimeJoin only supports time window"); - } - } - - long getWindowSize(WindowAssigner assigner) throws Exception { - if(assigner instanceof TumblingTimeWindows){ - return ((TumblingTimeWindows) assigner).getSize(); - } else if(assigner instanceof SlidingTimeWindows) { - return ((SlidingTimeWindows) assigner).getSize(); - } else { - throw new Exception("TimeJoin only supports time window"); - } - } - - WithOneWindow(WindowAssigner assigner, TypeInformation keyType) throws Exception { - this.windowAssigner = assigner; - this.keyType = keyType; - this.slideSize = getSlideSize(assigner); - this.windowSize = getWindowSize(assigner); - } - - /** - * Specifies a {@link KeySelector} for elements from the second input. - */ - public EqualTo equalTo(KeySelector keySelector) { - TypeInformation otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); - if (!otherKey.equals(this.keyType)) { - throw new IllegalArgumentException("The keys for the two inputs are not equal: " + - "first key = " + this.keyType + " , second key = " + otherKey); - } - - return new EqualTo(input2.clean(keySelector)); - } - - // -------------------------------------------------------------------- - - /** - * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs. - */ - public class EqualTo { - - private final KeySelector keySelector2; - - EqualTo(KeySelector keySelector2) { - this.keySelector2 = requireNonNull(keySelector2); - } - - /** - * Specifies the window1 on which the co-group operation works. - */ - public WithTwoWindows window(WindowAssigner assigner) throws Exception { - // Check slide size - long slideSize2 = getSlideSize(assigner); - long windowSize2 = getWindowSize(assigner); - if(slideSize != slideSize2){ - throw new ExceptionInInitializerError("Slide size of two windows should be equal"); - } - if(windowSize < windowSize2) { - throw new ExceptionInInitializerError("Window size of stream1 shouldn't be less than stream2"); - } - return new WithTwoWindows<>(input1, input2, - keySelector1, keySelector2, keyType, - windowAssigner, windowSize, assigner, windowSize2); - } - } - } - - } - - /** - * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as - * well as a {@link WindowAssigner}. - * Doesn't support trigger and evictor - * - * @param Type of the key. This must be the same for both inputs - */ - public class WithTwoWindows { - - private final TypeInformation keyType; - - private final DataStream input1; - private final DataStream input2; - - private final KeySelector keySelector1; - private final KeySelector keySelector2; - private final long windowSize1; - private final long windowSize2; - - private final WindowAssigner windowAssigner1; - private final WindowAssigner windowAssigner2; - protected final Trigger trigger; - - protected WithTwoWindows(DataStream input1, - DataStream input2, - KeySelector keySelector1, - KeySelector keySelector2, - TypeInformation keyType, - WindowAssigner windowAssigner1, - long windowSize1, - WindowAssigner windowAssigner2, - long windowSize2) { - - this.input1 = requireNonNull(input1); - this.input2 = requireNonNull(input2); - - this.keySelector1 = requireNonNull(keySelector1); - this.keySelector2 = requireNonNull(keySelector2); - this.keyType = requireNonNull(keyType); - - this.windowAssigner1 = requireNonNull(windowAssigner1); - this.windowAssigner2 = requireNonNull(windowAssigner2); - - this.windowSize1 = windowSize1; - this.windowSize2 = windowSize2; - - trigger = windowAssigner1.getDefaultTrigger(input1.getExecutionEnvironment()); - } - - public StreamExecutionEnvironment getExecutionEnvironment() { - return input1.getExecutionEnvironment(); - } - - /** - * Completes the join operation with the user function that is executed - * for each combination of elements with the same key in a window1. - */ - public DataStream apply(JoinFunction function) { - StreamExecutionEnvironment env = getExecutionEnvironment(); - function = env.clean(function); - boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - - TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - JoinFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "Join", - false); - - CoGroupedStreams.UnionTypeInfo unionType = new CoGroupedStreams.UnionTypeInfo<>(input1.getType(), input2.getType()); - - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - unionType.createSerializer(getExecutionEnvironment().getConfig())); - - StreamJoinOperator joinOperator - = new StreamJoinOperator<>( - function, - keySelector1, - keySelector2, - keyType.createSerializer(getExecutionEnvironment().getConfig()), - windowAssigner1, - windowAssigner1.getWindowSerializer(getExecutionEnvironment().getConfig()), - windowSize1, - windowAssigner2, - windowAssigner2.getWindowSerializer(getExecutionEnvironment().getConfig()), - windowSize2, - stateDesc, - input1.getType().createSerializer(getExecutionEnvironment().getConfig()), - input2.getType().createSerializer(getExecutionEnvironment().getConfig()), - trigger - ).enableSetProcessingTime(enableSetProcessingTime); - - TwoInputTransformation twoInputTransformation - = new TwoInputTransformation<>( - input1.keyBy(keySelector1).getTransformation(), - input2.keyBy(keySelector2).getTransformation(), - "Join", - joinOperator, - resultType, - parallelism - ); - twoInputTransformation.setStateKeySelectors(keySelector1, keySelector2); - twoInputTransformation.setStateKeyType(keyType); - - return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); - } - - } - - - // ------------------------------------------------------------------------ - // Implementation of the functions - // ------------------------------------------------------------------------ - - // ------------------------------------------------------------------------ - /** - * CoGroup function that does a nested-loop join to get the join result. - */ - private static class JoinCoGroupFunction - extends WrappingFunction> - implements CoGroupFunction { - private static final long serialVersionUID = 1L; - - public JoinCoGroupFunction(JoinFunction wrappedFunction) { - super(wrappedFunction); - } - - @Override - public void coGroup(Iterable first, Iterable second, Collector out) throws Exception { - for (T1 val1: first) { - for (T2 val2: second) { - out.collect(wrappedFunction.join(val1, val2)); - } - } - } - } - - private static class CoGroupWindowFunction - extends WrappingFunction> - implements WindowFunction>, T, KEY, W> { - - private static final long serialVersionUID = 1L; - - public CoGroupWindowFunction(CoGroupFunction userFunction) { - super(userFunction); - } - - @Override - public void apply(KEY key, - W window, - Iterable> values, - Collector out) throws Exception { - - List oneValues = new ArrayList<>(); - List twoValues = new ArrayList<>(); - - for (CoGroupedStreams.TaggedUnion val: values) { - if (val.isOne()) { - oneValues.add(val.getOne()); - } else { - twoValues.add(val.getTwo()); - } - } - wrappedFunction.coGroup(oneValues, twoValues, out); - } - } + /** The first input stream */ + private final DataStream input1; + + /** The second input stream */ + private final DataStream input2; + + + /** The parallelism of joined stream */ + private final int parallelism; + /** + * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group. + * + * @param input1 The first data stream. + * @param input2 The second data stream. + */ + public TimeJoinedStreams(DataStream input1, DataStream input2) { + this.input1 = requireNonNull(input1); + this.input2 = requireNonNull(input2); + this.parallelism = Math.max(input1.getParallelism(), input2.getParallelism()); + } + + /** + * Specifies a {@link KeySelector} for elements from the first input. + */ + public Where where(KeySelector keySelector) { + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + return new Where<>(input1.clean(keySelector), keyType); + } + + // ------------------------------------------------------------------------ + + /** + * CoGrouped streams that have the key for one side defined. + * + * @param The type of the key. + */ + public class Where { + + private final KeySelector keySelector1; + private final TypeInformation keyType; + + Where(KeySelector keySelector1, TypeInformation keyType) { + this.keySelector1 = keySelector1; + this.keyType = keyType; + } + + + public WithOneWindow window(WindowAssigner assigner) throws Exception { + return new WithOneWindow(assigner, keyType); + } + + // -------------------------------------------------------------------- + + /** + * A join operation that has {@link KeySelector KeySelectors} + * and {@link Time buffers} defined for both inputs. + */ + public class WithOneWindow { + private final TypeInformation keyType; + private final WindowAssigner windowAssigner; + private final long slideSize; + private final long windowSize; + + long getSlideSize(WindowAssigner assigner) throws Exception { + if(assigner instanceof TumblingTimeWindows){ + return ((TumblingTimeWindows) assigner).getSize(); + } else if(assigner instanceof SlidingTimeWindows) { + return ((SlidingTimeWindows) assigner).getSlide(); + } else { + throw new Exception("TimeJoin only supports time window"); + } + } + + long getWindowSize(WindowAssigner assigner) throws Exception { + if(assigner instanceof TumblingTimeWindows){ + return ((TumblingTimeWindows) assigner).getSize(); + } else if(assigner instanceof SlidingTimeWindows) { + return ((SlidingTimeWindows) assigner).getSize(); + } else { + throw new Exception("TimeJoin only supports time window"); + } + } + + WithOneWindow(WindowAssigner assigner, TypeInformation keyType) throws Exception { + this.windowAssigner = assigner; + this.keyType = keyType; + this.slideSize = getSlideSize(assigner); + this.windowSize = getWindowSize(assigner); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public EqualTo equalTo(KeySelector keySelector) { + TypeInformation otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); + if (!otherKey.equals(this.keyType)) { + throw new IllegalArgumentException("The keys for the two inputs are not equal: " + + "first key = " + this.keyType + " , second key = " + otherKey); + } + + return new EqualTo(input2.clean(keySelector)); + } + + // -------------------------------------------------------------------- + + /** + * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs. + */ + public class EqualTo { + + private final KeySelector keySelector2; + + EqualTo(KeySelector keySelector2) { + this.keySelector2 = requireNonNull(keySelector2); + } + + /** + * Specifies the window1 on which the co-group operation works. + */ + public WithTwoWindows window(WindowAssigner assigner) throws Exception { + // Check slide size + long slideSize2 = getSlideSize(assigner); + long windowSize2 = getWindowSize(assigner); + if(slideSize != slideSize2){ + throw new ExceptionInInitializerError("Slide size of two windows should be equal"); + } + if(windowSize < windowSize2) { + throw new ExceptionInInitializerError("Window size of stream1 shouldn't be less than stream2"); + } + return new WithTwoWindows<>(input1, input2, + keySelector1, keySelector2, keyType, + windowAssigner, windowSize, assigner, windowSize2); + } + } + } + + } + + /** + * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as + * well as a {@link WindowAssigner}. + * Doesn't support trigger and evictor + * + * @param Type of the key. This must be the same for both inputs + */ + public class WithTwoWindows { + + private final TypeInformation keyType; + + private final DataStream input1; + private final DataStream input2; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + private final long windowSize1; + private final long windowSize2; + + private final WindowAssigner windowAssigner1; + private final WindowAssigner windowAssigner2; + protected final Trigger trigger; + + protected WithTwoWindows(DataStream input1, + DataStream input2, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType, + WindowAssigner windowAssigner1, + long windowSize1, + WindowAssigner windowAssigner2, + long windowSize2) { + + this.input1 = requireNonNull(input1); + this.input2 = requireNonNull(input2); + + this.keySelector1 = requireNonNull(keySelector1); + this.keySelector2 = requireNonNull(keySelector2); + this.keyType = requireNonNull(keyType); + + this.windowAssigner1 = requireNonNull(windowAssigner1); + this.windowAssigner2 = requireNonNull(windowAssigner2); + + this.windowSize1 = windowSize1; + this.windowSize2 = windowSize2; + + trigger = windowAssigner1.getDefaultTrigger(input1.getExecutionEnvironment()); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return input1.getExecutionEnvironment(); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window1. + */ + public DataStream apply(JoinFunction function) { + StreamExecutionEnvironment env = getExecutionEnvironment(); + function = env.clean(function); + boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + + TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + JoinFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "Join", + false); + + CoGroupedStreams.UnionTypeInfo unionType = new CoGroupedStreams.UnionTypeInfo<>(input1.getType(), input2.getType()); + + ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", + unionType.createSerializer(getExecutionEnvironment().getConfig())); + + StreamJoinOperator joinOperator + = new StreamJoinOperator<>( + function, + keySelector1, + keySelector2, + keyType.createSerializer(getExecutionEnvironment().getConfig()), + windowAssigner1, + windowAssigner1.getWindowSerializer(getExecutionEnvironment().getConfig()), + windowSize1, + windowAssigner2, + windowAssigner2.getWindowSerializer(getExecutionEnvironment().getConfig()), + windowSize2, + stateDesc, + input1.getType().createSerializer(getExecutionEnvironment().getConfig()), + input2.getType().createSerializer(getExecutionEnvironment().getConfig()), + trigger + ).enableSetProcessingTime(enableSetProcessingTime); + + TwoInputTransformation twoInputTransformation + = new TwoInputTransformation<>( + input1.keyBy(keySelector1).getTransformation(), + input2.keyBy(keySelector2).getTransformation(), + "Join", + joinOperator, + resultType, + parallelism + ); + twoInputTransformation.setStateKeySelectors(keySelector1, keySelector2); + twoInputTransformation.setStateKeyType(keyType); + + return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); + } + + } + + + // ------------------------------------------------------------------------ + // Implementation of the functions + // ------------------------------------------------------------------------ + + // ------------------------------------------------------------------------ + /** + * CoGroup function that does a nested-loop join to get the join result. + */ + private static class JoinCoGroupFunction + extends WrappingFunction> + implements CoGroupFunction { + private static final long serialVersionUID = 1L; + + public JoinCoGroupFunction(JoinFunction wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void coGroup(Iterable first, Iterable second, Collector out) throws Exception { + for (T1 val1: first) { + for (T2 val2: second) { + out.collect(wrappedFunction.join(val1, val2)); + } + } + } + } + + private static class CoGroupWindowFunction + extends WrappingFunction> + implements WindowFunction>, T, KEY, W> { + + private static final long serialVersionUID = 1L; + + public CoGroupWindowFunction(CoGroupFunction userFunction) { + super(userFunction); + } + + @Override + public void apply(KEY key, + W window, + Iterable> values, + Collector out) throws Exception { + + List oneValues = new ArrayList<>(); + List twoValues = new ArrayList<>(); + + for (CoGroupedStreams.TaggedUnion val: values) { + if (val.isOne()) { + oneValues.add(val.getOne()); + } else { + twoValues.add(val.getTwo()); + } + } + wrappedFunction.coGroup(oneValues, twoValues, out); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java index 02a32069b3bb2..6e83a82b01635 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java @@ -18,7 +18,11 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.state.*; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -40,7 +44,12 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.*; +import java.util.Collection; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.List; +import java.util.ArrayList; import static java.util.Objects.requireNonNull; @@ -129,17 +138,17 @@ public class StreamJoinOperator public StreamJoinOperator(JoinFunction userFunction, KeySelector keySelector1, KeySelector keySelector2, - TypeSerializer keySerializer, - WindowAssigner windowAssigner1, - TypeSerializer windowSerializer1, - long windowSize1, - WindowAssigner windowAssigner2, - TypeSerializer windowSerializer2, - long windowSize2, - StateDescriptor>, ?> windowStateDescriptor, + TypeSerializer keySerializer, + WindowAssigner windowAssigner1, + TypeSerializer windowSerializer1, + long windowSize1, + WindowAssigner windowAssigner2, + TypeSerializer windowSerializer2, + long windowSize2, + StateDescriptor>, ?> windowStateDescriptor, TypeSerializer inputSerializer1, TypeSerializer inputSerializer2, - Trigger trigger) { + Trigger trigger) { super(userFunction); this.keySelector1 = requireNonNull(keySelector1); this.keySelector2 = requireNonNull(keySelector2); @@ -388,9 +397,7 @@ public Context(K key, TimeWindow window) { } @Override - public ValueState getKeyValueState(String name, - Class stateType, - S defaultState) { + public ValueState getKeyValueState(String name, Class stateType, S defaultState) { requireNonNull(stateType, "The state type class must not be null"); TypeInformation typeInfo; @@ -407,9 +414,7 @@ public ValueState getKeyValueState(String name, } @Override - public ValueState getKeyValueState(String name, - TypeInformation stateType, - S defaultState) { + public ValueState getKeyValueState(String name, TypeInformation stateType, S defaultState) { requireNonNull(name, "The name of the state must not be null"); requireNonNull(stateType, "The state type information must not be null"); From 82af7c6a6425792277386073e474b869349eb96a Mon Sep 17 00:00:00 2001 From: Yangjun Wang Date: Thu, 18 Feb 2016 17:24:56 +0800 Subject: [PATCH 5/6] [FLINK-3109]Join two streams, add scala api --- .../api/datastream/TimeJoinedStreams.java | 125 +++++-- .../api/operators/StreamJoinOperator.java | 12 +- .../windowing/CoGroupJoinITCase.java | 8 +- .../streaming/api/scala/DataStream.scala | 8 + .../api/scala/TimeJoinedStreams.scala | 315 ++++++++++++++++++ .../api/scala/CoGroupJoinITCase.scala | 83 ++++- 6 files changed, 506 insertions(+), 45 deletions(-) create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java index 048cde83c2639..6953fc016a149 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/TimeJoinedStreams.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.TimestampExtractor; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.StreamJoinOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; @@ -37,12 +37,8 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; -import java.util.ArrayList; -import java.util.List; - import static java.util.Objects.requireNonNull; /** @@ -66,19 +62,19 @@ *** Join base on processing time *** * DataStream result = one.join(two) * .where(new MyFirstKeySelector()) - * .window(Time.of(20, TimeUnit.SECONDS)) + * .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS)) * .equalTo(new MyFirstKeySelector()) - * .window(Time.of(5, TimeUnit.SECONDS)) + * .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS))) * .apply(new MyJoinFunction()); * *** Join base on event time *** * DataStream result = one.join(two) * .where(new MyFirstKeySelector()) * .assignTimestamps(timestampExtractor1) - * .window(Time.of(20, TimeUnit.SECONDS)) + * .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS)) * .equalTo(new MyFirstKeySelector()) * .assignTimestamps(timestampExtractor2) - * .window(Time.of(5, TimeUnit.SECONDS)) + * .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS))) * .apply(new MyJoinFunction()); * } * @@ -278,13 +274,75 @@ public StreamExecutionEnvironment getExecutionEnvironment() { /** * Completes the join operation with the user function that is executed - * for each combination of elements with the same key in a window1. + * for each combination of elements with the same key in a window. */ public DataStream apply(JoinFunction function) { + TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + JoinFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "Join", + false); + + return apply(function, resultType); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window1. + */ + public DataStream apply(JoinFunction function, TypeInformation resultType) { StreamExecutionEnvironment env = getExecutionEnvironment(); function = env.clean(function); boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + CoGroupedStreams.UnionTypeInfo unionType = new CoGroupedStreams.UnionTypeInfo<>(input1.getType(), input2.getType()); + + ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", + unionType.createSerializer(getExecutionEnvironment().getConfig())); + + StreamJoinOperator joinOperator + = new StreamJoinOperator<>( + new JoinCoGroupFunction<>(function), + keySelector1, + keySelector2, + keyType.createSerializer(getExecutionEnvironment().getConfig()), + windowAssigner1, + windowAssigner1.getWindowSerializer(getExecutionEnvironment().getConfig()), + windowSize1, + windowAssigner2, + windowAssigner2.getWindowSerializer(getExecutionEnvironment().getConfig()), + windowSize2, + stateDesc, + input1.getType().createSerializer(getExecutionEnvironment().getConfig()), + input2.getType().createSerializer(getExecutionEnvironment().getConfig()), + trigger + ).enableSetProcessingTime(enableSetProcessingTime); + + TwoInputTransformation twoInputTransformation + = new TwoInputTransformation<>( + input1.keyBy(keySelector1).getTransformation(), + input2.keyBy(keySelector2).getTransformation(), + "Join", + joinOperator, + resultType, + parallelism + ); + twoInputTransformation.setStateKeySelectors(keySelector1, keySelector2); + twoInputTransformation.setStateKeyType(keyType); + + return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); + } + + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window. + */ + public DataStream apply(FlatJoinFunction function) { TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( function, JoinFunction.class, @@ -295,6 +353,18 @@ public DataStream apply(JoinFunction function) { "Join", false); + return apply(function, resultType); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window1. + */ + public DataStream apply(FlatJoinFunction function, TypeInformation resultType) { + StreamExecutionEnvironment env = getExecutionEnvironment(); + function = env.clean(function); + boolean enableSetProcessingTime = env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + CoGroupedStreams.UnionTypeInfo unionType = new CoGroupedStreams.UnionTypeInfo<>(input1.getType(), input2.getType()); ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", @@ -302,7 +372,7 @@ public DataStream apply(JoinFunction function) { StreamJoinOperator joinOperator = new StreamJoinOperator<>( - function, + new FlatJoinCoGroupFunction<>(function), keySelector1, keySelector2, keyType.createSerializer(getExecutionEnvironment().getConfig()), @@ -332,7 +402,6 @@ public DataStream apply(JoinFunction function) { return new DataStream<>(getExecutionEnvironment(), twoInputTransformation); } - } @@ -363,33 +432,25 @@ public void coGroup(Iterable first, Iterable second, Collector out) t } } - private static class CoGroupWindowFunction - extends WrappingFunction> - implements WindowFunction>, T, KEY, W> { - + /** + * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version) + */ + private static class FlatJoinCoGroupFunction + extends WrappingFunction> + implements CoGroupFunction { private static final long serialVersionUID = 1L; - public CoGroupWindowFunction(CoGroupFunction userFunction) { - super(userFunction); + public FlatJoinCoGroupFunction(FlatJoinFunction wrappedFunction) { + super(wrappedFunction); } @Override - public void apply(KEY key, - W window, - Iterable> values, - Collector out) throws Exception { - - List oneValues = new ArrayList<>(); - List twoValues = new ArrayList<>(); - - for (CoGroupedStreams.TaggedUnion val: values) { - if (val.isOne()) { - oneValues.add(val.getOne()); - } else { - twoValues.add(val.getTwo()); + public void coGroup(Iterable first, Iterable second, Collector out) throws Exception { + for (T1 val1: first) { + for (T2 val2: second) { + wrappedFunction.join(val1, val2, out); } } - wrappedFunction.coGroup(oneValues, twoValues, out); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java index 6e83a82b01635..08b666df3263b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamJoinOperator.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.StateDescriptor; @@ -55,7 +55,7 @@ public class StreamJoinOperator - extends AbstractUdfStreamOperator> + extends AbstractUdfStreamOperator> implements TwoInputStreamOperator , Triggerable { private static final long serialVersionUID = 1L; @@ -135,7 +135,7 @@ public class StreamJoinOperator protected transient PriorityQueue> watermarkTimersQueue; - public StreamJoinOperator(JoinFunction userFunction, + public StreamJoinOperator(CoGroupFunction userFunction, KeySelector keySelector1, KeySelector keySelector2, TypeSerializer keySerializer, @@ -273,11 +273,7 @@ protected void processTriggerResult(TriggerResult triggerResult, K key, TimeWind twoValues.add(val.getTwo()); } } - for (IN1 val1: oneValues) { - for (IN2 val2: twoValues) { - timestampedCollector.collect(userFunction.join(val1, val2)); - } - } + userFunction.coGroup(oneValues, twoValues, timestampedCollector); if (triggerResult.isPurge()) { windowState.clear(); context.clear(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java index 1f3eff9b42b0f..e3e2e61b9020b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -240,7 +240,7 @@ public void invoke(String value) throws Exception { @Test public void test2WindowsJoin() throws Exception { - testResults = Lists.newArrayList(); + testResults = new ArrayList<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -270,7 +270,7 @@ public void run(SourceContext> ctx) throws Excep @Override public void cancel() { } - }).assignTimestamps(new Tuple3TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); DataStream> source2 = env.addSource(new SourceFunction>() { private static final long serialVersionUID = 1L; @@ -297,7 +297,7 @@ public void run(SourceContext> ctx) throws Excep @Override public void cancel() { } - }).assignTimestamps(new Tuple3TimestampExtractor()); + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); source1.timeJoin(source2) @@ -320,7 +320,7 @@ public void invoke(String value) throws Exception { env.execute("Join Test"); - List expectedResult = Lists.newArrayList( + List expectedResult = Arrays.asList( "(a,x,0):(a,i,3)", "(a,x,0):(a,u,0)", "(b,y,1):(b,k,5)", diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index c35541c3d47b1..2646373d24f6e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -750,6 +750,14 @@ class DataStream[T](stream: JavaStream[T]) { JoinedStreams.createJoin(this, otherStream) } + /** + * Creates a join operation. See [[JoinedStreams]] for an example of how the keys + * and window can be specified. + */ + def timeJoin[T2](otherStream: DataStream[T2]): TimeJoinedStreams.Unspecified[T, T2] = { + TimeJoinedStreams.createJoin(this, otherStream) + } + /** * Writes a DataStream to the standard output stream (stdout). For each * element of the DataStream the result of .toString is diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala new file mode 100644 index 0000000000000..ecef155b80024 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.annotation.{PublicEvolving, Public} +import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.streaming.api.datastream.{TimeJoinedStreams => JavaTimeJoinedStreams} +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner +import org.apache.flink.streaming.api.windowing.windows.{TimeWindow} +import org.apache.flink.util.Collector + +/** + * `JoinedStreams` represents two [[DataStream]]s that have been joined. + * A streaming join operation is evaluated over elements in a window. + * + * To finalize the join operation you also need to specify a [[KeySelector]] for + * both the first and second input and a [[WindowAssigner]] + * + * Note: Right now, the groups are being built in memory so you need to ensure that they don't + * get too big. Otherwise the JVM might crash. + * + * Example: + * + * {{{ + * val one: DataStream[(String, Int)] = ... + * val two: DataStream[(String, Int)] = ... + * + * val result = one.join(two) + * .where {t => ... } + * .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS)) + * .equalTo {t => ... } + * .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.SECONDS))) + * .apply(new MyJoinFunction()) + * } }}} + * + */ +@Public +object TimeJoinedStreams { + + /** + * A join operation that does not yet have its [[KeySelector]]s defined. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + */ + class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { + + /** + * Specifies a [[KeySelector]] for elements from the first input. + */ + def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val keyType = implicitly[TypeInformation[KEY]] + val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { + def getKey(in: T1) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = keyType + } + new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * Specify a [[WindowAssigner]] for input1 using [[window()]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + */ + class WithKey[T1, T2, KEY]( + input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY], + keyType: TypeInformation[KEY]) { + + + /** + * Specifies the window on which the join operation works. + */ + @PublicEvolving + def window(assigner: WindowAssigner[AnyRef, TimeWindow]) + : TimeJoinedStreams.WithOneWindow[T1, T2, KEY] = { + if (keySelector1 == null ) { + throw new UnsupportedOperationException("You first need to specify KeySelectors for input1 first") + } + new TimeJoinedStreams.WithOneWindow[T1, T2, KEY]( + input1, + input2, + keySelector1, + keySelector2, + keyType, + clean(assigner)) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * A join operation that has [[KeySelector]]s defined for either both or + * one input. + * + * You need to specify a [[KeySelector]] for the second input using [[equalTo()]] + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + */ + class WithOneWindow[T1, T2, KEY](input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY], + keyType: TypeInformation[KEY], + windowAssigner1: WindowAssigner[AnyRef, TimeWindow]) { + + /** + * Specifies a [[KeySelector]] for elements from the second input. + */ + def equalTo(keySelector: T2 => KEY): TimeJoinedStreams.WithKeysAndOneWindow[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val localKeyType = keyType + val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { + def getKey(in: T2) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = localKeyType + } + new WithKeysAndOneWindow[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType, windowAssigner1) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * Specify a [[WindowAssigner]] for input1 using [[window()]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + */ + class WithKeysAndOneWindow[T1, T2, KEY]( + input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY], + keyType: TypeInformation[KEY], + windowAssigner1: WindowAssigner[AnyRef, TimeWindow]) { + + + /** + * Specifies the window on which the join operation works. + */ + @PublicEvolving + def window(assigner: WindowAssigner[AnyRef, TimeWindow]) + : TimeJoinedStreams.WithTwoWindows[T1, T2, KEY] = { + if (keySelector1 == null || keySelector1 == null) { + throw new UnsupportedOperationException("You first need to specify KeySelectors for both" + + "inputs using where() and equalTo().") + } + + new TimeJoinedStreams.WithTwoWindows[T1, T2, KEY]( + input1, + input2, + keySelector1, + keySelector2, + keyType, + windowAssigner1, + clean(assigner)) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * A join operation that has [[KeySelector]]s defined for both inputs as + * well as a [[WindowAssigner]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + */ + class WithTwoWindows[T1, T2, KEY](input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY], + keyType: TypeInformation[KEY], + windowAssigner1: WindowAssigner[AnyRef, TimeWindow], + windowAssigner2: WindowAssigner[AnyRef, TimeWindow]) { + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[O: TypeInformation](fun: (T1, T2) => O): DataStream[O] = { + require(fun != null, "Join function must not be null.") + + val joiner = new FlatJoinFunction[T1, T2, O] { + val cleanFun = clean(fun) + def join(left: T1, right: T2, out: Collector[O]) = { + out.collect(cleanFun(left, right)) + } + } + apply(joiner) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[O: TypeInformation](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = { + require(fun != null, "Join function must not be null.") + + val joiner = new FlatJoinFunction[T1, T2, O] { + val cleanFun = clean(fun) + def join(left: T1, right: T2, out: Collector[O]) = { + cleanFun(left, right, out) + } + } + apply(joiner) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = { + + val join = new JavaTimeJoinedStreams[T1, T2](input1.javaStream, input2.javaStream) + asScalaStream(join + .where(keySelector1) + .window(windowAssigner1) + .equalTo(keySelector2) + .window(windowAssigner2) + .apply(clean(function), implicitly[TypeInformation[T]])) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = { + + val join = new JavaTimeJoinedStreams[T1, T2](input1.javaStream, input2.javaStream) + + asScalaStream(join + .where(keySelector1) + .window(windowAssigner1) + .equalTo(keySelector2) + .window(windowAssigner2) + .apply(clean(function), implicitly[TypeInformation[T]])) + + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) + } + } + + + /** + * Creates a new join operation from the two given inputs. + */ + def createJoin[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) + : TimeJoinedStreams.Unspecified[T1, T2] = { + new TimeJoinedStreams.Unspecified[T1, T2](input1, input2) + } + +} + diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index 5f10eacb534a6..675f6347a2428 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows +import org.apache.flink.streaming.api.windowing.assigners.{SlidingTimeWindows, TumblingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Test @@ -173,6 +173,87 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted) } + @Test + def test2WindowsJoin(): Unit = { + CoGroupJoinITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { + ctx.collect(("a", "x", 0)) + ctx.collect(("b", "y", 1)) + ctx.collect(("c", "z", 2)) + + ctx.collect(("d", "u", 3)) + ctx.collect(("e", "u", 4)) + ctx.collect(("f", "w", 5)) + + ctx.collect(("h", "j", 6)) + ctx.collect(("g", "i", 7)) + ctx.collect(("i", "k", 8)) + ctx.collect(("j", "k", 9)) + ctx.collect(("k", "k", 10)) + + } + + def cancel() {} + + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor) + + val source2 = env.addSource(new SourceFunction[(String, String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { + ctx.collect(("a", "u", 0)) + ctx.collect(("e", "w", 1)) + + ctx.collect(("g", "i", 3)) + ctx.collect(("a", "i", 3)) + ctx.collect(("d", "i", 4)) + ctx.collect(("b", "k", 5)) + + ctx.collect(("c", "x", 6)) + ctx.collect(("f", "x", 6)) + ctx.collect(("h", "x", 6)) + ctx.collect(("k", "z", 8)) + ctx.collect(("j", "z", 9)) + ctx.collect(("i", "z", 10)) + + } + + def cancel() {} + + }).assignTimestampsAndWatermarks(new CoGroupJoinITCase.Tuple3TimestampExtractor) + + source1.timeJoin(source2) + .where(_._1) + .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS))) + .equalTo(_._1) + .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS))) + .apply((l, r) => l.toString + ":" + r.toString) + .addSink(new SinkFunction[String]() { + def invoke(value: String) { + CoGroupJoinITCase.testResults += value + } + }) + + env.execute("Join Test") + + val expectedResult = mutable.MutableList( + "(a,x,0):(a,i,3)", + "(a,x,0):(a,u,0)", + "(b,y,1):(b,k,5)", + "(c,z,2):(c,x,6)", + "(d,u,3):(d,i,4)", + "(f,w,5):(f,x,6)", + "(h,j,6):(h,x,6)", + "(i,k,8):(i,z,10)", + "(j,k,9):(j,z,9)") + + assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted) + } + @Test def testSelfJoin(): Unit = { CoGroupJoinITCase.testResults = mutable.MutableList() From 55ff43ab964813a535143c1dc2ac0021a64de9d4 Mon Sep 17 00:00:00 2001 From: Yangjun Wang Date: Thu, 18 Feb 2016 18:47:43 +0800 Subject: [PATCH 6/6] [FLINK-3109]Join two streams, fix scalastyle-maven-plugin check issues --- .../streaming/api/scala/TimeJoinedStreams.scala | 13 ++++++++++--- .../streaming/api/scala/CoGroupJoinITCase.scala | 3 ++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala index ecef155b80024..a42613ede3faf 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TimeJoinedStreams.scala @@ -46,7 +46,8 @@ import org.apache.flink.util.Collector * * val result = one.join(two) * .where {t => ... } - * .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS)) + * .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), + * Time.of(2, TimeUnit.MILLISECONDS)) * .equalTo {t => ... } * .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.SECONDS))) * .apply(new MyJoinFunction()) @@ -108,7 +109,8 @@ object TimeJoinedStreams { def window(assigner: WindowAssigner[AnyRef, TimeWindow]) : TimeJoinedStreams.WithOneWindow[T1, T2, KEY] = { if (keySelector1 == null ) { - throw new UnsupportedOperationException("You first need to specify KeySelectors for input1 first") + throw new UnsupportedOperationException( + "You first need to specify KeySelectors for input1 first") } new TimeJoinedStreams.WithOneWindow[T1, T2, KEY]( input1, @@ -155,7 +157,12 @@ object TimeJoinedStreams { def getKey(in: T2) = cleanFun(in) override def getProducedType: TypeInformation[KEY] = localKeyType } - new WithKeysAndOneWindow[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType, windowAssigner1) + new WithKeysAndOneWindow[T1, T2, KEY](input1, + input2, + keySelector1, + javaSelector, + localKeyType, + windowAssigner1) } /** diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index 675f6347a2428..0288c3fd8fab7 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -228,7 +228,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { source1.timeJoin(source2) .where(_._1) - .window(SlidingTimeWindows.of(Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS))) + .window(SlidingTimeWindows.of( + Time.of(6, TimeUnit.MILLISECONDS), Time.of(2, TimeUnit.MILLISECONDS))) .equalTo(_._1) .window(TumblingTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS))) .apply((l, r) => l.toString + ":" + r.toString)