Skip to content

Commit

Permalink
DRILL-5195: Publish Operator and MajorFragment Stats in Profile page
Browse files Browse the repository at this point in the history
Improved UI
1. Introduction of Tooltips
2. Share of each operator as a percentages of the major fragment and of the query
  - This would help identify the most CPU intensive operators within a fragment and across the query
3. Rows emitted by each operator
4. For a running query, changes to 'last update' and 'last progress' now shows the elapsed time since.

closes #756
  • Loading branch information
Kunal Khatua authored and sudheeshkatkam committed Feb 25, 2017
1 parent 6892164 commit 8614bae
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 99 deletions.
Expand Up @@ -17,9 +17,12 @@
*/
package org.apache.drill.exec.server.rest.profile;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;

import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
Expand Down Expand Up @@ -49,75 +52,174 @@ public String getId() {
return String.format("fragment-%s", major.getMajorFragmentId());
}

public static final String[] FRAGMENT_OVERVIEW_COLUMNS = {"Major Fragment", "Minor Fragments Reporting",
"First Start", "Last Start", "First End", "Last End", "Min Runtime", "Avg Runtime", "Max Runtime", "Last Update",
"Last Progress", "Max Peak Memory"};
public static final String[] ACTIVE_FRAGMENT_OVERVIEW_COLUMNS = {
OverviewTblTxt.MAJOR_FRAGMENT, OverviewTblTxt.MINOR_FRAGMENTS_REPORTING,
OverviewTblTxt.FIRST_START, OverviewTblTxt.LAST_START, OverviewTblTxt.FIRST_END, OverviewTblTxt.LAST_END,
OverviewTblTxt.MIN_RUNTIME, OverviewTblTxt.AVG_RUNTIME, OverviewTblTxt.MAX_RUNTIME,
OverviewTblTxt.PERCENT_BUSY,
OverviewTblTxt.LAST_UPDATE, OverviewTblTxt.LAST_PROGRESS,
OverviewTblTxt.MAX_PEAK_MEMORY
};

public static final String[] ACTIVE_FRAGMENT_OVERVIEW_COLUMNS_TOOLTIP = {
OverviewTblTooltip.MAJOR_FRAGMENT, OverviewTblTooltip.MINOR_FRAGMENTS_REPORTING,
OverviewTblTooltip.FIRST_START, OverviewTblTooltip.LAST_START, OverviewTblTooltip.FIRST_END, OverviewTblTooltip.LAST_END,
OverviewTblTooltip.MIN_RUNTIME, OverviewTblTooltip.AVG_RUNTIME, OverviewTblTooltip.MAX_RUNTIME,
OverviewTblTooltip.PERCENT_BUSY,
OverviewTblTooltip.LAST_UPDATE, OverviewTblTooltip.LAST_PROGRESS,
OverviewTblTooltip.MAX_PEAK_MEMORY
};

// Not including Major Fragment ID and Minor Fragments Reporting
public static final int NUM_NULLABLE_OVERVIEW_COLUMNS = FRAGMENT_OVERVIEW_COLUMNS.length - 2;
public static final int NUM_NULLABLE_ACTIVE_OVERVIEW_COLUMNS = ACTIVE_FRAGMENT_OVERVIEW_COLUMNS.length - 2;

public void addSummary(TableBuilder tb) {
// Use only minor fragments that have complete profiles
// Complete iff the fragment profile has at least one operator profile, and start and end times.
final List<MinorFragmentProfile> complete = new ArrayList<>(
Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));

tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
tb.appendCell(new OperatorPathBuilder().setMajor(major).build());
tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount());

// If there are no stats to aggregate, create an empty row
if (complete.size() < 1) {
tb.appendRepeated("", null, NUM_NULLABLE_OVERVIEW_COLUMNS);
tb.appendRepeated("", null, NUM_NULLABLE_ACTIVE_OVERVIEW_COLUMNS);
return;
}

final MinorFragmentProfile firstStart = Collections.min(complete, Comparators.startTime);
final MinorFragmentProfile lastStart = Collections.max(complete, Comparators.startTime);
tb.appendMillis(firstStart.getStartTime() - start, null);
tb.appendMillis(lastStart.getStartTime() - start, null);
tb.appendMillis(firstStart.getStartTime() - start);
tb.appendMillis(lastStart.getStartTime() - start);

