Skip to content

Commit

Permalink
[FLINK-6014] [checkpoint] Additional review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Apr 22, 2017
1 parent 218bed8 commit aa21f85
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 329 deletions.
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.state.SharedStateRegistry;

/**
* This is the base class that provides implementation of some aspects common for all
* {@link CompletedCheckpointStore}s.
*/
public abstract class AbstractCompletedCheckpointStore implements CompletedCheckpointStore {

/**
* Registry for shared states.
*/
protected final SharedStateRegistry sharedStateRegistry;

public AbstractCompletedCheckpointStore() {
this.sharedStateRegistry = new SharedStateRegistry();
}
}
Expand Up @@ -37,9 +37,7 @@
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;

import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -110,9 +108,6 @@ public class CheckpointCoordinator {
/** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;

/** Registry for shared states */
private final SharedStateRegistry sharedStateRegistry;

/** Default directory for persistent checkpoints; <code>null</code> if none configured.
* THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
Expand Down Expand Up @@ -223,7 +218,6 @@ public CheckpointCoordinator(
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
this.sharedStateRegistry = new SharedStateRegistry();

this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);

Expand Down Expand Up @@ -288,7 +282,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {
}
pendingCheckpoints.clear();

completedCheckpointStore.shutdown(jobStatus, sharedStateRegistry);
completedCheckpointStore.shutdown(jobStatus);
checkpointIdCounter.shutdown(jobStatus);
}
}
Expand Down Expand Up @@ -732,7 +726,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws C
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());

discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

break;
case DISCARDED:
Expand All @@ -741,7 +735,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws C
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}

return true;
Expand All @@ -767,7 +761,7 @@ else if (checkpoint != null) {
}

// try to discard the state so that we don't have lingering state lying around
discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

return wasPendingCheckpoint;
}
Expand Down Expand Up @@ -805,16 +799,16 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro

// the pending checkpoint must be discarded after the finalization
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

try {
completedCheckpointStore.addCheckpoint(completedCheckpoint, sharedStateRegistry);
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// we failed to store the completed checkpoint. Let's clean up
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFail();
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
}
Expand Down Expand Up @@ -953,7 +947,7 @@ public boolean restoreLatestCheckpointedState(
}

// Recover the checkpoints
completedCheckpointStore.recover(sharedStateRegistry);
completedCheckpointStore.recover();

// restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
Expand Down Expand Up @@ -1017,7 +1011,7 @@ public boolean restoreSavepoint(
CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
job, tasks, savepointPath, userClassLoader, allowNonRestored);

completedCheckpointStore.addCheckpoint(savepoint, sharedStateRegistry);
completedCheckpointStore.addCheckpoint(savepoint);

// Reset the checkpoint ID counter
long nextCheckpointId = savepoint.getCheckpointID() + 1;
Expand Down Expand Up @@ -1057,10 +1051,11 @@ public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore;
}

public SharedStateRegistry getSharedStateRegistry() {
return sharedStateRegistry;
}

// @VisibleForTesting
// SharedStateRegistry getSharedStateRegistry() {
// return sharedStateRegistry;
// }

public CheckpointIDCounter getCheckpointIdCounter() {
return checkpointIdCounter;
Expand Down Expand Up @@ -1151,7 +1146,7 @@ public void run() {
* @param checkpointId of the state object
* @param subtaskState to discard asynchronously
*/
private void discardState(
private void discardSubtaskState(
final JobID jobId,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
Expand All @@ -1161,12 +1156,6 @@ private void discardState(
executor.execute(new Runnable() {
@Override
public void run() {
try {
subtaskState.discardSharedStatesOnFail();
} catch (Throwable t1) {
LOG.warn("Could not properly discard shared states of checkpoint {} " +
"belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId, t1);
}

try {
subtaskState.discardState();
Expand Down

0 comments on commit aa21f85

Please sign in to comment.