Skip to content
Permalink
Browse files
[FLINK-27733][python] Rework on_timer output behind watermark bug fix
This closes #19788.
  • Loading branch information
Vancior authored and HuangXingBo committed May 24, 2022
1 parent 99c74d5 commit a0ef9eb46ad3896d6d87595dbe364f69d583794c
Showing 9 changed files with 49 additions and 124 deletions.
@@ -25,6 +25,7 @@
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -180,23 +181,27 @@ public void processWatermark(Watermark mark) throws Exception {
// Approach 1) is the easiest and gives better latency, yet 2)
// gives better throughput due to the bundle not getting cut on
// every watermark. So we have implemented 2) below.

// advance the watermark and do not emit watermark to downstream operators
if (getTimeServiceManager().isPresent()) {
getTimeServiceManager().get().advanceWatermark(mark);
}

if (mark.getTimestamp() == Long.MAX_VALUE) {
invokeFinishBundle();
processElementsOfCurrentKeyIfNeeded(null);
preEmitWatermark(mark);
advanceWatermark(mark);
output.emitWatermark(mark);
} else if (isBundleFinished()) {
// forward the watermark immediately if the bundle is already finished.
preEmitWatermark(mark);
output.emitWatermark(mark);
} else {
// It is not safe to advance the output watermark yet, so add a hold on the current
// output watermark.
bundleFinishedCallback =
() -> {
try {
advanceWatermark(mark);
// at this point the bundle is finished, allow the watermark to pass
preEmitWatermark(mark);
output.emitWatermark(mark);
} catch (Exception e) {
throw new RuntimeException(
@@ -263,10 +268,19 @@ public Configuration getConfiguration() {

protected abstract PythonEnvironmentManager createPythonEnvironmentManager();

/** Called before emitting watermark to downstream. */
protected void preEmitWatermark(Watermark mark) throws Exception {
/**
* Advances the watermark of all managed timer services, potentially firing event time timers.
* It also ensures that the fired timers are processed in the Python user-defined functions.
*/
private void advanceWatermark(Watermark watermark) throws Exception {
if (getTimeServiceManager().isPresent()) {
getTimeServiceManager().get().advanceWatermark(mark);
InternalTimeServiceManager<?> timeServiceManager = getTimeServiceManager().get();
timeServiceManager.advanceWatermark(watermark);

while (!isBundleFinished()) {
invokeFinishBundle();
timeServiceManager.advanceWatermark(watermark);
}
}
}

@@ -29,7 +29,6 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

@@ -63,9 +61,6 @@
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService<VoidNamespace> internalTimerService;

/** TimerRegistration for handling timer registering. */
private transient TimerRegistration timerRegistration;

/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;

@@ -107,13 +102,6 @@ public void open() throws Exception {
timerDataTypeInfo);

timerHandler = new TimerHandler();
timerRegistration =
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
VoidNamespaceSerializer.INSTANCE,
timerDataSerializer);

super.open();
}
@@ -141,7 +129,12 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
getOperatorStateBackend(),
keyTypeSerializer,
null,
timerRegistration,
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
VoidNamespaceSerializer.INSTANCE,
timerDataSerializer),
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -214,20 +207,6 @@ private void processTimer(TimeDomain timeDomain, InternalTimer<Row, VoidNamespac
emitResults();
}

@SuppressWarnings("rawtypes")
@Override
protected void preEmitWatermark(Watermark mark) throws Exception {
if (!getTimeServiceManager().isPresent()) {
return;
}
InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get();
long timestamp = mark.getTimestamp();
do {
timeServiceManager.advanceWatermark(mark);
invokeFinishBundle();
} while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
}

/**
* As the beam state gRPC service will access the KeyedStateBackend in parallel with this
* operator, we must override this method to prevent changing the current key of the
@@ -29,7 +29,6 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

@@ -70,9 +68,6 @@
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService internalTimerService;

/** TimerRegistration for handling timer registering. */
private transient TimerRegistration timerRegistration;

/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;

@@ -127,13 +122,6 @@ public void open() throws Exception {
timerDataTypeInfo);

timerHandler = new TimerHandler();
timerRegistration =
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
namespaceSerializer,
timerDataSerializer);

super.open();
}
@@ -171,7 +159,12 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
getOperatorStateBackend(),
keyTypeSerializer,
namespaceSerializer,
timerRegistration,
new TimerRegistration(
getKeyedStateBackend(),
internalTimerService,
this,
namespaceSerializer,
timerDataSerializer),
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -233,20 +226,6 @@ private void processTimer(TimeDomain timeDomain, InternalTimer<Row, Object> time
emitResults();
}

@SuppressWarnings("rawtypes")
@Override
protected void preEmitWatermark(Watermark mark) throws Exception {
if (!getTimeServiceManager().isPresent()) {
return;
}
InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get();
long timestamp = mark.getTimestamp();
do {
timeServiceManager.advanceWatermark(mark);
invokeFinishBundle();
} while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
}

/**
* As the beam state gRPC service will access the KeyedStateBackend in parallel with this
* operator, we must override this method to prevent changing the current key of the
@@ -22,13 +22,11 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.types.Row;

@@ -41,7 +39,6 @@ public final class TimerRegistration {

private final KeyedStateBackend<Row> keyedStateBackend;
private final InternalTimerService internalTimerService;
private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> internalEventTimeTimersQueue;
private final KeyContext keyContext;
private final TypeSerializer namespaceSerializer;
private final TypeSerializer<Row> timerDataSerializer;
@@ -57,8 +54,6 @@ public TimerRegistration(
throws Exception {
this.keyedStateBackend = keyedStateBackend;
this.internalTimerService = internalTimerService;
this.internalEventTimeTimersQueue =
TimerUtils.getInternalEventTimeTimersQueue(internalTimerService);
this.keyContext = keyContext;
this.namespaceSerializer = namespaceSerializer;
this.timerDataSerializer = timerDataSerializer;
@@ -111,17 +106,6 @@ private void setTimer(TimerOperandType operandType, long timestamp, Row key, Obj
}
}

/**
* Returns if there's any event-time timer in the queue, that should be triggered because
* watermark advance.
*/
public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws Exception {
return TimerUtils.hasEventTimeTimerBeforeTimestamp(
internalEventTimeTimersQueue,
timestamp,
PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend));
}

/** The flag for indicating the timer operation type. */
private enum TimerOperandType {
REGISTER_EVENT_TIMER((byte) 0),
@@ -22,14 +22,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.lang.reflect.Field;

/** Utilities for timer. */
@Internal
@@ -48,28 +42,4 @@ public static FlinkFnApi.CoderInfoDescriptor createTimerDataCoderInfoDescriptorP
return ProtoUtils.createRawTypeCoderInfoDescriptorProto(
timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false);
}

@SuppressWarnings("unchecked")
public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
getInternalEventTimeTimersQueue(InternalTimerService<?> internalTimerService)
throws Exception {
Field queueField = internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue");
queueField.setAccessible(true);
return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>)
queueField.get(internalTimerService);
}

public static boolean hasEventTimeTimerBeforeTimestamp(
InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue,
long timestamp,
boolean isBatchMode)
throws Exception {
if (isBatchMode) {
Preconditions.checkArgument(timestamp == Long.MAX_VALUE);
return timerQueue.size() == 0;
}

TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek();
return minTimer == null || minTimer.getTimestamp() > timestamp;
}
}
@@ -207,8 +207,6 @@ void testFinishBundleTriggeredByTime() throws Exception {
testHarness.processElement(newRecord(true, initialTime + 3, "c1", "c6", 2L, 10000L));
testHarness.processElement(newRecord(true, initialTime + 4, "c2", "c8", 3L, 0L));
testHarness.processWatermark(new Watermark(20000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());

testHarness.setProcessingTime(1000L);
expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L));

0 comments on commit a0ef9eb

Please sign in to comment.