Skip to content

Commit

Permalink
MAPREDUCE-6284. Add Task Attempt State API to MapReduce Application M…
Browse files Browse the repository at this point in the history
…aster REST API. Contributed by Ryu Kobayashi.
  • Loading branch information
oza committed May 8, 2015
1 parent c5afe44 commit d18f10a
Show file tree
Hide file tree
Showing 10 changed files with 733 additions and 23 deletions.
3 changes: 3 additions & 0 deletions hadoop-mapreduce-project/CHANGES.txt
Expand Up @@ -261,6 +261,9 @@ Release 2.8.0 - UNRELEASED


NEW FEATURES NEW FEATURES


MAPREDUCE-6284. Add Task Attempt State API to MapReduce Application
Master REST API. (Ryu Kobayashi via ozawa)

IMPROVEMENTS IMPROVEMENTS


MAPREDUCE-6291. Correct mapred queue usage command. MAPREDUCE-6291. Correct mapred queue usage command.
Expand Down
Expand Up @@ -84,6 +84,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
Expand Down Expand Up @@ -424,6 +425,11 @@ public CancelDelegationTokenResponse cancelDelegationToken(
} }
} }


public KillTaskAttemptResponse forceKillTaskAttempt(
KillTaskAttemptRequest request) throws YarnException, IOException {
return protocolHandler.killTaskAttempt(request);
}

public WebApp getWebApp() { public WebApp getWebApp() {
return webApp; return webApp;
} }
Expand Down
Expand Up @@ -19,26 +19,37 @@
package org.apache.hadoop.mapreduce.v2.app.webapp; package org.apache.hadoop.mapreduce.v2.app.webapp;


import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;


import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;


import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskAttemptRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
Expand All @@ -50,6 +61,7 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptState;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
Expand All @@ -59,23 +71,27 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;


import com.google.common.base.Preconditions;
import com.google.inject.Inject; import com.google.inject.Inject;


@Path("/ws/v1/mapreduce") @Path("/ws/v1/mapreduce")
public class AMWebServices { public class AMWebServices {
private final AppContext appCtx; private final AppContext appCtx;
private final App app; private final App app;
private final MRClientService service;


private @Context HttpServletResponse response; private @Context HttpServletResponse response;


@Inject @Inject
public AMWebServices(final App app, final AppContext context) { public AMWebServices(final App app, final AppContext context) {
this.appCtx = context; this.appCtx = context;
this.app = app; this.app = app;
this.service = new MRClientService(context);
} }


Boolean hasAccess(Job job, HttpServletRequest request) { Boolean hasAccess(Job job, HttpServletRequest request) {
Expand Down Expand Up @@ -395,6 +411,59 @@ public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
} }
} }


@GET
@Path("/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/state")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobTaskAttemptState getJobTaskAttemptState(
@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid,
@PathParam("attemptid") String attId)
throws IOException, InterruptedException {
init();
Job job = getJobFromJobIdString(jid, appCtx);
checkAccess(job, hsr);
Task task = getTaskFromTaskIdString(tid, job);
TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
return new JobTaskAttemptState(ta.getState().toString());
}

@PUT
@Path("/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/state")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateJobTaskAttemptState(JobTaskAttemptState targetState,
@Context HttpServletRequest hsr, @PathParam("jobid") String jid,
@PathParam("taskid") String tid, @PathParam("attemptid") String attId)
throws IOException, InterruptedException {
init();
Job job = getJobFromJobIdString(jid, appCtx);
checkAccess(job, hsr);

String remoteUser = hsr.getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}

Task task = getTaskFromTaskIdString(tid, job);
TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
if (!ta.getState().toString().equals(targetState.getState())) {
// user is attempting to change state. right we only
// allow users to kill the job task attempt
if (targetState.getState().equals(TaskAttemptState.KILLED.toString())) {
return killJobTaskAttempt(ta, callerUGI, hsr);
}
throw new BadRequestException("Only '"
+ TaskAttemptState.KILLED.toString()
+ "' is allowed as a target state.");
}

JobTaskAttemptState ret = new JobTaskAttemptState();
ret.setState(ta.getState().toString());

return Response.status(Status.OK).entity(ret).build();
}

@GET @GET
@Path("/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/counters") @Path("/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
Expand All @@ -409,4 +478,47 @@ public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task); TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
return new JobTaskAttemptCounterInfo(ta); return new JobTaskAttemptCounterInfo(ta);
} }

