From 239619d8a1ccb08dae3c3262f380b87759a574f6 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Mon, 3 Apr 2017 16:49:52 -0700 Subject: [PATCH 1/2] (TWILL-230) Get resource report based on the caller user - Also by default get the resource report from the tracking url, then fall back to the original tracking url. --- .../twill/yarn/ResourceReportClient.java | 26 +++++---- .../twill/yarn/YarnTwillController.java | 57 ++++++++++++++----- .../twill/yarn/ResourceReportTestRun.java | 47 +++++++++++++-- 3 files changed, 100 insertions(+), 30 deletions(-) diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java index 3d5bcf36..4bebc9f7 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java @@ -28,6 +28,7 @@ import java.io.InputStreamReader; import java.io.Reader; import java.net.URL; +import java.util.List; /** * Package private class to get {@link ResourceReport} from the application master. @@ -36,10 +37,10 @@ final class ResourceReportClient { private static final Logger LOG = LoggerFactory.getLogger(ResourceReportClient.class); private final ResourceReportAdapter reportAdapter; - private final URL resourceUrl; + private final List resourceUrls; - ResourceReportClient(URL resourceUrl) { - this.resourceUrl = resourceUrl; + ResourceReportClient(List resourceUrls) { + this.resourceUrls = resourceUrls; this.reportAdapter = ResourceReportAdapter.create(); } @@ -48,16 +49,19 @@ final class ResourceReportClient { * @return A {@link ResourceReport} or {@code null} if failed to fetch the report. */ public ResourceReport get() { - try { - Reader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), Charsets.UTF_8)); + for (URL url : resourceUrls) { try { - return reportAdapter.fromJson(reader); - } finally { - Closeables.closeQuietly(reader); + Reader reader = new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8)); + try { + LOG.debug("Report returned by {}", url); + return reportAdapter.fromJson(reader); + } finally { + Closeables.closeQuietly(reader); + } + } catch (Exception e) { + LOG.debug("Exception raised when getting resource report from {}.", url, e); } - } catch (Exception e) { - LOG.error("Exception getting resource report from {}.", resourceUrl, e); - return null; } + return null; } } diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index 1945731a..669d7390 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -43,10 +43,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; +import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -66,7 +68,6 @@ final class YarnTwillController extends AbstractTwillController implements Twill private final TimeUnit startTimeoutUnit; private volatile ApplicationMasterLiveNodeData amLiveNodeData; private ProcessController processController; - private ResourceReportClient resourcesClient; // Thread for polling yarn for application status if application got ZK session expire. // Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback. @@ -101,7 +102,6 @@ public ProcessController call() throws Exception { this.startTimeoutUnit = startTimeoutUnit; } - /** * Sends a message to application to notify the secure store has be updated. */ @@ -140,14 +140,6 @@ protected void doStartUp() { if (state != YarnApplicationState.RUNNING) { LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId); forceShutDown(); - } else { - try { - URL resourceUrl = URI.create(String.format("http://%s:%d", report.getHost(), report.getRpcPort())) - .resolve(TrackerService.PATH).toURL(); - resourcesClient = new ResourceReportClient(resourceUrl); - } catch (IOException e) { - resourcesClient = null; - } } } catch (Exception e) { throw Throwables.propagate(e); @@ -322,7 +314,46 @@ private boolean hasRun(YarnApplicationState state) { @Override public ResourceReport getResourceReport() { - // in case the user calls this before starting, return null + // Only has resource report if the app is running. + if (state() != State.RUNNING) { + return null; + } + ResourceReportClient resourcesClient = getResourcesClient(); return (resourcesClient == null) ? null : resourcesClient.get(); } + + /** + * Returns the {@link ResourceReportClient} for fetching resource report from the AM. + * It first consults the RM for the tracking URL and get the resource report from there. + */ + @Nullable + private ResourceReportClient getResourcesClient() { + YarnApplicationReport report = processController.getReport(); + List urls = new ArrayList<>(2); + + // Try getting the report from the proxy tracking URL as well as the original tracking URL directly + // This is mainly to workaround for unit-test that the proxy tracking URL doesn't work well with local address. + for (String url : Arrays.asList(report.getTrackingUrl(), report.getOriginalTrackingUrl())) { + if (url != null && !url.equals("N/A")) { + try { + URL trackingUrl = new URL(url); + String path = trackingUrl.getPath(); + if (path.endsWith("/")) { + path = path.substring(0, path.length() - 1); + } + urls.add(new URL(trackingUrl.getProtocol(), trackingUrl.getHost(), + trackingUrl.getPort(), path + TrackerService.PATH)); + } catch (MalformedURLException e) { + LOG.info("Invalid tracking URL {} from YARN application report for {}:{}", url, appName, getRunId()); + } + } + } + + if (urls.isEmpty()) { + LOG.debug("No valid tracking URL for YARN application {}:{}", appName, getRunId()); + return null; + } + + return new ResourceReportClient(urls); + } } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java index 32e1fd61..a61880fe 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java @@ -18,15 +18,18 @@ package org.apache.twill.yarn; import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.io.LineReader; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.twill.api.ResourceReport; import org.apache.twill.api.ResourceSpecification; import org.apache.twill.api.TwillApplication; import org.apache.twill.api.TwillController; import org.apache.twill.api.TwillRunResources; import org.apache.twill.api.TwillRunner; +import org.apache.twill.api.TwillRunnerService; import org.apache.twill.api.TwillSpecification; import org.apache.twill.api.logging.PrinterLogHandler; import org.apache.twill.common.Threads; @@ -159,7 +162,7 @@ public void run() { Iterable echoServices = controller.discoverService("echo"); Assert.assertTrue(waitForSize(echoServices, 2, 120)); // check that we have 2 runnables. - ResourceReport report = controller.getResourceReport(); + ResourceReport report = getResourceReport(controller, 10000); Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size()); // cause a divide by 0 in one server @@ -175,7 +178,7 @@ public void run() { // takes some time for app master to find out the container completed... int count = 0; while (count < 100) { - report = controller.getResourceReport(); + report = getResourceReport(controller, 10000); // check that we have 1 runnable, not 2. if (report.getRunnableResources("BuggyServer").size() == 1) { break; @@ -216,7 +219,7 @@ public void run() { // wait for 3 echo servers to come up Iterable echoServices = controller.discoverService("echo"); Assert.assertTrue(waitForSize(echoServices, 3, 120)); - ResourceReport report = controller.getResourceReport(); + ResourceReport report = getResourceReport(controller, 10000); // make sure resources for echo1 and echo2 are there Map> usedResources = report.getResources(); Assert.assertEquals(2, usedResources.keySet().size()); @@ -226,10 +229,10 @@ public void run() { waitForSize(new Iterable() { @Override public Iterator iterator() { - return controller.getResourceReport().getServices().iterator(); + return getResourceReport(controller, 10000).getServices().iterator(); } }, 3, 120); - report = controller.getResourceReport(); + report = getResourceReport(controller, 10000); Assert.assertEquals(ImmutableSet.of("echo", "echo1", "echo2"), ImmutableSet.copyOf(report.getServices())); Collection echo1Resources = usedResources.get("echo1"); @@ -252,7 +255,7 @@ public Iterator iterator() { controller.changeInstances("echo1", 1).get(60, TimeUnit.SECONDS); echoServices = controller.discoverService("echo1"); Assert.assertTrue(waitForSize(echoServices, 1, 60)); - report = controller.getResourceReport(); + report = getResourceReport(controller, 10000); // make sure resources for echo1 and echo2 are there usedResources = report.getResources(); @@ -276,8 +279,40 @@ public Iterator iterator() { Assert.assertEquals(512, resources.getMemoryMB()); } + // Create a new TwillRunner, it should be able to get the same resource report + TwillRunnerService newRunnerService = TWILL_TESTER.createTwillRunnerService(); + newRunnerService.start(); + try { + TwillController newController = newRunnerService.lookup("ResourceApplication", controller.getRunId()); + // Get the controller of the application + int trials = 60; + while (newController == null && trials-- > 0) { + TimeUnit.SECONDS.sleep(1); + newController = newRunnerService.lookup("ResourceApplication", controller.getRunId()); + } + Assert.assertNotNull(newController); + + ResourceReport newReport = getResourceReport(newController, 10000); + Assert.assertEquals(report.getResources(), newReport.getResources()); + + } finally { + newRunnerService.stop(); + } + controller.terminate().get(120, TimeUnit.SECONDS); // Sleep a bit before exiting. TimeUnit.SECONDS.sleep(2); } + + private ResourceReport getResourceReport(TwillController controller, long timeoutMillis) { + ResourceReport report = controller.getResourceReport(); + Stopwatch stopwatch = new Stopwatch(); + while (report == null && stopwatch.elapsedMillis() < timeoutMillis) { + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + report = controller.getResourceReport(); + } + + Assert.assertNotNull(report); + return report; + } } From 6a37ad9d44dd2c1a031d24e5df6683082cc7e046 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Mon, 3 Apr 2017 18:04:24 -0700 Subject: [PATCH 2/2] Fix unit-test for Hadoop 2.0 --- .../internal/yarn/Hadoop20YarnApplicationReport.java | 4 ++-- .../java/org/apache/twill/yarn/ResourceReportClient.java | 8 +++++--- .../java/org/apache/twill/yarn/YarnTwillController.java | 3 +-- twill-yarn/src/test/resources/logback-test.xml | 1 + 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java index 6c1b764f..8d6e2df2 100644 --- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java +++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java @@ -77,12 +77,12 @@ public String getDiagnostics() { @Override public String getTrackingUrl() { - return report.getTrackingUrl(); + return "http://" + report.getTrackingUrl(); } @Override public String getOriginalTrackingUrl() { - return report.getOriginalTrackingUrl(); + return "http://" + report.getOriginalTrackingUrl(); } @Override diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java index 4bebc9f7..fb8b7e8c 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.net.URL; @@ -53,13 +54,14 @@ public ResourceReport get() { try { Reader reader = new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8)); try { - LOG.debug("Report returned by {}", url); + LOG.trace("Report returned by {}", url); return reportAdapter.fromJson(reader); } finally { Closeables.closeQuietly(reader); } - } catch (Exception e) { - LOG.debug("Exception raised when getting resource report from {}.", url, e); + } catch (IOException e) { + // Just log a trace as it's ok to not able to fetch resource report + LOG.trace("Exception raised when getting resource report from {}.", url, e); } } return null; diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index 669d7390..6ea7d8f1 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -344,13 +344,12 @@ private ResourceReportClient getResourcesClient() { urls.add(new URL(trackingUrl.getProtocol(), trackingUrl.getHost(), trackingUrl.getPort(), path + TrackerService.PATH)); } catch (MalformedURLException e) { - LOG.info("Invalid tracking URL {} from YARN application report for {}:{}", url, appName, getRunId()); + LOG.debug("Invalid tracking URL {} from YARN application report for {}:{}", url, appName, getRunId()); } } } if (urls.isEmpty()) { - LOG.debug("No valid tracking URL for YARN application {}:{}", appName, getRunId()); return null; } diff --git a/twill-yarn/src/test/resources/logback-test.xml b/twill-yarn/src/test/resources/logback-test.xml index 4bcdb42f..2b210cbf 100644 --- a/twill-yarn/src/test/resources/logback-test.xml +++ b/twill-yarn/src/test/resources/logback-test.xml @@ -24,6 +24,7 @@ limitations under the License. +