Skip to content

Commit

Permalink
DRILL-6879: Show warnings for potential performance issues
Browse files Browse the repository at this point in the history
1. Introduced warning for non-progressive fragments. Based on a threshold (`drill.exec.http.profile.warning.progress.threshold`), if all fragments have not made progress within that time, a warning is issued. The default is 5 minutes (300 sec)

2. Introduced a warning if any of the buffered operators spill to disk.

3. Introduced a warning for operators where the longest running fragment runs beyond a minimum threshold (drill.exec.http.profile.warning.time.skew.min), and runs atleast 2 times longer than the average (drill.exec.http.profile.warning.time.skew.ratio.process). The clock symbol with a tooltip indicates the extent of the skew. For wait times, the ratio is defined by `drill.exec.http.profile.warning.time.skew.ratio.wait`

3. Introduced a warning for operators where the average wait time of a scan operator exceeds its processing time, for a minimum threshold (drill.exec.http.profile.warning.scan.wait.min). The turtle symbol with a tooltip indicates which scan operator spent more time waiting than processing.

4. TableBuilder Refactored
 a. Using attribute map instead of String arguments, eg. for 'title'
 b. Removed APIs that pass a hyperlink since that is never used.
closes #1572
  • Loading branch information
Kunal Khatua authored and gparai committed Jan 4, 2019
1 parent 10b1059 commit e65079a
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 152 deletions.
Expand Up @@ -785,6 +785,17 @@ private ExecConstants() {
public static final BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED,
new OptionDescription("Enables users to dynamically upload UDFs. Users must upload their UDF (source and binary) JAR files to a staging directory in the distributed file system before issuing the CREATE FUNCTION USING JAR command to register a UDF. Default is true. (Drill 1.9+)"));

//Trigger warning in UX if fragments appear to be doing no work (units are in seconds).
public static final String PROFILE_WARNING_PROGRESS_THRESHOLD = "drill.exec.http.profile.warning.progress.threshold";
//Trigger warning in UX if slowest fragment operator crosses min threshold and exceeds ratio with average (units are in seconds).
public static final String PROFILE_WARNING_TIME_SKEW_MIN = "drill.exec.http.profile.warning.time.skew.min";
//Threshold Ratio for Processing (i.e. "maxProcessing : avgProcessing" ratio must exceed this defined threshold to show a skew warning)
public static final String PROFILE_WARNING_TIME_SKEW_RATIO_PROCESS = "drill.exec.http.profile.warning.time.skew.ratio.process";
//Trigger warning in UX if slowest fragment SCAN crosses min threshold and exceeds ratio with average (units are in seconds).
public static final String PROFILE_WARNING_SCAN_WAIT_MIN = "drill.exec.http.profile.warning.scan.wait.min";
//Threshold Ratio for Waiting (i.e. "maxWait : avgWait" ratio must exceed this defined threshold to show a skew warning)
public static final String PROFILE_WARNING_TIME_SKEW_RATIO_WAIT = "drill.exec.http.profile.warning.time.skew.ratio.wait";

