Skip to content

Commit

Permalink
fixup! [FLINK-18656][network,metrics] Fix startDelay metric for unali…
Browse files Browse the repository at this point in the history
…gned checkpoints
  • Loading branch information
pnowojski committed Jul 27, 2020
1 parent aeb4094 commit abec5ef
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public long getAlignmentDurationNanos() {
return activeHandler.getAlignmentDurationNanos();
}

@Override
public long getCheckpointStartDelayNanos() {
return activeHandler.getCheckpointStartDelayNanos();
}

@Override
public boolean hasInflightData(long checkpointId, InputChannelInfo channelInfo) {
// should only be called for unaligned checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -83,6 +86,43 @@ public void testAlternation() throws Exception {
assertEquals(barriers, target.triggeredCheckpoints);
}

@Test
public void testMetricsAlternation() throws Exception {
int numChannels = 2;
TestInvokable target = new TestInvokable();
CheckpointedInputGate gate = buildGate(target, numChannels);

long checkpoint1CreationTime = System.currentTimeMillis() - 10;
sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0);
Thread.sleep(10);
sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 1);
assertMetrics(gate.getCheckpointBarrierHandler(), 1L, 0L, 10_000_000L);

long checkpoint2CreationTime = System.currentTimeMillis() - 5;
sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0);
assertMetrics(gate.getCheckpointBarrierHandler(), 2L, 0L, 5_000_000L);
Thread.sleep(5);
sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 1);
assertMetrics(gate.getCheckpointBarrierHandler(), 2L, 5_000_000L, 5_000_000L);

long checkpoint3CreationTime = System.currentTimeMillis() - 7;
sendBarrier(3, checkpoint3CreationTime, CHECKPOINT, gate, 0);
assertMetrics(gate.getCheckpointBarrierHandler(), 3L, 0L, 7_000_000L);
Thread.sleep(7);
sendBarrier(3, checkpoint2CreationTime, CHECKPOINT, gate, 1);
assertMetrics(gate.getCheckpointBarrierHandler(), 3L, 0L, 7_000_000L);
}

private void assertMetrics(
CheckpointBarrierHandler checkpointBarrierHandler,
long latestCheckpointId,
long alignmentDurationNanos,
long startDelayNanos) {
assertThat(checkpointBarrierHandler.getLatestCheckpointId(), equalTo(latestCheckpointId));
assertThat(checkpointBarrierHandler.getAlignmentDurationNanos(), greaterThanOrEqualTo(alignmentDurationNanos));
assertThat(checkpointBarrierHandler.getCheckpointStartDelayNanos(), greaterThanOrEqualTo(startDelayNanos));
}

@Test
public void testPreviousHandlerReset() throws Exception {
SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
Expand Down Expand Up @@ -197,8 +237,15 @@ private void testBarrierHandling(CheckpointType checkpointType) throws Exception
}
}

private void sendBarrier(long id, CheckpointType type, TestInputChannel channel, CheckpointedInputGate gate) throws Exception {
channel.read(barrier(id, type).retainBuffer());
private void sendBarrier(long barrierId, long barrierCreationTime, CheckpointType type, CheckpointedInputGate gate, int channelId) throws Exception {
TestInputChannel channel = (TestInputChannel) gate.getChannel(channelId);
channel.read(barrier(barrierId, type, barrierCreationTime).retainBuffer());
while (gate.pollNext().isPresent()) {
}
}

private void sendBarrier(long barrierId, CheckpointType type, TestInputChannel channel, CheckpointedInputGate gate) throws Exception {
channel.read(barrier(barrierId, type).retainBuffer());
while (gate.pollNext().isPresent()) {
}
}
Expand All @@ -213,10 +260,14 @@ private static AlternatingCheckpointBarrierHandler barrierHandler(SingleInputGat
target);
}

private Buffer barrier(long id, CheckpointType checkpointType) throws IOException {
private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException {
return barrier(barrierId, checkpointType, System.currentTimeMillis());
}

private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp) throws IOException {
return toBuffer(new CheckpointBarrier(
id,
System.currentTimeMillis(),
barrierId,
barrierTimestamp,
new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault(), true, true)));
}

Expand Down

0 comments on commit abec5ef

Please sign in to comment.