Skip to content

Commit

Permalink
[FLINK-20389][tests] Fix UnalignedCheckpointITCase to work with unass…
Browse files Browse the repository at this point in the history
…igned splits.

The test currently assumed that when induced failures happen, splits have been assigned to readers, which works fine for the planned snapshot-failure-recovery sequence.
However, when unexpected failures happen, this assumption does not necessarily hold resulting in failures that may impede investigations.
The test would still fail as the number of failures would be different from the expected numbers of failures, but investigation can focus on the unexpected failure then.
  • Loading branch information
AHeise committed Nov 27, 2020
1 parent 7bf76c0 commit 12672b4
Showing 1 changed file with 6 additions and 2 deletions.
Expand Up @@ -224,8 +224,8 @@ public List<LongSplit> snapshotState(long checkpointId) {

@Override
public void notifyCheckpointComplete(long checkpointId) {
LOG.info("notifyCheckpointComplete {} @ {} subtask (? attempt)", split.numCompletedCheckpoints, split.nextNumber % split.increment);
if (split != null) {
LOG.info("notifyCheckpointComplete {} @ {} subtask (? attempt)", split.numCompletedCheckpoints, split.nextNumber % split.increment);
split.numCompletedCheckpoints++;
}
}
Expand Down Expand Up @@ -330,6 +330,11 @@ public void addReader(int subtaskId) {
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
unassignedSplits.forEach(s -> s.numCompletedCheckpoints++);
}

@Override
public List<LongSplit> snapshotState() throws Exception {
LOG.info("snapshotState {}", unassignedSplits);
Expand Down Expand Up @@ -673,6 +678,5 @@ public void close() throws Exception {
LOG.info("Last state {} @ {} subtask ({} attempt)", state, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber());
super.close();
}

}
}

0 comments on commit 12672b4

Please sign in to comment.