Skip to content

Commit

Permalink
DRILL-2004: Foreman should account for fragment cancellations or quer…
Browse files Browse the repository at this point in the history
…y hangs
  • Loading branch information
Hanifi Gunes authored and parthchandra committed Jan 15, 2015
1 parent 69db15e commit 9378028
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 44 deletions.
Expand Up @@ -54,7 +54,7 @@ public WorkEventBus(WorkerBee bee) {
} }


public void removeFragmentStatusListener(QueryId queryId) { public void removeFragmentStatusListener(QueryId queryId) {
logger.debug("Removing framgent status listener for queryId {}.", queryId); logger.debug("Removing fragment status listener for queryId {}.", queryId);
listeners.remove(queryId); listeners.remove(queryId);
} }


Expand All @@ -70,10 +70,7 @@ public void setFragmentStatusListener(QueryId queryId, FragmentStatusListener li
public void status(FragmentStatus status) { public void status(FragmentStatus status) {
FragmentStatusListener l = listeners.get(status.getHandle().getQueryId()); FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
if (l == null) { if (l == null) {

logger.warn("A fragment message arrived but there was no registered listener for that message: {}.", status);
logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.",
status.getHandle());
return;
} else { } else {
l.statusUpdate(status); l.statusUpdate(status);
} }
Expand Down Expand Up @@ -102,17 +99,12 @@ public FragmentManager getFragmentManager(FragmentHandle handle) throws Fragment


// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
FragmentManager m = managers.get(handle); FragmentManager m = managers.get(handle);
if(m != null){ if(m != null) {
return m; return m;
} }
throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle)); throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle));
} }


public void cancelFragment(FragmentHandle handle) {
logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle));
removeFragmentManager(handle);
}

public void removeFragmentManager(FragmentHandle handle) { public void removeFragmentManager(FragmentHandle handle) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
recentlyFinishedFragments.put(handle, 1); recentlyFinishedFragments.put(handle, 1);
Expand Down
Expand Up @@ -155,6 +155,8 @@ public void cancel() {
} }


