Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-22379][runtime] CheckpointCoordinator checks the state of all … #15728

Merged
merged 1 commit into from May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,9 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand All @@ -37,6 +40,7 @@
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -48,6 +52,7 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -142,7 +147,7 @@ private JobGraph createStreamingFileSinkJobGraph(String outputPath) {
env.addSource(new StatefulSource(true, latchId))
.uid(SOURCE_UID)
.setParallelism(NUM_SOURCES)
.addSink(sink)
.addSink(new WaitingRunningSink<>(latchId, sink))
.setParallelism(NUM_SINKS)
.uid(SINK_UID);
return env.getStreamGraph().getJobGraph();
Expand Down Expand Up @@ -200,6 +205,68 @@ private void loadSavepointAndExecute(
}
}

private static class WaitingRunningSink<T>
implements RichFunction,
Serializable,
SinkFunction<T>,
CheckpointedFunction,
CheckpointListener {
private final String latchId;
private final StreamingFileSink<T> streamingFileSink;

/**
* Creates a new {@code StreamingFileSink} that writes files to the given base directory
* with the give buckets properties.
*/
protected WaitingRunningSink(String latchId, StreamingFileSink<T> streamingFileSink) {
this.latchId = latchId;
this.streamingFileSink = streamingFileSink;
}

public void setRuntimeContext(RuntimeContext t) {
streamingFileSink.setRuntimeContext(t);
}

public RuntimeContext getRuntimeContext() {
return streamingFileSink.getRuntimeContext();
}

public IterationRuntimeContext getIterationRuntimeContext() {
return streamingFileSink.getIterationRuntimeContext();
}

public void open(Configuration parameters) throws Exception {
streamingFileSink.open(parameters);
}

public void close() throws Exception {
streamingFileSink.close();
}

public void initializeState(FunctionInitializationContext context) throws Exception {
streamingFileSink.initializeState(context);
}

public void notifyCheckpointComplete(long checkpointId) throws Exception {
streamingFileSink.notifyCheckpointComplete(checkpointId);
}

public void notifyCheckpointAborted(long checkpointId) {
streamingFileSink.notifyCheckpointAborted(checkpointId);
}

public void snapshotState(FunctionSnapshotContext context) throws Exception {
streamingFileSink.snapshotState(context);
}

@Override
public void invoke(T value, Context context) throws Exception {
SAVEPOINT_LATCH_MAP.get(latchId).countDown();

streamingFileSink.invoke(value, context);
}
}

private static class StatefulSource extends RichParallelSourceFunction<Integer>
implements CheckpointedFunction, CheckpointListener {

Expand Down Expand Up @@ -237,10 +304,6 @@ public void initializeState(FunctionInitializationContext context) throws Except
public void run(SourceContext<Integer> ctx) throws Exception {
if (takingSavepointMode) {
sendRecordsUntil(NUM_RECORDS / 3, 0, ctx);

CountDownLatch latch = SAVEPOINT_LATCH_MAP.get(latchId);
latch.countDown();

sendRecordsUntil(NUM_RECORDS / 2, 100, ctx);

while (true) {
Expand Down
Expand Up @@ -32,6 +32,8 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.utils.OperatorLatch;
import org.apache.flink.state.api.utils.WaitingFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -88,14 +90,15 @@ public void testOperatorStateInputFormat() throws Exception {

DataStream<Integer> data = streamEnv.addSource(new SavepointSource()).rebalance();

StatefulOperator statefulOperator = new StatefulOperator(list, union, broadcast);
data.connect(data.broadcast(broadcast))
.process(new StatefulOperator(list, union, broadcast))
.process(statefulOperator)
.uid(UID)
.addSink(new DiscardingSink<>());

JobGraph jobGraph = streamEnv.getStreamGraph().getJobGraph();

String savepoint = takeSavepoint(jobGraph);
String savepoint = takeSavepoint(jobGraph, statefulOperator);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

Expand Down Expand Up @@ -165,7 +168,8 @@ private void verifyBroadcastState(String path, ExecutionEnvironment batchEnv) th
broadcastStateValues);
}

private String takeSavepoint(JobGraph jobGraph) throws Exception {
private String takeSavepoint(JobGraph jobGraph, WaitingFunction waitingFunction)
throws Exception {
SavepointSource.initializeForTest();

ClusterClient<?> client = miniClusterResource.getClusterClient();
Expand All @@ -178,6 +182,7 @@ private String takeSavepoint(JobGraph jobGraph) throws Exception {
try {
JobID jobID = client.submitJob(jobGraph).get();

waitingFunction.await();
boolean finished = false;
while (deadline.hasTimeLeft()) {
if (SavepointSource.isFinished()) {
Expand Down Expand Up @@ -249,8 +254,8 @@ private static List<Integer> getElements() {
}

private static class StatefulOperator extends BroadcastProcessFunction<Integer, Integer, Void>
implements CheckpointedFunction {

implements CheckpointedFunction, WaitingFunction {
private final OperatorLatch startLatch = new OperatorLatch();
private final ListStateDescriptor<Integer> list;
private final ListStateDescriptor<Integer> union;
private final MapStateDescriptor<Integer, String> broadcast;
Expand Down Expand Up @@ -278,6 +283,8 @@ public void open(Configuration parameters) {

@Override
public void processElement(Integer value, ReadOnlyContext ctx, Collector<Void> out) {
startLatch.trigger();

elements.add(value);
}

Expand All @@ -304,5 +311,10 @@ public void initializeState(FunctionInitializationContext context) throws Except

unionState = context.getOperatorStateStore().getUnionListState(union);
}

@Override
public void await() throws RuntimeException {
startLatch.await();
}
}
}
Expand Up @@ -25,7 +25,9 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.utils.OperatorLatch;
import org.apache.flink.state.api.utils.SavepointTestBase;
import org.apache.flink.state.api.utils.WaitingFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand Down Expand Up @@ -58,17 +60,17 @@ public abstract class SavepointReaderKeyedStateITCase<B extends StateBackend>
public void testUserKeyedStateReader() throws Exception {
String savepointPath =
takeSavepoint(
elements,
source -> {
new KeyedStatefulOperator(),
process -> {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(getStateBackend());
env.setParallelism(4);

env.addSource(source)
env.addSource(createSource(elements))
.rebalance()
.keyBy(id -> id.key)
.process(new KeyedStatefulOperator())
.process(process)
.uid(uid)
.addSink(new DiscardingSink<>());

Expand All @@ -86,8 +88,9 @@ public void testUserKeyedStateReader() throws Exception {
"Unexpected results from keyed state", expected, new HashSet<>(results));
}

private static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Pojo, Void> {

private static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Pojo, Void>
implements WaitingFunction {
private final OperatorLatch startLatch = new OperatorLatch();
private transient ValueState<Integer> state;

@Override
Expand All @@ -97,12 +100,19 @@ public void open(Configuration parameters) {

@Override
public void processElement(Pojo value, Context ctx, Collector<Void> out) throws Exception {
startLatch.trigger();

state.update(value.state);

value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer));
value.processingTimeTimer.forEach(
timer -> ctx.timerService().registerProcessingTimeTimer(timer));
}

@Override
public void await() throws RuntimeException {
startLatch.await();
}
}

private static class Reader extends KeyedStateReaderFunction<Integer, Pojo> {
Expand Down