Skip to content

Commit

Permalink
fixup! fixup for e9eb61: don't account aborted as acked subtasks
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Jan 22, 2021
1 parent 8900426 commit 3618093
Showing 1 changed file with 14 additions and 3 deletions.
Expand Up @@ -208,17 +208,28 @@ private void testReportStatsAfterFailure(

assertStatsEqual(
checkpointId,
lateReportVertex.getJobvertexId(),
0,
lateReportedMetrics,
statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId));
}

private void assertStatsEqual(
long checkpointId, CheckpointMetrics expected, AbstractCheckpointStats actual) {
long checkpointId,
JobVertexID jobVertexID,
int subtasIdx,
CheckpointMetrics expected,
AbstractCheckpointStats actual) {
assertEquals(checkpointId, actual.getCheckpointId());
assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
assertEquals(1, actual.getNumberOfAcknowledgedSubtasks());
assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
SubtaskStateStats taskStats = actual.getLatestAcknowledgedSubtaskStats();
assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
SubtaskStateStats taskStats =
actual.getAllTaskStateStats().stream()
.filter(s -> s.getJobVertexId().equals(jobVertexID))
.findAny()
.get()
.getSubtaskStats()[subtasIdx];
assertEquals(
expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
assertEquals(expected.getUnalignedCheckpoint(), taskStats.getUnalignedCheckpoint());
Expand Down

0 comments on commit 3618093

Please sign in to comment.