Skip to content

Commit

Permalink
[FLINK-19487][task] Fix calculation of checkpoinStartDelay for single…
Browse files Browse the repository at this point in the history
… channel CheckpointBarrierAligner

Previously for one single channel this metric was always zero
  • Loading branch information
pnowojski committed Oct 13, 2020
1 parent fc5f49e commit 1d43462
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
Expand Up @@ -121,6 +121,7 @@ public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo c
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
markAlignmentStartAndEnd(receivedBarrier.getTimestamp());
notifyCheckpoint(receivedBarrier);
}
return;
Expand Down
Expand Up @@ -157,6 +157,16 @@ public void testMetricsSingleChannel() throws Exception {
0L,
0L,
10_000_000L);

long checkpoint2CreationTime = System.currentTimeMillis() - 5;
sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0);
Thread.sleep(5);
assertMetrics(
gate.getCheckpointBarrierHandler(),
2L,
0L,
0L,
5_000_000L);
}

private void assertMetrics(
Expand Down

0 comments on commit 1d43462

Please sign in to comment.