Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.utils.OperatorLatch;
import org.apache.flink.state.api.utils.WaitingFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -57,6 +55,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.state.api.utils.SavepointTestBase.waitForAllRunningOrSomeTerminal;

/** IT case for reading state. */
public abstract class SavepointReaderITTestBase extends AbstractTestBase {
static final String UID = "stateful-operator";
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testOperatorStateInputFormat() throws Exception {

JobGraph jobGraph = streamEnv.getStreamGraph().getJobGraph();

String savepoint = takeSavepoint(jobGraph, statefulOperator);
String savepoint = takeSavepoint(jobGraph);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

Expand Down Expand Up @@ -168,8 +168,7 @@ private void verifyBroadcastState(String path, ExecutionEnvironment batchEnv) th
broadcastStateValues);
}

private String takeSavepoint(JobGraph jobGraph, WaitingFunction waitingFunction)
throws Exception {
private String takeSavepoint(JobGraph jobGraph) throws Exception {
SavepointSource.initializeForTest();

ClusterClient<?> client = miniClusterResource.getClusterClient();
Expand All @@ -182,7 +181,7 @@ private String takeSavepoint(JobGraph jobGraph, WaitingFunction waitingFunction)
try {
JobID jobID = client.submitJob(jobGraph).get();

waitingFunction.await();
waitForAllRunningOrSomeTerminal(jobID, miniClusterResource);
boolean finished = false;
while (deadline.hasTimeLeft()) {
if (SavepointSource.isFinished()) {
Expand Down Expand Up @@ -254,8 +253,7 @@ private static List<Integer> getElements() {
}

private static class StatefulOperator extends BroadcastProcessFunction<Integer, Integer, Void>
implements CheckpointedFunction, WaitingFunction {
private final OperatorLatch startLatch = new OperatorLatch();
implements CheckpointedFunction {
private final ListStateDescriptor<Integer> list;
private final ListStateDescriptor<Integer> union;
private final MapStateDescriptor<Integer, String> broadcast;
Expand Down Expand Up @@ -283,8 +281,6 @@ public void open(Configuration parameters) {

@Override
public void processElement(Integer value, ReadOnlyContext ctx, Collector<Void> out) {
startLatch.trigger();

elements.add(value);
}

Expand All @@ -311,10 +307,5 @@ public void initializeState(FunctionInitializationContext context) throws Except

unionState = context.getOperatorStateStore().getUnionListState(union);
}

@Override
public void await() throws RuntimeException {
startLatch.await();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.utils.OperatorLatch;
import org.apache.flink.state.api.utils.SavepointTestBase;
import org.apache.flink.state.api.utils.WaitingFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand Down Expand Up @@ -58,24 +56,19 @@ public abstract class SavepointReaderKeyedStateITCase<B extends StateBackend>

@Test
public void testUserKeyedStateReader() throws Exception {
String savepointPath =
takeSavepoint(
new KeyedStatefulOperator(),
process -> {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(getStateBackend());
env.setParallelism(4);

env.addSource(createSource(elements))
.rebalance()
.keyBy(id -> id.key)
.process(process)
.uid(uid)
.addSink(new DiscardingSink<>());

return env;
});
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(getStateBackend());
env.setParallelism(4);

env.addSource(createSource(elements))
.returns(Pojo.class)
.rebalance()
.keyBy(id -> id.key)
.process(new KeyedStatefulOperator())
.uid(uid)
.addSink(new DiscardingSink<>());

String savepointPath = takeSavepoint(env);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend());
Expand All @@ -88,9 +81,7 @@ public void testUserKeyedStateReader() throws Exception {
"Unexpected results from keyed state", expected, new HashSet<>(results));
}

private static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Pojo, Void>
implements WaitingFunction {
private final OperatorLatch startLatch = new OperatorLatch();
private static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Pojo, Void> {
private transient ValueState<Integer> state;

@Override
Expand All @@ -100,19 +91,12 @@ public void open(Configuration parameters) {

@Override
public void processElement(Pojo value, Context ctx, Collector<Void> out) throws Exception {
startLatch.trigger();

state.update(value.state);

value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer));
value.processingTimeTimer.forEach(
timer -> ctx.timerService().registerProcessingTimeTimer(timer));
}

@Override
public void await() throws RuntimeException {
startLatch.await();
}
}

private static class Reader extends KeyedStateReaderFunction<Integer, Pojo> {
Expand Down
Loading