Skip to content
Permalink
Browse files
[FLINK-27676][python] Fix on_timer output behind triggering watermark
This closes #19765.
  • Loading branch information
Vancior authored and HuangXingBo committed May 19, 2022
1 parent cc63db7 commit 124e4adb04196e5d56974aead02e61a9bd5bf2cf
Showing 5 changed files with 115 additions and 17 deletions.
@@ -183,18 +183,21 @@ public void processWatermark(Watermark mark) throws Exception {
if (mark.getTimestamp() == Long.MAX_VALUE) {
invokeFinishBundle();
processElementsOfCurrentKeyIfNeeded(null);
super.processWatermark(mark);
preEmitWatermark(mark);
output.emitWatermark(mark);
} else if (isBundleFinished()) {
// forward the watermark immediately if the bundle is already finished.
super.processWatermark(mark);
preEmitWatermark(mark);
output.emitWatermark(mark);
} else {
// It is not safe to advance the output watermark yet, so add a hold on the current
// output watermark.
bundleFinishedCallback =
() -> {
try {
// at this point the bundle is finished, allow the watermark to pass
super.processWatermark(mark);
preEmitWatermark(mark);
output.emitWatermark(mark);
} catch (Exception e) {
throw new RuntimeException(
"Failed to process watermark after finished bundle.", e);
@@ -260,6 +263,13 @@ public Configuration getConfiguration() {

protected abstract PythonEnvironmentManager createPythonEnvironmentManager();

/** Called before emitting watermark to downstream. */
protected void preEmitWatermark(Watermark mark) throws Exception {
if (getTimeServiceManager().isPresent()) {
getTimeServiceManager().get().advanceWatermark(mark);
}
}

/** Checks whether to invoke finishBundle by elements count. Called in processElement. */
protected void checkInvokeFinishBundleByCount() throws Exception {
if (elementCount >= maxBundleSize) {
@@ -29,6 +29,7 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -37,6 +38,7 @@
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

@@ -61,6 +63,9 @@
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService<VoidNamespace> internalTimerService;

/** TimerRegistration for handling timer registering. */
private transient TimerRegistration timerRegistration;

/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;

@@ -102,6 +107,13 @@ public void open() throws Exception {
timerDataTypeInfo);

timerHandler = new TimerHandler();
timerRegistration =
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
VoidNamespaceSerializer.INSTANCE,
timerDataSerializer);

super.open();
}
@@ -129,13 +141,7 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
getOperatorStateBackend(),
keyTypeSerializer,
null,
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
VoidNamespaceSerializer.INSTANCE,
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
timerDataTypeInfo)),
timerRegistration,
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -208,6 +214,20 @@ private void processTimer(TimeDomain timeDomain, InternalTimer<Row, VoidNamespac
emitResults();
}

@SuppressWarnings("rawtypes")
@Override
protected void preEmitWatermark(Watermark mark) throws Exception {
if (!getTimeServiceManager().isPresent()) {
return;
}
InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get();
long timestamp = mark.getTimestamp();
do {
timeServiceManager.advanceWatermark(mark);
invokeFinishBundle();
} while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
}

/**
* As the beam state gRPC service will access the KeyedStateBackend in parallel with this
* operator, we must override this method to prevent changing the current key of the
@@ -29,6 +29,7 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -37,6 +38,7 @@
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

@@ -68,6 +70,9 @@
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService internalTimerService;

/** TimerRegistration for handling timer registering. */
private transient TimerRegistration timerRegistration;

/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;

@@ -122,6 +127,13 @@ public void open() throws Exception {
timerDataTypeInfo);

timerHandler = new TimerHandler();
timerRegistration =
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
namespaceSerializer,
timerDataSerializer);

super.open();
}
@@ -159,12 +171,7 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
getOperatorStateBackend(),
keyTypeSerializer,
namespaceSerializer,
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
namespaceSerializer,
timerDataSerializer),
timerRegistration,
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -226,6 +233,20 @@ private void processTimer(TimeDomain timeDomain, InternalTimer<Row, Object> time
emitResults();
}

@SuppressWarnings("rawtypes")
@Override
protected void preEmitWatermark(Watermark mark) throws Exception {
if (!getTimeServiceManager().isPresent()) {
return;
}
InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get();
long timestamp = mark.getTimestamp();
do {
timeServiceManager.advanceWatermark(mark);
invokeFinishBundle();
} while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
}

/**
* As the beam state gRPC service will access the KeyedStateBackend in parallel with this
* operator, we must override this method to prevent changing the current key of the
@@ -22,11 +22,13 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.types.Row;

@@ -39,6 +41,7 @@ public final class TimerRegistration {

private final KeyedStateBackend<Row> keyedStateBackend;
private final InternalTimerService internalTimerService;
private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> internalEventTimeTimersQueue;
private final KeyContext keyContext;
private final TypeSerializer namespaceSerializer;
private final TypeSerializer<Row> timerDataSerializer;
@@ -50,9 +53,12 @@ public TimerRegistration(
InternalTimerService internalTimerService,
KeyContext keyContext,
TypeSerializer namespaceSerializer,
TypeSerializer<Row> timerDataSerializer) {
TypeSerializer<Row> timerDataSerializer)
throws Exception {
this.keyedStateBackend = keyedStateBackend;
this.internalTimerService = internalTimerService;
this.internalEventTimeTimersQueue =
TimerUtils.getInternalEventTimeTimersQueue(internalTimerService);
this.keyContext = keyContext;
this.namespaceSerializer = namespaceSerializer;
this.timerDataSerializer = timerDataSerializer;
@@ -105,6 +111,17 @@ private void setTimer(TimerOperandType operandType, long timestamp, Row key, Obj
}
}

/**
* Returns if there's any event-time timer in the queue, that should be triggered because
* watermark advance.
*/
public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws Exception {
return TimerUtils.hasEventTimeTimerBeforeTimestamp(
internalEventTimeTimersQueue,
timestamp,
PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend));
}

/** The flag for indicating the timer operation type. */
private enum TimerOperandType {
REGISTER_EVENT_TIMER((byte) 0),
@@ -22,8 +22,14 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.lang.reflect.Field;

/** Utilities for timer. */
@Internal
@@ -42,4 +48,28 @@ public static FlinkFnApi.CoderInfoDescriptor createTimerDataCoderInfoDescriptorP
return ProtoUtils.createRawTypeCoderInfoDescriptorProto(
timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false);
}

@SuppressWarnings("unchecked")
public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
getInternalEventTimeTimersQueue(InternalTimerService<?> internalTimerService)
throws Exception {
Field queueField = internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue");
queueField.setAccessible(true);
return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>)
queueField.get(internalTimerService);
}

public static boolean hasEventTimeTimerBeforeTimestamp(
InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue,
long timestamp,
boolean isBatchMode)
throws Exception {
if (isBatchMode) {
Preconditions.checkArgument(timestamp == Long.MAX_VALUE);
return timerQueue.size() == 0;
}

TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek();
return minTimer == null || minTimer.getTimestamp() > timestamp;
}
}

0 comments on commit 124e4ad

Please sign in to comment.