Skip to content

Commit

Permalink
[hotfix] Improved logging for task local recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Feb 27, 2018
1 parent 8d180d5 commit 56c7560
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 103 deletions.
Expand Up @@ -27,7 +27,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -79,39 +78,39 @@ public class PrioritizedOperatorSubtaskState {
// -----------------------------------------------------------------------------------------------------------------

/**
* Returns an iterator over all alternative snapshots to restore the managed operator state, in the order in which
* we should attempt to restore.
* Returns an immutable list with all alternative snapshots to restore the managed operator state, in the order in
* which we should attempt to restore.
*/
@Nonnull
public Iterator<StateObjectCollection<OperatorStateHandle>> getPrioritizedManagedOperatorState() {
return prioritizedManagedOperatorState.iterator();
public List<StateObjectCollection<OperatorStateHandle>> getPrioritizedManagedOperatorState() {
return prioritizedManagedOperatorState;
}

/**
* Returns an iterator over all alternative snapshots to restore the raw operator state, in the order in which we
* should attempt to restore.
* Returns an immutable list with all alternative snapshots to restore the raw operator state, in the order in
* which we should attempt to restore.
*/
@Nonnull
public Iterator<StateObjectCollection<OperatorStateHandle>> getPrioritizedRawOperatorState() {
return prioritizedRawOperatorState.iterator();
public List<StateObjectCollection<OperatorStateHandle>> getPrioritizedRawOperatorState() {
return prioritizedRawOperatorState;
}

/**
* Returns an iterator over all alternative snapshots to restore the managed keyed state, in the order in which we
* should attempt to restore.
* Returns an immutable list with all alternative snapshots to restore the managed keyed state, in the order in
* which we should attempt to restore.
*/
@Nonnull
public Iterator<StateObjectCollection<KeyedStateHandle>> getPrioritizedManagedKeyedState() {
return prioritizedManagedKeyedState.iterator();
public List<StateObjectCollection<KeyedStateHandle>> getPrioritizedManagedKeyedState() {
return prioritizedManagedKeyedState;
}

/**
* Returns an iterator over all alternative snapshots to restore the raw keyed state, in the order in which we
* Returns an immutable list with all alternative snapshots to restore the raw keyed state, in the order in which we
* should attempt to restore.
*/
@Nonnull
public Iterator<StateObjectCollection<KeyedStateHandle>> getPrioritizedRawKeyedState() {
return prioritizedRawKeyedState.iterator();
public List<StateObjectCollection<KeyedStateHandle>> getPrioritizedRawKeyedState() {
return prioritizedRawKeyedState;
}

// -----------------------------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -110,38 +110,68 @@ public TaskLocalStateStore localStateStoreForSubtask(
"register a new TaskLocalStateStore.");
}

final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers =
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new HashMap<>());
Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers =
this.taskStateStoresByAllocationID.get(allocationID);

if (taskStateManagers == null) {
taskStateManagers = new HashMap<>();
this.taskStateStoresByAllocationID.put(allocationID, taskStateManagers);

if (LOG.isDebugEnabled()) {
LOG.debug("Registered new allocation id {} for local state stores for job {}.",
allocationID, jobId);
}
}

final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex);

// create the allocation base dirs, one inside each root dir.
File[] allocationBaseDirectories = allocationBaseDirectories(allocationID);
TaskLocalStateStoreImpl taskLocalStateStore = taskStateManagers.get(taskKey);

LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(
allocationBaseDirectories,
jobId,
jobVertexID,
subtaskIndex);
if (taskLocalStateStore == null) {

// create the allocation base dirs, one inside each root dir.
File[] allocationBaseDirectories = allocationBaseDirectories(allocationID);

LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(
allocationBaseDirectories,
jobId,
jobVertexID,
subtaskIndex);

LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
localRecoveryMode,
directoryProvider);
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(localRecoveryMode, directoryProvider);

return taskStateManagers.computeIfAbsent(
taskKey,
k -> new TaskLocalStateStoreImpl(
taskLocalStateStore = new TaskLocalStateStoreImpl(
jobId,
allocationID,
jobVertexID,
subtaskIndex,
localRecoveryConfig,
discardExecutor));
discardExecutor);

taskStateManagers.put(taskKey, taskLocalStateStore);

if (LOG.isTraceEnabled()) {
LOG.trace("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.",
localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Found existing local state store for {} - {} - {} under allocation id {}.",
jobId, jobVertexID, subtaskIndex, allocationID);
}
}

