Skip to content

Commit

Permalink
YARN-8659. RMWebServices returns only RUNNING apps when filtered with…
Browse files Browse the repository at this point in the history
… queue. (Contributed by Szilard Nemeth)
  • Loading branch information
haibchen committed Oct 8, 2018
1 parent d7c7f68 commit 7c13872
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 97 deletions.
Expand Up @@ -831,46 +831,7 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
ApplicationsRequestScope scope = request.getScope();

final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
Iterator<RMApp> appsIter;
// If the query filters by queues, we can avoid considering apps outside
// of those queues by asking the scheduler for the apps in those queues.
if (queues != null && !queues.isEmpty()) {
// Construct an iterator over apps in given queues
// Collect list of lists to avoid copying all apps
final List<List<ApplicationAttemptId>> queueAppLists =
new ArrayList<List<ApplicationAttemptId>>();
for (String queue : queues) {
List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue);
if (appsInQueue != null && !appsInQueue.isEmpty()) {
queueAppLists.add(appsInQueue);
}
}
appsIter = new Iterator<RMApp>() {
Iterator<List<ApplicationAttemptId>> appListIter = queueAppLists.iterator();
Iterator<ApplicationAttemptId> schedAppsIter;

@Override
public boolean hasNext() {
// Because queueAppLists has no empty lists, hasNext is whether the
// current list hasNext or whether there are any remaining lists
return (schedAppsIter != null && schedAppsIter.hasNext())
|| appListIter.hasNext();
}
@Override
public RMApp next() {
if (schedAppsIter == null || !schedAppsIter.hasNext()) {
schedAppsIter = appListIter.next().iterator();
}
return apps.get(schedAppsIter.next().getApplicationId());
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
};
} else {
appsIter = apps.values().iterator();
}
Iterator<RMApp> appsIter = apps.values().iterator();

List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
while (appsIter.hasNext() && reports.size() < limit) {
Expand All @@ -882,6 +843,12 @@ public void remove() {
continue;
}

if (queues != null && !queues.isEmpty()) {
if (!queues.contains(application.getQueue())) {
continue;
}
}

if (applicationTypes != null && !applicationTypes.isEmpty()) {
String appTypeToMatch =
StringUtils.toLowerCase(application.getApplicationType());
Expand Down
Expand Up @@ -18,19 +18,16 @@

package org.apache.hadoop.yarn.server.resourcemanager.webapp;

import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;

import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -45,9 +42,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
Expand All @@ -66,15 +60,18 @@
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;

import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Set;

import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestRMWebServicesApps extends JerseyTestBase {

Expand Down Expand Up @@ -104,6 +101,16 @@ protected void configureServlets() {
Guice.createInjector(new WebServletModule()));
}

private Set<String> getApplicationIds(JSONArray array) throws JSONException {
Set<String> ids = Sets.newHashSet();
for (int i = 0; i < array.length(); i++) {
JSONObject app = array.getJSONObject(i);
String appId = (String) app.get("id");
ids.add(appId);
}
return ids;
}

@Before
@Override
public void setUp() throws Exception {
Expand Down Expand Up @@ -639,6 +646,113 @@ public void testAppsQueryQueue() throws JSONException, Exception {
rm.stop();
}

@Test
public void testAppsQueryQueueAndStateTwoFinishedApps() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(CONTAINER_MB);
RMApp app2 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);

finishApp(amNodeManager, app1);
finishApp(amNodeManager, app2);

WebResource r = resource();

ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps")
.queryParam("queue", "default")
.queryParam("state", YarnApplicationState.FINISHED.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject apps = json.getJSONObject("apps");
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");

Set<String> appIds = getApplicationIds(array);
assertTrue("Finished app1 should be in the result list!",
appIds.contains(app1.getApplicationId().toString()));
assertTrue("Finished app2 should be in the result list!",
appIds.contains(app2.getApplicationId().toString()));
assertEquals("incorrect number of elements", 2, array.length());

rm.stop();
}

@Test
public void testAppsQueryQueueAndStateOneFinishedApp() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp finishedApp = rm.submitApp(CONTAINER_MB);
RMApp runningApp = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);

finishApp(amNodeManager, finishedApp);

WebResource r = resource();

ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps")
.queryParam("queue", "default")
.queryParam("state", YarnApplicationState.FINISHED.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject apps = json.getJSONObject("apps");
assertEquals("incorrect number of elements", 1, apps.length());

JSONArray array = apps.getJSONArray("app");

Set<String> appIds = getApplicationIds(array);
assertFalse("Running app should not be in the result list!",
appIds.contains(runningApp.getApplicationId().toString()));
assertTrue("Finished app should be in the result list!",
appIds.contains(finishedApp.getApplicationId().toString()));
assertEquals("incorrect number of elements", 1, array.length());

rm.stop();
}

@Test
public void testAppsQueryQueueOneFinishedApp() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp finishedApp = rm.submitApp(CONTAINER_MB);
RMApp runningApp = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);

finishApp(amNodeManager, finishedApp);

WebResource r = resource();

ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps")
.queryParam("queue", "default")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject apps = json.getJSONObject("apps");
assertEquals("incorrect number of elements", 1, apps.length());

JSONArray array = apps.getJSONArray("app");

Set<String> appIds = getApplicationIds(array);
assertTrue("Running app should be in the result list!",
appIds.contains(runningApp.getApplicationId().toString()));
assertTrue("Finished app should be in the result list!",
appIds.contains(finishedApp.getApplicationId().toString()));
assertEquals("incorrect number of elements", 2, array.length());

rm.stop();
}

@Test
public void testAppsQueryLimit() throws JSONException, Exception {
rm.start();
Expand Down Expand Up @@ -766,13 +880,7 @@ public void testAppsQueryFinishBegin() throws JSONException, Exception {
Thread.sleep(1);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
finishApp(amNodeManager, app1);
rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);

Expand All @@ -791,19 +899,24 @@ public void testAppsQueryFinishBegin() throws JSONException, Exception {
rm.stop();
}

private void finishApp(MockNM amNodeManager, RMApp app) throws Exception {
MockAM am = rm
.sendAMLaunched(app.getCurrentAppAttempt().getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
}

@Test
public void testAppsQueryFinishEnd() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
finishApp(amNodeManager, app1);

rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
Expand Down Expand Up @@ -833,12 +946,7 @@ public void testAppsQueryFinishBeginEnd() throws JSONException, Exception {
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
finishApp(amNodeManager, app1);

rm.submitApp(CONTAINER_MB);
rm.submitApp(CONTAINER_MB);
Expand Down Expand Up @@ -868,12 +976,7 @@ public void testAppsQueryAppTypes() throws JSONException, Exception {
RMApp app1 = rm.submitApp(CONTAINER_MB);
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
finishApp(amNodeManager, app1);

rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "MAPREDUCE");
Expand Down Expand Up @@ -1203,13 +1306,7 @@ public void testAppStatistics() throws JSONException, Exception {
RMApp app1 = rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "MAPREDUCE");
amNodeManager.nodeHeartbeat(true);
// finish App
MockAM am = rm
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
1, ContainerState.COMPLETE);
finishApp(amNodeManager, app1);

rm.submitApp(CONTAINER_MB, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, 2, null, "MAPREDUCE");
Expand Down

0 comments on commit 7c13872

Please sign in to comment.