final MinorFragmentProfile firstEnd = Collections.min(complete, Comparators.endTime);
final MinorFragmentProfile lastEnd = Collections.max(complete, Comparators.endTime);
tb.appendMillis(firstEnd.getEndTime() - start, null);
tb.appendMillis(lastEnd.getEndTime() - start, null);
tb.appendMillis(firstEnd.getEndTime() - start);
tb.appendMillis(lastEnd.getEndTime() - start);

long total = 0;
long cumulativeFragmentDurationInMillis = 0L;
long cumulativeProcessInNanos = 0L;
long cumulativeWaitInNanos = 0L;
for (final MinorFragmentProfile p : complete) {
total += p.getEndTime() - p.getStartTime();
cumulativeFragmentDurationInMillis += p.getEndTime() - p.getStartTime();
//Capture Busy & Wait Time
List<OperatorProfile> opProfileList = p.getOperatorProfileList();
for (OperatorProfile operatorProfile : opProfileList) {
cumulativeProcessInNanos += operatorProfile.getProcessNanos();
cumulativeWaitInNanos += operatorProfile.getWaitNanos();
}
}
double totalProcessInMillis = Math.round(cumulativeProcessInNanos/1E6);
double totalWaitInMillis = Math.round(cumulativeWaitInNanos/1E6);

final MinorFragmentProfile shortRun = Collections.min(complete, Comparators.runTime);
final MinorFragmentProfile longRun = Collections.max(complete, Comparators.runTime);
tb.appendMillis(shortRun.getEndTime() - shortRun.getStartTime(), null);
tb.appendMillis(total / complete.size(), null);
tb.appendMillis(longRun.getEndTime() - longRun.getStartTime(), null);
tb.appendMillis(shortRun.getEndTime() - shortRun.getStartTime());
tb.appendMillis(cumulativeFragmentDurationInMillis / complete.size());
tb.appendMillis(longRun.getEndTime() - longRun.getStartTime());

tb.appendPercent(totalProcessInMillis / (totalProcessInMillis + totalWaitInMillis), null,
//#8721 is the summation sign: sum(Busy): ## + sum(Wait): ##
String.format("&#8721;Busy: %,.2fs + &#8721;Wait: %,.2fs", totalProcessInMillis/1E3, totalWaitInMillis/1E3));

final MinorFragmentProfile lastUpdate = Collections.max(complete, Comparators.lastUpdate);
tb.appendTime(lastUpdate.getLastUpdate(), null);
tb.appendMillis(System.currentTimeMillis()-lastUpdate.getLastUpdate());

final MinorFragmentProfile lastProgress = Collections.max(complete, Comparators.lastProgress);
tb.appendTime(lastProgress.getLastProgress(), null);
tb.appendMillis(System.currentTimeMillis()-lastProgress.getLastProgress());

// TODO(DRILL-3494): Names (maxMem, getMaxMemoryUsed) are misleading; the value is peak memory allocated to fragment
final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragmentPeakMemory);
tb.appendBytes(maxMem.getMaxMemoryUsed(), null);
tb.appendBytes(maxMem.getMaxMemoryUsed());
}

public static final String[] COMPLETED_FRAGMENT_OVERVIEW_COLUMNS = {
OverviewTblTxt.MAJOR_FRAGMENT, OverviewTblTxt.MINOR_FRAGMENTS_REPORTING,
OverviewTblTxt.FIRST_START, OverviewTblTxt.LAST_START, OverviewTblTxt.FIRST_END, OverviewTblTxt.LAST_END,
OverviewTblTxt.MIN_RUNTIME, OverviewTblTxt.AVG_RUNTIME, OverviewTblTxt.MAX_RUNTIME,
OverviewTblTxt.PERCENT_BUSY, OverviewTblTxt.MAX_PEAK_MEMORY
};

