Skip to content

Commit

Permalink
[FLINK-4660] Allow CoProcessFunction on non-keyed ConnectedStreams
Browse files Browse the repository at this point in the history
Introduce new CoProcessOperator for this. Rename the pre-existing
CoProcessOperator to KeyedCoProcessOperator.
  • Loading branch information
aljoscha committed Mar 1, 2017
1 parent 29ca9b4 commit a26accf
Show file tree
Hide file tree
Showing 5 changed files with 742 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -281,13 +282,13 @@ public <R> SingleOutputStreamOperator<R> process(
CoProcessFunction<IN1, IN2, R> coProcessFunction,
TypeInformation<R> outputType) {

if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
"when both input streams are keyed.");
}
TwoInputStreamOperator<IN1, IN2, R> operator;

CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
inputStream1.clean(coProcessFunction));
if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
} else {
operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
}

return transform("Co-Process", outputType, operator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,32 @@
package org.apache.flink.streaming.api.operators.co;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

@Internal
public class CoProcessOperator<K, IN1, IN2, OUT>
public class CoProcessOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
implements TwoInputStreamOperator<IN1, IN2, OUT> {

private static final long serialVersionUID = 1L;

private transient TimestampedCollector<OUT> collector;

private transient ContextImpl<IN1, IN2, OUT> context;
private transient ContextImpl context;

private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
private long currentWatermark = Long.MIN_VALUE;

public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
Expand All @@ -57,13 +54,7 @@ public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);

InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

TimerService timerService = new SimpleTimerService(internalTimerService);

context = new ContextImpl<>(userFunction, timerService);
onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
context = new ContextImpl(userFunction, getProcessingTimeService());
}

@Override
Expand All @@ -83,36 +74,20 @@ public void processElement2(StreamRecord<IN2> element) throws Exception {
}

@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
currentWatermark = mark.getTimestamp();
}

@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
private class ContextImpl
extends CoProcessFunction<IN1, IN2, OUT>.Context
implements TimerService {

protected TimestampedCollector<OUT> getCollector() {
return collector;
}

private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {

private final TimerService timerService;
private final ProcessingTimeService timerService;

private StreamRecord<?> element;

ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, ProcessingTimeService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}
Expand All @@ -129,40 +104,28 @@ public Long timestamp() {
}

@Override
public TimerService timerService() {
return timerService;
public long currentProcessingTime() {
return timerService.getCurrentProcessingTime();
}
}

private static class OnTimerContextImpl<IN1, IN2, OUT>
extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {

private final TimerService timerService;

private TimeDomain timeDomain;

private InternalTimer<?, VoidNamespace> timer;

OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
@Override
public long currentWatermark() {
return currentWatermark;
}

@Override
public TimeDomain timeDomain() {
checkState(timeDomain != null);
return timeDomain;
public void registerProcessingTimeTimer(long time) {
throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
}

@Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
public void registerEventTimeTimer(long time) {
throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
}

@Override
public TimerService timerService() {
return timerService;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators.co;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

@Internal
public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {

private static final long serialVersionUID = 1L;

private transient TimestampedCollector<OUT> collector;

private transient ContextImpl<IN1, IN2, OUT> context;

private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;

public KeyedCoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
}

@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);

InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

TimerService timerService = new SimpleTimerService(internalTimerService);

context = new ContextImpl<>(userFunction, timerService);
onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
}

@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement1(element.getValue(), context, collector);
context.element = null;
}

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement2(element.getValue(), context, collector);
context.element = null;
}

@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}

@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}

protected TimestampedCollector<OUT> getCollector() {
return collector;
}

private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {

private final TimerService timerService;

private StreamRecord<?> element;

ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}

@Override
public Long timestamp() {
checkState(element != null);

if (element.hasTimestamp()) {
return element.getTimestamp();
} else {
return null;
}
}

@Override
public TimerService timerService() {
return timerService;
}
}

private static class OnTimerContextImpl<IN1, IN2, OUT>
extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {

private final TimerService timerService;

private TimeDomain timeDomain;

private InternalTimer<?, VoidNamespace> timer;

OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}

@Override
public TimeDomain timeDomain() {
checkState(timeDomain != null);
return timeDomain;
}

@Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
}

@Override
public TimerService timerService() {
return timerService;
}
}
}
Loading

0 comments on commit a26accf

Please sign in to comment.