From e8fae3cf3e52d833fa57a09c254b1cee8843164e Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 21 Jul 2016 18:05:31 -0700 Subject: [PATCH] Tweak DAGClient inheritance to work for Cascading --- .../tez/dag/api/client/DAGClientImpl.java | 38 ++++++++------ .../tez/dag/api/client/DAGClientInternal.java | 11 ++-- .../dag/api/client/DAGClientTimelineImpl.java | 1 - .../dag/api/client/rpc/DAGClientRPCImpl.java | 52 +++++++++---------- 4 files changed, 49 insertions(+), 53 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index af67ee84c8..c4cfe4d571 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -242,11 +242,11 @@ private DAGStatus getDAGStatusInternal(@Nullable Set statusOption if (dagStatus.isCompleted()) { return dagStatus; } - } catch (ApplicationNotFoundException e) { - LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline" - + " - Application not found by YARN", e); } catch (TezException e) { - if (LOG.isDebugEnabled()) { + if (e.getCause() instanceof ApplicationNotFoundException) { + LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline" + + " - Application not found by YARN", e); + } else if (LOG.isDebugEnabled()) { LOG.info("DAGStatus fetch failed." + e.getMessage()); } } @@ -296,12 +296,12 @@ public VertexStatus getVertexStatus(String vertexName, Set status if (vertexCompletionStates.contains(vertexStatus.getState())) { return vertexStatus; } - } catch (ApplicationNotFoundException e) { - LOG.info("Failed to fetch Vertex data for completed DAG from YARN Timeline" - + " - Application not found by YARN", e); - return null; } catch (TezException e) { - if (LOG.isDebugEnabled()) { + if (e.getCause() instanceof ApplicationNotFoundException) { + LOG.info("Failed to fetch Vertex data for completed DAG from YARN Timeline" + + " - Application not found by YARN", e); + return null; + } else if (LOG.isDebugEnabled()) { LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage()); } } @@ -361,11 +361,13 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set statusOptions, } catch (DAGNotRunningException e) { LOG.info("DAG is no longer running", e); dagCompleted = true; - } catch (ApplicationNotFoundException e) { - LOG.info("DAG is no longer running - application not found by YARN", e); - dagCompleted = true; } catch (TezException e) { - // can be either due to a n/w issue of due to AM completed. + // can be either due to ApplicationNotFound / n/w issue or due to AM completed. + // if its due to application found found we handle it + if (e.getCause() instanceof ApplicationNotFoundException) { + LOG.info("DAG is no longer running - application not found by YARN", e); + dagCompleted = true; + } } catch (IOException e) { // can be either due to a n/w issue of due to AM completed. } @@ -385,11 +387,13 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set } catch (DAGNotRunningException e) { LOG.info("DAG is no longer running", e); dagCompleted = true; - } catch (ApplicationNotFoundException e) { - LOG.info("DAG is no longer running - application not found by YARN", e); - dagCompleted = true; } catch (TezException e) { - // can be either due to a n/w issue of due to AM completed. + // can be either due to ApplicationNotFound / n/w issue or due to AM completed. + // if its due to application found found we handle it + if (e.getCause() instanceof ApplicationNotFoundException) { + LOG.info("DAG is no longer running - application not found by YARN", e); + dagCompleted = true; + } } catch (IOException e) { // can be either due to a n/w issue of due to AM completed. } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java index bb236a3049..f438b8e036 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java @@ -18,16 +18,13 @@ package org.apache.tez.dag.api.client; -import java.io.Closeable; import java.io.IOException; import java.util.Set; import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.tez.dag.api.TezException; /** @@ -35,7 +32,7 @@ * Application Master. */ @Private -public abstract class DAGClientInternal implements Closeable { +public abstract class DAGClientInternal extends DAGClient { /** * Gets DAG execution context for use with logging @@ -58,7 +55,7 @@ public abstract class DAGClientInternal implements Closeable { * specified options. To retrieve basic information, this can be null */ public abstract DAGStatus getDAGStatus(@Nullable Set statusOptions) - throws IOException, TezException, ApplicationNotFoundException; + throws IOException, TezException; /** * Get the status of the specified DAG when it reaches a final state, or the timeout expires. @@ -73,7 +70,7 @@ public abstract DAGStatus getDAGStatus(@Nullable Set statusOption */ public abstract DAGStatus getDAGStatus(@Nullable Set statusOptions, long timeout) - throws IOException, TezException, ApplicationNotFoundException; + throws IOException, TezException; /** * Get the status of a Vertex of a DAG @@ -82,7 +79,7 @@ public abstract DAGStatus getDAGStatus(@Nullable Set statusOption */ public abstract VertexStatus getVertexStatus(String vertexName, Set statusOptions) - throws IOException, TezException, ApplicationNotFoundException; + throws IOException, TezException; /** * Kill a running DAG diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java index ffd91b7932..471cb01c03 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java @@ -38,7 +38,6 @@ import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.WebResource; -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index ff48755ba0..cf6d845854 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -41,7 +41,6 @@ import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DagStatusSource; import org.apache.tez.dag.api.client.StatusGetOpts; @@ -85,53 +84,50 @@ public String getExecutionContext() { @Override public DAGStatus getDAGStatus(Set statusOptions) - throws IOException, TezException, ApplicationNotFoundException { + throws IOException, TezException { return getDAGStatus(statusOptions, 0); } @Override public DAGStatus getDAGStatus(@Nullable Set statusOptions, - long timeout) throws IOException, TezException, ApplicationNotFoundException { - if (createAMProxyIfNeeded()) { + long timeout) throws IOException, TezException { try { - DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout); - return dagStatus; - } catch (TezException e) { - resetProxy(e); // create proxy again - throw e; - } catch (IOException e) { + if (createAMProxyIfNeeded()) { + DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout); + return dagStatus; + } + + // the dag is not running + return null; + } catch (TezException | IOException e) { resetProxy(e); // create proxy again throw e; + } catch(ApplicationNotFoundException e) { + // propagate as TezException + throw new TezException(e); } - } - - // either the dag is not running or some exception happened - return null; } @Override public VertexStatus getVertexStatus(String vertexName, Set statusOptions) - throws IOException, TezException, ApplicationNotFoundException { - - if(createAMProxyIfNeeded()) { - try { + throws IOException, TezException { + try { + if (createAMProxyIfNeeded()) { return getVertexStatusViaAM(vertexName, statusOptions); - } catch (TezException e) { - resetProxy(e); // create proxy again - throw e; - } catch (IOException e) { - resetProxy(e); // create proxy again - throw e; } - } - return null; + return null; + } catch (TezException | IOException e) { + resetProxy(e); // create proxy again + throw e; + } catch (ApplicationNotFoundException e) { + // propagate as TezException + throw new TezException(e); + } } - - @Override public void tryKillDAG() throws TezException, IOException { if(LOG.isDebugEnabled()) {