Skip to content

Commit

Permalink
[FLINK-33202][runtime] Support switching from batch to stream mode to…
Browse files Browse the repository at this point in the history
… improve throughput when processing backlog data
  • Loading branch information
Sxnan committed Oct 13, 2023
1 parent cd95b56 commit dff5c54
Show file tree
Hide file tree
Showing 74 changed files with 3,303 additions and 79 deletions.
822 changes: 822 additions & 0 deletions [RED][Runtime]增加提交job的接口.patch

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

Expand Down Expand Up @@ -561,5 +562,8 @@ public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}

@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.IsBacklogEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
Expand Down Expand Up @@ -607,6 +608,14 @@ private void handleReaderRegistrationEvent(
context.registerSourceReader(subtask, attemptNumber, event.location());
if (!subtaskReaderExisted) {
enumerator.addReader(event.subtaskId());

if (context.isBacklog() != null) {
context.runInCoordinatorThread(
() -> {
context.sendEventToSourceOperatorIfTaskReady(
subtask, new IsBacklogEvent(context.isBacklog()));
});
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsBacklogEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
private final boolean supportsConcurrentExecutionAttempts;
private final boolean[] subtaskHasNoMoreSplits;
private volatile boolean closed;
private volatile Boolean backlog = null;

public SourceCoordinatorContext(
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
Expand Down Expand Up @@ -370,6 +372,16 @@ public void setIsProcessingBacklog(boolean isProcessingBacklog) {
if (checkpointCoordinator != null) {
checkpointCoordinator.setIsProcessingBacklog(operatorID, isProcessingBacklog);
}
backlog = isProcessingBacklog;
callInCoordinatorThread(
() -> {
final IsBacklogEvent isBacklogEvent = new IsBacklogEvent(isProcessingBacklog);
for (int i = 0; i < getCoordinatorContext().currentParallelism(); i++) {
sendEventToSourceOperatorIfTaskReady(i, isBacklogEvent);
}
return null;
},
"Failed to send IsBacklogEvent to reader.");
}

// --------- Package private additional methods for the SourceCoordinator ------------
Expand Down Expand Up @@ -629,6 +641,10 @@ private void sendCachedSplitsToNewlyRegisteredReader(int subtaskIndex, int attem
}
}

public Boolean isBacklog() {
return backlog;
}

/** Maintains the subtask gateways for different execution attempts of different subtasks. */
private static class SubtaskGateways {
private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.source.event;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;

/** A source event that notify the source of the backlog status. */
public class IsBacklogEvent implements OperatorEvent {

private final boolean backlog;

public IsBacklogEvent(boolean backlog) {
this.backlog = backlog;
}

public boolean isBacklog() {
return backlog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ public enum InputRequirement {
*/
SORTED,

SORTED_DURING_BACKLOG,

/**
* Records from {@link #PASS_THROUGH} inputs are passed to the operator before passing any
* records from {@link #SORTED} inputs. There are no guarantees on ordering between and
Expand Down Expand Up @@ -879,4 +881,10 @@ public static boolean requiresSorting(StreamConfig.InputConfig inputConfig) {
&& ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement()
== StreamConfig.InputRequirement.SORTED;
}

public static boolean requiresSortingDuringBacklog(StreamConfig.InputConfig inputConfig) {
return inputConfig instanceof StreamConfig.NetworkInputConfig
&& ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement()
== InputRequirement.SORTED_DURING_BACKLOG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.operators.InternalBacklogAwareTimerServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
Expand Down Expand Up @@ -93,6 +94,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -415,6 +417,12 @@ private void configureStreamGraphStreaming(final StreamGraph graph) {
graph.setCheckpointStorage(checkpointStorage);
graph.setSavepointDirectory(savepointDir);
graph.setGlobalStreamExchangeMode(deriveGlobalStreamExchangeModeStreaming());

if (Duration.ZERO.equals(
configuration.get(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG))) {
graph.setTimerServiceProvider(InternalBacklogAwareTimerServiceManagerImpl::create);
}
}

private String deriveJobName(String defaultJobName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -49,7 +50,10 @@
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand All @@ -61,6 +65,9 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

Expand Down Expand Up @@ -148,6 +155,9 @@ public abstract class AbstractStreamOperator<OUT>

protected transient ProcessingTimeService processingTimeService;

protected transient RecordAttributes lastRecordAttributes1;
protected transient RecordAttributes lastRecordAttributes2;

// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -649,4 +659,52 @@ public OperatorID getOperatorID() {
protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
return Optional.ofNullable(timeServiceManager);
}

@Experimental
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
lastRecordAttributes1 = recordAttributes;
if (timeServiceManager != null
&& timeServiceManager instanceof InternalBacklogAwareTimerServiceManagerImpl) {
final InternalBacklogAwareTimerServiceManagerImpl<?> backlogAwareTimerServiceManager =
(InternalBacklogAwareTimerServiceManagerImpl<?>) timeServiceManager;
if (recordAttributes instanceof InternalRecordAttributes) {
backlogAwareTimerServiceManager.setMaxWatermarkDuringBacklog(
((InternalRecordAttributes) recordAttributes)
.getMaxWatermarkDuringBacklog());
}
backlogAwareTimerServiceManager.setBacklog(recordAttributes.isBacklog());
}
output.emitRecordAttributes(
new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build());
}

@Experimental
public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
lastRecordAttributes1 = recordAttributes;
List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes();
output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build());
}

@Experimental
public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
lastRecordAttributes2 = recordAttributes;
List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes();
output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build());
}

private List<RecordAttributes> getTwoInputsLastRecordAttributes() {
List<RecordAttributes> lastRecordAttributes;
if (lastRecordAttributes1 == null && lastRecordAttributes2 == null) {
// should not reach here.
throw new RuntimeException(
"lastRecordAttributes1 and lastRecordAttributes2 cannot be both null.");
} else if (lastRecordAttributes1 != null && lastRecordAttributes2 != null) {
lastRecordAttributes = Arrays.asList(lastRecordAttributes1, lastRecordAttributes2);
} else if (lastRecordAttributes1 != null) {
lastRecordAttributes = Collections.singletonList(lastRecordAttributes1);
} else {
lastRecordAttributes = Collections.singletonList(lastRecordAttributes2);
}
return lastRecordAttributes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

import java.util.LinkedList;
import java.util.List;

/**
* An implementation of a {@link InternalTimerService} that manages timers with a single active key
* at a time. This is used by {@link
* org.apache.flink.streaming.api.operators.InternalBacklogAwareTimerServiceImpl} during backlog
* processing.
*/
@Internal
public class BacklogTimeService<K, N> extends BatchExecutionInternalTimeService<K, N> {
private long maxWatermarkDuringBacklog;

public BacklogTimeService(
ProcessingTimeService processingTimeService,
Triggerable<K, N> triggerTarget,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
processingTimeTimersQueue) {
super(
processingTimeService,
triggerTarget,
eventTimeTimersQueue,
processingTimeTimersQueue);
}

@Override
public void registerProcessingTimeTimer(N namespace, long time) {
throw new UnsupportedOperationException(
"BacklogTimeService does not support registering processing timer.");
}

public void setCurrentKey(K newKey) throws Exception {
if (newKey != null && newKey.equals(currentKey)) {
return;
}

TimerHeapInternalTimer<K, N> timer;
List<TimerHeapInternalTimer<K, N>> skippedTimers = new LinkedList<>();
if (currentKey != null) {
while ((timer = eventTimeTimersQueue.peek()) != null
&& timer.getTimestamp() <= maxWatermarkDuringBacklog) {
eventTimeTimersQueue.poll();

if (timer.getKey() != currentKey) {
skippedTimers.add(timer);
} else {
triggerTarget.onEventTime(timer);
}
}
eventTimeTimersQueue.addAll(skippedTimers);
}

if (newKey == null) {
currentWatermark = maxWatermarkDuringBacklog;
}

currentKey = newKey;
}

public void setMaxWatermarkDuringBacklog(long watermark) {
maxWatermarkDuringBacklog = watermark;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
Expand Down Expand Up @@ -74,4 +75,9 @@ public void close() {
public Gauge<Long> getWatermarkGauge() {
return output.getWatermarkGauge();
}

@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) {
output.emitRecordAttributes(recordAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

Expand Down Expand Up @@ -69,4 +71,11 @@ public interface Input<IN> {
* guaranteed to not be called concurrently with other methods of the operator.
*/
void setKeyContextElement(StreamRecord<IN> record) throws Exception;

/**
* Processes a {@link RecordAttributes} that arrived at this input. This method is guaranteed to
* not be called concurrently with other methods of the operator.
*/
@Experimental
default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}
Loading

0 comments on commit dff5c54

Please sign in to comment.