diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index 66601a7e9c67a8..9fe3a4c6cc190e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -29,9 +29,10 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.co.CoProcessOperator; import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; -import org.apache.flink.streaming.api.operators.co.CoProcessOperator; +import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import static java.util.Objects.requireNonNull; @@ -281,13 +282,13 @@ public SingleOutputStreamOperator process( CoProcessFunction coProcessFunction, TypeInformation outputType) { - if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) { - throw new UnsupportedOperationException("A CoProcessFunction can only be applied" + - "when both input streams are keyed."); - } + TwoInputStreamOperator operator; - CoProcessOperator operator = new CoProcessOperator<>( - inputStream1.clean(coProcessFunction)); + if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) { + operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction)); + } else { + operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction)); + } return transform("Co-Process", outputType, operator); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java index ed99815f8095d0..4133e7b1e911ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java @@ -18,35 +18,32 @@ package org.apache.flink.streaming.api.operators.co; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.streaming.api.SimpleTimerService; -import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @Internal -public class CoProcessOperator +public class CoProcessOperator extends AbstractUdfStreamOperator> - implements TwoInputStreamOperator, Triggerable { + implements TwoInputStreamOperator { private static final long serialVersionUID = 1L; private transient TimestampedCollector collector; - private transient ContextImpl context; + private transient ContextImpl context; - private transient OnTimerContextImpl onTimerContext; + /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ + private long currentWatermark = Long.MIN_VALUE; public CoProcessOperator(CoProcessFunction flatMapper) { super(flatMapper); @@ -57,13 +54,7 @@ public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); - InternalTimerService internalTimerService = - getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); - - TimerService timerService = new SimpleTimerService(internalTimerService); - - context = new ContextImpl<>(userFunction, timerService); - onTimerContext = new OnTimerContextImpl<>(userFunction, timerService); + context = new ContextImpl(userFunction, getProcessingTimeService()); } @Override @@ -83,36 +74,20 @@ public void processElement2(StreamRecord element) throws Exception { } @Override - public void onEventTime(InternalTimer timer) throws Exception { - collector.setAbsoluteTimestamp(timer.getTimestamp()); - onTimerContext.timeDomain = TimeDomain.EVENT_TIME; - onTimerContext.timer = timer; - userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); - onTimerContext.timeDomain = null; - onTimerContext.timer = null; + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + currentWatermark = mark.getTimestamp(); } - @Override - public void onProcessingTime(InternalTimer timer) throws Exception { - collector.setAbsoluteTimestamp(timer.getTimestamp()); - onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME; - onTimerContext.timer = timer; - userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); - onTimerContext.timeDomain = null; - onTimerContext.timer = null; - } + private class ContextImpl + extends CoProcessFunction.Context + implements TimerService { - protected TimestampedCollector getCollector() { - return collector; - } - - private static class ContextImpl extends CoProcessFunction.Context { - - private final TimerService timerService; + private final ProcessingTimeService timerService; private StreamRecord element; - ContextImpl(CoProcessFunction function, TimerService timerService) { + ContextImpl(CoProcessFunction function, ProcessingTimeService timerService) { function.super(); this.timerService = checkNotNull(timerService); } @@ -129,40 +104,28 @@ public Long timestamp() { } @Override - public TimerService timerService() { - return timerService; + public long currentProcessingTime() { + return timerService.getCurrentProcessingTime(); } - } - private static class OnTimerContextImpl - extends CoProcessFunction.OnTimerContext { - - private final TimerService timerService; - - private TimeDomain timeDomain; - - private InternalTimer timer; - - OnTimerContextImpl(CoProcessFunction function, TimerService timerService) { - function.super(); - this.timerService = checkNotNull(timerService); + @Override + public long currentWatermark() { + return currentWatermark; } @Override - public TimeDomain timeDomain() { - checkState(timeDomain != null); - return timeDomain; + public void registerProcessingTimeTimer(long time) { + throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); } @Override - public Long timestamp() { - checkState(timer != null); - return timer.getTimestamp(); + public void registerEventTimeTimer(long time) { + throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); } @Override public TimerService timerService() { - return timerService; + return this; } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java new file mode 100644 index 00000000000000..e721ab885bc1c4 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +@Internal +public class KeyedCoProcessOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator, Triggerable { + + private static final long serialVersionUID = 1L; + + private transient TimestampedCollector collector; + + private transient ContextImpl context; + + private transient OnTimerContextImpl onTimerContext; + + public KeyedCoProcessOperator(CoProcessFunction flatMapper) { + super(flatMapper); + } + + @Override + public void open() throws Exception { + super.open(); + collector = new TimestampedCollector<>(output); + + InternalTimerService internalTimerService = + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); + + TimerService timerService = new SimpleTimerService(internalTimerService); + + context = new ContextImpl<>(userFunction, timerService); + onTimerContext = new OnTimerContextImpl<>(userFunction, timerService); + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + collector.setTimestamp(element); + context.element = element; + userFunction.processElement1(element.getValue(), context, collector); + context.element = null; + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + collector.setTimestamp(element); + context.element = element; + userFunction.processElement2(element.getValue(), context, collector); + context.element = null; + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + onTimerContext.timeDomain = TimeDomain.EVENT_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; + } + + protected TimestampedCollector getCollector() { + return collector; + } + + private static class ContextImpl extends CoProcessFunction.Context { + + private final TimerService timerService; + + private StreamRecord element; + + ContextImpl(CoProcessFunction function, TimerService timerService) { + function.super(); + this.timerService = checkNotNull(timerService); + } + + @Override + public Long timestamp() { + checkState(element != null); + + if (element.hasTimestamp()) { + return element.getTimestamp(); + } else { + return null; + } + } + + @Override + public TimerService timerService() { + return timerService; + } + } + + private static class OnTimerContextImpl + extends CoProcessFunction.OnTimerContext { + + private final TimerService timerService; + + private TimeDomain timeDomain; + + private InternalTimer timer; + + OnTimerContextImpl(CoProcessFunction function, TimerService timerService) { + function.super(); + this.timerService = checkNotNull(timerService); + } + + @Override + public TimeDomain timeDomain() { + checkState(timeDomain != null); + return timeDomain; + } + + @Override + public Long timestamp() { + checkState(timer != null); + return timer.getTimestamp(); + } + + @Override + public TimerService timerService() { + return timerService; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java index 9d7c444a5adc10..c19eb379118961 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java @@ -18,18 +18,12 @@ package org.apache.flink.streaming.api.operators.co; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -46,15 +40,11 @@ public class CoProcessOperatorTest extends TestLogger { @Test public void testTimestampAndWatermarkQuerying() throws Exception { - CoProcessOperator operator = + CoProcessOperator operator = new CoProcessOperator<>(new WatermarkQueryingProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); + new TwoInputStreamOperatorTestHarness<>(operator); testHarness.setup(); testHarness.open(); @@ -82,15 +72,11 @@ public void testTimestampAndWatermarkQuerying() throws Exception { @Test public void testTimestampAndProcessingTimeQuerying() throws Exception { - CoProcessOperator operator = + CoProcessOperator operator = new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); + new TwoInputStreamOperatorTestHarness<>(operator); testHarness.setup(); testHarness.open(); @@ -111,237 +97,6 @@ public void testTimestampAndProcessingTimeQuerying() throws Exception { testHarness.close(); } - @Test - public void testEventTimeTimers() throws Exception { - - CoProcessOperator operator = - new CoProcessOperator<>(new EventTimeTriggeringProcessFunction()); - - TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement1(new StreamRecord<>(17, 42L)); - testHarness.processElement2(new StreamRecord<>("18", 42L)); - - testHarness.processWatermark1(new Watermark(5)); - testHarness.processWatermark2(new Watermark(5)); - - testHarness.processWatermark1(new Watermark(6)); - testHarness.processWatermark2(new Watermark(6)); - - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L)); - expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L)); - expectedOutput.add(new StreamRecord<>("1777", 5L)); - expectedOutput.add(new Watermark(5L)); - expectedOutput.add(new StreamRecord<>("1777", 6L)); - expectedOutput.add(new Watermark(6L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testProcessingTimeTimers() throws Exception { - - CoProcessOperator operator = - new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction()); - - TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement1(new StreamRecord<>(17)); - testHarness.processElement2(new StreamRecord<>("18")); - - testHarness.setProcessingTime(5); - testHarness.setProcessingTime(6); - - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT1:17")); - expectedOutput.add(new StreamRecord<>("INPUT2:18")); - expectedOutput.add(new StreamRecord<>("1777", 5L)); - expectedOutput.add(new StreamRecord<>("1777", 6L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - /** - * Verifies that we don't have leakage between different keys. - */ - @Test - public void testEventTimeTimerWithState() throws Exception { - - CoProcessOperator operator = - new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction()); - - TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processWatermark1(new Watermark(1)); - testHarness.processWatermark2(new Watermark(1)); - testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6 - - testHarness.processWatermark1(new Watermark(2)); - testHarness.processWatermark2(new Watermark(2)); - testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7 - - testHarness.processWatermark1(new Watermark(6)); - testHarness.processWatermark2(new Watermark(6)); - - testHarness.processWatermark1(new Watermark(7)); - testHarness.processWatermark2(new Watermark(7)); - - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new Watermark(1L)); - expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L)); - expectedOutput.add(new Watermark(2L)); - expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L)); - expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); - expectedOutput.add(new Watermark(6L)); - expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); - expectedOutput.add(new Watermark(7L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - /** - * Verifies that we don't have leakage between different keys. - */ - @Test - public void testProcessingTimeTimerWithState() throws Exception { - - CoProcessOperator operator = - new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction()); - - TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(1); - testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6 - - testHarness.setProcessingTime(2); - testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7 - - testHarness.setProcessingTime(6); - testHarness.setProcessingTime(7); - - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT1:17")); - expectedOutput.add(new StreamRecord<>("INPUT2:42")); - expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); - expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testSnapshotAndRestore() throws Exception { - - CoProcessOperator operator = - new CoProcessOperator<>(new BothTriggeringProcessFunction()); - - TwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement1(new StreamRecord<>(5, 12L)); - testHarness.processElement2(new StreamRecord<>("5", 12L)); - - // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); - - testHarness.close(); - - operator = new CoProcessOperator<>(new BothTriggeringProcessFunction()); - - testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - - testHarness.setProcessingTime(5); - testHarness.processWatermark1(new Watermark(6)); - testHarness.processWatermark2(new Watermark(6)); - - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); - expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); - expectedOutput.add(new Watermark(6)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - - private static class IntToStringKeySelector implements KeySelector { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Integer value) throws Exception { - return "" + value; - } - } - - private static class IdentityKeySelector implements KeySelector { - private static final long serialVersionUID = 1L; - - @Override - public T getKey(T value) throws Exception { - return value; - } - } private static class WatermarkQueryingProcessFunction extends CoProcessFunction { @@ -365,92 +120,6 @@ public void onTimer( } } - private static class EventTimeTriggeringProcessFunction extends CoProcessFunction { - - private static final long serialVersionUID = 1L; - - @Override - public void processElement1(Integer value, Context ctx, Collector out) throws Exception { - out.collect("INPUT1:" + value); - ctx.timerService().registerEventTimeTimer(5); - } - - @Override - public void processElement2(String value, Context ctx, Collector out) throws Exception { - out.collect("INPUT2:" + value); - ctx.timerService().registerEventTimeTimer(6); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - - assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); - out.collect("" + 1777); - } - } - - private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor state = - new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); - - @Override - public void processElement1(Integer value, Context ctx, Collector out) throws Exception { - out.collect("INPUT1:" + value); - getRuntimeContext().getState(state).update("" + value); - ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); - } - - @Override - public void processElement2(String value, Context ctx, Collector out) throws Exception { - out.collect("INPUT2:" + value); - getRuntimeContext().getState(state).update(value); - ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); - } - - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); - out.collect("STATE:" + getRuntimeContext().getState(state).value()); - } - } - - private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction { - - private static final long serialVersionUID = 1L; - - @Override - public void processElement1(Integer value, Context ctx, Collector out) throws Exception { - out.collect("INPUT1:" + value); - ctx.timerService().registerProcessingTimeTimer(5); - } - - @Override - public void processElement2(String value, Context ctx, Collector out) throws Exception { - out.collect("INPUT2:" + value); - ctx.timerService().registerProcessingTimeTimer(6); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - - assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); - out.collect("" + 1777); - } - } - private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction { private static final long serialVersionUID = 1L; @@ -472,64 +141,4 @@ public void onTimer( Collector out) throws Exception { } } - - private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor state = - new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); - - @Override - public void processElement1(Integer value, Context ctx, Collector out) throws Exception { - out.collect("INPUT1:" + value); - getRuntimeContext().getState(state).update("" + value); - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); - } - - @Override - public void processElement2(String value, Context ctx, Collector out) throws Exception { - out.collect("INPUT2:" + value); - getRuntimeContext().getState(state).update(value); - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); - } - - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); - out.collect("STATE:" + getRuntimeContext().getState(state).value()); - } - } - - private static class BothTriggeringProcessFunction extends CoProcessFunction { - - private static final long serialVersionUID = 1L; - - @Override - public void processElement1(Integer value, Context ctx, Collector out) throws Exception { - ctx.timerService().registerEventTimeTimer(6); - } - - @Override - public void processElement2(String value, Context ctx, Collector out) throws Exception { - ctx.timerService().registerProcessingTimeTimer(5); - } - - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { - out.collect("EVENT:1777"); - } else { - out.collect("PROC:1777"); - } - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java new file mode 100644 index 00000000000000..d8c9a61c88c5d1 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators.co; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link KeyedCoProcessOperator}. + */ +public class KeyedCoProcessOperatorTest extends TestLogger { + + @Test + public void testTimestampAndWatermarkQuerying() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(17)); + testHarness.processWatermark2(new Watermark(17)); + testHarness.processElement1(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark1(new Watermark(42)); + testHarness.processWatermark2(new Watermark(42)); + testHarness.processElement2(new StreamRecord<>("6", 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testTimestampAndProcessingTimeQuerying() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new ProcessingTimeQueryingProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement1(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement2(new StreamRecord<>("6")); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17 TS:null")); + expectedOutput.add(new StreamRecord<>("6PT:42 TS:null")); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testEventTimeTimers() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new EventTimeTriggeringProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17, 42L)); + testHarness.processElement2(new StreamRecord<>("18", 42L)); + + testHarness.processWatermark1(new Watermark(5)); + testHarness.processWatermark2(new Watermark(5)); + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L)); + expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L)); + expectedOutput.add(new StreamRecord<>("1777", 5L)); + expectedOutput.add(new Watermark(5L)); + expectedOutput.add(new StreamRecord<>("1777", 6L)); + expectedOutput.add(new Watermark(6L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTimers() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17)); + testHarness.processElement2(new StreamRecord<>("18")); + + testHarness.setProcessingTime(5); + testHarness.setProcessingTime(6); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:18")); + expectedOutput.add(new StreamRecord<>("1777", 5L)); + expectedOutput.add(new StreamRecord<>("1777", 6L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testEventTimeTimerWithState() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6 + + testHarness.processWatermark1(new Watermark(2)); + testHarness.processWatermark2(new Watermark(2)); + testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7 + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + testHarness.processWatermark1(new Watermark(7)); + testHarness.processWatermark2(new Watermark(7)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testProcessingTimeTimerWithState() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:42")); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + + KeyedCoProcessOperator operator = + new KeyedCoProcessOperator<>(new BothTriggeringProcessFunction()); + + TwoInputStreamOperatorTestHarness testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(5, 12L)); + testHarness.processElement2(new StreamRecord<>("5", 12L)); + + // snapshot and restore from scratch + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + testHarness.close(); + + operator = new KeyedCoProcessOperator<>(new BothTriggeringProcessFunction()); + + testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + + private static class IntToStringKeySelector implements KeySelector { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Integer value) throws Exception { + return "" + value; + } + } + + private static class IdentityKeySelector implements KeySelector { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + + private static class WatermarkQueryingProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + } + } + + private static class EventTimeTriggeringProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + out.collect("INPUT1:" + value); + ctx.timerService().registerEventTimeTimer(5); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + out.collect("INPUT2:" + value); + ctx.timerService().registerEventTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + + assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); + out.collect("" + 1777); + } + } + + private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + out.collect("INPUT1:" + value); + getRuntimeContext().getState(state).update("" + value); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + out.collect("INPUT2:" + value); + getRuntimeContext().getState(state).update(value); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); + } + + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + out.collect("INPUT1:" + value); + ctx.timerService().registerProcessingTimeTimer(5); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + out.collect("INPUT2:" + value); + ctx.timerService().registerProcessingTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + + assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); + out.collect("" + 1777); + } + } + + private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + } + } + + private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + out.collect("INPUT1:" + value); + getRuntimeContext().getState(state).update("" + value); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + out.collect("INPUT2:" + value); + getRuntimeContext().getState(state).update(value); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); + } + + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class BothTriggeringProcessFunction extends CoProcessFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { + ctx.timerService().registerEventTimeTimer(6); + } + + @Override + public void processElement2(String value, Context ctx, Collector out) throws Exception { + ctx.timerService().registerProcessingTimeTimer(5); + } + + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector out) throws Exception { + if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { + out.collect("EVENT:1777"); + } else { + out.collect("PROC:1777"); + } + } + } +}