Skip to content

Commit

Permalink
Merge 7c7f96f into c80ef18
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Mar 24, 2017
2 parents c80ef18 + 7c7f96f commit 37fc92f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -187,6 +188,9 @@ private UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, Checkpo
UnboundedReader<OutputT> existing = shard.getExistingReader();
if (existing == null) {
CheckpointMarkT checkpoint = shard.getCheckpoint();
if (checkpoint != null) {
checkpoint = CoderUtils.clone(shard.getSource().getCheckpointMarkCoder(), checkpoint);
}
return shard
.getSource()
.createReader(evaluationContext.getPipelineOptions(), checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSpl
@Override
public UnboundedSource.UnboundedReader<T> createReader(
PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
checkState(
checkpointMark == null || checkpointMark.decoded,
"Cannot resume from a checkpoint that has not been decoded");
return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
}

Expand Down Expand Up @@ -564,6 +567,7 @@ public void close() throws IOException {
private static class TestCheckpointMark implements CheckpointMark {
final int index;
private boolean finalized = false;
private boolean decoded = false;

private TestCheckpointMark(int index) {
this.index = index;
Expand All @@ -573,6 +577,10 @@ private TestCheckpointMark(int index) {
public void finalizeCheckpoint() throws IOException {
checkState(
!finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
checkState(
!decoded,
"%s was finalized after being decoded",
TestCheckpointMark.class.getSimpleName());
finalized = true;
}

Expand All @@ -594,7 +602,9 @@ public void encode(
public TestCheckpointMark decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
throws IOException {
return new TestCheckpointMark(VarInt.decodeInt(inStream));
TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
decoded.decoded = true;
return decoded;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ public abstract UnboundedReader<OutputT> createReader(
PipelineOptions options, @Nullable CheckpointMarkT checkpointMark) throws IOException;

/**
* Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or
* null if the checkpoints do not need to be durably committed.
* Returns a {@link Coder} for encoding and decoding the checkpoints for this source.
*/
@Nullable
public abstract Coder<CheckpointMarkT> getCheckpointMarkCoder();

/**
Expand Down

0 comments on commit 37fc92f

Please sign in to comment.