protected Response killJobTaskAttempt(TaskAttempt ta,
UserGroupInformation callerUGI, HttpServletRequest hsr)
throws IOException, InterruptedException {
Preconditions.checkNotNull(ta, "ta cannot be null");

String userName = callerUGI.getUserName();
final TaskAttemptId attemptId = ta.getID();
try {
callerUGI
.doAs(new PrivilegedExceptionAction<KillTaskAttemptResponse>() {
@Override
public KillTaskAttemptResponse run()
throws IOException, YarnException {
KillTaskAttemptRequest req = new KillTaskAttemptRequestPBImpl();
req.setTaskAttemptId(attemptId);
return service.forceKillTaskAttempt(req);
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String taId = attemptId.toString();
String msg =
"Unauthorized attempt to kill task attempt " + taId
+ " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else {
throw ue;
}
} else {
throw ue;
}
}

JobTaskAttemptState ret = new JobTaskAttemptState();
ret.setState(TaskAttemptState.KILLED.toString());

return Response.status(Status.OK).entity(ret).build();
}
} }
Expand Up @@ -18,9 +18,8 @@


package org.apache.hadoop.mapreduce.v2.app.webapp; package org.apache.hadoop.mapreduce.v2.app.webapp;


import java.util.Set; import java.util.HashMap;
import java.util.HashSet; import java.util.Map;
import java.util.Arrays;


import com.sun.jersey.api.json.JSONConfiguration; import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext; import com.sun.jersey.api.json.JSONJAXBContext;
Expand All @@ -39,6 +38,7 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.CounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.CounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptState;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
Expand All @@ -55,8 +55,7 @@
@Provider @Provider
public class JAXBContextResolver implements ContextResolver<JAXBContext> { public class JAXBContextResolver implements ContextResolver<JAXBContext> {


private JAXBContext context; private final Map<Class, JAXBContext> typesContextMap;
private final Set<Class> types;


// you have to specify all the dao classes here // you have to specify all the dao classes here
private final Class[] cTypes = {AMAttemptInfo.class, AMAttemptsInfo.class, private final Class[] cTypes = {AMAttemptInfo.class, AMAttemptsInfo.class,
Expand All @@ -67,14 +66,30 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
TaskAttemptInfo.class, TaskInfo.class, TasksInfo.class, TaskAttemptInfo.class, TaskInfo.class, TasksInfo.class,
TaskAttemptsInfo.class, ConfEntryInfo.class, RemoteExceptionData.class}; TaskAttemptsInfo.class, ConfEntryInfo.class, RemoteExceptionData.class};


// these dao classes need root unwrapping
private final Class[] rootUnwrappedTypes = {JobTaskAttemptState.class};

public JAXBContextResolver() throws Exception { public JAXBContextResolver() throws Exception {
this.types = new HashSet<Class>(Arrays.asList(cTypes)); JAXBContext context;
this.context = new JSONJAXBContext(JSONConfiguration.natural(). JAXBContext unWrappedRootContext;
rootUnwrapping(false).build(), cTypes);
this.typesContextMap = new HashMap<Class, JAXBContext>();
context =
new JSONJAXBContext(JSONConfiguration.natural().rootUnwrapping(false)
.build(), cTypes);
unWrappedRootContext =
new JSONJAXBContext(JSONConfiguration.natural().rootUnwrapping(true)
.build(), rootUnwrappedTypes);
for (Class type : cTypes) {
typesContextMap.put(type, context);
}
for (Class type : rootUnwrappedTypes) {
typesContextMap.put(type, unWrappedRootContext);
}
} }


@Override @Override
public JAXBContext getContext(Class<?> objectType) { public JAXBContext getContext(Class<?> objectType) {
return (types.contains(objectType)) ? context : null; return typesContextMap.get(objectType);
} }
} }
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.v2.app.webapp.dao;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;

/**
* Job task attempt state.
*/
@XmlRootElement(name = "jobTaskAttemptState")
@XmlAccessorType(XmlAccessType.FIELD)
public class JobTaskAttemptState {

private String state;

public JobTaskAttemptState() {
}

public JobTaskAttemptState(String state) {
this.state = state;
}

public void setState(String state) {
this.state = state;
}

public String getState() {
return this.state;
}
}
Expand Up @@ -95,7 +95,7 @@ public Map<JobId, Job> getAllJobs() {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {
return null; return new MockEventHandler();
} }


@Override @Override
Expand Down
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.v2.app;

import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.yarn.event.EventHandler;

public class MockEventHandler implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
}
}

0 comments on commit d18f10a

Please sign in to comment.