Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop #21576

Merged
merged 1 commit into from Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -76,6 +76,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -169,6 +170,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private int numSplits;
private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
private final Set<String> currentlyPausedSplits = new HashSet<>();
private boolean isEmitNextLoopDisabled = false;

private enum OperatingMode {
READING,
Expand All @@ -190,6 +192,8 @@ private enum OperatingMode {

private final boolean allowUnalignedSourceSplits;

private Supplier<Boolean> mailboxHasMail;

public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
Expand All @@ -199,7 +203,8 @@ public SourceOperator(
ProcessingTimeService timeService,
Configuration configuration,
String localHostname,
boolean emitProgressiveWatermarks) {
boolean emitProgressiveWatermarks,
Supplier<Boolean> mailboxHasMail) {

this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
Expand All @@ -212,6 +217,7 @@ public SourceOperator(
this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters();
this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
this.mailboxHasMail = checkNotNull(mailboxHasMail);
}

@Override
Expand Down Expand Up @@ -403,10 +409,20 @@ public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception {

// short circuit the hot path. Without this short circuit (READING handled in the
// switch/case) InputBenchmark.mapSink was showing a performance regression.
if (operatingMode == OperatingMode.READING) {
if (operatingMode != OperatingMode.READING) {
return emitNextNotReading(output);
}
if (isEmitNextLoopDisabled) {
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
}
return emitNextNotReading(output);

InputStatus status;
do {
status = sourceReader.pollNext(currentMainOutput);
} while (status == InputStatus.MORE_AVAILABLE
&& !mailboxHasMail.get()
&& !shouldWaitForAlignment());
return convertToInternalStatus(status);
}

private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception {
Expand Down Expand Up @@ -556,6 +572,11 @@ public void handleOperatorEvent(OperatorEvent event) {
}
}

// Configure SourceOperator#emitNext to emit at most one record to the given DataOutput.
public void disableEmitNextLoop() {
isEmitNextLoopDisabled = true;
}

private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
try {
List<SplitT> newSplits = event.splits(splitSerializer);
Expand Down
Expand Up @@ -36,6 +36,8 @@ Licensed to the Apache Software Foundation (ASF) under one

import javax.annotation.Nullable;

import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** The Factory class for {@link SourceOperator}. */
Expand Down Expand Up @@ -113,7 +115,8 @@ public <T extends StreamOperator<OUT>> T createStreamOperator(
.getEnvironment()
.getTaskManagerInfo()
.getTaskManagerExternalAddress(),
emitProgressiveWatermarks);
emitProgressiveWatermarks,
parameters.getContainingTask().getMailboxHasMail());

sourceOperator.setup(
parameters.getContainingTask(),
Expand Down Expand Up @@ -168,7 +171,8 @@ SourceOperator<T, SplitT> instantiateSourceOperator(
ProcessingTimeService timeService,
Configuration config,
String localHostName,
boolean emitProgressiveWatermarks) {
boolean emitProgressiveWatermarks,
Supplier<Boolean> mailboxHasMail) {

// jumping through generics hoops: cast the generics away to then cast them back more
// strictly typed
Expand All @@ -189,6 +193,7 @@ SourceOperator<T, SplitT> instantiateSourceOperator(
timeService,
config,
localHostName,
emitProgressiveWatermarks);
emitProgressiveWatermarks,
mailboxHasMail);
}
}
Expand Up @@ -34,8 +34,8 @@
public interface PushingAsyncDataInput<T> extends AvailabilityProvider {

/**
* Pushes the next element to the output from current data input, and returns the input status
* to indicate whether there are more available data in current input.
* Pushes elements to the output from current data input, and returns the input status to
* indicate whether there are more available data in current input.
*
* <p>This method should be non blocking.
*/
Expand Down
Expand Up @@ -157,4 +157,9 @@ public OperatorID getOperatorID() {
public SourceOperator<T, ?> getOperator() {
return operator;
}

// Configure StreamTaskSourceInput#emitNext to emit at most one record to the given DataOutput.
public void disableEmitNextLoop() {
operator.disableEmitNextLoop();
}
}
Expand Up @@ -82,6 +82,14 @@ public void init() throws Exception {
StreamConfig configuration = getConfiguration();
ClassLoader userClassLoader = getUserCodeClassLoader();

// This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
// behavior of choosing an input every time a record is emitted. This behavior is good for
// fairness between input consumption. But it can reduce throughput due to added control
// flow cost on the per-record code path.
for (StreamTaskSourceInput<?> input : operatorChain.getSourceTaskInputs()) {
input.disableEmitNextLoop();
}

InputConfig[] inputs = configuration.getInputs(userClassLoader);

WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputs.length];
Expand Down
Expand Up @@ -137,6 +137,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
Expand Down Expand Up @@ -992,6 +993,10 @@ public MailboxExecutorFactory getMailboxExecutorFactory() {
return this.mailboxProcessor::getMailboxExecutor;
}

public Supplier<Boolean> getMailboxHasMail() {
return this.mailboxProcessor::hasMail;
lindong28 marked this conversation as resolved.
Show resolved Hide resolved
}

public final boolean isRunning() {
return isRunning;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
Expand Down Expand Up @@ -61,6 +62,14 @@ protected void createInputProcessor(
List<IndexedInputGate> inputGates2,
Function<Integer, StreamPartitioner<?>> gatePartitioners) {

// This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
// behavior of choosing an input every time a record is emitted. This behavior is good for
// fairness between input consumption. But it can reduce throughput due to added control
// flow cost on the per-record code path.
for (StreamTaskSourceInput<?> input : operatorChain.getSourceTaskInputs()) {
input.disableEmitNextLoop();
}

// create an input instance for each input
checkpointBarrierHandler =
InputProcessorUtil.createCheckpointBarrierHandler(
Expand Down
Expand Up @@ -453,7 +453,6 @@ public boolean isMailboxLoopRunning() {
return mailboxLoopRunning;
}

@VisibleForTesting
public boolean hasMail() {
return mailbox.hasMail();
}
Expand Down
Expand Up @@ -89,7 +89,8 @@ public TestingSourceOperator(
timeService,
new Configuration(),
"localhost",
emitProgressiveWatermarks);
emitProgressiveWatermarks,
() -> true);

this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
Expand Down