/**
* Option to save query profiles. If false, no query profile will be saved
* for any query.
Expand Down
Expand Up @@ -22,12 +22,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.StreamProfile;

import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Collections2;

Expand All @@ -37,10 +39,13 @@
public class FragmentWrapper {
private final MajorFragmentProfile major;
private final long start;
private final int runningProfileProgressThreshold;

public FragmentWrapper(final MajorFragmentProfile major, final long start) {
public FragmentWrapper(final MajorFragmentProfile major, final long start, DrillConfig config) {
this.major = Preconditions.checkNotNull(major);
this.start = start;
//Threshold to track if query made no progress in specified elapsed time
runningProfileProgressThreshold = config.getInt(ExecConstants.PROFILE_WARNING_PROGRESS_THRESHOLD);
}

public String getDisplayName() {
Expand Down Expand Up @@ -83,7 +88,7 @@ public void addSummary(TableBuilder tb) {

// If there are no stats to aggregate, create an empty row
if (complete.size() < 1) {
tb.appendRepeated("", null, NUM_NULLABLE_ACTIVE_OVERVIEW_COLUMNS);
tb.appendRepeated("", NUM_NULLABLE_ACTIVE_OVERVIEW_COLUMNS);
return;
}

Expand Down Expand Up @@ -118,15 +123,23 @@ public void addSummary(TableBuilder tb) {
tb.appendMillis(cumulativeFragmentDurationInMillis / complete.size());
tb.appendMillis(longRun.getEndTime() - longRun.getStartTime());

tb.appendPercent(totalProcessInMillis / (totalProcessInMillis + totalWaitInMillis), null,
//#8721 is the summation sign: sum(Busy): ## + sum(Wait): ##
Map<String, String> percBusyAttrMap = new HashMap<>();
//#8721 is the summation sign: sum(Busy): ## + sum(Wait): ##
percBusyAttrMap.put(HtmlAttribute.TITLE,
String.format("&#8721;Busy: %,.2fs + &#8721;Wait: %,.2fs", totalProcessInMillis/1E3, totalWaitInMillis/1E3));
tb.appendPercent(totalProcessInMillis / (totalProcessInMillis + totalWaitInMillis), percBusyAttrMap);

final MinorFragmentProfile lastUpdate = Collections.max(complete, Comparators.lastUpdate);
tb.appendMillis(System.currentTimeMillis()-lastUpdate.getLastUpdate());

final MinorFragmentProfile lastProgress = Collections.max(complete, Comparators.lastProgress);
tb.appendMillis(System.currentTimeMillis()-lastProgress.getLastProgress());
long elapsedSinceLastProgress = System.currentTimeMillis()-lastProgress.getLastProgress();
Map<String, String> lastProgressAttrMap = null;
if (elapsedSinceLastProgress > TimeUnit.SECONDS.toMillis(runningProfileProgressThreshold)) {
lastProgressAttrMap = new HashMap<>();
lastProgressAttrMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_NO_PROGRESS_TAG);
}
tb.appendMillis(elapsedSinceLastProgress, lastProgressAttrMap);

// TODO(DRILL-3494): Names (maxMem, getMaxMemoryUsed) are misleading; the value is peak memory allocated to fragment
final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragmentPeakMemory);
Expand Down Expand Up @@ -162,7 +175,7 @@ public void addFinalSummary(TableBuilder tb) {

// If there are no stats to aggregate, create an empty row
if (complete.size() < 1) {
tb.appendRepeated("", null, NUM_NULLABLE_COMPLETED_OVERVIEW_COLUMNS);
tb.appendRepeated("", NUM_NULLABLE_COMPLETED_OVERVIEW_COLUMNS);
return;
}

Expand Down Expand Up @@ -195,9 +208,11 @@ public void addFinalSummary(TableBuilder tb) {
tb.appendMillis(totalDuration / complete.size());
tb.appendMillis(longRun.getEndTime() - longRun.getStartTime());

tb.appendPercent(totalProcessInMillis / (totalProcessInMillis + totalWaitInMillis), null,
//#8721 is the summation sign: sum(Busy): ## + sum(Wait): ##
Map<String, String> percBusyAttrMap = new HashMap<>();
//#8721 is the summation sign: sum(Busy): ## + sum(Wait): ##
percBusyAttrMap.put(HtmlAttribute.TITLE,
String.format("&#8721;Busy: %,.2fs + &#8721;Wait: %,.2fs", totalProcessInMillis/1E3, totalWaitInMillis/1E3));
tb.appendPercent(totalProcessInMillis / (totalProcessInMillis + totalWaitInMillis), percBusyAttrMap);

// TODO(DRILL-3494): Names (maxMem, getMaxMemoryUsed) are misleading; the value is peak memory allocated to fragment
final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragmentPeakMemory);
Expand Down Expand Up @@ -231,9 +246,9 @@ public String getContent() {

Collections.sort(complete, Comparators.minorId);

Map<String, String> attributeMap = new HashMap<String, String>(); //Reusing for different fragments
Map<String, String> attributeMap = new HashMap<>(); //Reusing for different fragments
for (final MinorFragmentProfile minor : complete) {
final ArrayList<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
final List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());

long biggestIncomingRecords = 0;
long biggestBatches = 0;
Expand Down Expand Up @@ -267,7 +282,7 @@ public String getContent() {

for (final MinorFragmentProfile m : incomplete) {
builder.appendCell(major.getMajorFragmentId() + "-" + m.getMinorFragmentId());
builder.appendRepeated(m.getState().toString(), null, NUM_NULLABLE_FRAGMENTS_COLUMNS);
builder.appendRepeated(m.getState().toString(), NUM_NULLABLE_FRAGMENTS_COLUMNS);
}
return builder.build();
}
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.server.rest.profile;

/**
* Define all attributes and values that can be injected by various Wrapper classes in org.apache.drill.exec.server.rest.*
*/
public class HtmlAttribute {
//Attributes
public static final String CLASS = "class";
public static final String DATA_ORDER = "data-order";
public static final String TITLE = "title";
public static final String SPILLS = "spills";
public static final String STYLE = "style";

//Values
public static final String CLASS_VALUE_SPILL_TAG = "spill-tag";
public static final String CLASS_VALUE_NO_PROGRESS_TAG = "no-progress-tag";
public static final String CLASS_VALUE_TIME_SKEW_TAG = "time-skew-tag";
public static final String CLASS_VALUE_SCAN_WAIT_TAG = "scan-wait-tag";
public static final String STYLE_VALUE_CURSOR_HELP = "cursor:help;";
}
Expand Up @@ -25,14 +25,16 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OperatorMetricRegistry;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.proto.UserBitShared.MetricValue;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.StreamProfile;