return taskLocalStateStore;
}
}

public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) {

if (LOG.isDebugEnabled()) {
LOG.debug("Releasing local state under allocation id {}.", allocationID);
}

Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> cleanupLocalStores;

synchronized (lock) {
Expand Down Expand Up @@ -175,7 +205,7 @@ public void shutdown() {

ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

LOG.debug("Shutting down TaskExecutorLocalStateStoresManager.");
LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");

for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> entry :
toRelease.entrySet()) {
Expand Down Expand Up @@ -217,7 +247,7 @@ private void doRelease(Iterable<TaskLocalStateStoreImpl> toRelease) {
try {
stateStore.dispose();
} catch (Exception disposeEx) {
LOG.warn("Exception while disposing local state store " + stateStore, disposeEx);
LOG.warn("Exception while disposing local state store {}.", stateStore, disposeEx);
}
}
}
Expand All @@ -233,7 +263,7 @@ private void cleanupAllocationBaseDirs(AllocationID allocationID) {
try {
FileUtils.deleteFileOrDirectory(directory);
} catch (IOException e) {
LOG.warn("Exception while deleting local state directory for allocation " + allocationID, e);
LOG.warn("Exception while deleting local state directory for allocation id {}.", allocationID, e);
}
}
}
Expand Down
Expand Up @@ -103,15 +103,15 @@ public TaskLocalStateStoreImpl(
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor) {

this.lock = new Object();
this.storedTaskStateByCheckpointID = new TreeMap<>();
this.jobID = jobID;
this.allocationID = allocationID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
this.discardExecutor = discardExecutor;
this.lock = new Object();
this.storedTaskStateByCheckpointID = new TreeMap<>();
this.disposed = false;
this.localRecoveryConfig = localRecoveryConfig;
this.disposed = false;
}

@Override
Expand All @@ -123,8 +123,15 @@ public void storeLocalState(
localState = NULL_DUMMY;
}

LOG.info("Storing local state for checkpoint {}.", checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
if (LOG.isTraceEnabled()) {
LOG.debug(
"Stored local state for checkpoint {} in subtask ({} - {} - {}) : {}.",
checkpointId, jobID, jobVertexID, subtaskIndex, localState);
} else if (LOG.isDebugEnabled()) {
LOG.debug(
"Stored local state for checkpoint {} in subtask ({} - {} - {})",
checkpointId, jobID, jobVertexID, subtaskIndex);
}

Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);

Expand All @@ -148,10 +155,21 @@ public void storeLocalState(
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {

TaskStateSnapshot snapshot;
synchronized (lock) {
TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID);
return snapshot != NULL_DUMMY ? snapshot : null;
snapshot = storedTaskStateByCheckpointID.get(checkpointID);
}

if (LOG.isTraceEnabled()) {
LOG.trace("Found entry for local state for checkpoint {} in subtask ({} - {} - {}) : {}",
checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Found entry for local state for checkpoint {} in subtask ({} - {} - {})",
checkpointID, jobID, jobVertexID, subtaskIndex);
}

return snapshot != NULL_DUMMY ? snapshot : null;
}

@Override
Expand All @@ -163,7 +181,8 @@ public LocalRecoveryConfig getLocalRecoveryConfig() {
@Override
public void confirmCheckpoint(long confirmedCheckpointId) {

LOG.debug("Received confirmation for checkpoint {}. Starting to prune history.", confirmedCheckpointId);
LOG.debug("Received confirmation for checkpoint {} in subtask ({} - {} - {}). Starting to prune history.",
confirmedCheckpointId, jobID, jobVertexID, subtaskIndex);

final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();

Expand Down Expand Up @@ -216,7 +235,8 @@ public CompletableFuture<Void> dispose() {
try {
deleteDirectory(subtaskBaseDirectory);
} catch (IOException e) {
LOG.warn("Exception when deleting local recovery subtask base dir: " + subtaskBaseDirectory, e);
LOG.warn("Exception when deleting local recovery subtask base directory {} in subtask ({} - {} - {})",
subtaskBaseDirectory, jobID, jobVertexID, subtaskIndex, e);
}
}
},
Expand All @@ -240,27 +260,32 @@ private void syncDiscardLocalStateForCollection(Collection<Map.Entry<Long, TaskS
*/
private void discardLocalStateForCheckpoint(long checkpointID, TaskStateSnapshot o) {

if (LOG.isTraceEnabled()) {
LOG.trace("Discarding local task state snapshot of checkpoint {} for subtask ({} - {} - {}).",
checkpointID, jobID, jobVertexID, subtaskIndex);
} else {
LOG.debug("Discarding local task state snapshot {} of checkpoint {} for subtask ({} - {} - {}).",
o, checkpointID, jobID, jobVertexID, subtaskIndex);
}

try {
if (LOG.isTraceEnabled()) {
LOG.trace("Discarding local task state snapshot of checkpoint {} for {}/{}/{}.",
checkpointID, jobID, jobVertexID, subtaskIndex);
} else {
LOG.debug("Discarding local task state snapshot {} of checkpoint {} for {}/{}/{}.",
o, checkpointID, jobID, jobVertexID, subtaskIndex);
}
o.discardState();
} catch (Exception discardEx) {
LOG.warn("Exception while discarding local task state snapshot of checkpoint " + checkpointID + ".", discardEx);
LOG.warn("Exception while discarding local task state snapshot of checkpoint {} in subtask ({} - {} - {}).",
checkpointID, jobID, jobVertexID, subtaskIndex, discardEx);
}

LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
File checkpointDir = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID);
LOG.debug("Deleting local state directory {} of checkpoint {} for {}/{}/{}/{}.",

LOG.debug("Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).",
checkpointDir, checkpointID, jobID, jobVertexID, subtaskIndex);

try {
deleteDirectory(checkpointDir);
} catch (IOException ex) {
LOG.warn("Exception while deleting local state directory of checkpoint " + checkpointID + ".", ex);
LOG.warn("Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).",
checkpointID, jobID, jobVertexID, subtaskIndex, ex);
}
}

Expand Down
Expand Up @@ -29,24 +29,29 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/**
* This class is the default implementation of {@link TaskStateManager} and collaborates with the job manager
* through {@link CheckpointResponder}) as well as a task-manager-local state store. Like this, client code does
* not have to deal with the differences between remote or local state on recovery because this class handles both
* cases transparently.
*
* Reported state is tagged by clients so that this class can properly forward to the right receiver for the
* <p>Reported state is tagged by clients so that this class can properly forward to the right receiver for the
* checkpointed state.
*
* TODO: all interaction with local state store must still be implemented! It is currently just a placeholder.
*/
public class TaskStateManagerImpl implements TaskStateManager {

/** The logger for this class. */
private static final Logger LOG = LoggerFactory.getLogger(TaskStateManagerImpl.class);

/** The id of the job for which this manager was created, can report, and recover. */
private final JobID jobId;

Expand Down Expand Up @@ -117,21 +122,27 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera
TaskStateSnapshot localStateSnapshot =
localStateStore.retrieveLocalState(jobManagerTaskRestore.getRestoreCheckpointId());

List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList();

if (localStateSnapshot != null) {
OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID);

if (localSubtaskState != null) {
PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
jobManagerSubtaskState,
Collections.singletonList(localSubtaskState));
return builder.build();
alternativesByPriority = Collections.singletonList(localSubtaskState);
}
}

if (LOG.isTraceEnabled()) {
LOG.trace("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
"state store {}.",
operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
}

PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
jobManagerSubtaskState,
Collections.emptyList(),
alternativesByPriority,
true);

return builder.build();
}

Expand Down
Expand Up @@ -216,7 +216,7 @@ private OperatorSubtaskState createAlternativeSubtaskState(OperatorSubtaskState

private <T extends StateObject> boolean checkResultAsExpected(
Function<OperatorSubtaskState, StateObjectCollection<T>> extractor,
Function<PrioritizedOperatorSubtaskState, Iterator<StateObjectCollection<T>>> extractor2,
Function<PrioritizedOperatorSubtaskState, List<StateObjectCollection<T>>> extractor2,
PrioritizedOperatorSubtaskState prioritizedResult,
OperatorSubtaskState... expectedOrdered) {

Expand All @@ -226,7 +226,7 @@ private <T extends StateObject> boolean checkResultAsExpected(
}

return checkRepresentSameOrder(
extractor2.apply(prioritizedResult),
extractor2.apply(prioritizedResult).iterator(),
collector.toArray(new StateObjectCollection[collector.size()]));
}

Expand Down

0 comments on commit 56c7560

Please sign in to comment.