From 3618093d3ed7359710c9f73ced3031ff2ab8def8 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Fri, 22 Jan 2021 17:25:44 +0100 Subject: [PATCH] fixup! fixup for e9eb61: don't account aborted as acked subtasks --- .../checkpoint/CheckpointCoordinatorTest.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index dc17acb71d3f9..fb68fc07da8f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -208,17 +208,28 @@ private void testReportStatsAfterFailure( assertStatsEqual( checkpointId, + lateReportVertex.getJobvertexId(), + 0, lateReportedMetrics, statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId)); } private void assertStatsEqual( - long checkpointId, CheckpointMetrics expected, AbstractCheckpointStats actual) { + long checkpointId, + JobVertexID jobVertexID, + int subtasIdx, + CheckpointMetrics expected, + AbstractCheckpointStats actual) { assertEquals(checkpointId, actual.getCheckpointId()); assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus()); - assertEquals(1, actual.getNumberOfAcknowledgedSubtasks()); assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize()); - SubtaskStateStats taskStats = actual.getLatestAcknowledgedSubtaskStats(); + assertEquals(0, actual.getNumberOfAcknowledgedSubtasks()); + SubtaskStateStats taskStats = + actual.getAllTaskStateStats().stream() + .filter(s -> s.getJobVertexId().equals(jobVertexID)) + .findAny() + .get() + .getSubtaskStats()[subtasIdx]; assertEquals( expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration()); assertEquals(expected.getUnalignedCheckpoint(), taskStats.getUnalignedCheckpoint());