Permalink
Browse files

MAPREDUCE-4738. fix and re-enable disabled unit tests in the mr-app2 …

…module. (sseth)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-3902@1400232 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
sidseth committed Oct 19, 2012
1 parent de0fe8a commit c524013ab66ed43ba3f41305e7d9081ea740104d
Showing with 744 additions and 365 deletions.
  1. +2 −0 hadoop-mapreduce-project/CHANGES.txt.MR-3902
  2. +149 −45 ...t/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
  3. +0 −1 ...-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java
  4. +2 −2 ...preduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java
  5. +2 −2 ...app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java
  6. +1 −0 ...app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventKillRequest.java
  7. +21 −4 ...nt-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptScheduleEvent.java
  8. +8 −4 ...educe-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
  9. +1 −2 ...op-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
  10. +17 −0 ...-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
  11. +22 −18 ...apreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
  12. +9 −7 ...reduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
  13. +2 −1 ...reduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
  14. +4 −1 ...lient-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
  15. +1 −1 ...apreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
  16. +6 −5 ...-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
  17. +89 −36 ...adoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
  18. +3 −4 .../hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
  19. +146 −229 ...educe-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
  20. +10 −3 ...ce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
  21. +141 −0 ...t-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java
  22. +18 −0 ...-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebApp.java
  23. +18 −0 ...educe-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServices.java
  24. +18 −0 ...ient-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesAttempts.java
  25. +18 −0 ...lient-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobConf.java
  26. +18 −0 ...e-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobs.java
  27. +18 −0 ...-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesTasks.java
