Skip to content

Commit

Permalink
DRILL-5963: Query state process improvements
Browse files Browse the repository at this point in the history
1. Added two new query states: PREPARING (when foreman is initialized) and PLANNING (includes logical and / or physical planning).
2. Ability to cancel query during planning and enqueued states was added.
3. Logic for submitting fragments was moved from Foreman to new class FragmentsRunner.
4. Logic for moving query from to new state and incrementing / decrementing query counters was moved into QueryStateProcessor class.
5. Major type in DrillFuncHolderExpr was cached for better performance.

closes #1051
  • Loading branch information
arina-ielchiieva committed Jan 2, 2018
1 parent adee461 commit 0343518
Show file tree
Hide file tree
Showing 13 changed files with 1,166 additions and 821 deletions.
Expand Up @@ -27,18 +27,21 @@
import org.apache.drill.exec.expr.fn.DrillFuncHolder;

public class DrillFuncHolderExpr extends FunctionHolderExpression implements Iterable<LogicalExpression>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class);
private DrillFuncHolder holder;
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class);
private final DrillFuncHolder holder;
private final MajorType majorType;
private DrillSimpleFunc interpreter;

public DrillFuncHolderExpr(String nameUsed, DrillFuncHolder holder, List<LogicalExpression> args, ExpressionPosition pos) {
super(nameUsed, pos, args);
this.holder = holder;
// since function return type can not be changed, cache it for better performance
this.majorType = holder.getReturnType(args);
}

@Override
public MajorType getMajorType() {
return holder.getReturnType(args);
return majorType;
}

@Override
Expand Down
Expand Up @@ -19,30 +19,35 @@

import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;

import java.util.HashMap;
import java.util.Map;

public class ProfileUtil {
// Display names for QueryState enum in UserBitShared.proto
private static final String[] queryStateDisplayNames = {
"Starting", // STARTING = 0
"Running", // RUNNING = 1
"Succeeded", // COMPLETED = 2
"Canceled", // CANCELED = 3
"Failed", // FAILED = 4
"CancellationRequested", // CANCELLATION_REQUESTED = 5
"Enqueued" // ENQUEUED = 6
};

private static final Map<QueryState, String> queryStateDisplayMap = new HashMap<>(QueryState.values().length);

static {
queryStateDisplayMap.put(QueryState.PREPARING, "Preparing");
queryStateDisplayMap.put(QueryState.PLANNING, "Planning");
queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued");
queryStateDisplayMap.put(QueryState.STARTING, "Starting");
queryStateDisplayMap.put(QueryState.RUNNING, "Running");
queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded");
queryStateDisplayMap.put(QueryState.FAILED, "Failed");
queryStateDisplayMap.put(QueryState.CANCELLATION_REQUESTED, "Cancellation Requested");
queryStateDisplayMap.put(QueryState.CANCELED, "Canceled");
}

/**
* Utility to return display name for query state
* @param queryState
* Utility method to return display name for query state
* @param queryState query state
* @return display string for query state
*/
public final static String getQueryStateDisplayName(QueryState queryState) {
int queryStateOrdinal = queryState.getNumber();
if (queryStateOrdinal >= queryStateDisplayNames.length) {
return queryState.name();
} else {
return queryStateDisplayNames[queryStateOrdinal];
public static String getQueryStateDisplayName(QueryState queryState) {
String displayName = queryStateDisplayMap.get(queryState);
if (displayName == null) {
displayName = queryState.name();
}
return displayName;
}
}
Expand Up @@ -202,8 +202,10 @@ public String getQueuedDuration() {
}

public String getExecutionDuration() {
//Check if State is STARTING or RUNNING
if (profile.getState() == QueryState.STARTING ||
//Check if State is PREPARING, PLANNING, STARTING, ENQUEUED or RUNNING
if (profile.getState() == QueryState.PREPARING ||
profile.getState() == QueryState.PLANNING ||
profile.getState() == QueryState.STARTING ||
profile.getState() == QueryState.ENQUEUED ||
profile.getState() == QueryState.RUNNING) {
return NOT_AVAILABLE_LABEL;
Expand Down
Expand Up @@ -23,11 +23,14 @@
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.foreman.ForemanSetupException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import org.codehaus.jackson.map.ObjectMapper;

public class QueryWorkUnit {

Expand Down Expand Up @@ -112,4 +115,46 @@ public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
fragments.add(defn.applyPlan(reader));
}
}

/**
* Converts list of stored fragments into their string representation,
* in case of exception returns text indicating that string was malformed.
* Is used for debugging purposes.
*
* @return fragments information
*/
public String stringifyFragments() {
StringBuilder stringBuilder = new StringBuilder();
final int fragmentCount = fragments.size();
int fragmentIndex = 0;
for (final PlanFragment planFragment : fragments) {
final ExecProtos.FragmentHandle fragmentHandle = planFragment.getHandle();
stringBuilder.append("PlanFragment(");
stringBuilder.append(++fragmentIndex);
stringBuilder.append('/');
stringBuilder.append(fragmentCount);
stringBuilder.append(") major_fragment_id ");
stringBuilder.append(fragmentHandle.getMajorFragmentId());
stringBuilder.append(" minor_fragment_id ");
stringBuilder.append(fragmentHandle.getMinorFragmentId());
stringBuilder.append('\n');

final CoordinationProtos.DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
stringBuilder.append(" DrillbitEndpoint address ");
stringBuilder.append(endpointAssignment.getAddress());
stringBuilder.append('\n');

String jsonString = "<<malformed JSON>>";
stringBuilder.append(" fragment_json: ");
final ObjectMapper objectMapper = new ObjectMapper();
try {
final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
} catch (final Exception e) {
// we've already set jsonString to a fallback value
}
stringBuilder.append(jsonString);
}
return stringBuilder.toString();
}
}

0 comments on commit 0343518

Please sign in to comment.