public static final String[] COMPLETED_FRAGMENT_OVERVIEW_COLUMNS_TOOLTIP = {
OverviewTblTooltip.MAJOR_FRAGMENT, OverviewTblTooltip.MINOR_FRAGMENTS_REPORTING,
OverviewTblTooltip.FIRST_START, OverviewTblTooltip.LAST_START, OverviewTblTooltip.FIRST_END, OverviewTblTooltip.LAST_END,
OverviewTblTooltip.MIN_RUNTIME, OverviewTblTooltip.AVG_RUNTIME, OverviewTblTooltip.MAX_RUNTIME,
OverviewTblTooltip.PERCENT_BUSY, OverviewTblTooltip.MAX_PEAK_MEMORY
};

//Not including Major Fragment ID and Minor Fragments Reporting
public static final int NUM_NULLABLE_COMPLETED_OVERVIEW_COLUMNS = COMPLETED_FRAGMENT_OVERVIEW_COLUMNS.length - 2;

public void addFinalSummary(TableBuilder tb) {

// Use only minor fragments that have complete profiles
// Complete iff the fragment profile has at least one operator profile, and start and end times.
final List<MinorFragmentProfile> complete = new ArrayList<>(
Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));

tb.appendCell(new OperatorPathBuilder().setMajor(major).build());
tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount());

// If there are no stats to aggregate, create an empty row
if (complete.size() < 1) {
tb.appendRepeated("", null, NUM_NULLABLE_COMPLETED_OVERVIEW_COLUMNS);
return;
}

final MinorFragmentProfile firstStart = Collections.min(complete, Comparators.startTime);
final MinorFragmentProfile lastStart = Collections.max(complete, Comparators.startTime);
tb.appendMillis(firstStart.getStartTime() - start);
tb.appendMillis(lastStart.getStartTime() - start);

final MinorFragmentProfile firstEnd = Collections.min(complete, Comparators.endTime);
final MinorFragmentProfile lastEnd = Collections.max(complete, Comparators.endTime);
tb.appendMillis(firstEnd.getEndTime() - start);
tb.appendMillis(lastEnd.getEndTime() - start);

long totalDuration = 0L;
double totalProcessInMillis = 0.0d;
double totalWaitInMillis = 0.0d;
for (final MinorFragmentProfile p : complete) {
totalDuration += p.getEndTime() - p.getStartTime();
//Capture Busy & Wait Time
List<OperatorProfile> opProfileList = p.getOperatorProfileList();
for (OperatorProfile operatorProfile : opProfileList) {
totalProcessInMillis += operatorProfile.getProcessNanos()/1E6;
totalWaitInMillis += operatorProfile.getWaitNanos()/1E6;
}
}

final MinorFragmentProfile shortRun = Collections.min(complete, Comparators.runTime);
final MinorFragmentProfile longRun = Collections.max(complete, Comparators.runTime);
tb.appendMillis(shortRun.getEndTime() - shortRun.getStartTime());
tb.appendMillis(totalDuration / complete.size());
tb.appendMillis(longRun.getEndTime() - longRun.getStartTime());

tb.appendPercent(totalProcessInMillis / (totalProcessInMillis + totalWaitInMillis), null,
//#8721 is the summation sign: sum(Busy): ## + sum(Wait): ##
String.format("&#8721;Busy: %,.2fs + &#8721;Wait: %,.2fs", totalProcessInMillis/1E3, totalWaitInMillis/1E3));

// TODO(DRILL-3494): Names (maxMem, getMaxMemoryUsed) are misleading; the value is peak memory allocated to fragment
final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragmentPeakMemory);
tb.appendBytes(maxMem.getMaxMemoryUsed());
}

public static final String[] FRAGMENT_COLUMNS = {"Minor Fragment ID", "Host Name", "Start", "End",
"Runtime", "Max Records", "Max Batches", "Last Update", "Last Progress", "Peak Memory", "State"};
"Runtime", "Max Records", "Max Batches", "Last Update", "Last Progress", "Peak Memory", "State"};

// Not including minor fragment ID
private static final int NUM_NULLABLE_FRAGMENTS_COLUMNS = FRAGMENT_COLUMNS.length - 1;