@@ -30,3 +30,5 @@ Branch MR-3902
MAPREDUCE-4727. Handle successful NM stop requests. (sseth)
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. (sseth)
+
+ MAPREDUCE-4738. fix and re-enable disabled unit tests in the mr-app2 module. (sseth)
@@ -74,14 +74,17 @@
import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app2.recover.RecoveryService;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
@@ -111,6 +114,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -178,7 +182,8 @@
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
- private JobHistoryEventHandler2 jobHistoryEventHandler;
+ private EventHandler<JobHistoryEvent> jobHistoryEventHandler;
+ private AbstractService stagingDirCleanerService;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private ContainerRequestor containerRequestor;
@@ -292,10 +297,9 @@ public void init(final Configuration conf) {
addIfService(clientService);
//service to log job history events
- EventHandler<JobHistoryEvent> historyService =
- createJobHistoryHandler(context);
+ jobHistoryEventHandler = createJobHistoryHandler(context);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
- historyService);
+ jobHistoryEventHandler);
this.jobEventDispatcher = new JobEventDispatcher();
@@ -324,18 +328,29 @@ public void init(final Configuration conf) {
addIfService(containerLauncher);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerRequestor = createContainerRequestor(clientService, context);
+ addIfService(containerRequestor);
+ dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+ amScheduler = createAMScheduler(containerRequestor, context);
+ addIfService(amScheduler);
+ dispatcher.register(AMSchedulerEventType.class, amScheduler);
+
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
- addService(createStagingDirCleaningService());
+ this.stagingDirCleanerService = createStagingDirCleaningService();
+ addService(stagingDirCleanerService);
+
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
// Note: Even though JobHistoryEventHandler is started last, if any
// component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler
- addIfService(historyService);
+ addIfService(this.jobHistoryEventHandler);
super.init(conf);
} // end of init()
@@ -580,44 +595,33 @@ public void handle(JobFinishEvent event) {
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext, getCommitter());
}
-
+
/**
* Create the RMContainerRequestor.
- * @param clientService the MR Client Service.
- * @param appContext the application context.
+ *
+ * @param clientService
+ * the MR Client Service.
+ * @param appContext
+ * the application context.
* @return an instance of the RMContainerRequestor.
*/
protected ContainerRequestor createContainerRequestor(
ClientService clientService, AppContext appContext) {
- ContainerRequestor containerRequestor;
- if (job.isUber()) {
- containerRequestor = new LocalContainerRequestor(clientService,
- appContext);
- } else {
- containerRequestor = new RMContainerRequestor(clientService, appContext);
- }
- return containerRequestor;
+ return new ContainerRequestorRouter(clientService, appContext);
}
/**
* Create the AM Scheduler.
*
- * @param requestor The Container Requestor.
- * @param appContext the application context.
+ * @param requestor
+ * The Container Requestor.
+ * @param appContext
+ * the application context.
* @return an instance of the AMScheduler.
*/
protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
AppContext appContext) {
- if (job.isUber()) {
- return new LocalContainerAllocator(appContext, jobId, nmHost, nmPort,
- nmHttpPort, containerID, (TaskUmbilicalProtocol) taskAttemptListener,
- taskAttemptListener, (RMCommunicator)containerRequestor);
- } else {
- // TODO XXX: This is terrible. Assuming RMContainerRequestor is sent in
- // when non-uberized. Fix RMContainerRequestor to be a proper interface, etc.
- return new RMContainerAllocator((RMContainerRequestor) requestor,
- appContext);
- }
+ return new AMSchedulerRouter(requestor, appContext);
}
/** Create and initialize (but don't start) a single job. */
@@ -681,9 +685,7 @@ protected void addIfService(Object object) {
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
- this.jobHistoryEventHandler = new JobHistoryEventHandler2(context,
- getStartCount());
- return this.jobHistoryEventHandler;
+ return new JobHistoryEventHandler2(context, getStartCount());
}
protected AbstractService createStagingDirCleaningService() {
@@ -810,7 +812,117 @@ public ContainerLauncher getContainerLauncher() {
public TaskAttemptListener getTaskAttemptListener() {
return taskAttemptListener;
}
-
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class ContainerRequestorRouter extends AbstractService
+ implements ContainerRequestor {
+ private final ClientService clientService;
+ private final AppContext context;
+ private ContainerRequestor real;
+
+ public ContainerRequestorRouter(ClientService clientService,
+ AppContext appContext) {
+ super(ContainerRequestorRouter.class.getName());
+ this.clientService = clientService;
+ this.context = appContext;
+ }
+
+ @Override
+ public void start() {
+ if (job.isUber()) {
+ real = new LocalContainerRequestor(clientService,
+ context);
+ } else {
+ real = new RMContainerRequestor(clientService, context);
+ }
+ ((Service)this.real).init(getConfig());
+ ((Service)this.real).start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (real != null) {
+ ((Service) real).stop();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(RMCommunicatorEvent event) {
+ real.handle(event);
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return real.getAvailableResources();
+ }
+
+ @Override
+ public void addContainerReq(ContainerRequest req) {
+ real.addContainerReq(req);
+ }
+
+ @Override
+ public void decContainerReq(ContainerRequest req) {
+ real.decContainerReq(req);
+ }
+
+ public void setSignalled(boolean isSignalled) {
+ ((RMCommunicator) real).setSignalled(isSignalled);
+ }
+ }
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class AMSchedulerRouter extends AbstractService
+ implements ContainerAllocator {
+ private final ContainerRequestor requestor;
+ private final AppContext context;
+ private ContainerAllocator containerAllocator;
+
+ AMSchedulerRouter(ContainerRequestor requestor,
+ AppContext context) {
+ super(AMSchedulerRouter.class.getName());
+ this.requestor = requestor;
+ this.context = context;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (job.isUber()) {
+ this.containerAllocator = new LocalContainerAllocator(this.context,
+ jobId, nmHost, nmPort, nmHttpPort, containerID,
+ (TaskUmbilicalProtocol) taskAttemptListener, taskAttemptListener,
+ (RMCommunicator) this.requestor);
+ } else {
+ this.containerAllocator = new RMContainerAllocator(this.requestor,
+ this.context);
+ }
+ ((Service)this.containerAllocator).init(getConfig());
+ ((Service)this.containerAllocator).start();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (containerAllocator != null) {
+ ((Service) this.containerAllocator).stop();
+ super.stop();
+ }
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ this.containerAllocator.handle(event);
+ }
+ }
+
public TaskHeartbeatHandler getTaskHeartbeatHandler() {
return taskHeartbeatHandler;
}
@@ -974,16 +1086,6 @@ public void start() {
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
- // service to allocate containers from RM (if non-uber) or to fake it (uber)
- containerRequestor = createContainerRequestor(clientService, context);
- addIfService(containerRequestor);
- ((Service)containerRequestor).init(getConfig());
- dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
-
- amScheduler = createAMScheduler(containerRequestor, context);
- addIfService(amScheduler);
- ((Service)amScheduler).init(getConfig());
- dispatcher.register(AMSchedulerEventType.class, amScheduler);
//start all the components
super.start();
@@ -1155,10 +1257,12 @@ public void run() {
// that they don't take too long in shutting down
// Signal the RMCommunicator.
- ((RMCommunicator)appMaster.containerRequestor).setSignalled(true);
+ ((ContainerRequestorRouter) appMaster.containerRequestor)
+ .setSignalled(true);
if(appMaster.jobHistoryEventHandler != null) {
- appMaster.jobHistoryEventHandler.setSignalled(true);
+ ((JobHistoryEventHandler2) appMaster.jobHistoryEventHandler)
+ .setSignalled(true);
}
appMaster.stop();
}
@@ -20,7 +20,6 @@
import java.net.InetSocketAddress;
-import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -68,6 +68,7 @@
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
@@ -346,8 +347,7 @@ public FailTaskAttemptResponse failTaskAttempt(
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
- new TaskAttemptEvent(taskAttemptId,
- TaskAttemptEventType.TA_FAIL_REQUEST));
+ new TaskAttemptEventFailRequest(taskAttemptId, message));
FailTaskAttemptResponse response = recordFactory.
newRecordInstance(FailTaskAttemptResponse.class);
return response;
@@ -29,8 +29,8 @@ public TaskAttemptEventFailRequest(TaskAttemptId id, String message) {
this.message = message;
}
+ // TODO: This is not used at the moment.
public String getMessage() {
return this.message;
}
-
-}
+}
@@ -11,6 +11,7 @@ public TaskAttemptEventKillRequest(TaskAttemptId id, String message) {
this.message = message;
}
+ // TODO: This is not used at the moment.
public String getMessage() {
return this.message;
}
@@ -1,18 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package org.apache.hadoop.mapreduce.v2.app2.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class TaskAttemptScheduleEvent extends TaskAttemptEvent {
private final boolean rescheduled;
-
- public TaskAttemptScheduleEvent(TaskAttemptId id, TaskAttemptEventType type, boolean rescheduled) {
- super(id, type);
+
+ public TaskAttemptScheduleEvent(TaskAttemptId id, boolean rescheduled) {
+ super(id, TaskAttemptEventType.TA_SCHEDULE);
this.rescheduled = rescheduled;
}
public boolean isRescheduled() {
return this.rescheduled;
}
-
+
}
Oops, something went wrong.

0 comments on commit c524013

Please sign in to comment.