Skip to content

Commit

Permalink
[FLINK-30533][runtime] SourceOperator#emitNext() should push records …
Browse files Browse the repository at this point in the history
…to DataOutput in a while loop
  • Loading branch information
lindong28 committed Jan 4, 2023
1 parent 9bec3b7 commit b4d1ef0
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 10 deletions.
Expand Up @@ -76,6 +76,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -169,6 +170,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private int numSplits;
private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
private final Set<String> currentlyPausedSplits = new HashSet<>();
private boolean isEmitNextLoopDisabled = true;

private enum OperatingMode {
READING,
Expand All @@ -190,6 +192,8 @@ private enum OperatingMode {

private final boolean allowUnalignedSourceSplits;

private Supplier<Boolean> mailboxHasMail;

public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
Expand All @@ -199,7 +203,8 @@ public SourceOperator(
ProcessingTimeService timeService,
Configuration configuration,
String localHostname,
boolean emitProgressiveWatermarks) {
boolean emitProgressiveWatermarks,
Supplier<Boolean> mailboxHasMail) {

this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
Expand All @@ -212,6 +217,7 @@ public SourceOperator(
this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters();
this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
this.mailboxHasMail = mailboxHasMail;
}

@Override
Expand Down Expand Up @@ -403,10 +409,20 @@ public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception {

// short circuit the hot path. Without this short circuit (READING handled in the
// switch/case) InputBenchmark.mapSink was showing a performance regression.
if (operatingMode == OperatingMode.READING) {
if (operatingMode != OperatingMode.READING) {
return emitNextNotReading(output);
}
if (isEmitNextLoopDisabled) {
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
}
return emitNextNotReading(output);

InputStatus status;
do {
status = sourceReader.pollNext(currentMainOutput);
} while (status == InputStatus.MORE_AVAILABLE
&& !mailboxHasMail.get()
&& !shouldWaitForAlignment());
return convertToInternalStatus(status);
}

private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception {
Expand Down Expand Up @@ -556,6 +572,11 @@ public void handleOperatorEvent(OperatorEvent event) {
}
}

// Configure SourceOperator#emitNext to emit at most one record to the given DataOutput.
public void disableEmitNextLoop() {
isEmitNextLoopDisabled = true;
}

private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
try {
List<SplitT> newSplits = event.splits(splitSerializer);
Expand Down
Expand Up @@ -36,6 +36,8 @@ Licensed to the Apache Software Foundation (ASF) under one

import javax.annotation.Nullable;

import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** The Factory class for {@link SourceOperator}. */
Expand Down Expand Up @@ -113,7 +115,8 @@ public <T extends StreamOperator<OUT>> T createStreamOperator(
.getEnvironment()
.getTaskManagerInfo()
.getTaskManagerExternalAddress(),
emitProgressiveWatermarks);
emitProgressiveWatermarks,
parameters.getContainingTask().getMailboxHasMail());

sourceOperator.setup(
parameters.getContainingTask(),
Expand Down Expand Up @@ -168,7 +171,8 @@ SourceOperator<T, SplitT> instantiateSourceOperator(
ProcessingTimeService timeService,
Configuration config,
String localHostName,
boolean emitProgressiveWatermarks) {
boolean emitProgressiveWatermarks,
Supplier<Boolean> mailboxHasMail) {

// jumping through generics hoops: cast the generics away to then cast them back more
// strictly typed
Expand All @@ -189,6 +193,7 @@ SourceOperator<T, SplitT> instantiateSourceOperator(
timeService,
config,
localHostName,
emitProgressiveWatermarks);
emitProgressiveWatermarks,
mailboxHasMail);
}
}
Expand Up @@ -34,8 +34,8 @@
public interface PushingAsyncDataInput<T> extends AvailabilityProvider {

/**
* Pushes the next element to the output from current data input, and returns the input status
* to indicate whether there are more available data in current input.
* Pushes elements to the output from current data input, and returns the input status to
* indicate whether there are more available data in current input.
*
* <p>This method should be non blocking.
*/
Expand Down
Expand Up @@ -157,4 +157,9 @@ public OperatorID getOperatorID() {
public SourceOperator<T, ?> getOperator() {
return operator;
}

// Configure StreamTaskSourceInput#emitNext to emit at most one record to the given DataOutput.
public void disableEmitNextLoop() {
operator.disableEmitNextLoop();
}
}
Expand Up @@ -82,6 +82,14 @@ public void init() throws Exception {
StreamConfig configuration = getConfiguration();
ClassLoader userClassLoader = getUserCodeClassLoader();

// This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
// behavior of choosing an input every time a record is emitted. This behavior is good for
// fairness between input consumption. But it can reduce throughput due to added control
// flow cost on the per-record code path.
for (StreamTaskSourceInput<?> input : operatorChain.getSourceTaskInputs()) {
input.disableEmitNextLoop();
}

InputConfig[] inputs = configuration.getInputs(userClassLoader);

WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputs.length];
Expand Down
Expand Up @@ -137,6 +137,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
Expand Down Expand Up @@ -992,6 +993,10 @@ public MailboxExecutorFactory getMailboxExecutorFactory() {
return this.mailboxProcessor::getMailboxExecutor;
}

public Supplier<Boolean> getMailboxHasMail() {
return this.mailboxProcessor::hasMail;
}

public final boolean isRunning() {
return isRunning;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
Expand Down Expand Up @@ -61,6 +62,14 @@ protected void createInputProcessor(
List<IndexedInputGate> inputGates2,
Function<Integer, StreamPartitioner<?>> gatePartitioners) {

// This is needed for StreamMultipleInputProcessor#processInput to preserve the existing
// behavior of choosing an input every time a record is emitted. This behavior is good for
// fairness between input consumption. But it can reduce throughput due to added control
// flow cost on the per-record code path.
for (StreamTaskSourceInput<?> input : operatorChain.getSourceTaskInputs()) {
input.disableEmitNextLoop();
}

// create an input instance for each input
checkpointBarrierHandler =
InputProcessorUtil.createCheckpointBarrierHandler(
Expand Down
Expand Up @@ -453,7 +453,6 @@ public boolean isMailboxLoopRunning() {
return mailboxLoopRunning;
}

@VisibleForTesting
public boolean hasMail() {
return mailbox.hasMail();
}
Expand Down
Expand Up @@ -89,7 +89,8 @@ public TestingSourceOperator(
timeService,
new Configuration(),
"localhost",
emitProgressiveWatermarks);
emitProgressiveWatermarks,
() -> true);

this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
Expand Down

0 comments on commit b4d1ef0

Please sign in to comment.