Skip to content

Commit

Permalink
[FLINK-33202][runtime] Support switching from batch to stream mode to…
Browse files Browse the repository at this point in the history
… improve throughput when processing backlog data
  • Loading branch information
Sxnan committed Nov 7, 2023
1 parent cd95b56 commit a4013fc
Show file tree
Hide file tree
Showing 73 changed files with 2,647 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

Expand Down Expand Up @@ -561,5 +562,8 @@ public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}

@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.BacklogEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
Expand Down Expand Up @@ -607,6 +608,14 @@ private void handleReaderRegistrationEvent(
context.registerSourceReader(subtask, attemptNumber, event.location());
if (!subtaskReaderExisted) {
enumerator.addReader(event.subtaskId());

if (context.isBacklog() != null) {
context.runInCoordinatorThread(
() -> {
context.sendEventToSourceOperatorIfTaskReady(
subtask, new BacklogEvent(context.isBacklog()));
});
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.BacklogEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
private final boolean supportsConcurrentExecutionAttempts;
private final boolean[] subtaskHasNoMoreSplits;
private volatile boolean closed;
private volatile Boolean backlog = null;

public SourceCoordinatorContext(
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
Expand Down Expand Up @@ -370,6 +372,16 @@ public void setIsProcessingBacklog(boolean isProcessingBacklog) {
if (checkpointCoordinator != null) {
checkpointCoordinator.setIsProcessingBacklog(operatorID, isProcessingBacklog);
}
backlog = isProcessingBacklog;
callInCoordinatorThread(
() -> {
final BacklogEvent backlogEvent = new BacklogEvent(isProcessingBacklog);
for (int i = 0; i < getCoordinatorContext().currentParallelism(); i++) {
sendEventToSourceOperatorIfTaskReady(i, backlogEvent);
}
return null;
},
"Failed to send BacklogEvent to reader.");
}

// --------- Package private additional methods for the SourceCoordinator ------------
Expand Down Expand Up @@ -629,6 +641,10 @@ private void sendCachedSplitsToNewlyRegisteredReader(int subtaskIndex, int attem
}
}

public Boolean isBacklog() {
return backlog;
}

/** Maintains the subtask gateways for different execution attempts of different subtasks. */
private static class SubtaskGateways {
private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.source.event;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;

/** A source event that notify the source operator of the backlog status. */
public class BacklogEvent implements OperatorEvent {

private final boolean backlog;

public BacklogEvent(boolean backlog) {
this.backlog = backlog;
}

public boolean isBacklog() {
return backlog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,13 @@ public enum InputRequirement {
* records from {@link #SORTED} inputs. There are no guarantees on ordering between and
* within the different {@link #PASS_THROUGH} inputs.
*/
PASS_THROUGH;
PASS_THROUGH,

/**
* Backlog records are grouped (sorted) by key and then fed to the operator one group at a
* time. Non-backlog records are passed to the operator directly.
*/
SORTED_DURING_BACKLOG;
}

/** Interface representing chained inputs. */
Expand Down Expand Up @@ -879,4 +885,10 @@ public static boolean requiresSorting(StreamConfig.InputConfig inputConfig) {
&& ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement()
== StreamConfig.InputRequirement.SORTED;
}

public static boolean requiresSortingDuringBacklog(StreamConfig.InputConfig inputConfig) {
return inputConfig instanceof StreamConfig.NetworkInputConfig
&& ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement()
== InputRequirement.SORTED_DURING_BACKLOG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.operators.InternalBacklogAwareTimerServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
Expand Down Expand Up @@ -93,6 +94,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -415,6 +417,12 @@ private void configureStreamGraphStreaming(final StreamGraph graph) {
graph.setCheckpointStorage(checkpointStorage);
graph.setSavepointDirectory(savepointDir);
graph.setGlobalStreamExchangeMode(deriveGlobalStreamExchangeModeStreaming());

if (Duration.ZERO.equals(
configuration.get(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG))) {
graph.setTimerServiceProvider(InternalBacklogAwareTimerServiceManagerImpl::create);
}
}

private String deriveJobName(String defaultJobName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -49,7 +50,10 @@
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand All @@ -61,6 +65,9 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

Expand Down Expand Up @@ -148,6 +155,9 @@ public abstract class AbstractStreamOperator<OUT>

protected transient ProcessingTimeService processingTimeService;

protected transient RecordAttributes lastRecordAttributes1;
protected transient RecordAttributes lastRecordAttributes2;

// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -649,4 +659,52 @@ public OperatorID getOperatorID() {
protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
return Optional.ofNullable(timeServiceManager);
}

@Experimental
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
lastRecordAttributes1 = recordAttributes;
if (timeServiceManager != null
&& timeServiceManager instanceof InternalBacklogAwareTimerServiceManagerImpl) {
final InternalBacklogAwareTimerServiceManagerImpl<?> backlogAwareTimerServiceManager =
(InternalBacklogAwareTimerServiceManagerImpl<?>) timeServiceManager;
if (recordAttributes instanceof InternalRecordAttributes) {
backlogAwareTimerServiceManager.setMaxWatermarkDuringBacklog(
((InternalRecordAttributes) recordAttributes)
.getMaxWatermarkDuringBacklog());
}
backlogAwareTimerServiceManager.setBacklog(recordAttributes.isBacklog());
}
output.emitRecordAttributes(
new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build());
}

@Experimental
public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
lastRecordAttributes1 = recordAttributes;
List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes();
output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build());
}

@Experimental
public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
lastRecordAttributes2 = recordAttributes;
List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes();
output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build());
}

private List<RecordAttributes> getTwoInputsLastRecordAttributes() {
Preconditions.checkState(
lastRecordAttributes1 != null || lastRecordAttributes2 != null,
"lastRecordAttributes1 and lastRecordAttributes2 cannot be both null.");

List<RecordAttributes> lastRecordAttributes;
if (lastRecordAttributes1 != null && lastRecordAttributes2 != null) {
lastRecordAttributes = Arrays.asList(lastRecordAttributes1, lastRecordAttributes2);
} else if (lastRecordAttributes1 != null) {
lastRecordAttributes = Collections.singletonList(lastRecordAttributes1);
} else {
lastRecordAttributes = Collections.singletonList(lastRecordAttributes2);
}
return lastRecordAttributes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

import java.util.LinkedList;
import java.util.List;

/**
* An implementation of a {@link InternalTimerService} that manages timers with a single active key
* at a time. This is used by {@link
* org.apache.flink.streaming.api.operators.InternalBacklogAwareTimerServiceImpl} during backlog
* processing.
*/
class BacklogTimeService<K, N> extends BatchExecutionInternalTimeService<K, N> {
private long maxWatermarkDuringBacklog;

public BacklogTimeService(
ProcessingTimeService processingTimeService,
Triggerable<K, N> triggerTarget,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
super(processingTimeService, triggerTarget, eventTimeTimersQueue, null);
}

@Override
public void registerProcessingTimeTimer(N namespace, long time) {
throw new UnsupportedOperationException(
"BacklogTimeService does not support registering processing timer.");
}

@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
throw new UnsupportedOperationException(
"BacklogTimeService does not support deleting processing timer.");
}

/**
* Set the current key of the time service. If the new key is different from the last key, all
* the event time timers of the last key whose timestamp is less than or equal to the max
* watermark during backlog are triggered.
*/
public void setCurrentKey(K newKey) throws Exception {
if (newKey != null && newKey.equals(currentKey)) {
return;
}

TimerHeapInternalTimer<K, N> timer;
List<TimerHeapInternalTimer<K, N>> skippedTimers = new LinkedList<>();
if (currentKey != null) {
while ((timer = eventTimeTimersQueue.peek()) != null
&& timer.getTimestamp() <= maxWatermarkDuringBacklog) {
eventTimeTimersQueue.poll();

if (timer.getKey() != currentKey) {
skippedTimers.add(timer);
} else {
triggerTarget.onEventTime(timer);
}
}
eventTimeTimersQueue.addAll(skippedTimers);
}

if (newKey == null) {
currentWatermark = maxWatermarkDuringBacklog;
}

currentKey = newKey;
}

public void setMaxWatermarkDuringBacklog(long watermark) {
maxWatermarkDuringBacklog = watermark;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
Expand Down Expand Up @@ -74,4 +75,9 @@ public void close() {
public Gauge<Long> getWatermarkGauge() {
return output.getWatermarkGauge();
}

@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) {
output.emitRecordAttributes(recordAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

Expand Down Expand Up @@ -69,4 +71,11 @@ public interface Input<IN> {
* guaranteed to not be called concurrently with other methods of the operator.
*/
void setKeyContextElement(StreamRecord<IN> record) throws Exception;

/**
* Processes a {@link RecordAttributes} that arrived at this input. This method is guaranteed to
* not be called concurrently with other methods of the operator.
*/
@Experimental
default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}
Loading

0 comments on commit a4013fc

Please sign in to comment.