Skip to content

Commit

Permalink
DRILL-3190: Check for transitions from CANCELLATION_REQUESTED state
Browse files Browse the repository at this point in the history
+ Moved state transition checks to QueryManager
  • Loading branch information
Sudheesh Katkam authored and vkorukanti committed Jun 5, 2015
1 parent 1de6aed commit d9452d9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
Expand Up @@ -23,10 +23,9 @@
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;

public class FragmentData {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);

private final boolean isLocal;
private volatile FragmentStatus status;
Expand All @@ -49,31 +48,16 @@ public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint
}

/**
* Update the status for this fragment. Also records last update and last progress time.
* @param status Updated status
* @return Whether or not the status update resulted in a FragmentState change.
* Update the status for this fragment. Also records last update and last progress time.
* @param newStatus Updated status
*/
public boolean setStatus(final FragmentStatus newStatus) {
public void setStatus(final FragmentStatus newStatus) {
final long time = System.currentTimeMillis();
final FragmentState oldState = status.getProfile().getState();
final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED || oldState == FragmentState.CANCELLED;
final FragmentState currentState = newStatus.getProfile().getState();
final boolean stateChanged = currentState != oldState;

if (inTerminalState) {
// already in a terminal state. This shouldn't happen.
logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState));
return false;
}

this.lastStatusUpdate = time;
lastStatusUpdate = time;
if (madeProgress(status, newStatus)) {
this.lastProgress = time;
lastProgress = time;
}
status = newStatus;

return stateChanged;
}

public FragmentState getState() {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.SchemaUserBitShared;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
Expand All @@ -50,10 +51,7 @@
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
import org.apache.drill.exec.work.fragment.StatusReporter;

Expand Down Expand Up @@ -126,12 +124,31 @@ public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStore
}
}

private static boolean isTerminal(final FragmentState state) {
return state == FragmentState.FAILED
|| state == FragmentState.FINISHED
|| state == FragmentState.CANCELLED;
}

private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
final int majorFragmentId = fragmentHandle.getMajorFragmentId();
final int minorFragmentId = fragmentHandle.getMinorFragmentId();
final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);
return data.setStatus(fragmentStatus);

final FragmentState oldState = data.getState();
final boolean inTerminalState = isTerminal(oldState);
final FragmentState currentState = fragmentStatus.getProfile().getState();

if (inTerminalState || (oldState == FragmentState.CANCELLATION_REQUESTED && !isTerminal(currentState))) {
// Already in a terminal state, or invalid state transition from CANCELLATION_REQUESTED. This shouldn't happen.
logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState));
return false;
}

data.setStatus(fragmentStatus);
return oldState != currentState;
}

private void fragmentDone(final FragmentStatus status) {
Expand Down

0 comments on commit d9452d9

Please sign in to comment.