Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.WatermarkOption;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class BroadcastConnectedStream<IN1, IN2> {
private final DataStream<IN1> inputStream1;
private final BroadcastStream<IN2> inputStream2;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
private WatermarkOption watermarkOption;

protected BroadcastConnectedStream(
final StreamExecutionEnvironment env,
Expand Down Expand Up @@ -112,6 +115,16 @@ public TypeInformation<IN2> getType2() {
return inputStream2.getType();
}

/**
* Sets the watermarkOption of the stream.
*
* @param watermarkOption The WatermarkOption of the stream
*/
public BroadcastConnectedStream<IN1, IN2> watermarkOption(WatermarkOption watermarkOption){
this.watermarkOption = watermarkOption;
return this;
}

/**
* Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
* {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
Expand Down Expand Up @@ -222,6 +235,10 @@ private <OUT> SingleOutputStreamOperator<OUT> transform(
inputStream1.getType();
inputStream2.getType();

if (watermarkOption != null){
((AbstractStreamOperator) operator).setWatermarkOption(watermarkOption);
}

TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
inputStream1.getTransformation(),
inputStream2.getTransformation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.WatermarkOption;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
Expand Down Expand Up @@ -62,6 +64,7 @@ public class ConnectedStreams<IN1, IN2> {
protected final StreamExecutionEnvironment environment;
protected final DataStream<IN1> inputStream1;
protected final DataStream<IN2> inputStream2;
protected WatermarkOption watermarkOption;

protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
this.environment = requireNonNull(env);
Expand Down Expand Up @@ -109,6 +112,16 @@ public TypeInformation<IN2> getType2() {
return inputStream2.getType();
}

/**
* Sets the watermarkOption of the stream.
*
* @param watermarkOption The WatermarkOption of the stream
*/
public ConnectedStreams<IN1, IN2> watermarkOption(WatermarkOption watermarkOption){
this.watermarkOption = watermarkOption;
return this;
}

/**
* KeyBy operation for connected data stream. Assigns keys to the elements of
* input1 and input2 according to keyPosition1 and keyPosition2.
Expand Down Expand Up @@ -447,6 +460,10 @@ public <R> SingleOutputStreamOperator<R> transform(String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator) {

if (watermarkOption != null) {
((AbstractStreamOperator) operator).setWatermarkOption(watermarkOption);
}

// read the output type of the input Transforms to coax out errors about MissingTypeInfo
inputStream1.getType();
inputStream2.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public abstract class AbstractStreamOperator<OUT>
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;

private WatermarkOption watermarkOption = WatermarkOption.ALL;

// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -673,6 +675,13 @@ public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}

// ------------------------------------------------------------------------
// Watermark Options
// ------------------------------------------------------------------------

public final void setWatermarkOption(WatermarkOption watermarkOption) {
this.watermarkOption = watermarkOption;
}

// ------------------------------------------------------------------------
// Metrics
Expand Down Expand Up @@ -799,7 +808,17 @@ private void checkTimerServiceInitialization() {

public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (watermarkOption == WatermarkOption.STREAM2){
return;
}
long newMin;
if (watermarkOption == WatermarkOption.ALL) {
newMin = Math.min(input1Watermark, input2Watermark);
} else if (watermarkOption == WatermarkOption.STREAM1){
newMin = input1Watermark;
} else {
throw new RuntimeException("WatermarkOption only support ALL,STREAM1,STREAM2");
}
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
Expand All @@ -808,7 +827,17 @@ public void processWatermark1(Watermark mark) throws Exception {

public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (watermarkOption == WatermarkOption.STREAM1){
return;
}
long newMin;
if (watermarkOption == WatermarkOption.ALL) {
newMin = Math.min(input1Watermark, input2Watermark);
} else if (watermarkOption == WatermarkOption.STREAM2){
newMin = input2Watermark;
} else {
throw new RuntimeException("WatermarkOption only support ALL,STREAM1,STREAM2");
}
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

/**
* Defines the watermark options for all "TwoInputStreamOperator".
* The default value used by the StreamOperator is {@link #ALL}, which means that
* the operator receive two stream watermark. {@link #STREAM1} and {@link #STREAM2} mean that
* the operator receive one stream watermatrk.
*
*/
public enum WatermarkOption {
STREAM1,
STREAM2,
ALL
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -657,11 +658,57 @@ public void testFailingBackendSnapshotMethod() throws Exception {
verify(keyedStateBackend).dispose();
}

/**
* Check the watermark in case that WatermarkOption is ALL,STREAM1,STREAM2.
*/
@Test
public void testWatermarkOption() throws Exception {
TestWatermarkOptionOperator testOperator = new TestWatermarkOptionOperator();

TwoInputStreamOperatorTestHarness<Object, Object, Long> testHarness =
new TwoInputStreamOperatorTestHarness<>(testOperator);

testHarness.open();

/////////////////////////////////////////////////////////////
// ALL
////////////////////////////////////////////////////////////
testOperator.setWatermarkOption(WatermarkOption.ALL);
testHarness.processWatermark1(new Watermark(0L));
testHarness.processWatermark2(new Watermark(1L));
assertThat(
extractResult(testHarness),
contains(0L)
);

/////////////////////////////////////////////////////////////
// STREAM1
////////////////////////////////////////////////////////////
testOperator.setWatermarkOption(WatermarkOption.STREAM1);
testHarness.processWatermark1(new Watermark(3L));
testHarness.processWatermark2(new Watermark(2L));
assertThat(
extractResult(testHarness),
contains(3L)
);

/////////////////////////////////////////////////////////////
// STREAM2
////////////////////////////////////////////////////////////
testOperator.setWatermarkOption(WatermarkOption.STREAM2);
testHarness.processWatermark1(new Watermark(4L));
testHarness.processWatermark2(new Watermark(5L));
assertThat(
extractResult(testHarness),
contains(5L)
);
}

/**
* Extracts the result values form the test harness and clear the output queue.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) {
private <T> List<T> extractResult(AbstractStreamOperatorTestHarness<T> testHarness) {
List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
List<T> result = new ArrayList<>();
for (Object in : streamRecords) {
Expand Down Expand Up @@ -745,6 +792,29 @@ public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws
}
}

/**
*Testing TwoInputStreamOperator WatermarkOption.
*/
private static class TestWatermarkOptionOperator
extends AbstractStreamOperator<Long>
implements TwoInputStreamOperator<Object, Object, Long> {

@Override
public void processElement1(StreamRecord element) throws Exception {

}

@Override
public void processElement2(StreamRecord element) throws Exception {

}

@Override
public void processWatermark(Watermark mark) throws Exception {
output.collect(new StreamRecord<>(mark.getTimestamp()));
}
}

private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
Random rand = new Random(System.currentTimeMillis());
int result = rand.nextInt();
Expand Down