Skip to content
Browse files

Update Kitten to run against Hadoop 2.2.0.

  • Loading branch information...
1 parent 9faaf1e commit 11527be23af2a50d83c29141465fa1aa70c5a7bf @jwills jwills committed
Showing with 347 additions and 648 deletions.
  1. +2 −0 .gitignore
  2. +2 −6 README.md
  3. +1 −1 java/client/pom.xml
  4. +1 −1 java/client/src/main/java/com/cloudera/kitten/client/params/lua/LuaYarnClientParameters.java
  5. +10 −19 ...oudera/kitten/client/service/{ApplicationsManagerConnectionFactory.java → YarnClientFactory.java}
  6. +44 −44 java/client/src/main/java/com/cloudera/kitten/client/service/YarnClientServiceImpl.java
  7. +8 −4 java/client/src/test/java/com/cloudera/kitten/TestKittenDistributedShell.java
  8. +2 −8 java/client/src/test/java/com/cloudera/kitten/client/params/lua/BasicLuaConfigTest.java
  9. +3 −2 java/client/src/test/resources/log4j.properties
  10. +5 −0 java/client/src/test/resources/lua/distshell.lua
  11. +1 −1 java/common/pom.xml
  12. +4 −13 java/common/src/main/java/com/cloudera/kitten/ContainerLaunchContextFactory.java
  13. +1 −11 java/common/src/main/java/com/cloudera/kitten/ContainerLaunchParameters.java
  14. +5 −24 java/common/src/main/java/com/cloudera/kitten/lua/LuaContainerLaunchParameters.java
  15. +1 −0 java/common/src/main/java/com/cloudera/kitten/lua/LuaFields.java
  16. +1 −1 java/common/src/main/resources/lua/kitten.lua
  17. +3 −2 java/{master/src/main → common/src/test}/resources/log4j.properties
  18. +2 −2 java/examples/distshell/distshell.lua
  19. +1 −1 java/master/pom.xml
  20. +1 −2 java/master/src/main/java/com/cloudera/kitten/appmaster/ApplicationMaster.java
  21. +9 −21 java/master/src/main/java/com/cloudera/kitten/appmaster/ApplicationMasterParameters.java
  22. +1 −8 java/master/src/main/java/com/cloudera/kitten/appmaster/ApplicationMasterService.java
  23. +0 −26 java/master/src/main/java/com/cloudera/kitten/appmaster/ContainerManagerConnectionFactory.java
  24. +9 −47 ...master/src/main/java/com/cloudera/kitten/appmaster/params/lua/LuaApplicationMasterParameters.java
  25. +216 −280 java/master/src/main/java/com/cloudera/kitten/appmaster/service/ApplicationMasterServiceImpl.java
  26. +0 −62 ...er/src/main/java/com/cloudera/kitten/appmaster/service/ContainerManagerConnectionFactoryImpl.java
  27. +0 −51 .../master/src/main/java/com/cloudera/kitten/appmaster/service/ResourceManagerConnectionFactory.java
  28. +1 −1 java/master/src/test/java/com/cloudera/kitten/appmaster/util/HDFSFileFinderTest.java
  29. +10 −0 java/master/src/test/resources/log4j.properties
  30. +1 −1 java/pom.xml
  31. +2 −9 pom.xml
