Skip to content

Commit

Permalink
[FLINK-6640] Ensure registration of shared state happens before exter…
Browse files Browse the repository at this point in the history
…nalizing a checkpoint
  • Loading branch information
StefanRRichter committed May 20, 2017
1 parent b93396c commit 0403563
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 154 deletions.

This file was deleted.

Expand Up @@ -40,11 +40,11 @@
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -173,6 +173,9 @@ public class CheckpointCoordinator {
@Nullable @Nullable
private CheckpointStatsTracker statsTracker; private CheckpointStatsTracker statsTracker;


/** Registry that tracks state which is shared across (incremental) checkpoints */
private final SharedStateRegistry sharedStateRegistry;

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


public CheckpointCoordinator( public CheckpointCoordinator(
Expand Down Expand Up @@ -226,6 +229,7 @@ public CheckpointCoordinator(
this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory; this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor); this.executor = checkNotNull(executor);
this.sharedStateRegistry = new SharedStateRegistry(executor);


this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>(); this.masterHooks = new HashMap<>();
Expand Down Expand Up @@ -836,6 +840,10 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro
final long checkpointId = pendingCheckpoint.getCheckpointId(); final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint; final CompletedCheckpoint completedCheckpoint;


// As a first step to complete the checkpoint, we register its state with the registry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());

try { try {
try { try {
// externalize the checkpoint if required // externalize the checkpoint if required
Expand Down Expand Up @@ -1003,7 +1011,7 @@ public boolean restoreLatestCheckpointedState(
} }


// Recover the checkpoints // Recover the checkpoints
completedCheckpointStore.recover(); completedCheckpointStore.recover(sharedStateRegistry);


// restore from the latest checkpoint // restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
Expand Down Expand Up @@ -1120,11 +1128,6 @@ public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore; return completedCheckpointStore;
} }


// @VisibleForTesting
// SharedStateRegistry getSharedStateRegistry() {
// return sharedStateRegistry;
// }

public CheckpointIDCounter getCheckpointIdCounter() { public CheckpointIDCounter getCheckpointIdCounter() {
return checkpointIdCounter; return checkpointIdCounter;
} }
Expand Down
Expand Up @@ -178,7 +178,7 @@ public void discardOnFailedStoring() throws Exception {
doDiscard(); doDiscard();
} }


public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception { public boolean discardOnSubsume() throws Exception {


if (props.discardOnSubsumed()) { if (props.discardOnSubsumed()) {
doDiscard(); doDiscard();
Expand All @@ -188,7 +188,7 @@ public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws
return false; return false;
} }


public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception { public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {


if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() || if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() || jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() ||
Expand Down Expand Up @@ -290,7 +290,7 @@ void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback disca
* *
* @param sharedStateRegistry The registry where shared states are registered * @param sharedStateRegistry The registry where shared states are registered
*/ */
public void registerSharedStates(SharedStateRegistry sharedStateRegistry) { public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) {
sharedStateRegistry.registerAll(operatorStates.values()); sharedStateRegistry.registerAll(operatorStates.values());
} }


Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint; package org.apache.flink.runtime.checkpoint;


import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.SharedStateRegistry;


import java.util.List; import java.util.List;


Expand All @@ -32,8 +33,10 @@ public interface CompletedCheckpointStore {
* *
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint. * available checkpoint.
*
* @param sharedStateRegistry the shared state registry to register recovered states.
*/ */
void recover() throws Exception; void recover(SharedStateRegistry sharedStateRegistry) throws Exception;


/** /**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down
Expand Up @@ -20,6 +20,7 @@


import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -32,7 +33,7 @@
/** /**
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}. * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
*/ */
public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpointStore { public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {


private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class); private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);


Expand All @@ -56,21 +57,19 @@ public StandaloneCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain) {
} }


@Override @Override
public void recover() throws Exception { public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
// Nothing to do // Nothing to do
} }


@Override @Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {

checkpoint.registerSharedStates(sharedStateRegistry);


checkpoints.addLast(checkpoint); checkpoints.addLast(checkpoint);


if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
try { try {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
checkpointToSubsume.discardOnSubsume(sharedStateRegistry); checkpointToSubsume.discardOnSubsume();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e); LOG.warn("Fail to subsume the old checkpoint.", e);
} }
Expand Down Expand Up @@ -103,7 +102,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {
LOG.info("Shutting down"); LOG.info("Shutting down");


for (CompletedCheckpoint checkpoint : checkpoints) { for (CompletedCheckpoint checkpoint : checkpoints) {
checkpoint.discardOnShutdown(jobStatus, sharedStateRegistry); checkpoint.discardOnShutdown(jobStatus);
} }
} finally { } finally {
checkpoints.clear(); checkpoints.clear();
Expand Down
Expand Up @@ -64,7 +64,7 @@
* checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoints is consistent. Currently, after recovery we start out with only a single
* checkpoint to circumvent those situations. * checkpoint to circumvent those situations.
*/ */
public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpointStore { public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {


private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);


Expand Down Expand Up @@ -102,8 +102,6 @@ public ZooKeeperCompletedCheckpointStore(
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage, RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor) throws Exception { Executor executor) throws Exception {


super(executor);

checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage"); checkNotNull(stateStorage, "State storage");


Expand Down Expand Up @@ -139,13 +137,14 @@ public boolean requiresExternalizedCheckpoints() {
* that the history of checkpoints is consistent. * that the history of checkpoints is consistent.
*/ */
@Override @Override
public void recover() throws Exception { public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper."); LOG.info("Recovering checkpoints from ZooKeeper.");


// Clear local handles in order to prevent duplicates on // Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state // recovery. The local handles should reflect the state
// of ZooKeeper. // of ZooKeeper.
completedCheckpoints.clear(); completedCheckpoints.clear();
sharedStateRegistry.clear();


// Get all there is first // Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
Expand All @@ -171,7 +170,7 @@ public void recover() throws Exception {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) { if (completedCheckpoint != null) {
// Re-register all shared states in the checkpoint. // Re-register all shared states in the checkpoint.
completedCheckpoint.registerSharedStates(sharedStateRegistry); completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpoints.add(completedCheckpoint); completedCheckpoints.add(completedCheckpoint);
} }
} catch (Exception e) { } catch (Exception e) {
Expand All @@ -195,9 +194,6 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception


final String path = checkpointIdToPath(checkpoint.getCheckpointID()); final String path = checkpointIdToPath(checkpoint.getCheckpointID());


// First, register all shared states in the checkpoint to consolidates placeholder.
checkpoint.registerSharedStates(sharedStateRegistry);

// Now add the new one. If it fails, we don't want to loose existing data. // Now add the new one. If it fails, we don't want to loose existing data.
checkpointsInZooKeeper.addAndLock(path, checkpoint); checkpointsInZooKeeper.addAndLock(path, checkpoint);


Expand All @@ -206,7 +202,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if necessary. // Everything worked, let's remove a previous checkpoint if necessary.
while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
try { try {
removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry); removeSubsumed(completedCheckpoints.removeFirst());
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e); LOG.warn("Failed to subsume the old checkpoint", e);
} }
Expand Down Expand Up @@ -248,7 +244,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {


for (CompletedCheckpoint checkpoint : completedCheckpoints) { for (CompletedCheckpoint checkpoint : completedCheckpoints) {
try { try {
removeShutdown(checkpoint, jobStatus, sharedStateRegistry); removeShutdown(checkpoint, jobStatus);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e); LOG.error("Failed to discard checkpoint.", e);
} }
Expand All @@ -274,8 +270,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


private void removeSubsumed( private void removeSubsumed(
final CompletedCheckpoint completedCheckpoint, final CompletedCheckpoint completedCheckpoint) throws Exception {
final SharedStateRegistry sharedStateRegistry) throws Exception {


if(completedCheckpoint == null) { if(completedCheckpoint == null) {
return; return;
Expand All @@ -287,7 +282,7 @@ private void removeSubsumed(
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
if (value != null) { if (value != null) {
try { try {
completedCheckpoint.discardOnSubsume(sharedStateRegistry); completedCheckpoint.discardOnSubsume();
} catch (Exception e) { } catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
} }
Expand All @@ -302,8 +297,7 @@ public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) t


private void removeShutdown( private void removeShutdown(
final CompletedCheckpoint completedCheckpoint, final CompletedCheckpoint completedCheckpoint,
final JobStatus jobStatus, final JobStatus jobStatus) throws Exception {
final SharedStateRegistry sharedStateRegistry) throws Exception {


if(completedCheckpoint == null) { if(completedCheckpoint == null) {
return; return;
Expand All @@ -313,7 +307,7 @@ private void removeShutdown(
@Override @Override
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
try { try {
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry); completedCheckpoint.discardOnShutdown(jobStatus);
} catch (Exception e) { } catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
} }
Expand Down
Expand Up @@ -29,10 +29,13 @@
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


/** /**
* This registry manages state that is shared across (incremental) checkpoints, and is responsible
* for deleting shared state that is no longer used in any valid checkpoint.
*
* A {@code SharedStateRegistry} will be deployed in the * A {@code SharedStateRegistry} will be deployed in the
* {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to
* maintain the reference count of {@link StreamStateHandle}s which are shared * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
* among different incremental checkpoints. * them.
*/ */
public class SharedStateRegistry { public class SharedStateRegistry {


Expand Down Expand Up @@ -247,4 +250,11 @@ public void run() {
} }
} }
} }

/**
* Clears the registry.
*/
public void clear() {
registeredStates.clear();
}
} }
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {


@Override @Override
public void recover() throws Exception { public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
throw new UnsupportedOperationException("Not implemented."); throw new UnsupportedOperationException("Not implemented.");
} }


Expand Down

0 comments on commit 0403563

Please sign in to comment.