From 7a42205a665ccab08d9378c45b6e47b154c5150b Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 20 Jun 2016 13:51:11 -0700 Subject: [PATCH] Clone the Checkpoint Mark before resuming The documentation for CheckpointMark#finalizeCheckpoint specifies that a checkpoint instance that the reader is restarted from will have been deserialized from the serialized form of the earlier checkpoint with the CheckpointMarkCoder. This brings the direct runner in line with this documentation. --- .../beam/runners/direct/UnboundedReadEvaluatorFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 3fb773ed0217..cb66322af389 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -201,6 +202,7 @@ private boolean startReader() throws IOException { if (currentReader == null) { if (checkpointMark != null) { checkpointMark.finalizeCheckpoint(); + checkpointMark = CoderUtils.clone(source.getCheckpointMarkCoder(), checkpointMark); } currentReader = source.createReader(evaluationContext.getPipelineOptions(), checkpointMark); checkpointMark = null;