Skip to content

Commit

Permalink
DRILL-3455: If fragments on unregistered Drillbits finished successfu…
Browse files Browse the repository at this point in the history
…lly, do not fail the query

+ DRILL-3448: Flipped the atLeastOneFailure condition in QueryManager
+ fixes in DrillbitStatusListener interface
+ logs from implementations of DrillbitStatusListener
  • Loading branch information
Sudheesh Katkam authored and adeneche committed Sep 4, 2015
1 parent cc7bc95 commit 57c5d15
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
Expand Up @@ -26,19 +26,17 @@
* Interface to define the listener to take actions when the set of active drillbits is changed. * Interface to define the listener to take actions when the set of active drillbits is changed.
*/ */
public interface DrillbitStatusListener { public interface DrillbitStatusListener {
// TODO this doesn't belong here
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class);


/** /**
* The action to taken when a set of drillbits are unregistered from the cluster. * The action to taken when a set of drillbits are unregistered from the cluster.
* @param unregisteredDrillbits the set of newly unregistered drillbits. * @param unregisteredDrillbits the set of newly unregistered drillbits.
*/ */
public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits); void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits);


/** /**
* The action to taken when a set of new drillbits are registered to the cluster. * The action to taken when a set of new drillbits are registered to the cluster.
* @param registeredDrillbits the set of newly registered drillbits. Note: the complete set of currently registered bits could be different. * @param registeredDrillbits the set of newly registered drillbits. Note: the complete set of currently registered bits could be different.
*/ */
public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits); void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits);


} }
Expand Up @@ -420,11 +420,18 @@ public void fragmentComplete() {
* the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only * the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only
* occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an * occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an
* external event). * external event).
*
* @return true if the node has fragments that are pending (non-terminal state); false if all fragments running on
* this node have already terminated.
*/ */
public void nodeDead() { public boolean nodeDead() {
if (completedFragments.get() == totalFragments.get()) {
return false;
}
while (completedFragments.get() < totalFragments.get()) { while (completedFragments.get() < totalFragments.get()) {
fragmentComplete(); fragmentComplete();
} }
return true;
} }


} }
Expand Down Expand Up @@ -492,26 +499,29 @@ public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbi
final StringBuilder failedNodeList = new StringBuilder(); final StringBuilder failedNodeList = new StringBuilder();
boolean atLeastOneFailure = false; boolean atLeastOneFailure = false;


for(final DrillbitEndpoint ep : unregisteredDrillbits) { for (final DrillbitEndpoint ep : unregisteredDrillbits) {
final NodeTracker tracker = nodeMap.get(ep); final NodeTracker tracker = nodeMap.get(ep);
if (tracker != null) { if (tracker == null) {
// mark node as dead. continue; // fragments were not assigned to this Drillbit
tracker.nodeDead(); }


// capture node name for exception or logging message // mark node as dead.
if (atLeastOneFailure) { if (!tracker.nodeDead()) {
failedNodeList.append(", "); continue; // fragments assigned to this Drillbit completed
}else{ }
atLeastOneFailure = true;
}
failedNodeList.append(ep.getAddress());
failedNodeList.append(":");
failedNodeList.append(ep.getUserPort());


// fragments were running on the Drillbit, capture node name for exception or logging message
if (atLeastOneFailure) {
failedNodeList.append(", ");
} else {
atLeastOneFailure = true;
} }
failedNodeList.append(ep.getAddress());
failedNodeList.append(":");
failedNodeList.append(ep.getUserPort());
} }


if (!atLeastOneFailure) { if (atLeastOneFailure) {
logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}", logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}",
failedNodeList, QueryIdHelper.getQueryId(queryId)); failedNodeList, QueryIdHelper.getQueryId(queryId));
stateListener.moveToState(QueryState.FAILED, stateListener.moveToState(QueryState.FAILED,
Expand Down

0 comments on commit 57c5d15

Please sign in to comment.