Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -90,9 +90,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre

/** Initialize necessary state components for {@link AbstractStreamOperator}. */
@Override
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
public final void beforeInitializeStateHandler() {
KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null);
if (stateStore instanceof DefaultKeyedStateStore) {
((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2();
Expand Down Expand Up @@ -293,12 +291,11 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
InternalTimerService<N> service =
keyedTimeServiceHandler.getInternalTimerService(
name, keySerializer, namespaceSerializer, triggerable);
((InternalTimerServiceAsyncImpl<K, N>) service).setup(asyncExecutionController);
return service;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
Expand Down Expand Up @@ -92,9 +92,7 @@ public AbstractAsyncStateStreamOperatorV2(

/** Initialize necessary state components for {@link AbstractStreamOperatorV2}. */
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
public final void beforeInitializeStateHandler() {
KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null);
if (stateStore instanceof DefaultKeyedStateStore) {
((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2();
Expand Down Expand Up @@ -258,12 +256,11 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
InternalTimerService<N> service =
keyedTimeServiceHandler.getInternalTimerService(
name, keySerializer, namespaceSerializer, triggerable);
((InternalTimerServiceAsyncImpl<K, N>) service).setup(asyncExecutionController);
return service;
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,11 @@ public OperatorMetricGroup getMetricGroup() {
return metrics;
}

/** Initialize necessary state components before initializing state components. */
protected void beforeInitializeStateHandler() {}

@Override
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {

final TypeSerializer<?> keySerializer =
Expand Down Expand Up @@ -296,6 +299,8 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();

beforeInitializeStateHandler();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ public OperatorMetricGroup getMetricGroup() {
return metrics;
}

/** Initialize necessary state components before initializing state components. */
protected void beforeInitializeStateHandler() {}

@Override
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
Expand All @@ -225,6 +228,8 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();

beforeInitializeStateHandler();
stateHandler.initializeOperatorState(this);

if (useSplittableTimers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -65,21 +64,6 @@ <N> InternalTimerService<N> getInternalTimerService(
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable);

/**
* Creates an {@link InternalTimerServiceAsyncImpl} for handling a group of timers identified by
* the given {@code name}. The timers are scoped to a key and namespace. Mainly used by async
* operators.
*
* <p>Some essential order preservation will be added when the given {@link Triggerable} is
* invoked.
*/
<N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController);

/**
* Advances the Watermark of all managed {@link InternalTimerService timer services},
* potentially firing event time timers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
Expand All @@ -39,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
Expand Down Expand Up @@ -76,6 +79,8 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan

private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

@Nullable AsyncExecutionController<K> asyncExecutionController;

private InternalTimeServiceManagerImpl(
TaskIOMetricGroup taskIOMetricGroup,
KeyGroupRange localKeyGroupRange,
Expand Down Expand Up @@ -162,65 +167,31 @@ <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
InternalTimerServiceImpl<K, N> timerService =
(InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {

timerService =
new InternalTimerServiceImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);

timerServices.put(name, timerService);
}
return timerService;
}

@Override
public <N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController) {
checkNotNull(keySerializer, "Timers can only be used on keyed operators.");

// the following casting is to overcome type restrictions.
TimerSerializer<K, N> timerSerializer =
new TimerSerializer<>(keySerializer, namespaceSerializer);

InternalTimerServiceAsyncImpl<K, N> timerService =
registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController);

timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);

return timerService;
}

<N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
AsyncExecutionController<K> asyncExecutionController) {
InternalTimerServiceAsyncImpl<K, N> timerService =
(InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name);
if (timerService == null) {
timerService =
new InternalTimerServiceAsyncImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext,
asyncExecutionController);
if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {
timerService =
new InternalTimerServiceAsyncImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(
EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);
} else {
timerService =
new InternalTimerServiceImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(
EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);
}

timerServices.put(name, timerService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;

/**
* An implementation of {@link InternalTimerService} that is used by {@link
* org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
* The timer service will set {@link RecordContext} for the timers before invoking action to
* preserve the execution order between timer firing and records processing.
* org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator}. The timer
* service will set {@link RecordContext} for the timers before invoking action to preserve the
* execution order between timer firing and records processing.
*
* @see <a
* href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
Expand All @@ -54,8 +55,7 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp
ProcessingTimeService processingTimeService,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue,
StreamTaskCancellationContext cancellationContext,
AsyncExecutionController<K> asyncExecutionController) {
StreamTaskCancellationContext cancellationContext) {
super(
taskIOMetricGroup,
localKeyGroupRange,
Expand All @@ -64,11 +64,17 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp
processingTimeTimersQueue,
eventTimeTimersQueue,
cancellationContext);
this.asyncExecutionController = asyncExecutionController;
}

public void setup(AsyncExecutionController<K> asyncExecutionController) {
if (asyncExecutionController != null) {
this.asyncExecutionController = asyncExecutionController;
}
}

@Override
void onProcessingTime(long time) throws Exception {
Preconditions.checkNotNull(asyncExecutionController);
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
Expand Down Expand Up @@ -99,6 +105,7 @@ void onProcessingTime(long time) throws Exception {
*/
@Override
public void advanceWatermark(long time) throws Exception {
Preconditions.checkNotNull(asyncExecutionController);
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -74,17 +73,6 @@ public <N> InternalTimerService<N> getInternalTimerService(
return timerService;
}

@Override
public <N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController) {
throw new UnsupportedOperationException(
"Async timer service is not supported in BATCH execution mode.");
}

@Override
public void advanceWatermark(Watermark watermark) {
if (watermark.getTimestamp() == Long.MAX_VALUE) {
Expand Down
Loading