Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Saving and restoring the NodeManager state

Summary: We are able to save and restore the NodeManager state now.

Test Plan:
I couldn't find a way to test the code completely as such. I tested in two basic ways: (a) Print out what was being read (b) After we recover from the safe mode and
reconstruct the state, persist the state again in a different file, and compare. The second method however requires turning off the compression and turning on pretty
printing.

Reviewers: dms, rvadali, aching

Reviewed By: dms

CC: security-diffs@lists

Task ID: 1112019
  • Loading branch information...
commit 80128bb1fc187f29a1d7dd9b5f6ee9318c8f5f7b 1 parent 603e1b6
gauravmenghani authored Alex Feinberg committed
2  ivy.xml
View
@@ -274,7 +274,7 @@
conf="common->master"/>
<dependency org="org.codehaus.jackson"
name="jackson-mapper-asl"
- rev="1.0.1"
+ rev="1.7.9"
conf="common->default"/>
</dependencies>
2  src/contrib/benchmark/ivy.xml
View
@@ -40,7 +40,7 @@
conf="common->default"/>
<dependency org="org.codehaus.jackson"
name="jackson-mapper-asl"
- rev="1.0.1"
+ rev="1.7.9"
conf="common->default"/>
</dependencies>
</ivy-module>
2  src/contrib/corona/ivy/libraries.properties
View
@@ -23,7 +23,7 @@ checkstyle.version=5.0
guava.version=r09
-jackson.version=1.0.1
+jackson.version=1.7.9
json.version=20090211
89 src/contrib/corona/src/java/org/apache/hadoop/corona/ClusterManager.java
View
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.corona;
-import java.io.*;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
@@ -27,9 +27,11 @@
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.mapred.Clock;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.CoronaSerializer;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
+import org.codehaus.jackson.JsonGenerator;
/**
* Manager of all the resources of the cluster.
@@ -87,20 +89,48 @@ public ClusterManager() { }
* Primary constructor.
*
* @param conf Configuration to be used
+ * @param recoverFromDisk True if we are restarting after going down while
+ * in Safe Mode
+ * @throws IOException
+ */
+ public ClusterManager(Configuration conf, boolean recoverFromDisk)
+ throws IOException {
+ this(new CoronaConf(conf), recoverFromDisk);
+ }
+
+ /**
+ * Constructor for ClusterManager, when it is not specified if we are
+ * restarting after persisting the state. In this case we assume the
+ * recoverFromDisk flag to be false.
+ *
+ * @param conf Configuration to be used
* @throws IOException
*/
public ClusterManager(Configuration conf) throws IOException {
- this(new CoronaConf(conf));
+ this(new CoronaConf(conf), false);
}
/**
* Construct ClusterManager given {@link CoronaConf}
*
* @param conf the configuration for the ClusterManager
+ * @param recoverFromDisk true if we are restarting after going down while
+ * in Safe Mode
* @throws IOException
*/
- public ClusterManager(CoronaConf conf) throws IOException {
+ public ClusterManager(CoronaConf conf, boolean recoverFromDisk)
+ throws IOException {
this.conf = conf;
+ HostsFileReader hostsReader =
+ new HostsFileReader(conf.getHostsFile(), conf.getExcludesFile());
+
+ if (recoverFromDisk) {
+ recoverClusterManagerFromDisk(hostsReader);
+ } else {
+ nodeManager = new NodeManager(this, hostsReader);
+ nodeManager.setConf(conf);
+ }
+
initLegalTypes();
metrics = new ClusterManagerMetrics(getTypes());
@@ -111,11 +141,6 @@ public ClusterManager(CoronaConf conf) throws IOException {
sessionHistoryManager = new SessionHistoryManager();
sessionHistoryManager.setConf(conf);
- HostsFileReader hostsReader =
- new HostsFileReader(conf.getHostsFile(), conf.getExcludesFile());
- nodeManager = new NodeManager(this, hostsReader);
- nodeManager.setConf(conf);
-
sessionNotifier = new SessionNotifier(sessionManager, this, metrics);
sessionNotifier.setConf(conf);
@@ -134,7 +159,34 @@ public ClusterManager(CoronaConf conf) throws IOException {
startTime = clock.getTime();
hostName = infoSocAddr.getHostName();
- safeMode = false;
+ setSafeMode(false);
+ }
+
+ /**
+ * This method starts the process to restore the CM state by reading back
+ * the serialized state from the CM state file.
+ * @param hostsReader The HostsReader instance
+ * @throws IOException
+ */
+ private void recoverClusterManagerFromDisk(HostsFileReader hostsReader)
+ throws IOException {
+ LOG.info("Recovering from Safe Mode");
+
+ // This will prevent the expireNodes thread from expiring the nodes
+ safeMode = true;
+
+ CoronaSerializer coronaSerializer = new CoronaSerializer(conf);
+
+ // Expecting the START_OBJECT token for ClusterManager
+ coronaSerializer.readStartObjectToken("ClusterManager");
+
+ coronaSerializer.readField("nodeManager");
+ nodeManager = new NodeManager(this, hostsReader, coronaSerializer);
+ nodeManager.setConf(conf);
+ nodeManager.restoreAfterSafeModeRestart();
+
+ // Expecting the END_OBJECT token for ClusterManager
+ coronaSerializer.readEndObjectToken("ClusterManager");
}
/**
@@ -452,6 +504,10 @@ public synchronized boolean setSafeMode(boolean safeMode) {
return true;
}
+ /**
+ * This function saves the state of the ClusterManager to disk.
+ * @return A boolean. True if saving the state succeeded, false otherwise.
+ */
@Override
public boolean persistState() {
if (!safeMode) {
@@ -460,6 +516,21 @@ public boolean persistState() {
return false;
}
+ try {
+ JsonGenerator jsonGenerator =
+ CoronaSerializer.createJsonGenerator(conf);
+ jsonGenerator.writeStartObject();
+
+ jsonGenerator.writeFieldName("nodeManager");
+ nodeManager.write(jsonGenerator);
+ // TODO Write the sessionManager and other objects
+
+ jsonGenerator.writeEndObject();
+ jsonGenerator.close();
+ } catch (IOException e) {
+ LOG.info("Could not persist the state: ", e);
+ return false;
+ }
return true;
}
26 src/contrib/corona/src/java/org/apache/hadoop/corona/ClusterManagerServer.java
View
@@ -5,6 +5,12 @@
import java.net.ServerSocket;
import java.net.Socket;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -16,7 +22,7 @@
public class ClusterManagerServer extends Thread {
public static final Log LOG = LogFactory.getLog(ClusterManagerServer.class);
- static{
+ static {
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Utilities.makeProcessExitOnUncaughtException(LOG);
@@ -62,10 +68,24 @@ public void run() {
}
public static void main(String[] args)
- throws IOException, TTransportException {
+ throws IOException, TTransportException, ParseException {
StringUtils.startupShutdownMessage(ClusterManager.class, args, LOG);
Configuration conf = new Configuration();
- ClusterManager cm = new ClusterManager(conf);
+ boolean recoverFromDisk = false;
+ // Check if we want to start the ClusterManager to restore the persisted
+ // state
+ Option recoverFromDiskOption =
+ new Option("recoverFromDisk",
+ "Used to restart the CM from the state persisted on disk");
+ Options options = new Options();
+ options.addOption(recoverFromDiskOption);
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = parser.parse(options, args);
+
+ if (line.hasOption("recoverFromDisk")) {
+ recoverFromDisk = true;
+ }
+ ClusterManager cm = new ClusterManager(conf, recoverFromDisk);
try {
ClusterManagerServer server = new ClusterManagerServer(conf, cm);
server.start();
232 src/contrib/corona/src/java/org/apache/hadoop/corona/ClusterNode.java
View
@@ -18,6 +18,7 @@
package org.apache.hadoop.corona;
+import java.io.IOException;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,8 +27,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.CoronaSerializer;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonToken;
public class ClusterNode {
/** Class logger */
@@ -35,7 +38,9 @@
LogFactory.getLog(ClusterNode.class);
public long lastHeartbeatTime;
public boolean deleted = false;
- public final Node hostNode;
+ // This is no longer a final because when we restart after an upgrade, we will
+ // initialize the hostNode outside the constructor.
+ public Node hostNode;
private ClusterNodeInfo clusterNodeInfo;
private volatile ComputeSpecs freeSpecs;
private Map<ResourceType, Integer> resourceTypeToMaxCpu =
@@ -46,7 +51,7 @@
protected Map<GrantId, ResourceRequestInfo> grants =
new HashMap<GrantId, ResourceRequestInfo>();
- protected ComputeSpecs granted =
+ protected ComputeSpecs granted =
new ComputeSpecs(); // All integral fields get initialized to 0.
public static class Stats {
@@ -77,6 +82,27 @@ public GrantId(String sessionId, int requestId) {
this.unique = sessionId + requestId;
}
+ /**
+ * Constructor for GrantId, used when we are reading back the state from
+ * the disk
+ * @param coronaSerializer The CoronaSerializer instance being used to read
+ * the JSON from disk
+ * @throws IOException
+ */
+ public GrantId(CoronaSerializer coronaSerializer) throws IOException {
+ // Expecting the START_OBJECT token for GrantId
+ coronaSerializer.readStartObjectToken("GrantId");
+
+ coronaSerializer.readField("sessionId");
+ this.sessionId = coronaSerializer.readValueAs(String.class);
+ coronaSerializer.readField("requestId");
+ this.requestId = coronaSerializer.readValueAs(Integer.class);
+
+ // Expecting the END_OBJECT token for GrantId
+ coronaSerializer.readEndObjectToken("GrantId");
+ this.unique = this.sessionId + this.requestId;
+ }
+
public String getSessionId() {
return sessionId;
}
@@ -114,27 +140,191 @@ public boolean equals(GrantId that) {
return this.unique.equals(that.unique);
}
+
+ /**
+ * Used to write the state of the GrantId instance to disk, when we are
+ * persisting the state of the NodeManager
+ * @param jsonGenerator The JsonGenerator instance being used to write JSON
+ * to disk
+ * @throws IOException
+ */
+ public void write(JsonGenerator jsonGenerator) throws IOException {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeObjectField("sessionId", sessionId);
+ jsonGenerator.writeObjectField("requestId", requestId);
+ jsonGenerator.writeEndObject();
+ }
}
+ public ClusterNode(
+ ClusterNodeInfo clusterNodeInfo, Node node,
+ Map<Integer, Map<ResourceType, Integer>> cpuToResourcePartitioning) {
+ clusterNodeInfo.address.host = clusterNodeInfo.address.host.intern();
+ this.clusterNodeInfo = clusterNodeInfo;
+ this.freeSpecs = clusterNodeInfo.getFree();
+ lastHeartbeatTime = ClusterManager.clock.getTime();
+ this.hostNode = node;
+ resetResourceTypeToStatsMap();
+ initResourceTypeToMaxCpuMap(cpuToResourcePartitioning);
+ }
/**
- * Based on the mapping of cpus to resources, find the appropriate resources
- * for a given number of cpus and initialize the resource type to allocated
- * cpu mapping.
- *
- * @param cpuToResourcePartitioning Mapping of cpus to resources to be used
+ * Constructor for ClusterNode, used when we are reading back the state from
+ * the disk
+ * @param coronaSerializer The CoronaSerializer instance being used to read
+ * the JSON from disk
+ * @throws IOException
*/
- private void initResourceTypeToCpu(
- Map<Integer, Map<ResourceType, Integer>> cpuToResourcePartitioning) {
- resourceTypeToMaxCpu =
- getResourceTypeToCountMap((int) clusterNodeInfo.total.numCpus,
- cpuToResourcePartitioning);
+ ClusterNode(CoronaSerializer coronaSerializer) throws IOException {
+ // Initialize the resourceTypeToStatsMap map
+ resetResourceTypeToStatsMap();
+
+ // Expecting the START_OBJECT token for ClusterNode
+ coronaSerializer.readStartObjectToken("ClusterNode");
+ readClusterNodeInfo(coronaSerializer);
+ coronaSerializer.readField("grants");
+ readGrants(coronaSerializer);
+
+ // Expecting the END_OBJECT token for ClusterManager
+ coronaSerializer.readEndObjectToken("ClusterNode");
+
+ // We will initialize the hostNode field later in the restoreClusterNode()
+ // method in NodeManager, which is the last stage of restoring the
+ // NodeManager state
+ hostNode = null;
+
+ // Done with reading the END_OBJECT token for ClusterNode
+ }
+
+ /**
+ * Reads the clusterNodeInfo object from the JSON stream
+ * @param coronaSerializer The CoronaSerializer instance being used to read
+ * the JSON from disk
+ * @throws IOException
+ */
+ private void readClusterNodeInfo(CoronaSerializer coronaSerializer)
+ throws IOException {
+ coronaSerializer.readField("clusterNodeInfo");
+ clusterNodeInfo = new ClusterNodeInfo();
+ // Expecting the START_OBJECT token for clusterNodeInfo
+ coronaSerializer.readStartObjectToken("clusterNodeInfo");
+
+ coronaSerializer.readField("name");
+ clusterNodeInfo.name = coronaSerializer.readValueAs(String.class);
+
+ coronaSerializer.readField("address");
+ clusterNodeInfo.address = coronaSerializer.readValueAs(InetAddress.class);
+
+ coronaSerializer.readField("total");
+ clusterNodeInfo.total = coronaSerializer.readValueAs(ComputeSpecs.class);
+
+ coronaSerializer.readField("free");
+ clusterNodeInfo.free = coronaSerializer.readValueAs(ComputeSpecs.class);
+
+ coronaSerializer.readField("resourceInfos");
+ clusterNodeInfo.resourceInfos = coronaSerializer.readValueAs(Map.class);
+
+ // Expecting the END_OBJECT token for clusterNodeInfo
+ coronaSerializer.readEndObjectToken("clusterNodeInfo");
+ }
+
+ /**
+ * Reads the list of grants from the JSON stream
+ * @param coronaSerializer The CoronaSerializer instance being used to read
+ * the JSON from disk
+ * @throws IOException
+ */
+ private void readGrants(CoronaSerializer coronaSerializer)
+ throws IOException {
+ // Expecting the START_OBJECT token for grants
+ coronaSerializer.readStartObjectToken("grants");
+ JsonToken current = coronaSerializer.nextToken();
+ while (current != JsonToken.END_OBJECT) {
+ // We can access the key for the grant, but it is not required
+ // Expecting the START_OBJECT token for the grant
+ coronaSerializer.readStartObjectToken("grant");
+
+ coronaSerializer.readField("grantId");
+ GrantId grantId = new GrantId(coronaSerializer);
+ coronaSerializer.readField("grant");
+ ResourceRequestInfo resourceRequestInfo =
+ new ResourceRequestInfo(coronaSerializer);
+
+ // Expecting the END_OBJECT token for the grant
+ coronaSerializer.readEndObjectToken("grant");
+
+ // This will update the grants map and the resourceTypeToStatsMap map
+ addGrant(grantId.getSessionId(), resourceRequestInfo);
+
+ current = coronaSerializer.nextToken();
+ }
+ }
+
+ /**
+ * Used to write the state of the ClusterNode instance to disk, when we are
+ * persisting the state of the NodeManager
+ * @param jsonGenerator The JsonGenerator instance being used to write JSON
+ * to disk
+ * @throws IOException
+ */
+ public void write(JsonGenerator jsonGenerator) throws IOException {
+ jsonGenerator.writeStartObject();
+
+ // clusterNodeInfo begins
+ jsonGenerator.writeFieldName("clusterNodeInfo");
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField("name", clusterNodeInfo.name);
+ jsonGenerator.writeObjectField("address", clusterNodeInfo.address);
+ jsonGenerator.writeObjectField("total", clusterNodeInfo.total);
+ jsonGenerator.writeObjectField("free", clusterNodeInfo.free);
+ jsonGenerator.writeObjectField("resourceInfos",
+ clusterNodeInfo.resourceInfos);
+ jsonGenerator.writeEndObject();
+ // clusterNodeInfo ends
+
+ // grants begins
+ jsonGenerator.writeFieldName("grants");
+ jsonGenerator.writeStartObject();
+ for (Map.Entry<GrantId, ResourceRequestInfo> entry : grants.entrySet()) {
+ jsonGenerator.writeFieldName(entry.getKey().unique);
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeFieldName("grantId");
+ entry.getKey().write(jsonGenerator);
+ jsonGenerator.writeFieldName("grant");
+ entry.getValue().write(jsonGenerator);
+ jsonGenerator.writeEndObject();
+ }
+ jsonGenerator.writeEndObject();
+ // grants ends
+
+ jsonGenerator.writeEndObject();
+ // We skip the hostNode and lastHeartbeatTime as they need not be persisted.
+ // resourceTypeToMaxCpu and resourceTypeToStatsMap can be rebuilt using the
+ // conf and the grants respectively.
+ }
+
+ /**
+ * This method is used to reset the mapping of resource type to stats.
+ */
+ public void resetResourceTypeToStatsMap() {
for (ResourceType type : ResourceType.values()) {
resourceTypeToStatsMap.put(type, new Stats());
}
}
/**
+ * This method is used to initialize the resource type to max CPU mapping
+ * based upon the cpuToResourcePartitioning instance given
+ * @param cpuToResourcePartitioning Mapping of cpus to resources to be used
+ */
+ public void initResourceTypeToMaxCpuMap(Map<Integer, Map<ResourceType,
+ Integer>> cpuToResourcePartitioning) {
+ resourceTypeToMaxCpu =
+ getResourceTypeToCountMap((int) clusterNodeInfo.total.numCpus,
+ cpuToResourcePartitioning);
+ }
+
+ /**
* Get a mapping of the resource type to amount of resources for a given
* number of cpus.
*
@@ -163,18 +353,6 @@ private void initResourceTypeToCpu(
return ret;
}
-
- public ClusterNode(
- ClusterNodeInfo clusterNodeInfo, Node node,
- Map<Integer, Map<ResourceType, Integer>> cpuToResourcePartitioning) {
- clusterNodeInfo.address.host = clusterNodeInfo.address.host.intern();
- this.clusterNodeInfo = clusterNodeInfo;
- this.freeSpecs = clusterNodeInfo.getFree();
- lastHeartbeatTime = ClusterManager.clock.getTime();
- this.hostNode = node;
- initResourceTypeToCpu(cpuToResourcePartitioning);
- }
-
public void addGrant(String sessionId, ResourceRequestInfo req) {
if (deleted)
throw new RuntimeException ("Node " + getName() + " has been deleted");
@@ -236,6 +414,10 @@ public InetAddress getAddress() {
return clusterNodeInfo.address;
}
+ public ClusterNodeInfo getClusterNodeInfo() {
+ return clusterNodeInfo;
+ }
+
public ComputeSpecs getFree() {
return freeSpecs;
}
33 src/contrib/corona/src/java/org/apache/hadoop/corona/CoronaConf.java
View
@@ -43,8 +43,19 @@
public static final String HOSTS_FILE = "cm.hosts";
/** The excludes file. */
public static final String EXCLUDE_HOSTS_FILE = "cm.hosts.exclude";
+ /**
+ * The name of the file which will contain the CM's state when it goes for
+ * an upgrade.
+ */
+ public static final String CM_STATE_FILE = "cm.state";
/** The RPC address of the Cluster Manager. */
public static final String CM_ADDRESS = "cm.server.address";
+ /**
+ * This boolean property is used to fix whether compression would be used
+ * while saving the CM state or not. While debugging, it is preferable
+ * that this should be false.
+ */
+ public static final String CM_COMPRESS_STATE = "cm.compress.state";
/** The HTTP UI address for the Cluster Manager. */
public static final String CM_HTTP_ADDRESS = "cm.server.http.address";
/** The RPC address of the Proxy Job Tracker. */
@@ -412,6 +423,28 @@ public String getExcludesFile() {
return get(EXCLUDE_HOSTS_FILE, "");
}
+ /**
+ * Get the address of the file used to save the state of the ClusterManager
+ * when it goes down for an upgrade
+ *
+ * @return A String, containing the address of the file used to save the
+ * ClusterManager state.
+ */
+ public String getCMStateFile() {
+ return get(CM_STATE_FILE, "cm.state");
+ }
+
+ /**
+ * Return the flag which indicates if we will be using compression while
+ * saving the ClusterManager state.
+ *
+ * @return A boolean, which is true if we are going to use compression while
+ * saving the CM state.
+ */
+ public boolean getCMCompressStateFlag() {
+ return getBoolean(CM_COMPRESS_STATE, false);
+ }
+
public int getCMNotifierThreadCount() {
return getInt(CM_NOTIFIER_THREAD_COUNT, 17);
}
209 src/contrib/corona/src/java/org/apache/hadoop/corona/NodeManager.java
View
@@ -37,7 +37,10 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.CoronaSerializer;
import org.apache.hadoop.util.HostsFileReader;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonToken;
/**
* Manages all the nodes known in the cluster.
@@ -444,6 +447,110 @@ public NodeManager(
this.faultManager = new FaultManager(this);
}
+ /**
+ * Constructor for the NodeManager, used when reading back the state of
+ * NodeManager from disk.
+ * @param clusterManager The ClusterManager instance
+ * @param hostsReader The HostsReader instance
+ * @param coronaSerializer The CoronaSerializer instance, which will be used
+ * to read JSON from disk
+ * @throws IOException
+ */
+ public NodeManager(ClusterManager clusterManager,
+ HostsFileReader hostsReader,
+ CoronaSerializer coronaSerializer)
+ throws IOException {
+ this(clusterManager, hostsReader);
+
+ // Expecting the START_OBJECT token for nodeManager
+ coronaSerializer.readStartObjectToken("nodeManager");
+ readNameToNode(coronaSerializer);
+ readHostsToSessions(coronaSerializer);
+ readNameToApps(coronaSerializer);
+ // Expecting the END_OBJECT token for ClusterManager
+ coronaSerializer.readEndObjectToken("nodeManager");
+
+ // topologyCache need not be serialized, it will eventually be rebuilt.
+ // cpuToResourcePartitioning and resourceLimit need not be serialized,
+ // they can be read from the conf.
+ }
+
+ /**
+ * Reads the nameToNode map from the JSON stream
+ * @param coronaSerializer The CoronaSerializer instance to be used to
+ * read the JSON
+ * @throws IOException
+ */
+ private void readNameToNode(CoronaSerializer coronaSerializer)
+ throws IOException {
+ coronaSerializer.readField("nameToNode");
+ // Expecting the START_OBJECT token for nameToNode
+ coronaSerializer.readStartObjectToken("nameToNode");
+ JsonToken current = coronaSerializer.nextToken();
+ while (current != JsonToken.END_OBJECT) {
+ // nodeName is the key, and the ClusterNode is the value here
+ String nodeName = coronaSerializer.getFieldName();
+ ClusterNode clusterNode = new ClusterNode(coronaSerializer);
+ if (!nameToNode.containsKey(nodeName)) {
+ nameToNode.put(nodeName, clusterNode);
+ }
+ current = coronaSerializer.nextToken();
+ }
+ // Done with reading the END_OBJECT token for nameToNode
+ }
+
+ /**
+ * Reads the hostsToSessions map from the JSON stream
+ * @param coronaSerializer The CoronaSerializer instance to be used to
+ * read the JSON
+ * @throws java.io.IOException
+ */
+ private void readHostsToSessions(CoronaSerializer coronaSerializer)
+ throws IOException {
+ coronaSerializer.readField("hostsToSessions");
+ // Expecting the START_OBJECT token for hostsToSessions
+ coronaSerializer.readStartObjectToken("hostsToSessions");
+ JsonToken current = coronaSerializer.nextToken();
+
+ while (current != JsonToken.END_OBJECT) {
+ String host = coronaSerializer.getFieldName();
+ Set<String> sessionsSet = coronaSerializer.readValueAs(Set.class);
+ hostsToSessions.put(nameToNode.get(host), sessionsSet);
+ current = coronaSerializer.nextToken();
+ }
+ }
+
+ /**
+ * Reads the nameToApps map from the JSON stream
+ * @param coronaSerializer The CoronaSerializer instance to be used to
+ * read the JSON
+ * @throws IOException
+ */
+ private void readNameToApps(CoronaSerializer coronaSerializer)
+ throws IOException {
+ coronaSerializer.readField("nameToApps");
+ // Expecting the START_OBJECT token for nameToApps
+ coronaSerializer.readStartObjectToken("nameToApps");
+ JsonToken current = coronaSerializer.nextToken();
+
+ while (current != JsonToken.END_OBJECT) {
+ String nodeName = coronaSerializer.getFieldName();
+ // Expecting the START_OBJECT token for the Apps
+ coronaSerializer.readStartObjectToken(nodeName);
+ Map<String, String> appMap = coronaSerializer.readValueAs(Map.class);
+ Map<ResourceType, String> appsOnNode =
+ new HashMap<ResourceType, String>();
+
+ for (Map.Entry<String, String> entry : appMap.entrySet()) {
+ appsOnNode.put(ResourceType.valueOf(entry.getKey()),
+ entry.getValue());
+ }
+
+ nameToApps.put(nodeName, appsOnNode);
+ current = coronaSerializer.nextToken();
+ }
+ }
+
/**
* See if there are any runnable nodes of a given type
* @param type the type to look for
@@ -536,7 +643,8 @@ protected void addNode(ClusterNode node,
typeToIndices.entrySet()) {
ResourceType type = entry.getKey();
if (resourceInfos.containsKey(type)) {
- if (node.checkForGrant(Utilities.getUnitResourceRequest(type), resourceLimit)) {
+ if (node.checkForGrant(Utilities.getUnitResourceRequest(type),
+ resourceLimit)) {
RunnableIndices r = entry.getValue();
r.addRunnable(node);
}
@@ -588,7 +696,8 @@ protected void addAppToNode(
for (Map.Entry<ResourceType, RunnableIndices> entry :
typeToIndices.entrySet()) {
if (type.equals(entry.getKey())) {
- if (node.checkForGrant(Utilities.getUnitResourceRequest(type), resourceLimit)) {
+ if (node.checkForGrant(Utilities.getUnitResourceRequest(type),
+ resourceLimit)) {
RunnableIndices r = entry.getValue();
r.addRunnable(node);
}
@@ -795,6 +904,58 @@ public void setConf(Configuration newConf) {
faultManager.setConf(conf);
}
+ /**
+ * This method rebuilds members related to the NodeManager instance, which
+ * were not directly persisted themselves.
+ * @throws IOException
+ */
+ public void restoreAfterSafeModeRestart() throws IOException {
+ if (!clusterManager.safeMode) {
+ throw new IOException("restoreAfterSafeModeRestart() called while the " +
+ "Cluster Manager was not in Safe Mode");
+ }
+ // Restoring all the ClusterNode(s)
+ for (ClusterNode clusterNode : nameToNode.values()) {
+ restoreClusterNode(clusterNode);
+ }
+
+ // Restoring all the RequestedNodes(s)
+ for (ClusterNode clusterNode : nameToNode.values()) {
+ for (ResourceRequestInfo resourceRequestInfo :
+ clusterNode.grants.values()) {
+ // Fix the RequestedNode(s)
+ restoreResourceRequestInfo(resourceRequestInfo);
+ loadManager.incrementLoad(resourceRequestInfo.getType());
+ }
+ }
+ }
+
+ /**
+ * This method rebuilds members related to a ResourceRequestInfo instance,
+ * which were not directly persisted themselves.
+ * @param resourceRequestInfo The ResourceRequestInfo instance to be restored
+ */
+ public void restoreResourceRequestInfo(ResourceRequestInfo
+ resourceRequestInfo) {
+ List<RequestedNode> requestedNodes = null;
+ List<String> hosts = resourceRequestInfo.getHosts();
+ if (hosts != null && hosts.size() > 0) {
+ requestedNodes = new ArrayList<RequestedNode>(hosts.size());
+ for (String host : hosts) {
+ requestedNodes.add(resolve(host, resourceRequestInfo.getType()));
+ }
+ }
+ resourceRequestInfo.nodes = requestedNodes;
+ }
+
+ private void restoreClusterNode(ClusterNode clusterNode) {
+ clusterNode.hostNode = topologyCache.getNode(clusterNode.getHost());
+ // This will reset the lastHeartbeatTime
+ clusterNode.heartbeat(clusterNode.getClusterNodeInfo());
+ clusterNode.initResourceTypeToMaxCpuMap(cpuToResourcePartitioning);
+ updateRunnability(clusterNode);
+ }
+
@Override
public Configuration getConf() {
return conf;
@@ -1073,7 +1234,7 @@ public void nodeFeedback(
*/
void blacklistNode(String nodeName, ResourceType resourceType) {
LOG.info("Node " + nodeName + " has been blacklisted for resource " +
- resourceType);
+ resourceType);
clusterManager.getMetrics().setBlacklistedNodes(
faultManager.getBlacklistedNodeCount());
deleteAppFromNode(nodeName, resourceType);
@@ -1151,4 +1312,46 @@ public void resetNodesLastHeartbeatTime() {
node.lastHeartbeatTime = now;
}
}
+
+ /**
+ * This method writes the state of the NodeManager to disk
+ * @param jsonGenerator The instance of JsonGenerator, which will be used to
+ * write JSON to disk
+ * @throws IOException
+ */
+ public void write(JsonGenerator jsonGenerator) throws IOException {
+ jsonGenerator.writeStartObject();
+
+ // nameToNode begins
+ jsonGenerator.writeFieldName("nameToNode");
+ jsonGenerator.writeStartObject();
+ for (Map.Entry<String, ClusterNode> entry : nameToNode.entrySet()) {
+ jsonGenerator.writeFieldName(entry.getKey());
+ entry.getValue().write(jsonGenerator);
+ }
+ jsonGenerator.writeEndObject();
+ // nameToNode ends
+
+ // hostsToSessions begins
+ // We create a new Map of type <ClusterNode.name, Set<SessionIds>>.
+ // The original hostsToSessions map has the ClusterNode as its key, and
+ // we do not need to persist the entire ClusterNode again, since we have
+ // already done that with nameToNode.
+ Map<String, Set<String>> hostsToSessionsMap =
+ new HashMap<String, Set<String>>();
+ for (Map.Entry<ClusterNode, Set<String>> entry :
+ hostsToSessions.entrySet()) {
+ hostsToSessionsMap.put(entry.getKey().getName(),
+ entry.getValue());
+ }
+ jsonGenerator.writeObjectField("hostsToSessions", hostsToSessionsMap);
+ // hostsToSessions ends
+
+ jsonGenerator.writeObjectField("nameToApps", nameToApps);
+
+ // faultManager is not required
+
+ // We can rebuild the loadManager
+ jsonGenerator.writeEndObject();
+ }
}
49 src/contrib/corona/src/java/org/apache/hadoop/corona/ResourceRequestInfo.java
View
@@ -18,6 +18,11 @@
package org.apache.hadoop.corona;
+import org.apache.hadoop.util.CoronaSerializer;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -27,7 +32,8 @@
List<RequestedNode> nodes;
Set<String> excludedHosts = new HashSet<String>();
- public ResourceRequestInfo(ResourceRequest request, List<RequestedNode> nodes) {
+ public ResourceRequestInfo(ResourceRequest request,
+ List<RequestedNode> nodes) {
this.request = request;
this.nodes = nodes;
if (request.getExcludeHosts() != null) {
@@ -35,6 +41,33 @@ public ResourceRequestInfo(ResourceRequest request, List<RequestedNode> nodes) {
}
}
+ /**
+ * Constructor to reconstruct the ResourceRequestInfo object from the
+ * file which stores the state on the disk.
+ * @param coronaSerializer The CoronaSerializer instance being used to read
+ * the JSON from disk
+ * @throws IOException
+ */
+ public ResourceRequestInfo(CoronaSerializer coronaSerializer)
+ throws IOException {
+ // Expecting the START_OBJECT token for ResourceRequestInfo
+ coronaSerializer.readStartObjectToken("ResourceRequestInfo");
+
+ coronaSerializer.readField("request");
+ this.request = coronaSerializer.readValueAs(ResourceRequest.class);
+
+ // Expecting the END_OBJECT token for ResourceRequestInfo
+ coronaSerializer.readEndObjectToken("ResourceRequestInfo");
+
+ // Restoring the excludedHosts
+ if (request.getExcludeHosts() != null) {
+ excludedHosts.addAll(request.getExcludeHosts());
+ }
+
+ // The list of RequestedNodes, nodes, will be restored later once the
+ // topologyCache object is created
+ }
+
public int getId() {
return request.id;
}
@@ -58,4 +91,18 @@ public ComputeSpecs getSpecs() {
public List<RequestedNode> getRequestedNodes() {
return nodes;
}
+
+ /**
+ * This method writes the ResourceRequestInfo instance to disk
+ * @param jsonGenerator The JsonGenerator instance being used to write the
+ * JSON to disk
+ * @throws IOException
+ */
+ public void write(JsonGenerator jsonGenerator) throws IOException {
+ // We neither need the list of RequestedNodes, nodes, nor excludedHosts,
+ // because we can reconstruct them from the request object
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeObjectField("request", request);
+ jsonGenerator.writeEndObject();
+ }
}
235 src/contrib/corona/src/java/org/apache/hadoop/util/CoronaSerializer.java
View
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.corona.CoronaConf;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.impl.DefaultPrettyPrinter;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class CoronaSerializer {
+ /**
+ * The JsonParser instance contained within a CoronaSerializer instance.
+ */
+ public JsonParser jsonParser;
+
+ /**
+ * Constructor for the CoronaSerializer class.
+ *
+ * @param conf The CoronaConf instance to be used
+ * @throws IOException
+ */
+ public CoronaSerializer(CoronaConf conf) throws IOException {
+ InputStream inputStream = new FileInputStream(conf.getCMStateFile());
+ if (conf.getCMCompressStateFlag()) {
+ inputStream = new GZIPInputStream(inputStream);
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
+ false);
+ JsonFactory jsonFactory = new JsonFactory();
+ jsonFactory.setCodec(mapper);
+ jsonParser = jsonFactory.createJsonParser(inputStream);
+ }
+
+ /**
+ * This is a helper method that reads a JSON token using a JsonParser
+ * instance, and throws an exception if the next token is not START_OBJECT.
+ *
+ * @param parentFieldName The name of the field
+ * @throws IOException
+ */
+ public void readStartObjectToken(String parentFieldName)
+ throws IOException {
+ readToken(parentFieldName, JsonToken.START_OBJECT);
+ }
+
+ /**
+ * This is a helper method that reads a JSON token using a JsonParser
+ * instance, and throws an exception if the next token is not START_ARRAY.
+ *
+ * @param parentFieldName The name of the field
+ * @throws IOException
+ */
+ public void readStartArrayToken(String parentFieldName)
+ throws IOException {
+ readToken(parentFieldName, JsonToken.START_ARRAY);
+ }
+
+ /**
+ * This is a helper method that reads a JSON token using a JsonParser
+ * instance, and throws an exception if the next token is not END_OBJECT.
+ *
+ * @param parentFieldName The name of the field
+ * @throws IOException
+ */
+ public void readEndObjectToken(String parentFieldName)
+ throws IOException {
+ readToken(parentFieldName, JsonToken.END_OBJECT);
+ }
+
+ /**
+ * This is a helper method that reads a JSON token using a JsonParser
+ * instance, and throws an exception if the next token is not END_ARRAY.
+ *
+ * @param parentFieldName The name of the field
+ * @throws IOException
+ */
+ public void readEndArrayToken(String parentFieldName)
+ throws IOException {
+ readToken(parentFieldName, JsonToken.END_ARRAY);
+ }
+
+ /**
+ * This is a helper method that reads a JSON token using a JsonParser
+ * instance, and throws an exception if the next token is not the same as
+ * the token we expect.
+ *
+ * @param parentFieldName The name of the field
+ * @param expectedToken The expected token
+ * @throws IOException
+ */
+ public void readToken(String parentFieldName, JsonToken expectedToken)
+ throws IOException {
+ JsonToken currentToken = jsonParser.nextToken();
+ if (currentToken != expectedToken) {
+ throw new IOException("Expected a " + expectedToken.toString() +
+ " token when reading the value of the field: " +
+ parentFieldName +
+ " but found a " +
+ currentToken.toString() + " token");
+ }
+ }
+
+ /**
+ * This is a helper method which creates a JsonGenerator instance, for writing
+ * the state of the ClusterManager to the state file. The JsonGenerator
+ * instance writes to a compressed file if we have the compression flag
+ * turned on.
+ *
+ * @param conf The CoronaConf instance to be used
+ * @return The JsonGenerator instance to be used
+ * @throws IOException
+ */
+ public static JsonGenerator createJsonGenerator(CoronaConf conf)
+ throws IOException {
+ OutputStream outputStream = new FileOutputStream(conf.getCMStateFile());
+ if (conf.getCMCompressStateFlag()) {
+ outputStream = new GZIPOutputStream(outputStream);
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ JsonGenerator jsonGenerator =
+ new JsonFactory().createJsonGenerator(outputStream, JsonEncoding.UTF8);
+ jsonGenerator.setCodec(mapper);
+ if (!conf.getCMCompressStateFlag()) {
+ jsonGenerator.setPrettyPrinter(new DefaultPrettyPrinter());
+ }
+ return jsonGenerator;
+ }
+
+ /**
+ * This is a helper method, which is used to throw an exception when we
+ * encounter an unexpected field name.
+ *
+ * @param fieldName Name of the field
+ * @param expectedFieldName Name of the expected field
+ * @throws IOException
+ */
+ public void foundUnknownField(String fieldName,
+ String expectedFieldName)
+ throws IOException {
+ throw new IOException("Found an unexpected field: " + fieldName +
+ ", instead of field: " + expectedFieldName);
+ }
+
+ /**
+ * The method reads a field from the JSON stream, and checks if the
+ * field read is the same as the expect field.
+ *
+ * @param expectedFieldName The field name which is expected next
+ * @throws IOException
+ */
+ public void readField(String expectedFieldName) throws IOException {
+ readToken(expectedFieldName, JsonToken.FIELD_NAME);
+ String fieldName = jsonParser.getCurrentName();
+ if (!fieldName.equals(expectedFieldName)) {
+ foundUnknownField(fieldName, expectedFieldName);
+ }
+ }
+
+ /**
+ * Moves the JSON parser ahead by one token
+ *
+ * @return The current JSON token
+ * @throws IOException
+ */
+ public JsonToken nextToken() throws IOException {
+ return jsonParser.nextToken();
+ }
+
+ /**
+ * If the current token is a field name, this method returns the name of
+ * the field.
+ *
+ * @return A String object containing the name of the field
+ * @throws IOException
+ */
+ public String getFieldName() throws IOException {
+ if (jsonParser.getCurrentToken() != JsonToken.FIELD_NAME) {
+ throw new IOException("Expected a field of type " + JsonToken.FIELD_NAME +
+ ", but found a field of type " +
+ jsonParser.getCurrentToken());
+ }
+ return jsonParser.getCurrentName();
+ }
+
+ /**
+ * The JsonParser class exposes a number of methods for reading values, which
+ * are named confusingly. They also don't move the current token ahead, which
+ * makes us use statements like jsonParser.nextToken() everywhere in the code.
+ *
+ * This wrapper method abstracts all that.
+ *
+ * @param valueType The type of the value to be read
+ * @param <T>
+ * @return
+ * @throws IOException
+ */
+ public <T> T readValueAs(Class<T> valueType)
+ throws IOException {
+ jsonParser.nextToken();
+ if (valueType == String.class) {
+ return valueType.cast(jsonParser.getText());
+ }
+ return jsonParser.readValueAs(valueType);
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.