Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19462][checkpointing] Update failed checkpoint stats #14635

Merged
merged 11 commits into from Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ops/metrics.md
Expand Up @@ -1107,6 +1107,7 @@ Metrics related to data exchange between task executors using netty network comm
</table>

### Checkpointing
Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.
<table class="table table-bordered">
<thead>
<tr>
Expand Down
1 change: 1 addition & 0 deletions docs/ops/metrics.zh.md
Expand Up @@ -1107,6 +1107,7 @@ Metrics related to data exchange between task executors using netty network comm
</table>

### Checkpointing
Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.
<table class="table table-bordered">
<thead>
<tr>
Expand Down
2 changes: 2 additions & 0 deletions docs/ops/monitoring/checkpoint_monitoring.md
Expand Up @@ -52,6 +52,8 @@ The overview tabs lists the following statistics. Note that these statistics don

The checkpoint history keeps statistics about recently triggered checkpoints, including those that are currently in progress.

Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.

<center>
<img src="{% link /fig/checkpoint_monitoring-history.png %}" width="700px" alt="Checkpoint Monitoring: History">
</center>
Expand Down
Expand Up @@ -54,6 +54,10 @@ public void reportTaskStateSnapshots(
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {}

@Override
public void reportIncompleteTaskStateSnapshots(
CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {}

@Nonnull
@Override
public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
Expand Down
Expand Up @@ -99,7 +99,7 @@
<td>{{ subTask['index'] }}</td>
<ng-container *ngIf="subTask['status'] == 'completed'">
<td >{{ subTask['ack_timestamp'] | date:'yyyy-MM-dd HH:mm:ss' }}</td>
<td>{{ subTask['end_to_end_duration'] | humanizeDuration}}</td>
<td>{{ subTask['end_to_end_duration'] | humanizeDuration}} <span *ngIf="subTask['aborted']">(aborted)</span></td>
<td>{{ subTask['state_size'] | humanizeBytes }}</td>
<td>{{ subTask['checkpoint']['sync'] | humanizeDuration}}</td>
<td>{{ subTask['checkpoint']['async'] | humanizeDuration}}</td>
Expand Down
Expand Up @@ -24,9 +24,9 @@ import { isNil } from 'utils';
})
export class HumanizeBytesPipe implements PipeTransform {
transform(value: number): any {
if (isNil(value)) {
if (isNil(value) || value < 0) {
return '-';
}
}
const units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB'];
const converter = (v: number, p: number): string => {
const base = Math.pow(1024, p);
Expand Down
Expand Up @@ -79,6 +79,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toMap;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -222,6 +223,7 @@ public class CheckpointCoordinator {
private boolean isTriggering = false;

private final CheckpointRequestDecider requestDecider;
private final LinkedHashMap<ExecutionAttemptID, ExecutionVertex> cachedTasksById;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -348,6 +350,15 @@ public CheckpointCoordinator(
this.minPauseBetweenCheckpoints,
this.pendingCheckpoints::size,
this.checkpointsCleaner::getNumberOfCheckpointsToClean);
this.cachedTasksById =
new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) {

@Override
protected boolean removeEldestEntry(
Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
return size() > CheckpointCoordinator.this.tasksToWaitFor.length;
}
};
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -728,12 +739,7 @@ private PendingCheckpoint createPendingCheckpoint(
checkpointStorageLocation,
onCompletionPromise);

if (statsTracker != null) {
PendingCheckpointStats callback =
statsTracker.reportPendingCheckpoint(checkpointID, timestamp, props);

checkpoint.setStatsCallback(callback);
}
reportToStatsTracker(checkpoint, ackTasks);

synchronized (lock) {
pendingCheckpoints.put(checkpointID, checkpoint);
Expand Down Expand Up @@ -1137,6 +1143,10 @@ public boolean receiveAcknowledgeMessage(
"Received message for discarded but non-removed checkpoint "
+ checkpointId);
} else {
reportStats(
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getCheckpointMetrics());
boolean wasPendingCheckpoint;

// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
Expand Down Expand Up @@ -1831,6 +1841,14 @@ int getNumQueuedRequests() {
}
}

public void reportStats(long id, ExecutionAttemptID attemptId, CheckpointMetrics metrics)
throws CheckpointException {
if (statsTracker != null) {
getVertex(attemptId)
.ifPresent(ev -> statsTracker.reportIncompleteStats(id, ev, metrics));
}
}

// ------------------------------------------------------------------------

private final class ScheduledTrigger implements Runnable {
Expand Down Expand Up @@ -2101,4 +2119,36 @@ private enum OperatorCoordinatorRestoreBehavior {
/** Coordinators are not restored during this checkpoint restore. */
SKIP;
}

private Optional<ExecutionVertex> getVertex(ExecutionAttemptID id) throws CheckpointException {
if (!cachedTasksById.containsKey(id)) {
cachedTasksById.putAll(getAckTasks());
if (!cachedTasksById.containsKey(id)) {
// the task probably gone after a restart
cachedTasksById.put(id, null);
}
}
return Optional.ofNullable(cachedTasksById.get(id));
}

private void reportToStatsTracker(
PendingCheckpoint checkpoint, Map<ExecutionAttemptID, ExecutionVertex> tasks) {
if (statsTracker == null) {
return;
}
Map<JobVertexID, Integer> vertices =
tasks.values().stream()
.map(ExecutionVertex::getJobVertex)
.distinct()
.collect(
toMap(
ExecutionJobVertex::getJobVertexId,
ExecutionJobVertex::getParallelism));
checkpoint.setStatsCallback(
statsTracker.reportPendingCheckpoint(
checkpoint.getCheckpointID(),
checkpoint.getCheckpointTimestamp(),
checkpoint.getProps(),
vertices));
}
}
Expand Up @@ -34,4 +34,10 @@ void acknowledgeCheckpoint(
final TaskStateSnapshot subtaskState);

void declineCheckpoint(DeclineCheckpoint declineCheckpoint);

void reportCheckpointMetrics(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics);
}
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;

import java.io.Serializable;
import java.util.Objects;

Expand Down Expand Up @@ -46,8 +48,11 @@ public class CheckpointMetrics implements Serializable {
/** Is the checkpoint completed as an unaligned checkpoint. */
private final boolean unalignedCheckpoint;

private final long totalBytesPersisted;

@VisibleForTesting
public CheckpointMetrics() {
this(-1L, -1L, -1L, -1L, -1L, -1L, false);
this(-1L, -1L, -1L, -1L, -1L, -1L, false, 0L);
rkhachatryan marked this conversation as resolved.
Show resolved Hide resolved
}

public CheckpointMetrics(
Expand All @@ -57,7 +62,8 @@ public CheckpointMetrics(
long syncDurationMillis,
long asyncDurationMillis,
long checkpointStartDelayNanos,
boolean unalignedCheckpoint) {
boolean unalignedCheckpoint,
long totalBytesPersisted) {

// these may be "-1", in case the values are unknown or not set
checkArgument(bytesProcessedDuringAlignment >= -1);
Expand All @@ -66,6 +72,7 @@ public CheckpointMetrics(
checkArgument(asyncDurationMillis >= -1);
checkArgument(alignmentDurationNanos >= -1);
checkArgument(checkpointStartDelayNanos >= -1);
checkArgument(totalBytesPersisted >= 0);

this.bytesProcessedDuringAlignment = bytesProcessedDuringAlignment;
this.bytesPersistedDuringAlignment = bytesPersistedDuringAlignment;
Expand All @@ -74,6 +81,7 @@ public CheckpointMetrics(
this.asyncDurationMillis = asyncDurationMillis;
this.checkpointStartDelayNanos = checkpointStartDelayNanos;
this.unalignedCheckpoint = unalignedCheckpoint;
this.totalBytesPersisted = totalBytesPersisted;
}

public long getBytesProcessedDuringAlignment() {
Expand Down Expand Up @@ -104,6 +112,10 @@ public boolean getUnalignedCheckpoint() {
return unalignedCheckpoint;
}

public long getTotalBytesPersisted() {
return totalBytesPersisted;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -121,7 +133,8 @@ public boolean equals(Object o) {
&& syncDurationMillis == that.syncDurationMillis
&& asyncDurationMillis == that.asyncDurationMillis
&& checkpointStartDelayNanos == that.checkpointStartDelayNanos
&& unalignedCheckpoint == that.unalignedCheckpoint;
&& unalignedCheckpoint == that.unalignedCheckpoint
&& totalBytesPersisted == that.totalBytesPersisted;
}

@Override
Expand All @@ -133,7 +146,8 @@ public int hashCode() {
syncDurationMillis,
asyncDurationMillis,
checkpointStartDelayNanos,
unalignedCheckpoint);
unalignedCheckpoint,
totalBytesPersisted);
}

@Override
Expand All @@ -153,6 +167,8 @@ public String toString() {
+ checkpointStartDelayNanos
+ ", unalignedCheckpoint="
+ unalignedCheckpoint
+ ", totalBytesPersisted="
+ totalBytesPersisted
+ '}';
}
}
Expand Up @@ -41,6 +41,7 @@ public class CheckpointMetricsBuilder {
private long asyncDurationMillis = -1L;
private long checkpointStartDelayNanos = -1L;
private boolean unalignedCheckpoint = false;
private long totalBytesPersisted = -1L;

public CheckpointMetricsBuilder setBytesProcessedDuringAlignment(
long bytesProcessedDuringAlignment) {
Expand Down Expand Up @@ -122,6 +123,11 @@ public CheckpointMetricsBuilder setUnalignedCheckpoint(boolean unalignedCheckpoi
return this;
}

public CheckpointMetricsBuilder setTotalBytesPersisted(long totalBytesPersisted) {
this.totalBytesPersisted = totalBytesPersisted;
return this;
}

public CheckpointMetrics build() {
return new CheckpointMetrics(
checkStateAndGet(bytesProcessedDuringAlignment),
Expand All @@ -130,6 +136,7 @@ public CheckpointMetrics build() {
syncDurationMillis,
asyncDurationMillis,
checkpointStartDelayNanos,
unalignedCheckpoint);
unalignedCheckpoint,
totalBytesPersisted);
}
}
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -51,8 +52,8 @@ public class CheckpointStatsHistory implements Serializable {
/** List over all available stats. Only updated on {@link #createSnapshot()}. */
private final List<AbstractCheckpointStats> checkpointsHistory;

/** Map of all available stats keyed by their ID. Only updated on {@link #createSnapshot()}. */
private final Map<Long, AbstractCheckpointStats> checkpointsById;
/** Map of all available stats keyed by their ID. */
private final LinkedHashMap<Long, AbstractCheckpointStats> recentCheckpoints;

/** Maximum array size. */
private final int maxSize;
Expand Down Expand Up @@ -121,18 +122,24 @@ private CheckpointStatsHistory(
this.maxSize = maxSize;
this.checkpointsArray = checkpointArray;
this.checkpointsHistory = checkNotNull(checkpointsHistory);
this.checkpointsById = checkNotNull(checkpointsById);
this.latestCompletedCheckpoint = latestCompletedCheckpoint;
this.latestFailedCheckpoint = latestFailedCheckpoint;
this.latestSavepoint = latestSavepoint;
this.recentCheckpoints =
new LinkedHashMap<Long, AbstractCheckpointStats>(checkpointsById) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > maxSize;
}
};
}

public List<AbstractCheckpointStats> getCheckpoints() {
return checkpointsHistory;
}

public AbstractCheckpointStats getCheckpointById(long checkpointId) {
return checkpointsById.get(checkpointId);
return recentCheckpoints.get(checkpointId);
}

@Nullable
Expand Down Expand Up @@ -250,6 +257,7 @@ void addInProgressCheckpoint(PendingCheckpointStats pending) {
}

checkpointsArray[nextPos++] = pending;
recentCheckpoints.put(pending.checkpointId, pending);
}

/**
Expand Down Expand Up @@ -300,6 +308,8 @@ boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed)
}

long checkpointId = completedOrFailed.getCheckpointId();
recentCheckpoints.computeIfPresent(
checkpointId, (unusedKey, unusedValue) -> completedOrFailed);

// We start searching from the last inserted position. Since the entries
// wrap around the array we search until we are at index 0 and then from
Expand Down