private void cleanup(QueryResult result) { private void cleanup(QueryResult result) {
logger.info("foreman cleaning up - status: {}", queryManager.getStatus());

bee.retireForeman(this); bee.retireForeman(this);
context.getWorkBus().removeFragmentStatusListener(queryId); context.getWorkBus().removeFragmentStatusListener(queryId);
context.getClusterCoordinator().removeDrillbitStatusListener(queryManager); context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
Expand Down Expand Up @@ -384,7 +386,7 @@ private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupE
* @return * @return
*/ */
private synchronized boolean moveToState(QueryState newState, Exception exception){ private synchronized boolean moveToState(QueryState newState, Exception exception){
logger.debug("State change requested. {} --> {}", state, newState); logger.info("State change requested. {} --> {}", state, newState, exception);
outside: switch(state) { outside: switch(state) {


case PENDING: case PENDING:
Expand Down Expand Up @@ -413,7 +415,9 @@ private synchronized boolean moveToState(QueryState newState, Exception exceptio
.setIsLastChunk(true) // .setIsLastChunk(true) //
.build(); .build();


cleanup(result); // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to
// a terminal state(completed, failed)
initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
return true; return true;
} }


Expand Down Expand Up @@ -454,7 +458,7 @@ private synchronized boolean moveToState(QueryState newState, Exception exceptio
case COMPLETED: case COMPLETED:
case FAILED: { case FAILED: {
// no op. // no op.
logger.info("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception); logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
return false; return false;
} }


Expand Down Expand Up @@ -635,9 +639,9 @@ public void failed(RpcException ex) {


public class StateListener { public class StateListener {
public boolean moveToState(QueryState newState, Exception ex){ public boolean moveToState(QueryState newState, Exception ex){
try{ try {
acceptExternalEvents.await(); acceptExternalEvents.await();
}catch(InterruptedException e){ } catch(InterruptedException e){
logger.warn("Interrupted while waiting to move state.", e); logger.warn("Interrupted while waiting to move state.", e);
return false; return false;
} }
Expand Down
Expand Up @@ -21,11 +21,13 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import com.google.common.base.Preconditions;
import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.RunQuery;
Expand All @@ -51,7 +53,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
private final QueryId queryId; private final QueryId queryId;


public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) { public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
super();
this.stateListener = stateListener; this.stateListener = stateListener;
this.queryId = id; this.queryId = id;
this.remainingFragmentCount = new AtomicInteger(0); this.remainingFragmentCount = new AtomicInteger(0);
Expand Down Expand Up @@ -87,13 +88,14 @@ public void statusUpdate(FragmentStatus status) {
updateFragmentStatus(status); updateFragmentStatus(status);
break; break;
case CANCELLED: case CANCELLED:
// we don't care about cancellation messages since we're the only entity that should drive cancellations. //TODO: define a new query state to distinguish the state of early termination from cancellation
fragmentDone(status);
break; break;
case FAILED: case FAILED:
stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError())); stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
break; break;
case FINISHED: case FINISHED:
finished(status); fragmentDone(status);
break; break;
case RUNNING: case RUNNING:
updateFragmentStatus(status); updateFragmentStatus(status);
Expand All @@ -107,18 +109,19 @@ private void updateFragmentStatus(FragmentStatus status){
this.status.updateFragmentStatus(status); this.status.updateFragmentStatus(status);
} }


private void finished(FragmentStatus status){ private void fragmentDone(FragmentStatus status){
this.status.incrementFinishedFragments(); this.status.incrementFinishedFragments();
int remaining = remainingFragmentCount.decrementAndGet(); int remaining = remainingFragmentCount.decrementAndGet();
updateFragmentStatus(status); updateFragmentStatus(status);

logger.debug("waiting for {} fragments", remaining);
if(remaining == 0){ if(remaining == 0){
stateListener.moveToState(QueryState.COMPLETED, null); stateListener.moveToState(QueryState.COMPLETED, null);
} }
} }


public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){ public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
remainingFragmentCount.set(countOfNonRootFragments + 1); remainingFragmentCount.set(countOfNonRootFragments + 1);
logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1);
status.add(new FragmentData(rootFragmentHandle, localIdentity, true)); status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
this.status.setTotalFragments(countOfNonRootFragments + 1); this.status.setTotalFragments(countOfNonRootFragments + 1);


Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
Expand Down Expand Up @@ -71,16 +70,13 @@ public void cancel() {
updateState(FragmentState.CANCELLED); updateState(FragmentState.CANCELLED);
logger.debug("Cancelled Fragment {}", context.getHandle()); logger.debug("Cancelled Fragment {}", context.getHandle());
context.cancel(); context.cancel();

if (executionThread != null) {
executionThread.interrupt();
}
} }


public void receivingFragmentFinished(FragmentHandle handle) { public void receivingFragmentFinished(FragmentHandle handle) {
updateState(FragmentState.CANCELLED); cancel();
context.cancel(); if (root != null) {
root.receivingFragmentFinished(handle); root.receivingFragmentFinished(handle);
}
} }


public UserClientConnection getClient() { public UserClientConnection getClient() {
Expand All @@ -104,8 +100,8 @@ public void run() {
context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener); context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener);


logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
if (!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)) { if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get())))); logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
return; return;
} }


Expand All @@ -117,7 +113,7 @@ public void run() {
closeOutResources(false); closeOutResources(false);
} else { } else {
closeOutResources(true); // make sure to close out resources before we report success. closeOutResources(true); // make sure to close out resources before we report success.
updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
} }


break; break;
Expand Down Expand Up @@ -166,23 +162,60 @@ private void internalFail(Throwable excep) {
listener.fail(context.getHandle(), "Failure while running fragment.", excep); listener.fail(context.getHandle(), "Failure while running fragment.", excep);
} }


private void updateState(FragmentState update) { /**
state.set(update.getNumber()); * Updates the fragment state with the given state
listener.stateChanged(context.getHandle(), update); * @param to target state
*/
protected void updateState(FragmentState to) {;
state.set(to.getNumber());
listener.stateChanged(context.getHandle(), to);
} }


private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) { /**
boolean success = state.compareAndSet(current.getNumber(), update.getNumber()); * Updates the fragment state only if the current state matches the expected.
if (!success && exceptionOnFailure) { *
internalFail(new RuntimeException(String.format( * @param expected expected current state
"State was different than expected. Attempting to update state from %s to %s however current state was %s.", * @param to target state
current.name(), update.name(), FragmentState.valueOf(state.get())))); * @return true only if update succeeds
return false; */
protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) {
boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
if (success) {
listener.stateChanged(context.getHandle(), to);
} else {
logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
expected.name(), to.name(), FragmentState.valueOf(state.get()));
} }
listener.stateChanged(context.getHandle(), update); return success;
return true;
} }


/**
* Returns true if the fragment is in a terminal state
*/
protected boolean isCompleted() {
return state.get() == FragmentState.CANCELLED_VALUE
|| state.get() == FragmentState.FAILED_VALUE
|| state.get() == FragmentState.FINISHED_VALUE;
}

/**
* Update the state if current state matches expected or fail the fragment if state transition fails even though
* fragment is not in a terminal state.
*
* @param expected current expected state
* @param to target state
* @return true only if update succeeds
*/
protected boolean updateStateOrFail(FragmentState expected, FragmentState to) {
final boolean updated = checkAndUpdateState(expected, to);
if (!updated && !isCompleted()) {
final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
}
return updated;
}


@Override @Override
public int compareTo(Object o) { public int compareTo(Object o) {
return o.hashCode() - this.hashCode(); return o.hashCode() - this.hashCode();
Expand Down
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.fragment;

import org.apache.drill.common.exceptions.DrillException;

public class StateTransitionException extends DrillException {
public StateTransitionException() {
super();
}

public StateTransitionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

public StateTransitionException(String message, Throwable cause) {
super(message, cause);
}

public StateTransitionException(String message) {
super(message);
}

public StateTransitionException(Throwable cause) {
super(cause);
}
}

0 comments on commit 9378028

Please sign in to comment.