public String getContent() {
final TableBuilder builder = new TableBuilder(FRAGMENT_COLUMNS);
final TableBuilder builder = new TableBuilder(FRAGMENT_COLUMNS, null);

// Use only minor fragments that have complete profiles
// Complete iff the fragment profile has at least one operator profile, and start and end times.
final List<MinorFragmentProfile> complete = new ArrayList<>(
Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
final List<MinorFragmentProfile> incomplete = new ArrayList<>(
Collections2.filter(major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
Collections2.filter(major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));

Collections.sort(complete, Comparators.minorId);
for (final MinorFragmentProfile minor : complete) {
Expand All @@ -136,26 +238,60 @@ public String getContent() {
biggestBatches = Math.max(biggestBatches, batches);
}

builder.appendCell(new OperatorPathBuilder().setMajor(major).setMinor(minor).build(), null);
builder.appendCell(minor.getEndpoint().getAddress(), null);
builder.appendMillis(minor.getStartTime() - start, null);
builder.appendMillis(minor.getEndTime() - start, null);
builder.appendMillis(minor.getEndTime() - minor.getStartTime(), null);
builder.appendCell(new OperatorPathBuilder().setMajor(major).setMinor(minor).build());
builder.appendCell(minor.getEndpoint().getAddress());
builder.appendMillis(minor.getStartTime() - start);
builder.appendMillis(minor.getEndTime() - start);
builder.appendMillis(minor.getEndTime() - minor.getStartTime());

builder.appendFormattedInteger(biggestIncomingRecords, null);
builder.appendFormattedInteger(biggestBatches, null);
builder.appendFormattedInteger(biggestIncomingRecords);
builder.appendFormattedInteger(biggestBatches);

builder.appendTime(minor.getLastUpdate(), null);
builder.appendTime(minor.getLastProgress(), null);
builder.appendTime(minor.getLastUpdate());
builder.appendTime(minor.getLastProgress());

builder.appendBytes(minor.getMaxMemoryUsed(), null);
builder.appendCell(minor.getState().name(), null);
builder.appendBytes(minor.getMaxMemoryUsed());
builder.appendCell(minor.getState().name());
}

for (final MinorFragmentProfile m : incomplete) {
builder.appendCell(major.getMajorFragmentId() + "-" + m.getMinorFragmentId(), null);
builder.appendCell(major.getMajorFragmentId() + "-" + m.getMinorFragmentId());
builder.appendRepeated(m.getState().toString(), null, NUM_NULLABLE_FRAGMENTS_COLUMNS);
}
return builder.build();
}

private class OverviewTblTxt {
static final String MAJOR_FRAGMENT = "Major Fragment";
static final String MINOR_FRAGMENTS_REPORTING = "Minor Fragments Reporting";
static final String FIRST_START = "First Start";
static final String LAST_START = "Last Start";
static final String FIRST_END = "First End";
static final String LAST_END = "Last End";
static final String MIN_RUNTIME = "Min Runtime";
static final String AVG_RUNTIME = "Avg Runtime";
static final String MAX_RUNTIME = "Max Runtime";
static final String PERCENT_BUSY = "% Busy";
static final String LAST_UPDATE = "Last Update";
static final String LAST_PROGRESS = "Last Progress";
static final String MAX_PEAK_MEMORY = "Max Peak Memory";
}

private class OverviewTblTooltip {
static final String MAJOR_FRAGMENT = "Major fragment ID seen in the visual plan";
static final String MINOR_FRAGMENTS_REPORTING = "Number of minor fragments started";
static final String FIRST_START = "Time at which the first fragment started";
static final String LAST_START = "Time at which the last fragment started";
static final String FIRST_END = "Time at which the first fragment completed";
static final String LAST_END = "Time at which the last fragment completed";
static final String MIN_RUNTIME = "Shortest fragment runtime";
static final String AVG_RUNTIME = "Average fragment runtime";
static final String MAX_RUNTIME = "Longest fragment runtime";
static final String PERCENT_BUSY = "Percentage of run time that fragments were busy doing work";
static final String LAST_UPDATE = "Time since most recent heartbeat from a fragment";
static final String LAST_PROGRESS = "Time since most recent update from a fragment";
static final String MAX_PEAK_MEMORY = "Highest memory consumption by a fragment";
}
}


0 comments on commit 8614bae

Please sign in to comment.