import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;

/**
Expand All @@ -42,10 +44,6 @@ public class OperatorWrapper {
@SuppressWarnings("unused")
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorWrapper.class);

private static final String HTML_ATTRIB_SPILLS = "spills";
private static final String HTML_ATTRIB_CLASS = "class";
private static final String HTML_ATTRIB_STYLE = "style";
private static final String HTML_ATTRIB_TITLE = "title";
private static final DecimalFormat DECIMAL_FORMATTER = new DecimalFormat("#.##");
private static final String UNKNOWN_OPERATOR = "UNKNOWN_OPERATOR";
//Negative valued constant used for denoting invalid index to indicate absence of metric
Expand All @@ -56,8 +54,19 @@ public class OperatorWrapper {
private final CoreOperatorType operatorType;
private final String operatorName;
private final int size;
private final int timeSkewMin;
private final double timeSkewRatio;
private final int scanWaitMin;
private final double waitSkewRatio;

public OperatorWrapper(int major, List<ImmutablePair<ImmutablePair<OperatorProfile, Integer>, String>> opsAndHostsList, Map<String, String> phyOperMap, DrillConfig config) {
//Threshold to track if the slowest operator ran relatively slow
timeSkewMin = config.getInt(ExecConstants.PROFILE_WARNING_TIME_SKEW_MIN);
timeSkewRatio = config.getDouble(ExecConstants.PROFILE_WARNING_TIME_SKEW_RATIO_PROCESS);
//Threshold to track if the slowest SCAN operator spent more time in wait than processing
scanWaitMin = config.getInt(ExecConstants.PROFILE_WARNING_SCAN_WAIT_MIN);
waitSkewRatio = config.getDouble(ExecConstants.PROFILE_WARNING_TIME_SKEW_RATIO_WAIT);

public OperatorWrapper(int major, List<ImmutablePair<ImmutablePair<OperatorProfile, Integer>, String>> opsAndHostsList, Map<String, String> phyOperMap) {
Preconditions.checkArgument(opsAndHostsList.size() > 0);
this.major = major;
firstProfile = opsAndHostsList.get(0).getLeft().getLeft();
Expand Down Expand Up @@ -102,12 +111,12 @@ public String getId() {
public String getContent() {
TableBuilder builder = new TableBuilder(OPERATOR_COLUMNS, OPERATOR_COLUMNS_TOOLTIP, true);

Map<String, String> attributeMap = new HashMap<String, String>(); //Reusing for different fragments
Map<String, String> attributeMap = new HashMap<>(); //Reusing for different fragments
for (ImmutablePair<ImmutablePair<OperatorProfile, Integer>, String> ip : opsAndHosts) {
int minor = ip.getLeft().getRight();
OperatorProfile op = ip.getLeft().getLeft();

attributeMap.put("data-order", String.valueOf(minor)); //Overwrite values from previous fragments
attributeMap.put(HtmlAttribute.DATA_ORDER, String.valueOf(minor)); //Overwrite values from previous fragments
String path = new OperatorPathBuilder().setMajor(major).setMinor(minor).setOperator(op).build();
builder.appendCell(path, attributeMap);
builder.appendCell(ip.getRight());
Expand Down Expand Up @@ -150,17 +159,18 @@ public String getContent() {
//Palette to help shade operators sharing a common major fragment
private static final String[] OPERATOR_OVERVIEW_BGCOLOR_PALETTE = {"#ffffff","#f2f2f2"};

public void addSummary(TableBuilder tb, HashMap<String, Long> majorFragmentBusyTally, long majorFragmentBusyTallyTotal) {
public void addSummary(TableBuilder tb, Map<String, Long> majorFragmentBusyTally, long majorFragmentBusyTallyTotal) {
//Select background color from palette
String opTblBgColor = OPERATOR_OVERVIEW_BGCOLOR_PALETTE[major%OPERATOR_OVERVIEW_BGCOLOR_PALETTE.length];
String path = new OperatorPathBuilder().setMajor(major).setOperator(firstProfile).build();
tb.appendCell(path, null, null, opTblBgColor);
tb.appendCell(path, opTblBgColor, null);
tb.appendCell(operatorName);

//Check if spill information is available
int spillCycleMetricIndex = getSpillCycleMetricIndex(operatorType);
boolean isSpillableOp = (spillCycleMetricIndex != NO_SPILL_METRIC_INDEX);
boolean hasSpilledToDisk = false;
boolean isScanOp = operatorName.endsWith("SCAN");

//Get MajorFragment Busy+Wait Time Tally
long majorBusyNanos = majorFragmentBusyTally.get(new OperatorPathBuilder().setMajor(major).build());
Expand Down Expand Up @@ -208,15 +218,53 @@ public void addSummary(TableBuilder tb, HashMap<String, Long> majorFragmentBusyT
tb.appendNanos(Math.round(setupSum / size));
tb.appendNanos(longSetup.getLeft().getSetupNanos());

Map<String, String> timeSkewMap = null;
final ImmutablePair<OperatorProfile, Integer> longProcess = Collections.max(opList, Comparators.processTime);
tb.appendNanos(Math.round(processSum / size));
tb.appendNanos(longProcess.getLeft().getProcessNanos());
//Calculating average processing time
long avgProcTime = Math.round(processSum / size);
tb.appendNanos(avgProcTime);
long maxProcTime = longProcess.getLeft().getProcessNanos();
//Calculating skew of longest processing fragment w.r.t. average
double maxSkew = (avgProcTime > 0) ? maxProcTime/Double.valueOf(avgProcTime) : 0.0d;
//Marking skew if both thresholds are crossed
if (avgProcTime > TimeUnit.SECONDS.toNanos(timeSkewMin) && maxSkew > timeSkewRatio ) {
timeSkewMap = new HashMap<>();
timeSkewMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_TIME_SKEW_TAG);
timeSkewMap.put(HtmlAttribute.TITLE, "One fragment took " + DECIMAL_FORMATTER.format(maxSkew) + " longer than average");
timeSkewMap.put(HtmlAttribute.STYLE, HtmlAttribute.STYLE_VALUE_CURSOR_HELP);
}
tb.appendNanos(maxProcTime, timeSkewMap);

final ImmutablePair<OperatorProfile, Integer> shortWait = Collections.min(opList, Comparators.waitTime);
final ImmutablePair<OperatorProfile, Integer> longWait = Collections.max(opList, Comparators.waitTime);
tb.appendNanos(shortWait.getLeft().getWaitNanos());
tb.appendNanos(Math.round(waitSum / size));
tb.appendNanos(longWait.getLeft().getWaitNanos());
//Calculating average wait time for fragment
long avgWaitTime = Math.round(waitSum / size);

//Slow Scan Warning
Map<String, String> slowScanMap = null;
//Marking slow scan if threshold is crossed and wait was longer than processing
if (isScanOp && (avgWaitTime > TimeUnit.SECONDS.toNanos(scanWaitMin)) && (avgWaitTime > avgProcTime)) {
slowScanMap = new HashMap<>();
slowScanMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_SCAN_WAIT_TAG);
slowScanMap.put(HtmlAttribute.TITLE, "Avg Wait Time &gt; Avg Processing Time");
slowScanMap.put(HtmlAttribute.STYLE, HtmlAttribute.STYLE_VALUE_CURSOR_HELP);
}
tb.appendNanos(avgWaitTime, slowScanMap);

long maxWaitTime = longWait.getLeft().getWaitNanos();
//Skewed Wait Warning
timeSkewMap = null; //Resetting
//Calculating skew of longest waiting fragment w.r.t. average
maxSkew = (avgWaitTime > 0) ? maxWaitTime/Double.valueOf(avgWaitTime) : 0.0d;
//Marking skew if both thresholds are crossed
if (avgWaitTime > TimeUnit.SECONDS.toNanos(timeSkewMin) && maxSkew > waitSkewRatio) {
timeSkewMap = new HashMap<>();
timeSkewMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_TIME_SKEW_TAG);
timeSkewMap.put(HtmlAttribute.TITLE, "One fragment waited " + DECIMAL_FORMATTER.format(maxSkew) + " longer than average");
timeSkewMap.put(HtmlAttribute.STYLE, HtmlAttribute.STYLE_VALUE_CURSOR_HELP);
}
tb.appendNanos(maxWaitTime, timeSkewMap);

tb.appendPercent(processSum / majorBusyNanos);
tb.appendPercent(processSum / majorFragmentBusyTallyTotal);
Expand All @@ -232,15 +280,15 @@ public void addSummary(TableBuilder tb, HashMap<String, Long> majorFragmentBusyT
avgSpillMap = new HashMap<>();
//Average SpillCycle
double avgSpillCycle = spillCycleSum/size;
avgSpillMap.put(HTML_ATTRIB_TITLE, DECIMAL_FORMATTER.format(avgSpillCycle) + " spills on average");
avgSpillMap.put(HTML_ATTRIB_STYLE, "cursor:help;" + spillCycleMax);
avgSpillMap.put(HTML_ATTRIB_CLASS, "spill-tag"); //JScript will inject Icon
avgSpillMap.put(HTML_ATTRIB_SPILLS, DECIMAL_FORMATTER.format(avgSpillCycle)); //JScript will inject Count
avgSpillMap.put(HtmlAttribute.TITLE, DECIMAL_FORMATTER.format(avgSpillCycle) + " spills on average");
avgSpillMap.put(HtmlAttribute.STYLE, HtmlAttribute.STYLE_VALUE_CURSOR_HELP);
avgSpillMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_SPILL_TAG); //JScript will inject Icon
avgSpillMap.put(HtmlAttribute.SPILLS, DECIMAL_FORMATTER.format(avgSpillCycle)); //JScript will inject Count
maxSpillMap = new HashMap<>();
maxSpillMap.put(HTML_ATTRIB_TITLE, "Most # spills: " + spillCycleMax);
maxSpillMap.put(HTML_ATTRIB_STYLE, "cursor:help;" + spillCycleMax);
maxSpillMap.put(HTML_ATTRIB_CLASS, "spill-tag"); //JScript will inject Icon
maxSpillMap.put(HTML_ATTRIB_SPILLS, String.valueOf(spillCycleMax)); //JScript will inject Count
maxSpillMap.put(HtmlAttribute.TITLE, "Most # spills: " + spillCycleMax);
maxSpillMap.put(HtmlAttribute.STYLE, HtmlAttribute.STYLE_VALUE_CURSOR_HELP);
maxSpillMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_SPILL_TAG); //JScript will inject Icon
maxSpillMap.put(HtmlAttribute.SPILLS, String.valueOf(spillCycleMax)); //JScript will inject Count
}

tb.appendBytes(Math.round(memSum / size), avgSpillMap);
Expand Down Expand Up @@ -312,7 +360,7 @@ public String getMetricsTable() {

final Number[] values = new Number[metricNames.length];
//Track new/Unknown Metrics
final Set<Integer> unknownMetrics = new TreeSet<Integer>();
final Set<Integer> unknownMetrics = new TreeSet<>();
for (final MetricValue metric : op.getMetricList()) {
if (metric.getMetricId() < metricNames.length) {
if (metric.hasLongValue()) {
Expand Down

0 comments on commit e65079a

Please sign in to comment.