View
2 .gitignore
@@ -3,3 +3,5 @@
.settings
target
build
+*.iml
+.idea
View
8 README.md
@@ -22,15 +22,11 @@ To build Kitten, run:
from this directory. That will build the common, master, and client subprojects.
-Kitten is developed against CDH4, which ships with an experimental YARN
-module. [Cloudera Manager](https://ccp.cloudera.com/display/SUPPORT/Cloudera+Manager+Downloads)
-is the easiest way to get a Hadoop cluster with YARN up and running.
-
The `java/examples/distshell` directory contains an example configuration file
that can be used to run the Kitten version of the Distributed Shell example
-application that ships with Hadoop 2.0.0. To run the example, execute:
+application that ships with Hadoop 2.2.0. To run the example, execute:
- hadoop jar kitten-client-0.1.0-jar-with-dependencies.jar distshell.lua distshell
+ hadoop jar kitten-client-0.2.0-jar-with-dependencies.jar distshell.lua distshell
where the jar file is in the `java/client/target` directory. You should also copy the
application master jar file from `java/master/target` to a directory where it can be
View
2 java/client/pom.xml
@@ -5,7 +5,7 @@
<parent>
<artifactId>kitten-java</artifactId>
<groupId>com.cloudera.kitten</groupId>
- <version>0.1.0</version>
+ <version>0.2.0</version>
<relativePath>../</relativePath>
</parent>
View
2 java/client/src/main/java/com/cloudera/kitten/client/params/lua/LuaYarnClientParameters.java
@@ -100,7 +100,7 @@ public String getApplicationName() {
@Override
public String getQueue() {
- return env.isNil(LuaFields.QUEUE) ? "" : env.getString(LuaFields.QUEUE);
+ return env.isNil(LuaFields.QUEUE) ? "default" : env.getString(LuaFields.QUEUE);
}
@Override
View
29 ...ApplicationsManagerConnectionFactory.java → ...ten/client/service/YarnClientFactory.java
@@ -14,39 +14,30 @@
*/
package com.cloudera.kitten.client.service;
-import java.net.InetSocketAddress;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import com.cloudera.kitten.MasterConnectionFactory;
import com.google.common.base.Preconditions;
-public class ApplicationsManagerConnectionFactory implements
- MasterConnectionFactory<ClientRMProtocol> {
+public class YarnClientFactory implements MasterConnectionFactory<YarnClient> {
- private static final Log LOG = LogFactory.getLog(ApplicationsManagerConnectionFactory.class);
+ private static final Log LOG = LogFactory.getLog(YarnClientFactory.class);
private final Configuration conf;
- private final YarnRPC rpc;
-
- public ApplicationsManagerConnectionFactory(Configuration conf) {
+
+ public YarnClientFactory(Configuration conf) {
this.conf = Preconditions.checkNotNull(conf);
- this.rpc = YarnRPC.create(conf);
}
@Override
- public ClientRMProtocol connect() {
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS));
- LOG.info("Connecting to ResourceManager at: " + rmAddress);
- return ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
+ public YarnClient connect() {
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(new YarnConfiguration(conf));
+ client.start();
+ return client;
}
}
View
88 java/client/src/main/java/com/cloudera/kitten/client/service/YarnClientServiceImpl.java
@@ -14,27 +14,23 @@
*/
package com.cloudera.kitten.client.service;
+import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import com.cloudera.kitten.ContainerLaunchContextFactory;
import com.cloudera.kitten.ContainerLaunchParameters;
@@ -58,48 +54,45 @@
private static final Log LOG = LogFactory.getLog(YarnClientServiceImpl.class);
private final YarnClientParameters parameters;
- private final MasterConnectionFactory<ClientRMProtocol> applicationsManagerFactory;
+ private final MasterConnectionFactory<YarnClient> yarnClientFactory;
private final Stopwatch stopwatch;
- private ClientRMProtocol applicationsManager;
+ private YarnClient yarnClient;
private ApplicationId applicationId;
private ApplicationReport finalReport;
private boolean timeout = false;
public YarnClientServiceImpl(YarnClientParameters params) {
- this(params, new ApplicationsManagerConnectionFactory(params.getConfiguration()),
+ this(params, new YarnClientFactory(params.getConfiguration()),
new Stopwatch());
}
public YarnClientServiceImpl(YarnClientParameters parameters,
- MasterConnectionFactory<ClientRMProtocol> applicationsManagerFactory,
+ MasterConnectionFactory<YarnClient> yarnClientFactory,
Stopwatch stopwatch) {
this.parameters = Preconditions.checkNotNull(parameters);
- this.applicationsManagerFactory = applicationsManagerFactory;
+ this.yarnClientFactory = yarnClientFactory;
this.stopwatch = stopwatch;
}
@Override
protected void startUp() {
- this.applicationsManager = applicationsManagerFactory.connect();
+ this.yarnClient = yarnClientFactory.connect();
+ YarnClientApplication clientApp = getNewApplication();
+ GetNewApplicationResponse newApp = clientApp.getNewApplicationResponse();
+ ContainerLaunchContextFactory clcFactory = new ContainerLaunchContextFactory(newApp.getMaximumResourceCapability());
- GetNewApplicationResponse newApp = getNewApplication();
- this.applicationId = newApp.getApplicationId();
- ContainerLaunchContextFactory clcFactory = new ContainerLaunchContextFactory(
- newApp.getMinimumResourceCapability(), newApp.getMaximumResourceCapability());
-
- LOG.info("Setting up application submission context for the application master");
- ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
- appContext.setApplicationId(applicationId);
+ ApplicationSubmissionContext appContext = clientApp.getApplicationSubmissionContext();
+ this.applicationId = appContext.getApplicationId();
appContext.setApplicationName(parameters.getApplicationName());
// Setup the container for the application master.
ContainerLaunchParameters appMasterParams = parameters.getApplicationMasterParameters(applicationId);
ContainerLaunchContext clc = clcFactory.create(appMasterParams);
+ appContext.setResource(clcFactory.createResource(appMasterParams));
appContext.setAMContainerSpec(clc);
- appContext.setUser(appMasterParams.getUser());
appContext.setQueue(parameters.getQueue());
- appContext.setPriority(ContainerLaunchContextFactory.createPriority(appMasterParams.getPriority()));
+ appContext.setPriority(clcFactory.createPriority(appMasterParams.getPriority()));
submitApplication(appContext);
// Make sure we stop the application in the case that it isn't done already.
@@ -116,25 +109,28 @@ public void run() {
}
private void submitApplication(ApplicationSubmissionContext appContext) {
- SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
-
LOG.info("Submitting application to the applications manager");
try {
- applicationsManager.submitApplication(appRequest);
- } catch (YarnRemoteException e) {
+ yarnClient.submitApplication(appContext);
+ } catch (YarnException e) {
LOG.error("Exception thrown submitting application", e);
stop();
- }
+ } catch (IOException e) {
+ LOG.error("IOException thrown submitting application", e);
+ stop();
+ }
}
- private GetNewApplicationResponse getNewApplication() {
+ private YarnClientApplication getNewApplication() {
try {
- return applicationsManager.getNewApplication(Records.newRecord(GetNewApplicationRequest.class));
- } catch (YarnRemoteException e) {
+ return yarnClient.createApplication();
+ } catch (YarnException e) {
LOG.error("Exception thrown getting new application", e);
stop();
return null;
+ } catch (IOException e) {
+ stop();
+ return null;
}
}
@@ -143,30 +139,34 @@ protected void shutDown() {
if (finalReport != null) {
YarnApplicationState state = finalReport.getYarnApplicationState();
FinalApplicationStatus status = finalReport.getFinalApplicationStatus();
+ String diagnostics = finalReport.getDiagnostics();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == status) {
LOG.info("Application completed successfully.");
}
else {
LOG.info("Application finished unsuccessfully."
- + " State=" + state.toString() + ", FinalStatus=" + status.toString());
+ + " State = " + state.toString() + ", FinalStatus = " + status.toString());
}
}
else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
LOG.info("Application did not complete successfully."
- + " State=" + state.toString() + ", FinalStatus=" + status.toString());
+ + " State = " + state.toString() + ", FinalStatus = " + status.toString());
+ if (diagnostics != null) {
+ LOG.info("Diagnostics = " + diagnostics);
+ }
}
} else {
// Otherwise, we need to kill the application, if it was created.
if (applicationId != null) {
LOG.info("Killing application id = " + applicationId);
- KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class);
- request.setApplicationId(applicationId);
try {
- applicationsManager.forceKillApplication(request);
- } catch (YarnRemoteException e) {
+ yarnClient.killApplication(applicationId);
+ } catch (YarnException e) {
LOG.error("Exception thrown killing application", e);
+ } catch (IOException e) {
+ LOG.error("IOException thrown killing application", e);
}
LOG.info("Application was killed.");
}
@@ -198,14 +198,14 @@ public ApplicationReport getFinalReport() {
@Override
public ApplicationReport getApplicationReport() {
- GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(applicationId);
try {
- GetApplicationReportResponse response = applicationsManager.getApplicationReport(reportRequest);
- return response.getApplicationReport();
- } catch (YarnRemoteException e) {
+ return yarnClient.getApplicationReport(applicationId);
+ } catch (YarnException e) {
LOG.error("Exception occurred requesting application report", e);
return null;
+ } catch (IOException e) {
+ LOG.error("IOException occurred requesting application report", e);
+ return null;
}
}
View
12 java/client/src/test/java/com/cloudera/kitten/TestKittenDistributedShell.java
@@ -50,6 +50,7 @@ public static void setup() throws InterruptedException, IOException {
1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
+ conf = yarnCluster.getConfig();
}
try {
Thread.sleep(2000);
@@ -71,8 +72,11 @@ public void testKittenShell() throws Exception {
String config = "/lua/distshell.lua";
// For the outputs
- File tmpFile = File.createTempFile("distshell", ".txt");
- tmpFile.deleteOnExit();
+ File tmpFile = new File("/tmp/distshell.out");
+ if (tmpFile.exists()) {
+ tmpFile.delete();
+ }
+ //tmpFile.deleteOnExit();
KittenClient client = new KittenClient(
ImmutableMap.<String, Object>of(
@@ -80,8 +84,8 @@ public void testKittenShell() throws Exception {
"PWD", (new File(".")).getAbsolutePath()));
conf.set(LocalDataHelper.APP_BASE_DIR, "file:///tmp/");
client.setConf(conf);
-
- assertEquals(0, client.run(new String[] { config, "distshell" }));
+ System.out.println("Running...");
+ assertEquals(0, client.run(new String[]{config, "distshell"}));
assertEquals(12, Files.readLines(tmpFile, Charsets.UTF_8).size());
}
}
View
10 java/client/src/test/java/com/cloudera/kitten/client/params/lua/BasicLuaConfigTest.java
@@ -29,21 +29,17 @@
import com.cloudera.kitten.ContainerLaunchParameters;
import com.cloudera.kitten.client.YarnClientParameters;
-import com.cloudera.kitten.client.params.lua.LuaYarnClientParameters;
import com.cloudera.kitten.util.LocalDataHelper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
public class BasicLuaConfigTest {
- Resource clusterMin;
Resource clusterMax;
Configuration conf;
@Before
public void setUp() throws Exception {
- clusterMin = Records.newRecord(Resource.class);
- clusterMin.setMemory(50);
clusterMax = Records.newRecord(Resource.class);
clusterMax.setMemory(90);
conf = new Configuration();
@@ -59,14 +55,12 @@ public void testBasicClient() throws Exception {
conf);
assertEquals("Distributed Shell", params.getApplicationName());
assertEquals(86400L, params.getClientTimeoutMillis());
- assertEquals("", params.getQueue());
+ assertEquals("default", params.getQueue());
ContainerLaunchParameters clp = params.getApplicationMasterParameters(null);
- assertEquals("josh", clp.getUser());
assertEquals(1, clp.getPriority());
- assertEquals(100, clp.getMemory());
// clusterMax = 90 < 100
- assertEquals(clusterMax, clp.getContainerResource(clusterMin, clusterMax));
+ assertEquals(clusterMax, clp.getContainerResource(clusterMax));
Map<String, String> expEnv = ImmutableMap.of(
"zs", "10", "a", "b", "fiz", "faz", "foo", "foo", "biz", "baz");
Map<String, String> actEnv = clp.getEnvironment();
View
5 java/client/src/test/resources/log4j.properties
@@ -1,6 +1,7 @@
-# ***** Set root logger level to INFO and its only appender to A.
+# ***** Set root logger level to WARN and its only appender to A.
+log4j.rootLogger=warn, A
log4j.logger.com.cloudera.kitten=info, A
-log4j.logger.org.apache.hadoop=error, A
+log4j.additivity.com.cloudera.kitten=false
# ***** A is set to be a ConsoleAppender.
log4j.appender.A=org.apache.log4j.ConsoleAppender
View
5 java/client/src/test/resources/lua/distshell.lua
@@ -5,18 +5,23 @@ CONTAINER_INSTANCES = 2
base_env = {
-- PWD is specified by the test case.
CLASSPATH = table.concat({"${CLASSPATH}", PWD .. "/target/classes/", PWD .. "/target/lib/*"}, ":"),
+
}
distshell = yarn {
name = "Distributed Shell",
timeout = 60000,
memory = 256,
+ cores = 1,
master = {
env = base_env,
command = {
base = "java -Xmx128m com.cloudera.kitten.appmaster.ApplicationMaster",
args = { "-conf job.xml", "1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr" }, -- job.xml contains the client configuration info.
+ },
+ resources = {
+ ["log4j.properties"] = {file = PWD .. "/src/test/resources/log4j.properties"}
}
},
View
2 java/common/pom.xml
@@ -5,7 +5,7 @@
<parent>
<artifactId>kitten-java</artifactId>
<groupId>com.cloudera.kitten</groupId>
- <version>0.1.0</version>
+ <version>0.2.0</version>
<relativePath>../</relativePath>
</parent>
View
17 java/common/src/main/java/com/cloudera/kitten/ContainerLaunchContextFactory.java
@@ -25,11 +25,9 @@
*/
public class ContainerLaunchContextFactory {
- private final Resource clusterMin;
private final Resource clusterMax;
- public ContainerLaunchContextFactory(Resource clusterMin, Resource clusterMax) {
- this.clusterMin = clusterMin;
+ public ContainerLaunchContextFactory(Resource clusterMax) {
this.clusterMax = clusterMax;
}
@@ -38,21 +36,14 @@ public ContainerLaunchContext create(ContainerLaunchParameters parameters) {
clc.setCommands(parameters.getCommands());
clc.setEnvironment(parameters.getEnvironment());
clc.setLocalResources(parameters.getLocalResources());
- clc.setResource(parameters.getContainerResource(clusterMin, clusterMax));
- clc.setUser(parameters.getUser());
return clc;
}
- public ResourceRequest createResourceRequest(ContainerLaunchParameters parameters) {
- ResourceRequest req = Records.newRecord(ResourceRequest.class);
- req.setCapability(parameters.getContainerResource(clusterMin, clusterMax));
- req.setPriority(createPriority(parameters.getPriority()));
- req.setNumContainers(parameters.getNumInstances());
- req.setHostName("*"); // TODO: get smarter about this.
- return req;
+ public Resource createResource(ContainerLaunchParameters parameters) {
+ return parameters.getContainerResource(clusterMax);
}
- public static Priority createPriority(int priority) {
+ public Priority createPriority(int priority) {
Priority p = Records.newRecord(Priority.class);
p.setPriority(priority);
return p;
View
12 java/common/src/main/java/com/cloudera/kitten/ContainerLaunchParameters.java
@@ -26,19 +26,9 @@
*/
public interface ContainerLaunchParameters {
/**
- * The user to run the container as.
- */
- String getUser();
-
- /**
- * The requested memory for this container in megabytes.
- */
- int getMemory();
-
- /**
* Returns the resources needed for this job, using the cluster min and max as bounds.
*/
- Resource getContainerResource(Resource clusterMin, Resource clusterMax);
+ Resource getContainerResource(Resource clusterMax);
/**
* The requested priority for the container.
View
29 java/common/src/main/java/com/cloudera/kitten/lua/LuaContainerLaunchParameters.java
@@ -58,11 +58,6 @@ public LuaContainerLaunchParameters(LuaWrapper lv, Configuration conf, Map<Strin
this(lv, conf, localFileUris, new Extras());
}
- public LuaContainerLaunchParameters(LuaValue lv, Configuration conf,
- Map<String, URI> localFileUris, Extras extras) {
- this(new LuaWrapper(lv.checktable()), conf, localFileUris, extras);
- }
-
public LuaContainerLaunchParameters(LuaWrapper lv, Configuration conf,
Map<String, URI> localFileUris, Extras extras) {
this.lv = lv;
@@ -71,33 +66,19 @@ public LuaContainerLaunchParameters(LuaWrapper lv, Configuration conf,
this.extras = extras;
}
-
- @Override
- public String getUser() {
- if (!lv.isNil(LuaFields.USER)) {
- String user = lv.getString(LuaFields.USER);
- if (!user.isEmpty()) {
- return user;
- }
- }
-
- try {
- return UserGroupInformation.getCurrentUser().getShortUserName();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ public int getCores() {
+ return lv.getInteger(LuaFields.CORES);
}
- @Override
public int getMemory() {
return lv.getInteger(LuaFields.MEMORY);
}
@Override
- public Resource getContainerResource(Resource clusterMin, Resource clusterMax) {
+ public Resource getContainerResource(Resource clusterMax) {
Resource rsrc = Records.newRecord(Resource.class);
- rsrc.setMemory(Math.min(clusterMax.getMemory(),
- Math.max(clusterMin.getMemory(), getMemory())));
+ rsrc.setMemory(Math.min(clusterMax.getMemory(), getMemory()));
+ rsrc.setVirtualCores(Math.min(clusterMax.getVirtualCores(), getCores()));
return rsrc;
}
View
1 java/common/src/main/java/com/cloudera/kitten/lua/LuaFields.java
@@ -34,6 +34,7 @@
public static final String COMMANDS = "commands";
public static final String COMMAND = "command";
public static final String INSTANCES = "instances";
+ public static final String CORES = "cores";
public static final String MEMORY = "memory";
public static final String PRIORITY = "priority";
View
2 java/common/src/main/resources/lua/kitten.lua
@@ -33,7 +33,7 @@ function yarn(t)
end
t.name = t_check(t.name, "name", "string")
- t.queue = t_check(t.queue or "", "queue", "string")
+ t.queue = t_check(t.queue or "default", "queue", "string")
t.user = t_check(t.user or "", "user", "string")
t.timeout = t_check(t.timeout or -1, "timeout", "number")
View
5 ...aster/src/main/resources/log4j.properties → ...ommon/src/test/resources/log4j.properties
@@ -1,6 +1,7 @@
-# ***** Set root logger level to INFO and its only appender to A.
+# ***** Set root logger level to WARN and its only appender to A.
+log4j.rootLogger=warn, A
log4j.logger.com.cloudera.kitten=info, A
-log4j.logger.org.apache.hadoop=error, A
+log4j.additivity.com.cloudera.kitten=false
# ***** A is set to be a ConsoleAppender.
log4j.appender.A=org.apache.log4j.ConsoleAppender
View
4 java/examples/distshell/distshell.lua
@@ -3,7 +3,7 @@
--
-- To execute, run:
--
--- hadoop jar kitten-client-0.1.0-jar-with-dependencies.jar distshell.lua distshell
+-- hadoop jar kitten-client-0.2.0-jar-with-dependencies.jar distshell.lua distshell
--
-- from a directory that contains the client jar, the master jar, and distshell.lua.
--
@@ -16,7 +16,7 @@ SHELL_COMMAND = "ls -ltr >> /tmp/kitten_dir_contents"
CONTAINER_INSTANCES = 2
-- The location of the jar file containing kitten's default ApplicationMaster
-- implementation.
-MASTER_JAR_LOCATION = "kitten-master-0.1.0-jar-with-dependencies.jar"
+MASTER_JAR_LOCATION = "kitten-master-0.2.0-jar-with-dependencies.jar"
-- CLASSPATH setup.
H_SHARE = "/usr/lib/hadoop"
View
2 java/master/pom.xml
@@ -5,7 +5,7 @@
<parent>
<artifactId>kitten-java</artifactId>
<groupId>com.cloudera.kitten</groupId>
- <version>0.1.0</version>
+ <version>0.2.0</version>
<relativePath>../</relativePath>
</parent>
View
3 java/master/src/main/java/com/cloudera/kitten/appmaster/ApplicationMaster.java
@@ -30,8 +30,7 @@
@Override
public int run(String[] args) throws Exception {
ApplicationMasterParameters params = new LuaApplicationMasterParameters(getConf());
- ApplicationMasterService service = new ApplicationMasterServiceImpl(params);
-
+ ApplicationMasterService service = new ApplicationMasterServiceImpl(params, getConf());
service.startAndWait();
while (service.hasRunningContainers()) {
Thread.sleep(1000);
View
30 java/master/src/main/java/com/cloudera/kitten/appmaster/ApplicationMasterParameters.java
@@ -17,7 +17,6 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import com.cloudera.kitten.ContainerLaunchParameters;
@@ -33,11 +32,6 @@
Configuration getConfiguration();
/**
- * Returns the attempt ID for this application.
- */
- ApplicationAttemptId getApplicationAttemptId();
-
- /**
* Returns the parameters that will be used to launch the child containers for
* this application.
*/
@@ -50,32 +44,26 @@
int getAllowedFailures();
/**
- * Sets the hostname the master is running on. This information is communicated to the
- * resource manager and is then passed along to the client by YARN.
- */
- ApplicationMasterParameters setHostname(String hostname);
-
- /**
* Returns the hostname that was set for this application master.
*/
String getHostname();
-
+
/**
- * Sets the port the master is listening on for client requests. This information is
- * communicated to the resource manager and is then passed along to the client by YARN.
+ * TODO
+ * @param clientPort
*/
- ApplicationMasterParameters setClientPort(int port);
-
+ void setClientPort(int clientPort);
+
/**
* Returns the client port that was set for this application master.
*/
int getClientPort();
-
+
/**
- * Sets a tracking URL for the client. If it is not specified, the combination of the
- * hostname and the client port will be used.
+ * TODO
+ * @param trackingUrl
*/
- ApplicationMasterParameters setTrackingUrl(String url);
+ void setTrackingUrl(String trackingUrl);
/**
* Returns the tracking URL that was set for this application master.
View
9 java/master/src/main/java/com/cloudera/kitten/appmaster/ApplicationMasterService.java
@@ -14,8 +14,6 @@
*/
package com.cloudera.kitten.appmaster;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-
import com.google.common.util.concurrent.Service;
/**
@@ -28,12 +26,7 @@
* Returns the parameters used to configure this service.
*/
ApplicationMasterParameters getParameters();
-
- /**
- * Returns the application attempt ID.
- */
- ApplicationAttemptId getApplicationAttemptId();
-
+
/**
* Returns true if there are containers that this application master service is
* monitoring.
View
26 ...master/src/main/java/com/cloudera/kitten/appmaster/ContainerManagerConnectionFactory.java
@@ -1,26 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.kitten.appmaster;
-
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.records.Container;
-
-/**
- * Handles connecting to the {@code ContainerManager} that is responsible for a specific
- * {@code Container} instance.
- */
-public interface ContainerManagerConnectionFactory {
- ContainerManager connect(Container container);
-}
View
56 ...rc/main/java/com/cloudera/kitten/appmaster/params/lua/LuaApplicationMasterParameters.java
@@ -20,10 +20,6 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import com.cloudera.kitten.ContainerLaunchParameters;
import com.cloudera.kitten.appmaster.ApplicationMasterParameters;
@@ -36,56 +32,39 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.hadoop.net.NetUtils;
-public class LuaApplicationMasterParameters implements
- ApplicationMasterParameters {
+public class LuaApplicationMasterParameters implements ApplicationMasterParameters {
private final LuaWrapper env;
- private final ApplicationAttemptId applicationAttemptId;
private final Configuration conf;
private final Map<String, URI> localToUris;
-
- private String hostname = "";
+ private final String hostname;
+
private int clientPort = 0;
private String trackingUrl = "";
-
+
public LuaApplicationMasterParameters(Configuration conf) {
this(LuaFields.KITTEN_LUA_CONFIG_FILE, System.getenv(LuaFields.KITTEN_JOB_NAME), conf);
}
- public LuaApplicationMasterParameters(Configuration conf, Map<String, Object> extras) {
- this(LuaFields.KITTEN_LUA_CONFIG_FILE, System.getenv(LuaFields.KITTEN_JOB_NAME), conf, extras);
- }
-
public LuaApplicationMasterParameters(String script, String jobName, Configuration conf) {
this(script, jobName, conf, ImmutableMap.<String, Object>of());
}
public LuaApplicationMasterParameters(String script, String jobName,
Configuration conf, Map<String, Object> extras) {
- this(script, jobName, conf, extras, loadApplicationAttemptId(), loadLocalToUris());
+ this(script, jobName, conf, extras, loadLocalToUris());
}
public LuaApplicationMasterParameters(String script, String jobName,
Configuration conf,
Map<String, Object> extras,
- ApplicationAttemptId applicationAttemptId,
Map<String, URI> localToUris) {
this.env = new LuaWrapper(script, loadExtras(extras)).getTable(jobName);
this.conf = conf;
- this.applicationAttemptId = applicationAttemptId;
this.localToUris = localToUris;
- }
-
- private static ApplicationAttemptId loadApplicationAttemptId() {
- Map<String, String> e = System.getenv();
- if (e.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
- ContainerId containerId = ConverterUtils.toContainerId(
- e.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
- return containerId.getApplicationAttemptId();
- }
- throw new IllegalStateException(
- "Could not find application attempt ID in environment variables");
+ this.hostname = NetUtils.getHostname();
}
private static Map<String, URI> loadLocalToUris() {
@@ -113,20 +92,13 @@ public Configuration getConfiguration() {
}
@Override
- public ApplicationMasterParameters setHostname(String hostname) {
- this.hostname = hostname;
- return this;
- }
-
- @Override
public String getHostname() {
return hostname;
}
@Override
- public ApplicationMasterParameters setClientPort(int clientPort) {
+ public void setClientPort(int clientPort) {
this.clientPort = clientPort;
- return this;
}
@Override
@@ -135,26 +107,16 @@ public int getClientPort() {
}
@Override
- public ApplicationMasterParameters setTrackingUrl(String trackingUrl) {
+ public void setTrackingUrl(String trackingUrl) {
this.trackingUrl = trackingUrl;
- return this;
}
@Override
public String getTrackingUrl() {
- if (trackingUrl.isEmpty() && !hostname.isEmpty()) {
- return String.format("%s:%d", hostname, clientPort);
- }
return trackingUrl;
}
@Override
- public ApplicationAttemptId getApplicationAttemptId() {
- return applicationAttemptId;
- }
-
-
- @Override
public int getAllowedFailures() {
if (env.isNil(LuaFields.TOLERATED_FAILURES)) {
return 4; // TODO: kind of arbitrary, no? :)
View
496 ...ter/src/main/java/com/cloudera/kitten/appmaster/service/ApplicationMasterServiceImpl.java
@@ -14,90 +14,69 @@
*/
package com.cloudera.kitten.appmaster.service;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.cloudera.kitten.ContainerLaunchContextFactory;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import com.cloudera.kitten.ContainerLaunchContextFactory;
import com.cloudera.kitten.ContainerLaunchParameters;
-import com.cloudera.kitten.MasterConnectionFactory;
import com.cloudera.kitten.appmaster.ApplicationMasterParameters;
import com.cloudera.kitten.appmaster.ApplicationMasterService;
-import com.cloudera.kitten.appmaster.ContainerManagerConnectionFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractScheduledService;
public class ApplicationMasterServiceImpl extends
- AbstractScheduledService implements ApplicationMasterService {
+ AbstractScheduledService implements ApplicationMasterService,
+ AMRMClientAsync.CallbackHandler {
private static final Log LOG = LogFactory.getLog(ApplicationMasterServiceImpl.class);
private final ApplicationMasterParameters parameters;
- private final MasterConnectionFactory<AMRMProtocol> resourceManagerFactory;
- private final ContainerManagerConnectionFactory containerManagerFactory;
- private final List<ContainerTracker> containerTrackers;
- private final AtomicInteger newRequestId = new AtomicInteger();
+ private final YarnConfiguration conf;
+ private AtomicInteger totalRequested = new AtomicInteger();
+ private AtomicInteger totalCompleted = new AtomicInteger();
private final AtomicInteger totalFailures = new AtomicInteger();
-
- private AMRMProtocol resourceManager;
- private ContainerLaunchContextFactory containerLaunchContextFactory;
+ private final List<ContainerTracker> trackers = Lists.newArrayList();
+
+ private AMRMClientAsync resourceManager;
private boolean hasRunningContainers = false;
-
- public ApplicationMasterServiceImpl(ApplicationMasterParameters params) {
- this(params, new ResourceManagerConnectionFactory(params.getConfiguration()),
- new ContainerManagerConnectionFactoryImpl(params.getConfiguration()));
- }
-
- public ApplicationMasterServiceImpl(ApplicationMasterParameters parameters,
- MasterConnectionFactory<AMRMProtocol> resourceManagerFactory,
- ContainerManagerConnectionFactory containerManagerFactory) {
+ private Throwable throwable;
+
+ public ApplicationMasterServiceImpl(ApplicationMasterParameters parameters, Configuration conf) {
this.parameters = Preconditions.checkNotNull(parameters);
- this.resourceManagerFactory = resourceManagerFactory;
- this.containerManagerFactory = containerManagerFactory;
- this.containerTrackers = Lists.newArrayList();
+ this.conf = new YarnConfiguration(conf);
}
@Override
public ApplicationMasterParameters getParameters() {
return parameters;
}
-
- @Override
- public ApplicationAttemptId getApplicationAttemptId() {
- return parameters.getApplicationAttemptId();
- }
-
+
@Override
public boolean hasRunningContainers() {
return hasRunningContainers;
@@ -105,80 +84,58 @@ public boolean hasRunningContainers() {
@Override
protected void startUp() {
- this.resourceManager = resourceManagerFactory.connect();
-
- RegisterApplicationMasterResponse registration = null;
+ this.resourceManager = AMRMClientAsync.createAMRMClientAsync(1000, this);
+ this.resourceManager.init(conf);
+ this.resourceManager.start();
+
+ RegisterApplicationMasterResponse registration;
try {
registration = resourceManager.registerApplicationMaster(
- createRegistrationRequest());
- } catch (YarnRemoteException e) {
+ parameters.getHostname(),
+ parameters.getClientPort(),
+ parameters.getTrackingUrl());
+ } catch (Exception e) {
LOG.error("Exception thrown registering application master", e);
stop();
return;
}
-
- this.containerLaunchContextFactory = new ContainerLaunchContextFactory(
- registration.getMinimumResourceCapability(), registration.getMaximumResourceCapability());
-
- List<ContainerLaunchParameters> containerParameters = parameters.getContainerLaunchParameters();
- if (containerParameters.isEmpty()) {
- LOG.warn("No container configurations specified");
- stop();
- return;
- }
- for (int i = 0; i < containerParameters.size(); i++) {
- ContainerLaunchParameters clp = containerParameters.get(i);
- containerTrackers.add(new ContainerTracker(clp));
+
+ ContainerLaunchContextFactory factory = new ContainerLaunchContextFactory(
+ registration.getMaximumResourceCapability());
+ for (ContainerLaunchParameters clp : parameters.getContainerLaunchParameters()) {
+ ContainerTracker tracker = new ContainerTracker(clp);
+ tracker.init(factory);
+ trackers.add(tracker);
}
this.hasRunningContainers = true;
}
- private AMResponse allocate(int id, ResourceRequest request, List<ContainerId> releases) {
- AllocateRequest req = Records.newRecord(AllocateRequest.class);
- req.setResponseId(id);
- req.setApplicationAttemptId(parameters.getApplicationAttemptId());
- req.addAsk(request);
- req.addAllReleases(releases);
- try {
- return resourceManager.allocate(req).getAMResponse();
- } catch (YarnRemoteException e) {
- LOG.warn("Exception thrown during resource request", e);
- return Records.newRecord(AllocateResponse.class).getAMResponse();
- }
- }
-
- private RegisterApplicationMasterRequest createRegistrationRequest() {
- RegisterApplicationMasterRequest req = Records.newRecord(
- RegisterApplicationMasterRequest.class);
- req.setApplicationAttemptId(parameters.getApplicationAttemptId());
- req.setHost(parameters.getHostname());
- req.setRpcPort(parameters.getClientPort());
- req.setTrackingUrl(parameters.getTrackingUrl());
- return req;
- }
-
@Override
protected void shutDown() {
// Stop the containers in the case that we're finishing because of a timeout.
LOG.info("Stopping trackers");
- for (ContainerTracker tracker : containerTrackers) {
- tracker.stopServices();
- }
this.hasRunningContainers = false;
-
- FinishApplicationMasterRequest finishReq = Records.newRecord(
- FinishApplicationMasterRequest.class);
- finishReq.setAppAttemptId(getApplicationAttemptId());
+
+ for (ContainerTracker tracker : trackers) {
+ if (tracker.hasRunningContainers()) {
+ tracker.kill();
+ }
+ }
+ FinalApplicationStatus status;
+ String message = null;
if (state() == State.FAILED || totalFailures.get() > parameters.getAllowedFailures()) {
//TODO: diagnostics
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ status = FinalApplicationStatus.FAILED;
+ if (throwable != null) {
+ message = throwable.getLocalizedMessage();
+ }
} else {
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ status = FinalApplicationStatus.SUCCEEDED;
}
- LOG.info("Sending finish request with status = " + finishReq.getFinalApplicationStatus());
+ LOG.info("Sending finish request with status = " + status);
try {
- resourceManager.finishApplicationMaster(finishReq);
- } catch (YarnRemoteException e) {
+ resourceManager.unregisterApplicationMaster(status, message, null);
+ } catch (Exception e) {
LOG.error("Error finishing application master", e);
}
}
@@ -190,215 +147,194 @@ protected Scheduler scheduler() {
@Override
protected void runOneIteration() throws Exception {
- if (totalFailures.get() > parameters.getAllowedFailures()) {
+ if (totalFailures.get() > parameters.getAllowedFailures() ||
+ totalCompleted.get() == totalRequested.get()) {
stop();
- return;
- }
-
- if (hasRunningContainers) {
- boolean moreWork = false;
- for (ContainerTracker tracker : containerTrackers) {
- moreWork |= tracker.doWork();
- }
- this.hasRunningContainers = moreWork;
}
}
-
- private class ContainerTracker {
- private final ContainerLaunchParameters parameters;
- private final int needed;
- private final Map<ContainerId, ContainerService> services;
- private final Map<ContainerId, ContainerStatus> amStatus;
-
- private int requested = 0;
- private int completed = 0;
- private boolean stopping = false;
-
- public ContainerTracker(ContainerLaunchParameters parameters) {
- this.parameters = parameters;
- this.needed = parameters.getNumInstances();
- this.services = Maps.newHashMapWithExpectedSize(needed);
- this.amStatus = Maps.newHashMapWithExpectedSize(needed);
- }
-
- public boolean doWork() {
- if (shouldWork()) {
- AMResponse resp = allocate(newRequestId.incrementAndGet(), createRequest(), getReleases());
- handleAllocation(resp);
- completed = checkContainerStatuses(resp);
- }
- LOG.info(String.format("Total completed Containers = %d", completed));
- return shouldWork();
- }
-
- private boolean shouldWork() {
- return !stopping && completed < needed;
- }
-
- private ResourceRequest createRequest() {
- ResourceRequest req = containerLaunchContextFactory.createResourceRequest(parameters);
- req.setNumContainers(needed - requested);
- if (requested < needed) {
- requested = needed;
- }
- return req;
- }
-
- private List<ContainerId> getReleases() {
- // TODO: empty for now, but perhaps something for the future.
- return ImmutableList.of();
- }
-
- private void handleAllocation(AMResponse resp) {
- List<Container> newContainers = resp.getAllocatedContainers();
- for (Container container : newContainers) {
- if (!services.containsKey(container.getId())) {
- ContainerService cs = new ContainerService(container, parameters);
- services.put(container.getId(), cs);
- cs.start();
+
+ // AMRMClientHandler methods
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
+ LOG.info(containerStatuses.size() + " containers have completed");
+ for (ContainerStatus status : containerStatuses) {
+ int exitStatus = status.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (ContainerExitStatus.ABORTED != exitStatus) {
+ totalCompleted.incrementAndGet();
+ totalFailures.incrementAndGet();
} else {
- LOG.warn("Already have running service for container: " + container.getId().getId());
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
}
+ } else {
+ // nothing to do
+ // container completed successfully
+ totalCompleted.incrementAndGet();
+ LOG.info("Container id = " + status.getContainerId() + " completed successfully");
}
}
+ }
- private int checkContainerStatuses(AMResponse resp) {
- for (ContainerStatus status : resp.getCompletedContainersStatuses()) {
- amStatus.put(status.getContainerId(), status);
- }
-
- int completeFromStatus = 0;
- int completeFromService = 0;
- Set<ContainerId> failed = Sets.newHashSet();
- for (ContainerId containerId : services.keySet()) {
- ContainerService service = services.get(containerId);
- if (service != null) {
- if (amStatus.containsKey(containerId)) {
- int exitStatus = amStatus.get(containerId).getExitStatus();
- if (exitStatus == 0) {
- LOG.debug(containerId + " completed cleanly");
- completeFromStatus++;
- } else {
- LOG.info(containerId + " failed with exit code = " + exitStatus);
- failed.add(containerId);
- }
- service.markCompleted();
- } else if (service.state() == State.TERMINATED) {
- LOG.info("Marking " + containerId + " as completed");
- completeFromService++;
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Allocating " + allocatedContainers.size() + " container(s)");
+ Set<Container> assigned = Sets.newHashSet();
+ for (ContainerTracker tracker : trackers) {
+ if (tracker.needsContainers()) {
+ for (Container allocated : allocatedContainers) {
+ if (!assigned.contains(allocated) && tracker.matches(allocated)) {
+ tracker.launchContainer(allocated);
+ assigned.add(allocated);
}
}
}
-
- if (!failed.isEmpty()) {
- totalFailures.addAndGet(failed.size());
- requested -= failed.size();
- for (ContainerId failedId : failed) {
- // Placeholder value so we don't attempt to duplicate a new job in
- // this container as part of a response from the AM.
- services.put(failedId, null);
- }
- }
-
- LOG.info(String.format("Completed %d from status check and %d from service check",
- completeFromStatus, completeFromService));
- return completeFromStatus + completeFromService;
}
-
- public void stopServices() {
- this.stopping = true;
- for (ContainerService service : services.values()) {
- if (service != null) {
- service.stop();
- }
- }
+ if (assigned.size() < allocatedContainers.size()) {
+ LOG.error(String.format("Not all containers were allocated (%d out of %d)", assigned.size(),
+ allocatedContainers.size()));
+ stop();
}
}
-
- private class ContainerService extends AbstractScheduledService {
-
- private final Container container;
- private final ContainerLaunchParameters params;
-
- private ContainerManager containerManager;
- private ContainerState state;
-
- public ContainerService(Container container, ContainerLaunchParameters params) {
- this.container = container;
- this.params = params;
+
+ @Override
+ public void onShutdownRequest() {
+ stop();
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> nodeReports) {
+ //TODO
+ }
+
+ @Override
+ public float getProgress() {
+ int num = 0, den = 0;
+ for (ContainerTracker tracker : trackers) {
+ num += tracker.completed.get();
+ den += tracker.parameters.getNumInstances();
}
-
- public void markCompleted() {
- this.state = ContainerState.COMPLETE;
- stop();
+ if (den == 0) {
+ return 0.0f;
+ }
+ return ((float) num) / den;
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ this.throwable = throwable;
+ stop();
+ }
+
+ private class ContainerTracker implements NMClientAsync.CallbackHandler {
+ private final ContainerLaunchParameters parameters;
+ private final ConcurrentMap<ContainerId, Container> containers = Maps.newConcurrentMap();
+
+ private AtomicInteger needed = new AtomicInteger();
+ private AtomicInteger started = new AtomicInteger();
+ private AtomicInteger completed = new AtomicInteger();
+ private AtomicInteger failed = new AtomicInteger();
+ private NMClientAsync nodeManager;
+ private Resource resource;
+ private Priority priority;
+ private ContainerLaunchContext ctxt;
+
+ public ContainerTracker(ContainerLaunchParameters parameters) {
+ this.parameters = parameters;
+ }
+
+ public void init(ContainerLaunchContextFactory factory) {
+ this.nodeManager = NMClientAsync.createNMClientAsync(this);
+ nodeManager.init(conf);
+ nodeManager.start();
+
+ this.ctxt = factory.create(parameters);
+ this.resource = factory.createResource(parameters);
+ this.priority = factory.createPriority(parameters.getPriority());
+ AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
+ resource,
+ null, // nodes
+ null, // racks
+ priority);
+ int numInstances = parameters.getNumInstances();
+ for (int j = 0; j < numInstances; j++) {
+ resourceManager.addContainerRequest(containerRequest);
+ }
+ needed.set(numInstances);
+ totalRequested.addAndGet(numInstances);
}
@Override
- protected void startUp() throws Exception {
- ContainerLaunchContext ctxt = containerLaunchContextFactory.create(params);
- ctxt.setContainerId(container.getId());
- ctxt.setResource(container.getResource());
-
- this.containerManager = containerManagerFactory.connect(container);
- if (containerManager == null) {
- LOG.error("Could not connect to manager for : " + container.getId());
- stop();
+ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+ Container container = containers.get(containerId);
+ if (container != null) {
+ LOG.info("Starting container id = " + containerId);
+ started.incrementAndGet();
+ nodeManager.getContainerStatusAsync(containerId, container.getNodeId());
}
-
- StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
- startReq.setContainerLaunchContext(ctxt);
- LOG.info("Starting container: " + container.getId());
- try {
- containerManager.startContainer(startReq);
- } catch (YarnRemoteException e) {
- LOG.error("Exception starting " + container.getId(), e);
- stop();
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received status for container: " + containerId + " = " + containerStatus);
}
}
@Override
- protected Scheduler scheduler() {
- return Scheduler.newFixedRateSchedule(10, 10, TimeUnit.SECONDS);
+ public void onContainerStopped(ContainerId containerId) {
+ LOG.info("Stopping container id = " + containerId);
+ containers.remove(containerId);
+ completed.incrementAndGet();
}
@Override
- protected void runOneIteration() throws Exception {
- GetContainerStatusRequest req = Records.newRecord(GetContainerStatusRequest.class);
- req.setContainerId(container.getId());
- try {
- GetContainerStatusResponse resp = containerManager.getContainerStatus(req);
- this.state = resp.getStatus().getState();
- } catch (YarnRemoteException e) {
- LOG.info("Exception checking status of " + container.getId() + ", stopping");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Exception details for " + container.getId(), e);
- }
- this.state = ContainerState.COMPLETE;
- stop();
- return;
- }
+ public void onStartContainerError(ContainerId containerId, Throwable throwable) {
+ LOG.warn("Start container error for container id = " + containerId, throwable);
+ containers.remove(containerId);
+ completed.incrementAndGet();
+ failed.incrementAndGet();
+ }
- if (state != null) {
- LOG.info("Current status for " + container.getId() + ": " + state);
- }
- if (state != null && state == ContainerState.COMPLETE) {
- stop();
- }
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId, Throwable throwable) {
+ LOG.error("Could not get status for container: " + containerId, throwable);
}
@Override
- protected void shutDown() throws Exception {
- if (state != null && state != ContainerState.COMPLETE) {
- // We need to explicitly release the container.
- LOG.info("Stopping " + container.getId());
- StopContainerRequest req = Records.newRecord(StopContainerRequest.class);
- req.setContainerId(container.getId());
- try {
- containerManager.stopContainer(req);
- } catch (YarnRemoteException e) {
- LOG.warn("Exception thrown stopping container: " + container, e);
- }
+ public void onStopContainerError(ContainerId containerId, Throwable throwable) {
+ LOG.error("Failed to stop container: " + containerId, throwable);
+ completed.incrementAndGet();
+ }
+
+ public boolean needsContainers() {
+ return needed.get() > 0;
+ }
+
+ public boolean matches(Container c) {
+ return true; // TODO
+ }
+
+ public void launchContainer(Container c) {
+ LOG.info("Launching container id = " + c.getId() + " on node = " + c.getNodeId());
+ needed.decrementAndGet();
+ containers.put(c.getId(), c);
+ nodeManager.startContainerAsync(c, ctxt);
+ }
+
+ public boolean hasRunningContainers() {
+ return !containers.isEmpty();
+ }
+
+ public void kill() {
+ for (Container c : containers.values()) {
+ nodeManager.stopContainerAsync(c.getId(), c.getNodeId());
}
}
+
+ public boolean hasMoreContainers() {
+ return needsContainers() || hasRunningContainers();
+ }
}
}
View
62 ...ain/java/com/cloudera/kitten/appmaster/service/ContainerManagerConnectionFactoryImpl.java
@@ -1,62 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.kitten.appmaster.service;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-
-import com.cloudera.kitten.appmaster.ContainerManagerConnectionFactory;
-import com.google.common.collect.Maps;
-
-public class ContainerManagerConnectionFactoryImpl implements
- ContainerManagerConnectionFactory {
-
- private static final Log LOG = LogFactory.getLog(ContainerManagerConnectionFactoryImpl.class);
-
- private final Configuration conf;
- private final YarnRPC rpc;
- private final Map<String, ContainerManager> containerManagers;
-
- public ContainerManagerConnectionFactoryImpl(Configuration conf) {
- this.conf = conf;
- this.rpc = YarnRPC.create(conf);
- this.containerManagers = Maps.newHashMap();
- }
-
- @Override
- public synchronized ContainerManager connect(Container container) {
- NodeId nodeId = container.getNodeId();
- String containerIpPort = String.format("%s:%d", nodeId.getHost(), nodeId.getPort());
- if (!containerManagers.containsKey(containerIpPort)) {
- LOG.info("Connecting to ContainerManager at: " + containerIpPort);
- InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
- ContainerManager cm = (ContainerManager) rpc.getProxy(ContainerManager.class,
- addr, conf);
- containerManagers.put(containerIpPort, cm);
- return cm;
- }
- return containerManagers.get(containerIpPort);
- }
-
-}
View
51 ...src/main/java/com/cloudera/kitten/appmaster/service/ResourceManagerConnectionFactory.java
@@ -1,51 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.kitten.appmaster.service;
-
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-
-import com.cloudera.kitten.MasterConnectionFactory;
-
-public class ResourceManagerConnectionFactory implements
- MasterConnectionFactory<AMRMProtocol> {
-
- private static final Log LOG = LogFactory.getLog(ResourceManagerConnectionFactory.class);
-
- private final Configuration conf;
- private final YarnRPC rpc;
-
- public ResourceManagerConnectionFactory(Configuration conf) {
- this.conf = conf;
- this.rpc = YarnRPC.create(conf);
- }
-
- @Override
- public AMRMProtocol connect() {
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
- }
-}
View
2 java/master/src/test/java/com/cloudera/kitten/appmaster/util/HDFSFileFinderTest.java
@@ -26,7 +26,7 @@
protected static Configuration conf = new Configuration();
protected static int numDataNodes = 5;
protected static int replicationFactor = 3;
- protected static long blockSize = 8; // should be power of 2
+ protected static long blockSize = (long) Math.pow(2, 20); // should be power of 2
@BeforeClass
public static void setup() throws InterruptedException, IOException {
View
10 java/master/src/test/resources/log4j.properties
@@ -0,0 +1,10 @@
+# ***** Set root logger level to WARN and its only appender to A.
+log4j.rootLogger=warn, A
+log4j.logger.com.cloudera.kitten=info, A
+log4j.additivity.com.cloudera.kitten=false
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
View
2 java/pom.xml
@@ -5,7 +5,7 @@
<parent>
<artifactId>kitten-project</artifactId>
<groupId>com.cloudera.kitten</groupId>
- <version>0.1.0</version>
+ <version>0.2.0</version>
<relativePath>../</relativePath>
</parent>
View
11 pom.xml
@@ -4,7 +4,7 @@
<groupId>com.cloudera.kitten</groupId>
<artifactId>kitten-project</artifactId>
- <version>0.1.0</version>
+ <version>0.2.0</version>
<packaging>pom</packaging>
<name>Kitten Project</name>
@@ -19,7 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- Versions for dependencies -->
- <hadoop.version>2.0.0-cdh4.1.3</hadoop.version>
+ <hadoop.version>2.2.0</hadoop.version>
<guava.version>11.0.2</guava.version>
<luaj.version>2.0.2</luaj.version>
<junit.version>4.8.2</junit.version>
@@ -28,11 +28,4 @@
<compiler.plugin.version>2.3.2</compiler.plugin.version>
<assembly.plugin.version>2.2.1</assembly.plugin.version>
</properties>
-
- <distributionManagement>
- <repository>
- <id>cloudera-local</id>
- <url>https://repository.cloudera.com/artifactory/libs-release-local</url>
- </repository>
- </distributionManagement>
</project>

0 comments on commit 11527be

Please sign in to comment.
Something went wrong with that request. Please try again.