Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
GIRAPH-980 Way to disable checkpoints for particular job and on parti…
Browse files Browse the repository at this point in the history
…cular supersteps

Summary:
Added CheckpointSupportedChecker, using this interface one can specify whether the job is checkpointable or not.
By default all jobs that don't do output during the computation are checkpointable.

Test Plan: Run several jobs with output during the computation enabled and disabled. Also wrote custom CheckpointSupportedChecker to test if I can disable checkpoints on particular superstep

Reviewers: majakabiljo, pavanka, pavanka.26, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D31113
  • Loading branch information
Sergey Edunov committed Jan 14, 2015
1 parent 5a58a77 commit 8af7670
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 10 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG
@@ -1,7 +1,9 @@
Giraph Change Log

Release 1.2.0 - unreleased


GIRAPH-980 Way to disable checkpoints for particular job and on particular supersteps (edunov)

GIRAPH-983 Remove checkpoint related error messages from console (edunov)

GIRAPH-978 Giraph-Debugger Test Graphs not working (nishantgandhi99 via edunov)
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.giraph.bsp;
package org.apache.giraph.bsp.checkpoints;

/**
* Enum represents possible checkpoint state.
Expand Down
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.bsp.checkpoints;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.master.MasterCompute;

/**
* To prevent accidental checkpointing of non-checkpointable app
* you may provide implementation of this interface. Most apps are
* checkpointable by default, however some apps are not checkpointable,
* e.g. apps that use static variables to pass data around between supersteps
* or start new threads or use external resources. This interface allows
* you to specify if and when your app is checkpointable.
*/
public interface CheckpointSupportedChecker {

/**
* Does the job support checkpoints?
* It is true by default, set it to false if your job uses some
* non-checkpointable features:
* - static variables for storing data between supersteps.
* - starts new threads or uses Timers
* - writes output before job is complete, etc
* This method is called on master and has access to
* job configuration and master compute.
*
* @param conf giraph configuration
* @param masterCompute instance of master compute
* @return true if checkpointing on current superstep is supported
* by this application.
*/
boolean isCheckpointSupported(GiraphConfiguration conf,
MasterCompute masterCompute);

}
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.bsp.checkpoints;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.master.MasterCompute;

/**
* Default checkpoint supported checker.
* Most tasks will support checkpointing by default unless they do output
* during the computation. Some tasks however can use non-checkpointable
* features, e.g.: static variables to pass data around between supersteps
* or new threads started from MasterCompute or external resources. In that
* case to prevent accidental checkpointing you should provide checker
* implementation.
*/
public class DefaultCheckpointSupportedChecker
implements CheckpointSupportedChecker {
@Override
public boolean isCheckpointSupported(GiraphConfiguration conf,
MasterCompute masterCompute) {
return !conf.doOutputDuringComputation();
}
}
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.bsp.checkpoints;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.master.MasterCompute;

/**
* Disable all checkpoints.
*/
public class DisabledCheckpointSupportedChecker
implements CheckpointSupportedChecker {

@Override
public boolean isCheckpointSupported(GiraphConfiguration conf,
MasterCompute masterCompute) {
return false;
}
}
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package of generic bulk synchronous processing objects.
*/
package org.apache.giraph.bsp.checkpoints;
Expand Up @@ -19,6 +19,7 @@
package org.apache.giraph.conf;

import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReuseObjectsOutEdges;
Expand Down Expand Up @@ -1002,6 +1003,18 @@ public boolean useCheckpointing() {
return getCheckpointFrequency() != 0;
}

/**
* Set runtime checkpoint support checker.
* The instance of this class will have to decide whether
* checkpointing is allowed on current superstep.
*
* @param clazz checkpoint supported checker class
*/
public void setCheckpointSupportedChecker(
Class<? extends CheckpointSupportedChecker> clazz) {
GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
}

