From 8016ee3b4f9850c1ede8bdc06b2365710a59b195 Mon Sep 17 00:00:00 2001 From: Sudheesh Katkam Date: Wed, 22 Jul 2015 17:16:29 -0700 Subject: [PATCH] DRILL-3455: If fragments on unregistered Drillbits finished successfully, do not fail the query + DRILL-3448: Flipped the atLeastOneFailure condition in QueryManager + fixes in DrillbitStatusListener interface + logs from implementations of DrillbitStatusListener --- .../work/foreman/DrillbitStatusListener.java | 6 +-- .../drill/exec/work/foreman/QueryManager.java | 42 ++++++++++++------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java index 80f2ca11369..2a6d4128d16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java @@ -26,19 +26,17 @@ * Interface to define the listener to take actions when the set of active drillbits is changed. */ public interface DrillbitStatusListener { - // TODO this doesn't belong here - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class); /** * The action to taken when a set of drillbits are unregistered from the cluster. * @param unregisteredDrillbits the set of newly unregistered drillbits. */ - public void drillbitUnregistered(Set unregisteredDrillbits); + void drillbitUnregistered(Set unregisteredDrillbits); /** * The action to taken when a set of new drillbits are registered to the cluster. * @param registeredDrillbits the set of newly registered drillbits. Note: the complete set of currently registered bits could be different. */ - public void drillbitRegistered(Set registeredDrillbits); + void drillbitRegistered(Set registeredDrillbits); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 554a27960c8..60173e234ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -420,11 +420,18 @@ public void fragmentComplete() { * the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only * occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an * external event). + * + * @return true if the node has fragments that are pending (non-terminal state); false if all fragments running on + * this node have already terminated. */ - public void nodeDead() { + public boolean nodeDead() { + if (completedFragments.get() == totalFragments.get()) { + return false; + } while (completedFragments.get() < totalFragments.get()) { fragmentComplete(); } + return true; } } @@ -492,26 +499,29 @@ public void drillbitUnregistered(final Set unregisteredDrillbi final StringBuilder failedNodeList = new StringBuilder(); boolean atLeastOneFailure = false; - for(final DrillbitEndpoint ep : unregisteredDrillbits) { + for (final DrillbitEndpoint ep : unregisteredDrillbits) { final NodeTracker tracker = nodeMap.get(ep); - if (tracker != null) { - // mark node as dead. - tracker.nodeDead(); - - // capture node name for exception or logging message - if (atLeastOneFailure) { - failedNodeList.append(", "); - }else{ - atLeastOneFailure = true; - } - failedNodeList.append(ep.getAddress()); - failedNodeList.append(":"); - failedNodeList.append(ep.getUserPort()); + if (tracker == null) { + continue; // fragments were not assigned to this Drillbit + } + + // mark node as dead. + if (!tracker.nodeDead()) { + continue; // fragments assigned to this Drillbit completed + } + // fragments were running on the Drillbit, capture node name for exception or logging message + if (atLeastOneFailure) { + failedNodeList.append(", "); + } else { + atLeastOneFailure = true; } + failedNodeList.append(ep.getAddress()); + failedNodeList.append(":"); + failedNodeList.append(ep.getUserPort()); } - if (!atLeastOneFailure) { + if (atLeastOneFailure) { logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}", failedNodeList, QueryIdHelper.getQueryId(queryId)); stateListener.moveToState(QueryState.FAILED,