Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
Expand All @@ -49,6 +51,7 @@
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand All @@ -57,11 +60,15 @@
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Base source operator only used for integrating the source reader which is proposed by FLIP-27. It
Expand Down Expand Up @@ -149,6 +156,8 @@ private enum OperatingMode {

private InternalSourceReaderMetricGroup sourceMetricGroup;

private LatencyMarkerEmitter<OUT> latencyMarerEmitter;

public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
Expand Down Expand Up @@ -282,6 +291,19 @@ public void open() throws Exception {
watermarkStrategy, sourceMetricGroup);
}

latencyMarerEmitter =
new LatencyMarkerEmitter<>(
getProcessingTimeService(),
getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: getContainingTask()
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration()
.getLong(MetricOptions.LATENCY_INTERVAL),
getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());

// restore the state if necessary.
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
Expand All @@ -296,13 +318,17 @@ public void open() throws Exception {
sourceReader.start();

eventTimeLogic.startPeriodicWatermarkEmits();
latencyMarerEmitter.startLatencyMarkerEmit();
}

@Override
public void finish() throws Exception {
if (eventTimeLogic != null) {
eventTimeLogic.stopPeriodicWatermarkEmits();
}
if (latencyMarerEmitter != null) {
latencyMarerEmitter.stopLatencyMarkerEmit();
}
super.finish();

finished.complete(null);
Expand Down Expand Up @@ -348,6 +374,7 @@ private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Except
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
currentMainOutput = eventTimeLogic.createMainOutput(output);
latencyMarerEmitter.emitMainOutput(output);
lastInvokedOutput = output;
this.operatingMode = OperatingMode.READING;
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
Expand Down Expand Up @@ -478,4 +505,79 @@ public void forceStop() {
this.forcedStopFuture.complete(null);
}
}

private static class LatencyMarkerEmitter<OUT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we deduplicate this class with LatencyMarksEmitter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski , this couldn't deduplicate with the LatencyMarksEmitter in StreamSource, because this uses the DataOutput but LatencyMarksEmitter uses the Output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why is that an issue? You could define a very simple helper interface inside the unified LatencyMarksEmitter/LatencyMarkerEmitter:

interface LatencyMarkerEmitter {
  void emitLatencyMarker(LatencyMarker latencyMarker);
}

Use it in the unified class. And in StreamSource/StreamOperator inject it's implementation by just using a simple lambda function output::emitLatencyMarker to define it.


private final ProcessingTimeService timeService;

private final long latencyTrackingInterval;

private final OperatorID operatorId;

private final int subtaskIndex;

@Nullable private DataOutput<OUT> currentMainOutput;

@Nullable private ScheduledFuture<?> latencyMarkerTimer;

public LatencyMarkerEmitter(
final ProcessingTimeService timeService,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
this.timeService = timeService;
this.latencyTrackingInterval = latencyTrackingInterval;
this.operatorId = operatorId;
this.subtaskIndex = subtaskIndex;
}

// ------------------------------------------------------------------------

public void emitMainOutput(PushingAsyncDataInput.DataOutput<OUT> output) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to initializeOutput?

// At the moment, we assume only one output is ever created!
// This assumption is strict, currently, because many of the classes in this
// implementation
// do not support re-assigning the underlying output
checkState(currentMainOutput == null, "Main output has already been set.");
currentMainOutput = output;
}

public void startLatencyMarkerEmit() {
checkState(
latencyMarkerTimer == null, "Latency marker emitter has already been started");
if (latencyTrackingInterval == 0) {
// a value of zero means not activated
return;
}
latencyMarkerTimer =
timeService.scheduleWithFixedDelay(
this::triggerLatencyMarkerEmit, 0L, latencyTrackingInterval);
}

public void stopLatencyMarkerEmit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to close() and implement Closeable?

if (latencyMarkerTimer != null) {
latencyMarkerTimer.cancel(false);
latencyMarkerTimer = null;
}
}

void triggerLatencyMarkerEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
if (currentMainOutput != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get rid of this if check (or replace is it with checkState(currentMainOuptut != null) if you just move code from startLatencyMarkerEmit() and add it to the initializeOutput(...)/emitMainOutput method.

try {
// ProcessingTimeService callbacks are executed under the
// checkpointing lock
currentMainOutput.emitLatencyMarker(
new LatencyMarker(
timeService.getCurrentProcessingTime(),
operatorId,
subtaskIndex));
} catch (Throwable t) {
// we catch the Throwable here so that we don't trigger the
// processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@

package org.apache.flink.streaming.api.operators.source;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

Expand Down Expand Up @@ -243,6 +251,19 @@ private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
final SourceOperator<T, MockSourceSplit> sourceOperator =
new TestingSourceOperator<>(
reader, watermarkStrategy, timeService, emitProgressiveWatermarks);

sourceOperator.setup(
new SourceOperatorStreamTask<Integer>(
new StreamMockEnvironment(
new Configuration(),
new Configuration(),
new ExecutionConfig(),
1L,
new MockInputSplitProvider(),
1,
new TestTaskStateManager())),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()));
sourceOperator.initializeState(stateContext);
sourceOperator.open();

Expand Down
Loading