diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java index 90368c41339..1b0a6eb1722 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java @@ -27,18 +27,21 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder; public class DrillFuncHolderExpr extends FunctionHolderExpression implements Iterable{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class); - private DrillFuncHolder holder; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class); + private final DrillFuncHolder holder; + private final MajorType majorType; private DrillSimpleFunc interpreter; public DrillFuncHolderExpr(String nameUsed, DrillFuncHolder holder, List args, ExpressionPosition pos) { super(nameUsed, pos, args); this.holder = holder; + // since function return type can not be changed, cache it for better performance + this.majorType = holder.getReturnType(args); } @Override public MajorType getMajorType() { - return holder.getReturnType(args); + return majorType; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java index cfc7977c930..a0c0ea70f98 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java @@ -19,30 +19,35 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import java.util.HashMap; +import java.util.Map; + public class ProfileUtil { - // Display names for QueryState enum in UserBitShared.proto - private static final String[] queryStateDisplayNames = { - "Starting", // STARTING = 0 - "Running", // RUNNING = 1 - "Succeeded", // COMPLETED = 2 - "Canceled", // CANCELED = 3 - "Failed", // FAILED = 4 - "CancellationRequested", // CANCELLATION_REQUESTED = 5 - "Enqueued" // ENQUEUED = 6 - }; + private static final Map queryStateDisplayMap = new HashMap<>(QueryState.values().length); + + static { + queryStateDisplayMap.put(QueryState.PREPARING, "Preparing"); + queryStateDisplayMap.put(QueryState.PLANNING, "Planning"); + queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued"); + queryStateDisplayMap.put(QueryState.STARTING, "Starting"); + queryStateDisplayMap.put(QueryState.RUNNING, "Running"); + queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded"); + queryStateDisplayMap.put(QueryState.FAILED, "Failed"); + queryStateDisplayMap.put(QueryState.CANCELLATION_REQUESTED, "Cancellation Requested"); + queryStateDisplayMap.put(QueryState.CANCELED, "Canceled"); + } /** - * Utility to return display name for query state - * @param queryState + * Utility method to return display name for query state + * @param queryState query state * @return display string for query state */ - public final static String getQueryStateDisplayName(QueryState queryState) { - int queryStateOrdinal = queryState.getNumber(); - if (queryStateOrdinal >= queryStateDisplayNames.length) { - return queryState.name(); - } else { - return queryStateDisplayNames[queryStateOrdinal]; + public static String getQueryStateDisplayName(QueryState queryState) { + String displayName = queryStateDisplayMap.get(queryState); + if (displayName == null) { + displayName = queryState.name(); } + return displayName; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java index 9c2b438f370..20cc0cad44c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java @@ -202,8 +202,10 @@ public String getQueuedDuration() { } public String getExecutionDuration() { - //Check if State is STARTING or RUNNING - if (profile.getState() == QueryState.STARTING || + //Check if State is PREPARING, PLANNING, STARTING, ENQUEUED or RUNNING + if (profile.getState() == QueryState.PREPARING || + profile.getState() == QueryState.PLANNING || + profile.getState() == QueryState.STARTING || profile.getState() == QueryState.ENQUEUED || profile.getState() == QueryState.RUNNING) { return NOT_AVAILABLE_LABEL; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java index a06d46c5a2b..2fa757651c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java @@ -23,11 +23,14 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.work.foreman.ForemanSetupException; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; +import org.codehaus.jackson.map.ObjectMapper; public class QueryWorkUnit { @@ -112,4 +115,46 @@ public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException { fragments.add(defn.applyPlan(reader)); } } + + /** + * Converts list of stored fragments into their string representation, + * in case of exception returns text indicating that string was malformed. + * Is used for debugging purposes. + * + * @return fragments information + */ + public String stringifyFragments() { + StringBuilder stringBuilder = new StringBuilder(); + final int fragmentCount = fragments.size(); + int fragmentIndex = 0; + for (final PlanFragment planFragment : fragments) { + final ExecProtos.FragmentHandle fragmentHandle = planFragment.getHandle(); + stringBuilder.append("PlanFragment("); + stringBuilder.append(++fragmentIndex); + stringBuilder.append('/'); + stringBuilder.append(fragmentCount); + stringBuilder.append(") major_fragment_id "); + stringBuilder.append(fragmentHandle.getMajorFragmentId()); + stringBuilder.append(" minor_fragment_id "); + stringBuilder.append(fragmentHandle.getMinorFragmentId()); + stringBuilder.append('\n'); + + final CoordinationProtos.DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); + stringBuilder.append(" DrillbitEndpoint address "); + stringBuilder.append(endpointAssignment.getAddress()); + stringBuilder.append('\n'); + + String jsonString = "<>"; + stringBuilder.append(" fragment_json: "); + final ObjectMapper objectMapper = new ObjectMapper(); + try { + final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); + jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json); + } catch (final Exception e) { + // we've already set jsonString to a fallback value + } + stringBuilder.append(jsonString); + } + return stringBuilder.toString(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 8ce8639b458..391f1006db7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,18 +17,13 @@ */ package org.apache.drill.exec.work.foreman; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; - +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.apache.drill.common.CatastrophicFailure; -import org.apache.drill.common.EventProcessor; -import org.apache.drill.common.concurrent.ExtendedLatch; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.LogicalPlan; @@ -36,8 +31,6 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.metrics.DrillMetrics; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; @@ -48,9 +41,7 @@ import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; -import org.apache.drill.exec.proto.BitControl.InitializeFragments; import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -63,36 +54,21 @@ import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.UserClientConnection; -import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.Pointer; -import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; -import org.apache.drill.exec.work.fragment.FragmentExecutor; -import org.apache.drill.exec.work.fragment.FragmentStatusReporter; -import org.apache.drill.exec.work.fragment.NonRootFragmentManager; -import org.apache.drill.exec.work.fragment.RootFragmentManager; import org.codehaus.jackson.map.ObjectMapper; -import com.codahale.metrics.Counter; -import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; +import java.io.IOException; +import java.util.Date; +import java.util.List; /** * Foreman manages all the fragments (local and remote) for a single query where this @@ -100,16 +76,17 @@ * * The flow is as follows: *
    + *
  • While Foreman is initialized query is in preparing state.
  • *
  • Foreman is submitted as a runnable.
  • *
  • Runnable does query planning.
  • - *
  • state changes from PENDING to RUNNING
  • - *
  • Runnable sends out starting fragments
  • + *
  • Runnable submits query to be enqueued.
  • + *
  • The Runnable's run() completes, but the Foreman stays around to listen to state changes.
  • + *
  • Once query is enqueued, starting fragments are sent out.
  • *
  • Status listener are activated
  • - *
  • The Runnable's run() completes, but the Foreman stays around
  • *
  • Foreman listens for state change messages.
  • - *
  • state change messages can drive the state to FAILED or CANCELED, in which case - * messages are sent to running fragments to terminate
  • - *
  • when all fragments complete, state change messages drive the state to COMPLETED
  • + *
  • State change messages can drive the state to FAILED or CANCELED, in which case + * messages are sent to running fragments to terminate.
  • + *
  • When all fragments is completed, state change messages drive the state to COMPLETED.
  • *
*/ @@ -118,90 +95,81 @@ public class Foreman implements Runnable { private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger"); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class); - public enum ProfileOption { SYNC, ASYNC, NONE }; + public enum ProfileOption { SYNC, ASYNC, NONE } private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; - - private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued"); - private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running"); - private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed"); - private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded"); - private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed"); - private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled"); private final QueryId queryId; private final String queryIdString; private final RunQuery queryRequest; private final QueryContext queryContext; private final QueryManager queryManager; // handles lower-level details of query execution - private final WorkerBee bee; // provides an interface to submit tasks private final DrillbitContext drillbitContext; private final UserClientConnection initiatingClient; // used to send responses - private volatile QueryState state; private boolean resume = false; private final ProfileOption profileOption; private final QueryResourceManager queryRM; private final ResponseSendListener responseListener = new ResponseSendListener(); - private final StateSwitch stateSwitch = new StateSwitch(); - private final ForemanResult foremanResult = new ForemanResult(); private final ConnectionClosedListener closeListener = new ConnectionClosedListener(); private final ChannelFuture closeFuture; + private final FragmentsRunner fragmentsRunner; + private final QueryStateProcessor queryStateProcessor; private String queryText; /** * Constructor. Sets up the Foreman, but does not initiate any execution. * - * @param bee used to submit additional work - * @param drillbitContext - * @param connection + * @param bee work manager (runs fragments) + * @param drillbitContext drillbit context + * @param connection connection * @param queryId the id for the query * @param queryRequest the query to execute */ public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) { - this.bee = bee; this.queryId = queryId; - queryIdString = QueryIdHelper.getQueryId(queryId); + this.queryIdString = QueryIdHelper.getQueryId(queryId); this.queryRequest = queryRequest; this.drillbitContext = drillbitContext; - - initiatingClient = connection; - closeFuture = initiatingClient.getChannelClosureFuture(); + this.initiatingClient = connection; + this.closeFuture = initiatingClient.getChannelClosureFuture(); closeFuture.addListener(closeListener); - queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId); - queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), + this.queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId); + this.queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), drillbitContext.getClusterCoordinator(), this); + this.queryRM = drillbitContext.getResourceManager().newQueryRM(this); + this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this); + this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult()); + this.profileOption = setProfileOption(queryContext.getOptions()); + } - recordNewState(QueryState.ENQUEUED); - enqueuedQueries.inc(); - queryRM = drillbitContext.getResourceManager().newQueryRM(this); - profileOption = setProfileOption(queryContext.getOptions()); + /** + * @return query id + */ + public QueryId getQueryId() { + return queryId; } - private ProfileOption setProfileOption(OptionManager options) { - if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) { - return ProfileOption.NONE; - } - if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) { - return ProfileOption.SYNC; - } else { - return ProfileOption.ASYNC; - } + /** + * @return current query state + */ + public QueryState getState() { + return queryStateProcessor.getState(); } - private class ConnectionClosedListener implements GenericFutureListener> { - @Override - public void operationComplete(Future future) throws Exception { - cancel(); - } + /** + * @return sql query text of the query request + */ + public String getQueryText() { + return queryText; } + /** * Get the QueryContext created for the query. * @@ -221,12 +189,21 @@ public QueryManager getQueryManager() { } /** - * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be - * terminated. + * Cancel the query (move query in cancellation requested state). + * Query execution will be canceled once possible. */ public void cancel() { - // Note this can be called from outside of run() on another thread, or after run() completes - addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); + queryStateProcessor.cancel(); + } + + /** + * Adds query status in the event queue to process it when foreman is ready. + * + * @param state new query state + * @param exception exception if failure has occurred + */ + public void addToEventQueue(QueryState state, Exception exception) { + queryStateProcessor.addToEventQueue(state, exception); } /** @@ -255,23 +232,21 @@ public void run() { currentThread.setName(queryIdString + ":foreman"); try { /* - Check if the foreman is ONLINE. If not dont accept any new queries. + Check if the foreman is ONLINE. If not don't accept any new queries. */ if (!drillbitContext.isForemanOnline()) { throw new ForemanException("Query submission failed since Foreman is shutting down."); } } catch (ForemanException e) { logger.debug("Failure while submitting query", e); - addToEventQueue(QueryState.FAILED, e); + queryStateProcessor.addToEventQueue(QueryState.FAILED, e); } - // track how long the query takes - queryManager.markStartTime(); - enqueuedQueries.dec(); - runningQueries.inc(); + + queryText = queryRequest.getPlan(); + queryStateProcessor.moveToState(QueryState.PLANNING, null); try { injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class); - queryText = queryRequest.getPlan(); // convert a run query request into action switch (queryRequest.getType()) { @@ -299,18 +274,15 @@ public void run() { } injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class); } catch (final OutOfMemoryException e) { - moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger)); + queryStateProcessor.moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger)); } catch (final ForemanException e) { - moveToState(QueryState.FAILED, e); + queryStateProcessor.moveToState(QueryState.FAILED, e); } catch (AssertionError | Exception ex) { - moveToState(QueryState.FAILED, + queryStateProcessor.moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex)); } catch (final OutOfMemoryError e) { if ("Direct buffer memory".equals(e.getMessage())) { - moveToState(QueryState.FAILED, - UserException.resourceError(e) - .message("One or more nodes ran out of memory while executing the query.") - .build(logger)); + queryStateProcessor.moveToState(QueryState.FAILED, UserException.resourceError(e).message("One or more nodes ran out of memory while executing the query.").build(logger)); } else { /* * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we @@ -321,35 +293,6 @@ public void run() { } } finally { - /* - * Begin accepting external events. - * - * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there - * is an exception anywhere during setup, it wouldn't occur, and any events that are generated - * as a result of any partial setup that was done (such as the FragmentSubmitListener, - * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the - * event delivery call. - * - * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to - * make sure that we can't make things any worse as those events are delivered, but allow - * any necessary remaining cleanup to proceed. - * - * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman - * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman - * to accept events. - */ - try { - stateSwitch.start(); - } catch (Exception ex) { - moveToState(QueryState.FAILED, ex); - } - - // If we received the resume signal before fragments are setup, the first call does not actually resume the - // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume. - if(resume) { - resume(); - } - // restore the thread's original name currentThread.setName(originalName); } @@ -361,6 +304,31 @@ public void run() { */ } + /** + * While one fragments where sanding out, other might have been completed. We don't want to process completed / failed + * events until all fragments are sent out. This method triggers events processing when all fragments were sent out. + */ + public void startProcessingEvents() { + queryStateProcessor.startProcessingEvents(); + + // If we received the resume signal before fragments are setup, the first call does not actually resume the + // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume. + if (resume) { + resume(); + } + } + + private ProfileOption setProfileOption(OptionManager options) { + if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) { + return ProfileOption.NONE; + } + if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) { + return ProfileOption.SYNC; + } else { + return ProfileOption.ASYNC; + } + } + private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException { LogicalPlan logicalPlan; try { @@ -436,48 +404,16 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); - admit(work); - queryManager.setQueueName(queryRM.queueName()); - - final List planFragments = work.getFragments(); - final PlanFragment rootPlanFragment = work.getRootFragment(); - assert queryId == rootPlanFragment.getHandle().getQueryId(); - drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); - drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); + fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator()); - logger.debug("Submitting fragments to run."); - - // set up the root fragment first so we'll have incoming buffers available. - setupRootFragment(rootPlanFragment, work.getRootOperator()); - - setupNonRootFragments(planFragments); - - moveToState(QueryState.RUNNING, null); - logger.debug("Fragments running."); - } - - private void admit(QueryWorkUnit work) throws ForemanSetupException { - queryManager.markPlanningEndTime(); - try { - queryRM.admit(); - } catch (QueueTimeoutException e) { - throw UserException - .resourceError() - .message(e.getMessage()) - .build(logger); - } catch (QueryQueueException e) { - throw new ForemanSetupException(e.getMessage(), e); - } finally { - queryManager.markQueueWaitEndTime(); - } - moveToState(QueryState.STARTING, null); + startQueryProcessing(); } /** * This is a helper method to run query based on the list of PlanFragment that were planned * at some point of time - * @param fragmentsList + * @param fragmentsList fragment list * @throws ExecutionSetupException */ private void runFragment(List fragmentsList) throws ExecutionSetupException { @@ -502,6 +438,8 @@ private void runFragment(List fragmentsList) throws ExecutionSetup } } + assert rootFragment != null; + final FragmentRoot rootOperator; try { rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson()); @@ -509,26 +447,73 @@ private void runFragment(List fragmentsList) throws ExecutionSetup throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson())); } queryRM.setCost(rootOperator.getCost()); - admit(null); - drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); - drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); - logger.debug("Submitting fragments to run."); + fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator); - // set up the root fragment first so we'll have incoming buffers available. - setupRootFragment(rootFragment, rootOperator); + startQueryProcessing(); + } - setupNonRootFragments(planFragments); + /** + * Enqueues the query and once enqueued, starts sending out query fragments for further execution. + * Moves query to RUNNING state. + */ + private void startQueryProcessing() { + enqueue(); + runFragments(); + queryStateProcessor.moveToState(QueryState.RUNNING, null); + } - moveToState(QueryState.RUNNING, null); - logger.debug("Fragments running."); + /** + * Move query to ENQUEUED state. Enqueues query if queueing is enabled. + * Foreman run will be blocked until query is enqueued. + * In case of failures (ex: queue timeout exception) will move query to FAILED state. + */ + private void enqueue() { + queryStateProcessor.moveToState(QueryState.ENQUEUED, null); + + try { + queryRM.admit(); + queryStateProcessor.moveToState(QueryState.STARTING, null); + } catch (QueueTimeoutException | QueryQueueException e) { + queryStateProcessor.moveToState(QueryState.FAILED, e); + } finally { + String queueName = queryRM.queueName(); + queryManager.setQueueName(queueName == null ? "Unknown" : queueName); + } + } + + private void runFragments() { + try { + fragmentsRunner.submit(); + } catch (Exception e) { + queryStateProcessor.moveToState(QueryState.FAILED, e); + } finally { + /* + * Begin accepting external events. + * + * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there + * is an exception anywhere during setup, it wouldn't occur, and any events that are generated + * as a result of any partial setup that was done (such as the FragmentSubmitListener, + * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the + * event delivery call. + * + * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to + * make sure that we can't make things any worse as those events are delivered, but allow + * any necessary remaining cleanup to proceed. + * + * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman + * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman + * to accept events. + */ + startProcessingEvents(); + } } /** * Helper method to execute the query in prepared statement. Current implementation takes the query from opaque * object of the preparedStatement and submits as a new query. * - * @param preparedStatementHandle + * @param preparedStatementHandle prepared statement handle * @throws ExecutionSetupException */ private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle) @@ -559,10 +544,6 @@ private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupExc } } - Exception getCurrentException() { - return foremanResult.getException(); - } - private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException { final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); @@ -577,47 +558,26 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { if (! logger.isTraceEnabled()) { return; } - final StringBuilder sb = new StringBuilder(); - sb.append("PlanFragments for query "); - sb.append(queryId); - sb.append('\n'); - - final List planFragments = queryWorkUnit.getFragments(); - final int fragmentCount = planFragments.size(); - int fragmentIndex = 0; - for(final PlanFragment planFragment : planFragments) { - final FragmentHandle fragmentHandle = planFragment.getHandle(); - sb.append("PlanFragment("); - sb.append(++fragmentIndex); - sb.append('/'); - sb.append(fragmentCount); - sb.append(") major_fragment_id "); - sb.append(fragmentHandle.getMajorFragmentId()); - sb.append(" minor_fragment_id "); - sb.append(fragmentHandle.getMinorFragmentId()); - sb.append('\n'); - - final DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); - sb.append(" DrillbitEndpoint address "); - sb.append(endpointAssignment.getAddress()); - sb.append('\n'); - - String jsonString = "<>"; - sb.append(" fragment_json: "); - final ObjectMapper objectMapper = new ObjectMapper(); - try - { - final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); - jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json); - } catch(final Exception e) { - // we've already set jsonString to a fallback value - } - sb.append(jsonString); + logger.trace(String.format("PlanFragments for query %s \n%s", + queryId, queryWorkUnit.stringifyFragments())); + } + + private void runSQL(final String sql) throws ExecutionSetupException { + final Pointer textPlan = new Pointer<>(); + final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); + queryManager.setPlanText(textPlan.value); + runPhysicalPlan(plan); + } - logger.trace(sb.toString()); + private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException { + if (logger.isDebugEnabled()) { + logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence())); } + return new BasicOptimizer(queryContext, initiatingClient).optimize( + new BasicOptimizer.BasicOptimizationContext(queryContext), plan); } + /** * Manages the end-state processing for Foreman. * @@ -631,7 +591,7 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { * The idea here is to make close()ing the ForemanResult do the final cleanup and * sending. Closing the result must be the last thing that is done by Foreman. */ - private class ForemanResult implements AutoCloseable { + public class ForemanResult implements AutoCloseable { private QueryState resultState = null; private volatile Exception resultException = null; private boolean isClosed = false; @@ -688,7 +648,7 @@ public void setForceFailure(final Exception exception) { * @param exception the exception to add */ private void addException(final Exception exception) { - Preconditions.checkNotNull(exception); + assert exception != null; if (resultException == null) { resultException = exception; @@ -741,7 +701,7 @@ private void logQuerySummary() { queryText, new Date(queryContext.getQueryContextInfo().getQueryStartTime()), new Date(System.currentTimeMillis()), - state, + queryStateProcessor.getState(), queryContext.getSession().getCredentials().getUserName(), initiatingClient.getRemoteAddress()); queryLogger.info(MAPPER.writeValueAsString(q)); @@ -756,9 +716,6 @@ public void close() { Preconditions.checkState(!isClosed); Preconditions.checkState(resultState != null); - // to track how long the query takes - queryManager.markEndTime(); - logger.debug(queryIdString + ": cleaning up."); injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger); @@ -780,11 +737,11 @@ public void close() { * * We only need to do this if the resultState differs from the last recorded state */ - if (resultState != state) { + if (resultState != queryStateProcessor.getState()) { suppressingClose(new AutoCloseable() { @Override public void close() throws Exception { - recordNewState(resultState); + queryStateProcessor.recordNewState(resultState); } }); } @@ -842,7 +799,7 @@ public void close() throws Exception { } // Remove the Foreman from the running query list. - bee.retireForeman(Foreman.this); + fragmentsRunner.getBee().retireForeman(Foreman.this); try { queryManager.close(); @@ -850,21 +807,9 @@ public void close() throws Exception { logger.warn("unable to close query manager", e); } - // Incrementing QueryState counters - switch (state) { - case FAILED: - failedQueries.inc(); - break; - case CANCELED: - canceledQueries.inc(); - break; - case COMPLETED: - succeededQueries.inc(); - break; - } - runningQueries.dec(); - completedQueries.inc(); + queryStateProcessor.close(); + try { queryRM.exit(); } finally { @@ -873,472 +818,10 @@ public void close() throws Exception { } } - private static class StateEvent { - final QueryState newState; - final Exception exception; - - StateEvent(final QueryState newState, final Exception exception) { - this.newState = newState; - this.exception = exception; - } - } - - private void moveToState(final QueryState newState, final Exception exception) { - logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, - exception); - switch (state) { - case ENQUEUED: - switch (newState) { - case FAILED: - Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); - recordNewState(newState); - foremanResult.setFailed(exception); - foremanResult.close(); - return; - case STARTING: - recordNewState(newState); - return; - } - break; - case STARTING: - if (newState == QueryState.RUNNING) { - recordNewState(QueryState.RUNNING); - return; - } - - //$FALL-THROUGH$ - - case RUNNING: { - /* - * For cases that cancel executing fragments, we have to record the new - * state first, because the cancellation of the local root fragment will - * cause this to be called recursively. - */ - switch (newState) { - case CANCELLATION_REQUESTED: { - assert exception == null; - recordNewState(QueryState.CANCELLATION_REQUESTED); - queryManager.cancelExecutingFragments(drillbitContext); - foremanResult.setCompleted(QueryState.CANCELED); - /* - * We don't close the foremanResult until we've gotten - * acknowledgments, which happens below in the case for current state - * == CANCELLATION_REQUESTED. - */ - return; - } - - case COMPLETED: { - assert exception == null; - recordNewState(QueryState.COMPLETED); - foremanResult.setCompleted(QueryState.COMPLETED); - foremanResult.close(); - return; - } - - case FAILED: { - assert exception != null; - recordNewState(QueryState.FAILED); - queryManager.cancelExecutingFragments(drillbitContext); - foremanResult.setFailed(exception); - foremanResult.close(); - return; - } - - } - break; - } - - case CANCELLATION_REQUESTED: - if ((newState == QueryState.CANCELED) - || (newState == QueryState.COMPLETED) - || (newState == QueryState.FAILED)) { - - if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) { - if (newState == QueryState.FAILED) { - assert exception != null; - recordNewState(QueryState.FAILED); - foremanResult.setForceFailure(exception); - } - } - /* - * These amount to a completion of the cancellation requests' cleanup; - * now we can clean up and send the result. - */ - foremanResult.close(); - } - return; - - case CANCELED: - case COMPLETED: - case FAILED: - logger - .warn( - "Dropping request to move to {} state as query is already at {} state (which is terminal).", - newState, state); - return; - } - - throw new IllegalStateException(String.format( - "Failure trying to change states: %s --> %s", state.name(), - newState.name())); - } - - private class StateSwitch extends EventProcessor { - public void addEvent(final QueryState newState, final Exception exception) { - sendEvent(new StateEvent(newState, exception)); - } - - @Override - protected void processEvent(final StateEvent event) { - moveToState(event.newState, event.exception); - } - } - - /** - * Tells the foreman to move to a new state.
- * This will be added to the end of the event queue and will be processed once the foreman is ready - * to accept external events. - * - * @param newState the state to move to - * @param exception if not null, the exception that drove this state transition (usually a failure) - */ - public void addToEventQueue(final QueryState newState, final Exception exception) { - stateSwitch.addEvent(newState, exception); - } - - private void recordNewState(final QueryState newState) { - state = newState; - queryManager.updateEphemeralState(newState); - } - - private void runSQL(final String sql) throws ExecutionSetupException { - final Pointer textPlan = new Pointer<>(); - final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); - queryManager.setPlanText(textPlan.value); - runPhysicalPlan(plan); - } - - private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException { - if (logger.isDebugEnabled()) { - logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence())); - } - return new BasicOptimizer(queryContext, initiatingClient).optimize( - new BasicOptimizer.BasicOptimizationContext(queryContext), plan); - } - - public QueryId getQueryId() { - return queryId; - } - - /** - * Set up the root fragment (which will run locally), and submit it for execution. - * - * @param rootFragment - * @param rootOperator - * @throws ExecutionSetupException - */ - private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator) - throws ExecutionSetupException { - final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext, - initiatingClient, drillbitContext.getFunctionImplementationRegistry()); - final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext); - final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator); - final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter); - - queryManager.addFragmentStatusTracker(rootFragment, true); - - // FragmentManager is setting buffer for FragmentContext - if (rootContext.isBuffersDone()) { - // if we don't have to wait for any incoming data, start the fragment runner. - bee.addFragmentRunner(rootRunner); - } else { - // if we do, record the fragment manager in the workBus. - drillbitContext.getWorkBus().addFragmentManager(fragmentManager); - } - } - - /** - * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node - * and the local Drillbit Endpoint. - * @param planFragment - * @param localEndPoint - * @param localFragmentList - * @param remoteFragmentMap - */ - private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint, - final List localFragmentList, - final Multimap remoteFragmentMap) { - final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment(); - - if (assignedDrillbit.equals(localEndPoint)) { - localFragmentList.add(planFragment); - } else { - remoteFragmentMap.put(assignedDrillbit, planFragment); - } - } - - /** - * Send remote intermediate fragment to the assigned Drillbit node. Throw exception in case of failure to send the - * fragment. - * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's - */ - private void scheduleRemoteIntermediateFragments(final Multimap remoteFragmentMap) { - - final int numIntFragments = remoteFragmentMap.keySet().size(); - final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments); - final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures(); - - // send remote intermediate fragments - for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); - } - - final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments; - if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) { - long numberRemaining = endpointLatch.getCount(); - throw UserException.connectionError() - .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " + - "Sent %d and only heard response back from %d nodes.", - timeout, numIntFragments, numIntFragments - numberRemaining).build(logger); - } - - // if any of the intermediate fragment submissions failed, fail the query - final List submissionExceptions = - fragmentSubmitFailures.submissionExceptions; - - if (submissionExceptions.size() > 0) { - Set endpoints = Sets.newHashSet(); - StringBuilder sb = new StringBuilder(); - boolean first = true; - - for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) { - DrillbitEndpoint endpoint = e.drillbitEndpoint; - if (endpoints.add(endpoint)) { - if (first) { - first = false; - } else { - sb.append(", "); - } - sb.append(endpoint.getAddress()); - } - } - throw UserException.connectionError(submissionExceptions.get(0).rpcException) - .message("Error setting up remote intermediate fragment execution") - .addContext("Nodes with failures", sb.toString()).build(logger); - } - } - - - /** - * Start the locally assigned leaf or intermediate fragment - * @param fragment - * @throws ForemanException - */ - private void startLocalFragment(final PlanFragment fragment) throws ForemanException { - - logger.debug("Received local fragment start instruction", fragment); - - try { - final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment, - drillbitContext.getFunctionImplementationRegistry()); - final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext); - final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter); - - // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf. - if (fragment.getLeafFragment()) { - bee.addFragmentRunner(fragmentExecutor); - } else { - // isIntermediate, store for incoming data. - final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter); - drillbitContext.getWorkBus().addFragmentManager(manager); - } - - } catch (final ExecutionSetupException ex) { - throw new ForemanException("Failed to create fragment context", ex); - } catch (final Exception ex) { - throw new ForemanException("Failed while trying to start local fragment", ex); - } - } - - /** - * Set up the non-root fragments for execution. Some may be local, and some may be remote. - * Messages are sent immediately, so they may start returning data even before we complete this. - * - * @param fragments the fragments - * @throws ForemanException - */ - private void setupNonRootFragments(final Collection fragments) throws ForemanException { - if (fragments.isEmpty()) { - // nothing to do here - return; - } - /* - * We will send a single message to each endpoint, regardless of how many fragments will be - * executed there. We need to start up the intermediate fragments first so that they will be - * ready once the leaf fragments start producing data. To satisfy both of these, we will - * make a pass through the fragments and put them into the remote maps according to their - * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate - * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists. - * - * This will help to schedule local - */ - final Multimap remoteLeafFragmentMap = ArrayListMultimap.create(); - final List localLeafFragmentList = new ArrayList<>(); - final Multimap remoteIntFragmentMap = ArrayListMultimap.create(); - final List localIntFragmentList = new ArrayList<>(); - - final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint(); - // record all fragments for status purposes. - for (final PlanFragment planFragment : fragments) { - - if (logger.isTraceEnabled()) { - logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(), - planFragment.getFragmentJson()); - } - - queryManager.addFragmentStatusTracker(planFragment, false); - - if (planFragment.getLeafFragment()) { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap); - } else { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap); - } - } - - /* - * We need to wait for the intermediates to be sent so that they'll be set up by the time - * the leaves start producing data. We'll use this latch to wait for the responses. - * - * However, in order not to hang the process if any of the RPC requests fails, we always - * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll - * know if any submissions did fail. - */ - scheduleRemoteIntermediateFragments(remoteIntFragmentMap); - - // Setup local intermediate fragments - for (final PlanFragment fragment : localIntFragmentList) { - startLocalFragment(fragment); - } - - injector.injectChecked(queryContext.getExecutionControls(), "send-fragments", ForemanException.class); - /* - * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through - * the regular sendListener event delivery. - */ - for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null); - } - - // Setup local leaf fragments - for (final PlanFragment fragment : localLeafFragmentList) { - startLocalFragment(fragment); - } - } - - /** - * Send all the remote fragments belonging to a single target drillbit in one request. - * - * @param assignment the drillbit assigned to these fragments - * @param fragments the set of fragments - * @param latch the countdown latch used to track the requests to all endpoints - * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints - */ - private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection fragments, - final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) { - @SuppressWarnings("resource") - final Controller controller = drillbitContext.getController(); - final InitializeFragments.Builder fb = InitializeFragments.newBuilder(); - for(final PlanFragment planFragment : fragments) { - fb.addFragment(planFragment); - } - final InitializeFragments initFrags = fb.build(); - - logger.debug("Sending remote fragments to \nNode:\n{} \n\nData:\n{}", assignment, initFrags); - final FragmentSubmitListener listener = - new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures); - controller.getTunnel(assignment).sendFragments(listener, initFrags); - } - - public QueryState getState() { - return state; - } - - /** - * @return sql query text of the query request - */ - public String getQueryText() { - return queryText; - } - - /** - * Used by {@link FragmentSubmitListener} to track the number of submission failures. - */ - private static class FragmentSubmitFailures { - static class SubmissionException { - final DrillbitEndpoint drillbitEndpoint; - final RpcException rpcException; - - SubmissionException(final DrillbitEndpoint drillbitEndpoint, - final RpcException rpcException) { - this.drillbitEndpoint = drillbitEndpoint; - this.rpcException = rpcException; - } - } - - final List submissionExceptions = new LinkedList<>(); - - void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) { - submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException)); - } - } - - private class FragmentSubmitListener extends EndpointListener { - private final CountDownLatch latch; - private final FragmentSubmitFailures fragmentSubmitFailures; - - /** - * Constructor. - * - * @param endpoint the endpoint for the submission - * @param value the initialize fragments message - * @param latch the latch to count down when the status is known; may be null - * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null - */ - public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value, - final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) { - super(endpoint, value); - Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null)); - this.latch = latch; - this.fragmentSubmitFailures = fragmentSubmitFailures; - } - - @Override - public void success(final Ack ack, final ByteBuf byteBuf) { - if (latch != null) { - latch.countDown(); - } - } - - @Override - public void failed(final RpcException ex) { - if (latch != null) { // this block only applies to intermediate fragments - fragmentSubmitFailures.addFailure(endpoint, ex); - latch.countDown(); - } else { // this block only applies to leaf fragments - // since this won't be waited on, we can wait to deliver this event once the Foreman is ready - logger.debug("Failure while sending fragment. Stopping query.", ex); - addToEventQueue(QueryState.FAILED, ex); - } - } - + private class ConnectionClosedListener implements GenericFutureListener> { @Override - public void interrupted(final InterruptedException e) { - // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission. - // Consider the interrupt as failure. - final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission."; - logger.error(errMsg, e); - failed(new RpcException(errMsg, e)); + public void operationComplete(Future future) throws Exception { + cancel(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java new file mode 100644 index 00000000000..ce048486271 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl.InitializeFragments; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { + this.bee = bee; + this.initiatingClient = initiatingClient; + this.drillbitContext = drillbitContext; + this.foreman = foreman; + } + + public WorkerBee getBee() { + return bee; + } + + public void setFragmentsInfo(List planFragments, + PlanFragment rootPlanFragment, + FragmentRoot rootOperator) { + this.planFragments = planFragments; + this.rootPlanFragment = rootPlanFragment; + this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() throws ExecutionSetupException { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId queryId = foreman.getQueryId(); + assert queryId == rootPlanFragment.getHandle().getQueryId(); + + QueryManager queryManager = foreman.getQueryManager(); + drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); + drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); + + logger.debug("Submitting fragments to run."); + // set up the root fragment first so we'll have incoming buffers available. + setupRootFragment(rootPlanFragment, rootOperator); + setupNonRootFragments(planFragments); + logger.debug("Fragments running."); + } + + /** + * Set up the root fragment (which will run locally), and submit it for execution. + * + * @param rootFragment root fragment + * @param rootOperator root operator + * @throws ExecutionSetupException + */ + private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator) throws ExecutionSetupException { + QueryManager queryManager = foreman.getQueryManager(); + final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, foreman.getQueryContext(), + initiatingClient, drillbitContext.getFunctionImplementationRegistry()); + final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext); + final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator); + final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter); + + queryManager.addFragmentStatusTracker(rootFragment, true); + + // FragmentManager is setting buffer for FragmentContext + if (rootContext.isBuffersDone()) { + // if we don't have to wait for any incoming data, start the fragment runner. + bee.addFragmentRunner(rootRunner); + } else { + // if we do, record the fragment manager in the workBus. + drillbitContext.getWorkBus().addFragmentManager(fragmentManager); + } + } + + + /** + * Set up the non-root fragments for execution. Some may be local, and some may be remote. + * Messages are sent immediately, so they may start returning data even before we complete this. + * + * @param fragments the fragments + */ + private void setupNonRootFragments(final Collection fragments) throws ExecutionSetupException { + if (fragments.isEmpty()) { + // nothing to do here + return; + } + /* + * We will send a single message to each endpoint, regardless of how many fragments will be + * executed there. We need to start up the intermediate fragments first so that they will be + * ready once the leaf fragments start producing data. To satisfy both of these, we will + * make a pass through the fragments and put them into the remote maps according to their + * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate + * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists. + * + * This will help to schedule local + */ + final Multimap remoteLeafFragmentMap = ArrayListMultimap.create(); + final List localLeafFragmentList = new ArrayList<>(); + final Multimap remoteIntFragmentMap = ArrayListMultimap.create(); + final List localIntFragmentList = new ArrayList<>(); + + final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint(); + // record all fragments for status purposes. + for (final PlanFragment planFragment : fragments) { + + if (logger.isTraceEnabled()) { + logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(), + planFragment.getFragmentJson()); + } + + foreman.getQueryManager().addFragmentStatusTracker(planFragment, false); + + if (planFragment.getLeafFragment()) { + updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap); + } else { + updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap); + } + } + + /* + * We need to wait for the intermediates to be sent so that they'll be set up by the time + * the leaves start producing data. We'll use this latch to wait for the responses. + * + * However, in order not to hang the process if any of the RPC requests fails, we always + * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll + * know if any submissions did fail. + */ + scheduleRemoteIntermediateFragments(remoteIntFragmentMap); + + // Setup local intermediate fragments + for (final PlanFragment fragment : localIntFragmentList) { + startLocalFragment(fragment); + } + + injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class); + /* + * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through + * the regular sendListener event delivery. + */ + for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) { + sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null); + } + + // Setup local leaf fragments + for (final PlanFragment fragment : localLeafFragmentList) { + startLocalFragment(fragment); + } + } + + /** + * Send all the remote fragments belonging to a single target drillbit in one request. + * + * @param assignment the drillbit assigned to these fragments + * @param fragments the set of fragments + * @param latch the countdown latch used to track the requests to all endpoints + * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints + */ + private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection fragments, + final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) { + @SuppressWarnings("resource") + final Controller controller = drillbitContext.getController(); + final InitializeFragments.Builder fb = InitializeFragments.newBuilder(); + for(final PlanFragment planFragment : fragments) { + fb.addFragment(planFragment); + } + final InitializeFragments initFrags = fb.build(); + + logger.debug("Sending remote fragments to node: {}\nData: {}", assignment, initFrags); + final FragmentSubmitListener listener = + new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures); + controller.getTunnel(assignment).sendFragments(listener, initFrags); + } + + /** + * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node + * and the local Drillbit Endpoint. + * + * @param planFragment plan fragment + * @param localEndPoint local endpoint + * @param localFragmentList local fragment list + * @param remoteFragmentMap remote fragment map + */ + private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint, + final List localFragmentList, + final Multimap remoteFragmentMap) { + final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment(); + + if (assignedDrillbit.equals(localEndPoint)) { + localFragmentList.add(planFragment); + } else { + remoteFragmentMap.put(assignedDrillbit, planFragment); + } + } + + /** + * Send remote intermediate fragment to the assigned Drillbit node. + * Throw exception in case of failure to send the fragment. + * + * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's + */ + private void scheduleRemoteIntermediateFragments(final Multimap remoteFragmentMap) { + + final int numIntFragments = remoteFragmentMap.keySet().size(); + final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments); + final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures(); + + // send remote intermediate fragments + for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) { + sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); + } + + final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments; + if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) { + long numberRemaining = endpointLatch.getCount(); + throw UserException.connectionError() + .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " + + "Sent %d and only heard response back from %d nodes.", + timeout, numIntFragments, numIntFragments - numberRemaining).build(logger); + } + + // if any of the intermediate fragment submissions failed, fail the query + final List submissionExceptions = + fragmentSubmitFailures.submissionExceptions; + + if (submissionExceptions.size() > 0) { + Set endpoints = Sets.newHashSet(); + StringBuilder sb = new StringBuilder(); + boolean first = true; + + for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) { + DrillbitEndpoint endpoint = e.drillbitEndpoint; + if (endpoints.add(endpoint)) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(endpoint.getAddress()); + } + } + throw UserException.connectionError(submissionExceptions.get(0).rpcException) + .message("Error setting up remote intermediate fragment execution") + .addContext("Nodes with failures", sb.toString()).build(logger); + } + } + + + /** + * Start the locally assigned leaf or intermediate fragment + * + * @param fragment fragment + */ + private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException { + logger.debug("Received local fragment start instruction", fragment); + + final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry()); + final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext); + final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter); + + // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf. + if (fragment.getLeafFragment()) { + bee.addFragmentRunner(fragmentExecutor); + } else { + // isIntermediate, store for incoming data. + final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter); + drillbitContext.getWorkBus().addFragmentManager(manager); + } + } + + /** + * Used by {@link FragmentSubmitListener} to track the number of submission failures. + */ + private static class FragmentSubmitFailures { + static class SubmissionException { + final DrillbitEndpoint drillbitEndpoint; + final RpcException rpcException; + + SubmissionException(final DrillbitEndpoint drillbitEndpoint, + final RpcException rpcException) { + this.drillbitEndpoint = drillbitEndpoint; + this.rpcException = rpcException; + } + } + + final List submissionExceptions = new LinkedList<>(); + + void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) { + submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException)); + } + } + + private class FragmentSubmitListener extends EndpointListener { + private final CountDownLatch latch; + private final FragmentSubmitFailures fragmentSubmitFailures; + + /** + * Constructor. + * + * @param endpoint the endpoint for the submission + * @param value the initialize fragments message + * @param latch the latch to count down when the status is known; may be null + * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null + */ + public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value, + final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) { + super(endpoint, value); + Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null)); + this.latch = latch; + this.fragmentSubmitFailures = fragmentSubmitFailures; + } + + @Override + public void success(final GeneralRPCProtos.Ack ack, final ByteBuf byteBuf) { + if (latch != null) { + latch.countDown(); + } + } + + @Override + public void failed(final RpcException ex) { + if (latch != null) { // this block only applies to intermediate fragments + fragmentSubmitFailures.addFailure(endpoint, ex); + latch.countDown(); + } else { // this block only applies to leaf fragments + // since this won't be waited on, we can wait to deliver this event once the Foreman is ready + logger.debug("Failure while sending fragment. Stopping query.", ex); + foreman.addToEventQueue(QueryState.FAILED, ex); + } + } + + @Override + public void interrupted(final InterruptedException e) { + // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission. + // Consider the interrupt as failure. + final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission."; + logger.error(errMsg, e); + failed(new RpcException(errMsg, e)); + } + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 216a80df498..addd8fb8ee1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -280,14 +280,15 @@ public void interrupted(final InterruptedException ex) { } void updateEphemeralState(final QueryState queryState) { - // If query is already in zk transient store, ignore the transient state update option. - // Else, they will not be removed from transient store upon completion. - if (!inTransientStore && - !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) { - return; - } + // If query is already in zk transient store, ignore the transient state update option. + // Else, they will not be removed from transient store upon completion. + if (!inTransientStore && !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) { + return; + } - switch (queryState) { + switch (queryState) { + case PREPARING: + case PLANNING: case ENQUEUED: case STARTING: case RUNNING: @@ -295,15 +296,14 @@ void updateEphemeralState(final QueryState queryState) { runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. inTransientStore = true; break; - case COMPLETED: case CANCELED: case FAILED: try { runningProfileStore.remove(stringQueryId); inTransientStore = false; - } catch(final Exception e) { - logger.warn("Failure while trying to delete the estore profile for this query.", e); + } catch (final Exception e) { + logger.warn("Failure while trying to delete the stored profile for the query [{}]", stringQueryId, e); } break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java new file mode 100644 index 00000000000..24431394550 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.codahale.metrics.Counter; +import org.apache.drill.common.EventProcessor; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.metrics.DrillMetrics; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.foreman.Foreman.ForemanResult; + +/** + * Is responsible for query transition from one state to another, + * incrementing / decrementing query statuses counters. + */ +public class QueryStateProcessor implements AutoCloseable { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStateProcessor.class); + + private static final Counter planningQueries = DrillMetrics.getRegistry().counter("drill.queries.planning"); + private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued"); + private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running"); + private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed"); + private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded"); + private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed"); + private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled"); + + private final StateSwitch stateSwitch = new StateSwitch(); + + private final String queryIdString; + private final QueryManager queryManager; + private final DrillbitContext drillbitContext; + private final ForemanResult foremanResult; + + private volatile QueryState state; + + public QueryStateProcessor(String queryIdString, QueryManager queryManager, DrillbitContext drillbitContext, ForemanResult foremanResult) { + this.queryIdString = queryIdString; + this.queryManager = queryManager; + this.drillbitContext = drillbitContext; + this.foremanResult = foremanResult; + // initial query state is PREPARING + this.state = QueryState.PREPARING; + } + + /** + * @return current query state + */ + public QueryState getState() { + return state; + } + + /** + * Moves one query state to another, will fail when requested query state transition is not allowed. + * + * @param newState new query state + * @param exception exception if failure occurred + */ + public synchronized void moveToState(QueryState newState, Exception exception) { + logger.debug(queryIdString + ": State change requested {} --> {}", state, newState); + + switch (state) { + case PREPARING: + preparing(newState, exception); + return; + case PLANNING: + planning(newState, exception); + return; + case ENQUEUED: + enqueued(newState, exception); + return; + case STARTING: + starting(newState, exception); + return; + case RUNNING: + running(newState, exception); + return; + case CANCELLATION_REQUESTED: + cancellationRequested(newState, exception); + return; + case CANCELED: + case COMPLETED: + case FAILED: + logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state); + return; + } + + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); + } + + /** + * Directly moves query from one state to another and updates ephemeral query store. + * + * @param newState new query state + */ + public void recordNewState(final QueryState newState) { + state = newState; + queryManager.updateEphemeralState(newState); + } + + /** + * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated. + * For preparing, planning and enqueued states we cancel immediately since these states are done locally. + * + * Note this can be called from outside of run() on another thread, or after run() completes + */ + public void cancel() { + switch (state) { + case PREPARING: + case PLANNING: + case ENQUEUED: + moveToState(QueryState.CANCELLATION_REQUESTED, null); + return; + + case STARTING: + case RUNNING: + addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); + return; + + case CANCELLATION_REQUESTED: + case CANCELED: + case COMPLETED: + case FAILED: + // nothing to do + return; + + default: + throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state); + } + } + + /** + * Tells the foreman to move to a new state.
+ * This will be added to the end of the event queue and will be processed once the foreman is ready + * to accept external events. + * + * @param newState the state to move to + * @param exception if not null, the exception that drove this state transition (usually a failure) + */ + public void addToEventQueue(final QueryState newState, final Exception exception) { + stateSwitch.addEvent(newState, exception); + } + + /** + * Starts processing all events that were enqueued while all fragments were sending out. + */ + public void startProcessingEvents() { + try { + stateSwitch.start(); + } catch (Exception ex) { + moveToState(QueryState.FAILED, ex); + } + } + + /** + * On close set proper increment / decrement counters depending on final query state. + */ + @Override + public void close() { + queryManager.markEndTime(); + + switch (state) { + case FAILED: + failedQueries.inc(); + break; + case CANCELED: + canceledQueries.inc(); + break; + case COMPLETED: + succeededQueries.inc(); + break; + } + + runningQueries.dec(); + completedQueries.inc(); + } + + + private void preparing(final QueryState newState, final Exception exception) { + switch (newState) { + case PLANNING: + queryManager.markStartTime(); + runningQueries.inc(); + + recordNewState(newState); + planningQueries.inc(); + return; + case CANCELLATION_REQUESTED: + wrapUpCancellation(); + return; + } + checkCommonStates(newState, exception); + } + + private void planning(final QueryState newState, final Exception exception) { + planningQueries.dec(); + queryManager.markPlanningEndTime(); + switch (newState) { + case ENQUEUED: + recordNewState(newState); + enqueuedQueries.inc(); + return; + case CANCELLATION_REQUESTED: + wrapUpCancellation(); + return; + } + checkCommonStates(newState, exception); + } + + private void enqueued(final QueryState newState, final Exception exception) { + enqueuedQueries.dec(); + queryManager.markQueueWaitEndTime(); + switch (newState) { + case STARTING: + recordNewState(newState); + return; + case CANCELLATION_REQUESTED: + wrapUpCancellation(); + return; + } + checkCommonStates(newState, exception); + } + + private void starting(final QueryState newState, final Exception exception) { + switch (newState) { + case RUNNING: + recordNewState(QueryState.RUNNING); + return; + case COMPLETED: + wrapUpCompletion(); + case CANCELLATION_REQUESTED: + // since during starting state fragments are sent to the remote nodes, + // we don't want to cancel until they all are sent out + addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); + return; + } + + checkCommonStates(newState, exception); + } + + private void running(final QueryState newState, final Exception exception) { + /* + * For cases that cancel executing fragments, we have to record the new + * state first, because the cancellation of the local root fragment will + * cause this to be called recursively. + */ + switch (newState) { + case CANCELLATION_REQUESTED: { + assert exception == null; + recordNewState(QueryState.CANCELLATION_REQUESTED); + queryManager.cancelExecutingFragments(drillbitContext); + foremanResult.setCompleted(QueryState.CANCELED); + /* + * We don't close the foremanResult until we've gotten + * acknowledgments, which happens below in the case for current state + * == CANCELLATION_REQUESTED. + */ + return; + } + + case COMPLETED: { + wrapUpCompletion(); + return; + } + } + checkCommonStates(newState, exception); + } + + private void cancellationRequested(final QueryState newState, final Exception exception) { + switch (newState) { + case FAILED: + if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) { + assert exception != null; + recordNewState(QueryState.FAILED); + foremanResult.setForceFailure(exception); + } + + // proceed + + case CANCELED: + case COMPLETED: + /* + * These amount to a completion of the cancellation requests' cleanup; + * now we can clean up and send the result. + */ + foremanResult.close(); + return; + } + + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); + } + + private void wrapUpCancellation() { + recordNewState(QueryState.CANCELLATION_REQUESTED); + foremanResult.setCompleted(QueryState.CANCELED); + } + + private void wrapUpCompletion() { + recordNewState(QueryState.COMPLETED); + foremanResult.setCompleted(QueryState.COMPLETED); + foremanResult.close(); + } + + private void checkCommonStates(final QueryState newState, final Exception exception) { + switch (newState) { + case FAILED: + assert exception != null; + recordNewState(QueryState.FAILED); + queryManager.cancelExecutingFragments(drillbitContext); + foremanResult.setFailed(exception); + foremanResult.close(); + return; + } + + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); + } + + private class StateEvent { + final QueryState newState; + final Exception exception; + + StateEvent(final QueryState newState, final Exception exception) { + this.newState = newState; + this.exception = exception; + } + } + + private class StateSwitch extends EventProcessor { + public void addEvent(final QueryState newState, final Exception exception) { + sendEvent(new StateEvent(newState, exception)); + } + + @Override + protected void processEvent(final StateEvent event) { + moveToState(event.newState, event.exception); + } + } + +} diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index a0cf643baac..cb66ca35ed0 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -306,7 +306,7 @@ drill.exec: { size: 2, // Maximum wait time in the queue before the query times out and // fails. - timeout: 5000 // 5 seconds + timeout_ms: 5000 // 5 seconds } } memory: { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index 956cfc48747..ec101d89ee0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -31,6 +31,7 @@ import java.util.Map; import org.apache.commons.math3.util.Pair; +import org.apache.drill.exec.work.foreman.FragmentsRunner; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.QueryTestUtil; import org.apache.drill.SingleRowListener; @@ -757,7 +758,7 @@ public void failsWhenSendingFragments() { final String exceptionDesc = "send-fragments"; final Class exceptionClass = ForemanException.class; final String controls = Controls.newBuilder() - .addException(Foreman.class, exceptionDesc, exceptionClass) + .addException(FragmentsRunner.class, exceptionDesc, exceptionClass) .build(); assertFailsWithException(controls, exceptionClass, exceptionDesc); diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 51cdab7d38d..edc401c649a 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -10377,6 +10377,22 @@ public enum QueryState * */ ENQUEUED(6, 6), + /** + * PREPARING = 7; + * + *
+       * query is at preparation stage, foreman is initializing
+       * 
+ */ + PREPARING(7, 7), + /** + * PLANNING = 8; + * + *
+       * query is at planning stage (includes logical or / and physical planning)
+       * 
+ */ + PLANNING(8, 8), ; /** @@ -10427,6 +10443,22 @@ public enum QueryState * */ public static final int ENQUEUED_VALUE = 6; + /** + * PREPARING = 7; + * + *
+       * query is at preparation stage, foreman is initializing
+       * 
+ */ + public static final int PREPARING_VALUE = 7; + /** + * PLANNING = 8; + * + *
+       * query is at planning stage (includes logical or / and physical planning)
+       * 
+ */ + public static final int PLANNING_VALUE = 8; public final int getNumber() { return value; } @@ -10440,6 +10472,8 @@ public static QueryState valueOf(int value) { case 4: return FAILED; case 5: return CANCELLATION_REQUESTED; case 6: return ENQUEUED; + case 7: return PREPARING; + case 8: return PLANNING; default: return null; } } @@ -23942,92 +23976,93 @@ public Builder clearStatus() { "ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" + "ngth\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNod" + "eStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footp" + - "rint\030\002 \001(\003\"\225\002\n\013QueryResult\0228\n\013query_stat" + + "rint\030\002 \001(\003\"\263\002\n\013QueryResult\0228\n\013query_stat" + "e\030\001 \001(\0162#.exec.shared.QueryResult.QueryS", "tate\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Que" + "ryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillP" + - "BError\"z\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RU" + - "NNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006" + - "FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010" + - "ENQUEUED\020\006\"p\n\tQueryData\022&\n\010query_id\030\001 \001(" + - "\0132\024.exec.shared.QueryId\022\021\n\trow_count\030\002 \001" + - "(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.RecordBatc" + - "hDef\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005sta" + - "rt\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.Qu", - "eryResult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n" + - "\007foreman\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024" + - "\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001" + - "\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile" + - "\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004ty" + - "pe\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n\005star" + - "t\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004p" + - "lan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.Drillb" + - "itEndpoint\0222\n\005state\030\010 \001(\0162#.exec.shared." + - "QueryResult.QueryState\022\027\n\017total_fragment", - "s\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020f" + - "ragment_profile\030\013 \003(\0132!.exec.shared.Majo" + - "rFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005err" + - "or\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_" + - "id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_" + - "json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWait" + - "End\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_n" + - "ame\030\025 \001(\t:\001-\"t\n\024MajorFragmentProfile\022\031\n\021" + - "major_fragment_id\030\001 \001(\005\022A\n\026minor_fragmen" + - "t_profile\030\002 \003(\0132!.exec.shared.MinorFragm", - "entProfile\"\350\002\n\024MinorFragmentProfile\022)\n\005s" + - "tate\030\001 \001(\0162\032.exec.shared.FragmentState\022(" + - "\n\005error\030\002 \001(\0132\031.exec.shared.DrillPBError" + - "\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020operator_" + - "profile\030\004 \003(\0132\034.exec.shared.OperatorProf" + - "ile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(" + - "\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_use" + - "d\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbi" + - "tEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_p" + - "rogress\030\013 \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinp", - "ut_profile\030\001 \003(\0132\032.exec.shared.StreamPro" + - "file\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_ty" + - "pe\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess" + - "_nanos\030\006 \001(\003\022#\n\033peak_local_memory_alloca" + - "ted\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.shared." + - "MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStrea" + - "mProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001" + - "(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tme" + - "tric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014dou" + - "ble_value\030\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\013", - "2\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022" + - "\032\n\022function_signature\030\002 \003(\t\"W\n\013SaslMessa" + - "ge\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006s" + - "tatus\030\003 \001(\0162\027.exec.shared.SaslStatus*5\n\n" + - "RpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020" + - "\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOG" + - "ICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022P" + - "REPARED_STATEMENT\020\005*\207\001\n\rFragmentState\022\013\n" + - "\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007R" + - "UNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n", - "\006FAILED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\360\005" + - "\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n" + - "\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_" + - "AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN" + - "\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007" + - "\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTIT" + - "ION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_R" + - "ECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022" + - "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" + - "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN", - "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" + - "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" + - "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" + - "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" + - "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" + - "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" + - "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" + - "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" + - "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" + - "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSasl", - "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" + - "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" + - "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" + - "ec.protoB\rUserBitSharedH\001" + "BError\"\227\001\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007R" + + "UNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n" + + "\006FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n" + + "\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007\022\014\n\010PLANNING\020\010" + + "\"p\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.s" + + "hared.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030" + + "\003 \001(\0132\033.exec.shared.RecordBatchDef\"\330\001\n\tQ" + + "ueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222", + "\n\005state\030\003 \001(\0162#.exec.shared.QueryResult." + + "QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005" + + " \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014options_" + + "json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqueue_" + + "name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile\022 \n\002id\030\001 \001" + + "(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001(\0162\026" + + ".exec.shared.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n" + + "\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022" + + "\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndpoint" + + "\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryResul", + "t.QueryState\022\027\n\017total_fragments\030\t \001(\005\022\032\n" + + "\022finished_fragments\030\n \001(\005\022;\n\020fragment_pr" + + "ofile\030\013 \003(\0132!.exec.shared.MajorFragmentP" + + "rofile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024" + + "\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022" + + "\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021 \001(\t" + + "\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022" + + "\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 \001(\t:" + + "\001-\"t\n\024MajorFragmentProfile\022\031\n\021major_frag" + + "ment_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030", + "\002 \003(\0132!.exec.shared.MinorFragmentProfile" + + "\"\350\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\016" + + "2\032.exec.shared.FragmentState\022(\n\005error\030\002 " + + "\001(\0132\031.exec.shared.DrillPBError\022\031\n\021minor_" + + "fragment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 " + + "\003(\0132\034.exec.shared.OperatorProfile\022\022\n\nsta" + + "rt_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memor" + + "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" + + "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" + + "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 ", + "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" + + "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" + + "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" + + "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" + + "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" + + "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" + + "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" + + "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" + + "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " + + "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030", + "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" + + "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" + + "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" + + "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" + + "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" + + "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" + + "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" + + "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" + + "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" + + "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014", + "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" + + "\032\n\026CANCELLATION_REQUESTED\020\006*\360\005\n\020CoreOper" + + "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" + + "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" + + "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" + + "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" + + "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" + + "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" + + "\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTI" + + "ON_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGA", + "TE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022" + + "\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026P" + + "ARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN" + + "\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SC" + + "AN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_S" + + "CAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020" + + "\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_" + + "SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_" + + "CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW" + + "\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCA", + "N\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSaslStatus\022\020\n\014" + + "SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_I" + + "N_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_F" + + "AILED\020\004B.\n\033org.apache.drill.exec.protoB\r" + + "UserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java index 7b2a27373e4..a53dc420b9f 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java @@ -43,7 +43,9 @@ public enum QueryState implements com.dyuproject.protostuff.EnumLite CANCELED(3), FAILED(4), CANCELLATION_REQUESTED(5), - ENQUEUED(6); + ENQUEUED(6), + PREPARING(7), + PLANNING(8); public final int number; @@ -68,6 +70,8 @@ public static QueryState valueOf(int number) case 4: return FAILED; case 5: return CANCELLATION_REQUESTED; case 6: return ENQUEUED; + case 7: return PREPARING; + case 8: return PLANNING; default: return null; } } diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 086b98a0754..205611b26b8 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -167,6 +167,8 @@ message QueryResult { FAILED = 4; CANCELLATION_REQUESTED = 5; // cancellation has been requested, and is being processed ENQUEUED = 6; // query has been enqueued. this is pre-starting. + PREPARING = 7; // query is at preparation stage, foreman is initializing + PLANNING = 8; // query is at planning stage (includes logical or / and physical planning) } optional QueryState query_state = 1;