/**
* Set the max task attempts
*
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
import org.apache.giraph.bsp.checkpoints.DefaultCheckpointSupportedChecker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
Expand Down Expand Up @@ -1159,14 +1161,27 @@ public interface GiraphConstants {
/**
* Compression algorithm to be used for checkpointing.
* Defined by extension for hadoop compatibility reasons.
*/
*/
StrConfOption CHECKPOINT_COMPRESSION_CODEC =
new StrConfOption("giraph.checkpoint.compression.codec",
".deflate",
"Defines compression algorithm we will be using for " +
"storing checkpoint. Available options include but " +
"not restricted to: .deflate, .gz, .bz2, .lzo");

/**
* Defines if and when checkpointing is supported by this job.
* By default checkpointing is always supported unless output during the
* computation is enabled.
*/
ClassConfOption<CheckpointSupportedChecker> CHECKPOINT_SUPPORTED_CHECKER =
ClassConfOption.create("giraph.checkpoint.supported.checker",
DefaultCheckpointSupportedChecker.class,
CheckpointSupportedChecker.class,
"This is the way to specify if checkpointing is " +
"supported by the job");


/** Number of threads to use in async message store, 0 means
* we should not use async message processing */
IntConfOption ASYNC_MESSAGE_STORE_THREADS_COUNT =
Expand Down
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.giraph.graph;

import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.checkpoints.CheckpointStatus;

/**
* Immutable graph stats after the completion of a superstep
Expand Down
Expand Up @@ -22,7 +22,7 @@
import java.io.DataOutput;
import java.io.IOException;

import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.partition.PartitionStats;
import org.apache.hadoop.io.Writable;

Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.ClassConfOption;
import org.apache.giraph.conf.GiraphConstants;
Expand Down
Expand Up @@ -50,12 +50,14 @@
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.MasterServer;
import org.apache.giraph.comm.netty.NettyMasterClient;
import org.apache.giraph.comm.netty.NettyMasterServer;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.counters.GiraphStats;
Expand Down Expand Up @@ -89,6 +91,7 @@
import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.zk.BspEvent;
Expand Down Expand Up @@ -193,6 +196,8 @@ public class BspServiceMaster<I extends WritableComparable,
private final int checkpointFrequency;
/** Current checkpoint status */
private CheckpointStatus checkpointStatus;
/** Checks if checkpointing supported */
private CheckpointSupportedChecker checkpointSupportedChecker;

/**
* Constructor for setting up the master.
Expand Down Expand Up @@ -231,6 +236,9 @@ public BspServiceMaster(

this.checkpointFrequency = conf.getCheckpointFrequency();
this.checkpointStatus = CheckpointStatus.NONE;
this.checkpointSupportedChecker =
ReflectionUtils.newInstance(
GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));

GiraphMetrics.get().addSuperstepResetObserver(this);
GiraphStats.init(context);
Expand Down Expand Up @@ -1759,7 +1767,12 @@ private CheckpointStatus getCheckpointStatus(long superstep) {
try {
if (getZkExt().
exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
return CheckpointStatus.CHECKPOINT_AND_HALT;
if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
return CheckpointStatus.CHECKPOINT_AND_HALT;
} else {
LOG.warn("Attempted to manually checkpoint the job that " +
"does not support checkpoints. Ignoring");
}
}
} catch (KeeperException e) {
throw new IllegalStateException(
Expand All @@ -1779,11 +1792,29 @@ private CheckpointStatus getCheckpointStatus(long superstep) {
return CheckpointStatus.NONE;
}
if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
return CheckpointStatus.CHECKPOINT;
if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
return CheckpointStatus.CHECKPOINT;
}
}
return CheckpointStatus.NONE;
}

/**
* Returns false if job doesn't support checkpoints.
* Job may not support checkpointing if it does output during
* computation, uses static variables to keep data between supersteps,
* starts new threads etc.
* @param conf Immutable configuration of the job
* @param masterCompute instance of master compute
* @return true if it is safe to checkpoint the job
*/
private boolean isCheckpointingSupported(
GiraphConfiguration conf, MasterCompute masterCompute) {
return checkpointSupportedChecker.isCheckpointSupported(
conf, masterCompute);
}


/**
* This doMasterCompute is only called
* after masterCompute is initialized
Expand Down
Expand Up @@ -42,7 +42,7 @@
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
Expand Down

0 comments on commit 8af7670

Please sign in to comment.