Skip to content

Commit

Permalink
[FLINK-5113] Port functions in tests to new CheckpointedFunction IF.
Browse files Browse the repository at this point in the history
This closes #2939.
  • Loading branch information
kl0u authored and zentol committed Jan 19, 2017
1 parent 570dbc8 commit 525edf1
Show file tree
Hide file tree
Showing 26 changed files with 608 additions and 303 deletions.
Expand Up @@ -21,12 +21,14 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;


import java.util.Collections;
import java.util.List;
import java.util.UUID; import java.util.UUID;


/** /**
Expand All @@ -50,6 +52,9 @@ public static void main(String[] args) throws Exception {
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);") .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.enableWriteAheadLog() .enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() { .setClusterBuilder(new ClusterBuilder() {

private static final long serialVersionUID = 2793938419775311824L;

@Override @Override
public Cluster buildCluster(Cluster.Builder builder) { public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build(); return builder.addContactPoint("127.0.0.1").build();
Expand All @@ -62,7 +67,9 @@ public Cluster buildCluster(Cluster.Builder builder) {
env.execute(); env.execute();
} }


public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> { public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
private static final long serialVersionUID = 4022367939215095610L;

private int counter = 0; private int counter = 0;
private boolean stop = false; private boolean stop = false;


Expand All @@ -84,13 +91,16 @@ public void cancel() {
} }


@Override @Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return counter; return Collections.singletonList(this.counter);
} }


@Override @Override
public void restoreState(Integer state) throws Exception { public void restoreState(List<Integer> state) throws Exception {
this.counter = state; if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.counter = state.get(0);
} }
} }
} }
Expand Up @@ -19,7 +19,7 @@


import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -42,7 +42,9 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
Expand Down Expand Up @@ -236,7 +238,7 @@ public String map(String value) throws Exception {
} }


private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
implements CheckpointedAsynchronously<Integer> { implements ListCheckpointed<Integer> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand All @@ -246,7 +248,6 @@ private static class StringGeneratingSourceFunction extends RichParallelSourceFu


private volatile boolean isRunning = true; private volatile boolean isRunning = true;



StringGeneratingSourceFunction(long numElements) { StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements; this.numElements = numElements;
} }
Expand Down Expand Up @@ -288,13 +289,16 @@ private static String randomString(StringBuilder bld, Random rnd) {
} }


@Override @Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) { public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return index; return Collections.singletonList(this.index);
} }


@Override @Override
public void restoreState(Integer state) { public void restoreState(List<Integer> state) throws Exception {
index = state; if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.index = state.get(0);
} }
} }
} }
Expand Up @@ -19,7 +19,7 @@


import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -42,7 +42,9 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
Expand Down Expand Up @@ -233,7 +235,7 @@ public String map(String value) throws Exception {
} }


private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
implements CheckpointedAsynchronously<Integer> { implements ListCheckpointed<Integer> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand All @@ -243,7 +245,6 @@ private static class StringGeneratingSourceFunction extends RichParallelSourceFu


private volatile boolean isRunning = true; private volatile boolean isRunning = true;



StringGeneratingSourceFunction(long numElements) { StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements; this.numElements = numElements;
} }
Expand Down Expand Up @@ -285,13 +286,16 @@ private static String randomString(StringBuilder bld, Random rnd) {
} }


@Override @Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) { public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return index; return Collections.singletonList(index);
} }


@Override @Override
public void restoreState(Integer state) { public void restoreState(List<Integer> state) throws Exception {
index = state; if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.index = state.get(0);
} }
} }
} }
Expand Up @@ -51,29 +51,23 @@
import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
Expand Down Expand Up @@ -1925,7 +1919,7 @@ private static void printTopic(String topicName, int elements,DeserializationSch




public static class BrokerKillingMapper<T> extends RichMapFunction<T,T> public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
implements Checkpointed<Integer>, CheckpointListener { implements ListCheckpointed<Integer>, CheckpointListener {


private static final long serialVersionUID = 6334389850158707313L; private static final long serialVersionUID = 6334389850158707313L;


Expand All @@ -1939,7 +1933,6 @@ public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
private boolean failer; private boolean failer;
private boolean hasBeenCheckpointed; private boolean hasBeenCheckpointed;



public BrokerKillingMapper(int shutdownBrokerId, int failCount) { public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
this.shutdownBrokerId = shutdownBrokerId; this.shutdownBrokerId = shutdownBrokerId;
this.failCount = failCount; this.failCount = failCount;
Expand Down Expand Up @@ -1994,13 +1987,16 @@ public void notifyCheckpointComplete(long checkpointId) {
} }


@Override @Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) { public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return numElementsTotal; return Collections.singletonList(this.numElementsTotal);
} }


@Override @Override
public void restoreState(Integer state) { public void restoreState(List<Integer> state) throws Exception {
this.numElementsTotal = state; if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.numElementsTotal = state.get(0);
} }
} }
} }
Expand Up @@ -21,13 +21,16 @@
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.Collections;
import java.util.List;



public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
Checkpointed<Integer>, CheckpointListener, Runnable { ListCheckpointed<Integer>, CheckpointListener, Runnable {


private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class); private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);


Expand Down Expand Up @@ -89,13 +92,16 @@ public void notifyCheckpointComplete(long checkpointId) {
} }


@Override @Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) { public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return numElementsTotal; return Collections.singletonList(numElementsTotal);
} }


@Override @Override
public void restoreState(Integer state) { public void restoreState(List<Integer> state) throws Exception {
numElementsTotal = state; if (state.isEmpty() || state.size() > 1) {
throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
}
this.numElementsTotal = state.get(0);
} }


@Override @Override
Expand Down
Expand Up @@ -19,15 +19,17 @@
package org.apache.flink.streaming.connectors.kafka.testutils; package org.apache.flink.streaming.connectors.kafka.testutils;


import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.SuccessException; import org.apache.flink.test.util.SuccessException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.BitSet; import java.util.BitSet;
import java.util.Collections;
import java.util.List;


public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> { public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {


private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);


Expand All @@ -39,7 +41,6 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme


private int numElements; // this is checkpointed private int numElements; // this is checkpointed



public ValidatingExactlyOnceSink(int numElementsTotal) { public ValidatingExactlyOnceSink(int numElementsTotal) {
this.numElementsTotal = numElementsTotal; this.numElementsTotal = numElementsTotal;
} }
Expand Down Expand Up @@ -68,15 +69,20 @@ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
} }


@Override @Override
public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception {
LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); LOG.info("Snapshot of counter " + numElements + " at checkpoint " + checkpointId);
return new Tuple2<>(numElements, duplicateChecker); return Collections.singletonList(new Tuple2<>(numElements, duplicateChecker));
} }


@Override @Override
public void restoreState(Tuple2<Integer, BitSet> state) { public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception {
LOG.info("restoring num elements to {}", state.f0); if (state.isEmpty() || state.size() > 1) {
this.numElements = state.f0; throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
this.duplicateChecker = state.f1; }

Tuple2<Integer, BitSet> s = state.get(0);
LOG.info("restoring num elements to {}", s.f0);
this.numElements = s.f0;
this.duplicateChecker = s.f1;
} }
} }
Expand Up @@ -34,4 +34,4 @@ public static boolean isNullOrEmpty(Collection<?> collection) {
public static boolean isNullOrEmpty(Map<?, ?> map) { public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty(); return map == null || map.isEmpty();
} }
} }
Expand Up @@ -31,8 +31,7 @@
* state is written, the function is not called, so the function needs not return a * state is written, the function is not called, so the function needs not return a
* copy of its state, but may return a reference to its state. Functions that can * copy of its state, but may return a reference to its state. Functions that can
* continue to work and mutate the state, even while the state snapshot is being accessed, * continue to work and mutate the state, even while the state snapshot is being accessed,
* can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously} * can implement the {@link CheckpointedAsynchronously} interface.</p>
* interface.</p>
* *
* @param <T> The type of the operator state. * @param <T> The type of the operator state.
*/ */
Expand Down

0 comments on commit 525edf1

Please sign in to comment.