From b4d1ef0e3581fdc876bc5c31ac310c2b882cbceb Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Fri, 30 Dec 2022 15:15:53 +0800 Subject: [PATCH] [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop --- .../api/operators/SourceOperator.java | 27 ++++++++++++++++--- .../api/operators/SourceOperatorFactory.java | 11 +++++--- .../runtime/io/PushingAsyncDataInput.java | 4 +-- .../runtime/io/StreamTaskSourceInput.java | 5 ++++ .../tasks/MultipleInputStreamTask.java | 8 ++++++ .../streaming/runtime/tasks/StreamTask.java | 5 ++++ .../runtime/tasks/TwoInputStreamTask.java | 9 +++++++ .../tasks/mailbox/MailboxProcessor.java | 1 - .../source/TestingSourceOperator.java | 3 ++- 9 files changed, 63 insertions(+), 10 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 3e962596e66d5a..23c100f9c31899 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -76,6 +76,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -169,6 +170,7 @@ public class SourceOperator extends AbstractStr private int numSplits; private final Map splitCurrentWatermarks = new HashMap<>(); private final Set currentlyPausedSplits = new HashSet<>(); + private boolean isEmitNextLoopDisabled = true; private enum OperatingMode { READING, @@ -190,6 +192,8 @@ private enum OperatingMode { private final boolean allowUnalignedSourceSplits; + private Supplier mailboxHasMail; + public SourceOperator( FunctionWithException, Exception> readerFactory, @@ -199,7 +203,8 @@ public SourceOperator( ProcessingTimeService timeService, Configuration configuration, String localHostname, - boolean emitProgressiveWatermarks) { + boolean emitProgressiveWatermarks, + Supplier mailboxHasMail) { this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); @@ -212,6 +217,7 @@ public SourceOperator( this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED; this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters(); this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS); + this.mailboxHasMail = mailboxHasMail; } @Override @@ -403,10 +409,20 @@ public DataInputStatus emitNext(DataOutput output) throws Exception { // short circuit the hot path. Without this short circuit (READING handled in the // switch/case) InputBenchmark.mapSink was showing a performance regression. - if (operatingMode == OperatingMode.READING) { + if (operatingMode != OperatingMode.READING) { + return emitNextNotReading(output); + } + if (isEmitNextLoopDisabled) { return convertToInternalStatus(sourceReader.pollNext(currentMainOutput)); } - return emitNextNotReading(output); + + InputStatus status; + do { + status = sourceReader.pollNext(currentMainOutput); + } while (status == InputStatus.MORE_AVAILABLE + && !mailboxHasMail.get() + && !shouldWaitForAlignment()); + return convertToInternalStatus(status); } private DataInputStatus emitNextNotReading(DataOutput output) throws Exception { @@ -556,6 +572,11 @@ public void handleOperatorEvent(OperatorEvent event) { } } + // Configure SourceOperator#emitNext to emit at most one record to the given DataOutput. + public void disableEmitNextLoop() { + isEmitNextLoopDisabled = true; + } + private void handleAddSplitsEvent(AddSplitEvent event) { try { List newSplits = event.splits(splitSerializer); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 4c364beb2b69ea..4846a1ebbc9b13 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -36,6 +36,8 @@ Licensed to the Apache Software Foundation (ASF) under one import javax.annotation.Nullable; +import java.util.function.Supplier; + import static org.apache.flink.util.Preconditions.checkNotNull; /** The Factory class for {@link SourceOperator}. */ @@ -113,7 +115,8 @@ public > T createStreamOperator( .getEnvironment() .getTaskManagerInfo() .getTaskManagerExternalAddress(), - emitProgressiveWatermarks); + emitProgressiveWatermarks, + parameters.getContainingTask().getMailboxHasMail()); sourceOperator.setup( parameters.getContainingTask(), @@ -168,7 +171,8 @@ SourceOperator instantiateSourceOperator( ProcessingTimeService timeService, Configuration config, String localHostName, - boolean emitProgressiveWatermarks) { + boolean emitProgressiveWatermarks, + Supplier mailboxHasMail) { // jumping through generics hoops: cast the generics away to then cast them back more // strictly typed @@ -189,6 +193,7 @@ SourceOperator instantiateSourceOperator( timeService, config, localHostName, - emitProgressiveWatermarks); + emitProgressiveWatermarks, + mailboxHasMail); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java index 4b58982b080215..619c2d00954a4f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java @@ -34,8 +34,8 @@ public interface PushingAsyncDataInput extends AvailabilityProvider { /** - * Pushes the next element to the output from current data input, and returns the input status - * to indicate whether there are more available data in current input. + * Pushes elements to the output from current data input, and returns the input status to + * indicate whether there are more available data in current input. * *

This method should be non blocking. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java index 54988606b15ef4..55863fee47e061 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java @@ -157,4 +157,9 @@ public OperatorID getOperatorID() { public SourceOperator getOperator() { return operator; } + + // Configure StreamTaskSourceInput#emitNext to emit at most one record to the given DataOutput. + public void disableEmitNextLoop() { + operator.disableEmitNextLoop(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index aac586f2163716..6f1ed44e8a8ba2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -82,6 +82,14 @@ public void init() throws Exception { StreamConfig configuration = getConfiguration(); ClassLoader userClassLoader = getUserCodeClassLoader(); + // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing + // behavior of choosing an input every time a record is emitted. This behavior is good for + // fairness between input consumption. But it can reduce throughput due to added control + // flow cost on the per-record code path. + for (StreamTaskSourceInput input : operatorChain.getSourceTaskInputs()) { + input.disableEmitNextLoop(); + } + InputConfig[] inputs = configuration.getInputs(userClassLoader); WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputs.length]; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 5130861b844af7..097be6d07f2bf2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -137,6 +137,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -992,6 +993,10 @@ public MailboxExecutorFactory getMailboxExecutorFactory() { return this.mailboxProcessor::getMailboxExecutor; } + public Supplier getMailboxHasMail() { + return this.mailboxProcessor::hasMail; + } + public final boolean isRunning() { return isRunning; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 5f9bfcef0db810..e62ed44a99c0f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory; import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate; @@ -61,6 +62,14 @@ protected void createInputProcessor( List inputGates2, Function> gatePartitioners) { + // This is needed for StreamMultipleInputProcessor#processInput to preserve the existing + // behavior of choosing an input every time a record is emitted. This behavior is good for + // fairness between input consumption. But it can reduce throughput due to added control + // flow cost on the per-record code path. + for (StreamTaskSourceInput input : operatorChain.getSourceTaskInputs()) { + input.disableEmitNextLoop(); + } + // create an input instance for each input checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java index b100152e3dae48..3dae1cec3dfd82 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java @@ -453,7 +453,6 @@ public boolean isMailboxLoopRunning() { return mailboxLoopRunning; } - @VisibleForTesting public boolean hasMail() { return mailbox.hasMail(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index d25226e9bdd2fd..8aca0222eae24d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -89,7 +89,8 @@ public TestingSourceOperator( timeService, new Configuration(), "localhost", - emitProgressiveWatermarks); + emitProgressiveWatermarks, + () -> true); this.subtaskIndex = subtaskIndex; this.parallelism = parallelism;