From b554063e3af9a51bed43d7790efde8213ba946cd Mon Sep 17 00:00:00 2001 From: Sudheesh Katkam Date: Fri, 23 Oct 2015 10:18:12 -0700 Subject: [PATCH] DRILL-3340: Add operator metrics registry for metric definitions + Display metrics as a table within an operator profile panel + Rename FragmentStats#getOperatorStats to newOperatorStats --- .../apache/drill/exec/ops/FragmentStats.java | 18 ++++- .../org/apache/drill/exec/ops/MetricDef.java | 3 + .../drill/exec/ops/OperatorContextImpl.java | 2 +- .../exec/ops/OperatorMetricRegistry.java | 75 +++++++++++++++++++ .../exec/physical/impl/ScreenCreator.java | 2 +- .../server/rest/profile/OperatorWrapper.java | 56 +++++++++++++- .../server/rest/profile/ProfileWrapper.java | 2 +- .../server/rest/profile/TableBuilder.java | 1 - .../main/resources/rest/profile/profile.ftl | 14 ++++ .../drill/exec/memory/TestAllocators.java | 6 +- 10 files changed, 165 insertions(+), 14 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java index b6b0a8b2492..a5a334fb713 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java @@ -26,8 +26,11 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Lists; +/** + * Holds statistics of a particular (minor) fragment. + */ public class FragmentStats { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class); private List operators = Lists.newArrayList(); private final long startTime; @@ -50,9 +53,16 @@ public void addMetricsToStatus(MinorFragmentProfile.Builder prfB) { } } - public OperatorStats getOperatorStats(OpProfileDef profileDef, BufferAllocator allocator){ - OperatorStats stats = new OperatorStats(profileDef, allocator); - if(profileDef.operatorType != -1){ + /** + * Creates a new holder for operator statistics within this holder for fragment statistics. + * + * @param profileDef operator profile definition + * @param allocator the allocator being used + * @return a new operator statistics holder + */ + public OperatorStats newOperatorStats(final OpProfileDef profileDef, final BufferAllocator allocator) { + final OperatorStats stats = new OperatorStats(profileDef, allocator); + if(profileDef.operatorType != -1) { operators.add(stats); } return stats; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java index b3ece419a23..a4a667c8b58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.ops; +/** + * Interface that defines a metric. For example, {@link org.apache.drill.exec.physical.impl.join.HashJoinBatch.Metric}. + */ public interface MetricDef { public String name(); public int metricId(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index ce9f351fa1e..57c0c39a30e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -50,7 +50,7 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, this.popConfig = popConfig; OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); - this.stats = context.getStats().getOperatorStats(def, allocator); + stats = context.getStats().newOperatorStats(def, allocator); executionControls = context.getExecutionControls(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java new file mode 100644 index 00000000000..75426d52421 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.exec.physical.impl.ScreenCreator; +import org.apache.drill.exec.physical.impl.SingleSenderCreator; +import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; +import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec; +import org.apache.drill.exec.physical.impl.join.HashJoinBatch; +import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; +import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + +/** + * Registry of operator metrics. + */ +public class OperatorMetricRegistry { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorMetricRegistry.class); + + // Mapping: operator type --> metric id --> metric name + private static final String[][] OPERATOR_METRICS = new String[CoreOperatorType.values().length][]; + + static { + register(CoreOperatorType.SCREEN_VALUE, ScreenCreator.ScreenRoot.Metric.class); + register(CoreOperatorType.SINGLE_SENDER_VALUE, SingleSenderCreator.SingleSenderRootExec.Metric.class); + register(CoreOperatorType.BROADCAST_SENDER_VALUE, BroadcastSenderRootExec.Metric.class); + register(CoreOperatorType.HASH_PARTITION_SENDER_VALUE, PartitionSenderRootExec.Metric.class); + register(CoreOperatorType.MERGING_RECEIVER_VALUE, MergingRecordBatch.Metric.class); + register(CoreOperatorType.UNORDERED_RECEIVER_VALUE, UnorderedReceiverBatch.Metric.class); + register(CoreOperatorType.HASH_AGGREGATE_VALUE, HashAggTemplate.Metric.class); + register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class); + } + + private static void register(final int operatorType, final Class metricDef) { + // Currently registers a metric def that has enum constants + final MetricDef[] enumConstants = metricDef.getEnumConstants(); + if (enumConstants != null) { + final String[] names = new String[enumConstants.length]; + for (int i = 0; i < enumConstants.length; i++) { + names[i] = enumConstants[i].name(); + } + OPERATOR_METRICS[operatorType] = names; + } + } + + /** + * Given an operator type, this method returns an array of metric names (indexable by metric id). + * + * @param operatorType the operator type + * @return metric names if operator was registered, null otherwise + */ + public static String[] getMetricNames(final int operatorType) { + return OPERATOR_METRICS[operatorType]; + } + + // to prevent instantiation + private OperatorMetricRegistry() { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 97b8d97ea35..a46ea35f2ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -50,7 +50,7 @@ public RootExec getRoot(FragmentContext context, Screen config, List> ops; + private final List> ops; // operator profile --> minor fragment number private final OperatorProfile firstProfile; private final CoreOperatorType operatorType; + private final String operatorName; private final int size; public OperatorWrapper(int major, List> ops) { @@ -43,13 +46,14 @@ public OperatorWrapper(int major, List> this.major = major; firstProfile = ops.get(0).getLeft(); operatorType = CoreOperatorType.valueOf(firstProfile.getOperatorType()); + operatorName = operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(); this.ops = ops; size = ops.size(); } public String getDisplayName() { final String path = new OperatorPathBuilder().setMajor(major).setOperator(firstProfile).build(); - return String.format("%s - %s", path, operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString()); + return String.format("%s - %s", path, operatorName); } public String getId() { @@ -94,7 +98,7 @@ public void addSummary(TableBuilder tb) { String path = new OperatorPathBuilder().setMajor(major).setOperator(firstProfile).build(); tb.appendCell(path, null); - tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null); + tb.appendCell(operatorName, null); double setupSum = 0.0; double processSum = 0.0; @@ -130,4 +134,50 @@ public void addSummary(TableBuilder tb) { tb.appendBytes(Math.round(memSum / size), null); tb.appendBytes(peakMem.getLeft().getPeakLocalMemoryAllocated(), null); } + + public String getMetricsTable() { + if (operatorType == null) { + return ""; + } + final String[] metricNames = OperatorMetricRegistry.getMetricNames(operatorType.getNumber()); + if (metricNames == null) { + return ""; + } + + final String[] metricsTableColumnNames = new String[metricNames.length + 1]; + metricsTableColumnNames[0] = "Minor Fragment"; + int i = 1; + for (final String metricName : metricNames) { + metricsTableColumnNames[i++] = metricName; + } + final TableBuilder builder = new TableBuilder(metricsTableColumnNames); + for (final ImmutablePair ip : ops) { + final OperatorProfile op = ip.getLeft(); + + builder.appendCell( + new OperatorPathBuilder() + .setMajor(major) + .setMinor(ip.getRight()) + .setOperator(op) + .build(), + null); + + final Number[] values = new Number[metricNames.length]; + for (final MetricValue metric : op.getMetricList()) { + if (metric.hasLongValue()) { + values[metric.getMetricId()] = metric.getLongValue(); + } else if (metric.hasDoubleValue()) { + values[metric.getMetricId()] = metric.getDoubleValue(); + } + } + for (final Number value : values) { + if (value != null) { + builder.appendFormattedNumber(value, null); + } else { + builder.appendCell("", null); + } + } + } + return builder.build(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java index 7de05c10da3..f73ab90982e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java @@ -32,7 +32,7 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper; /** - * This class contains information presented in the query profile web UI. + * Wrapper class for a {@link #profile query profile}, so it to be presented through web UI. */ public class ProfileWrapper { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileWrapper.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java index bcc67f28135..83557976312 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java @@ -48,7 +48,6 @@ public TableBuilder(final String[] columns) { width = columns.length; format.setMaximumFractionDigits(3); - format.setMinimumFractionDigits(3); sb.append("\n"); for (final String cn : columns) { diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl index 47c7e06bfef..88dbe21e6b3 100644 --- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl +++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl @@ -176,6 +176,20 @@
${op.getContent()}
+
+ +
+
+ ${op.getMetricsTable()} +
+
+
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index f362257914f..18aae2cca4f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -128,7 +128,7 @@ public void testAllocators() throws Exception { //Use some bogus operator type to create a new operator context. def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorContext.getChildCount(physicalOperator1)); - stats = fragmentContext1.getStats().getOperatorStats(def, fragmentContext1.getAllocator()); + stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator()); // Add a couple of Operator Contexts @@ -143,7 +143,7 @@ public void testAllocators() throws Exception { def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, OperatorContext.getChildCount(physicalOperator4)); - stats = fragmentContext2.getStats().getOperatorStats(def, fragmentContext2.getAllocator()); + stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator()); OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats, true); DrillBuf b22=oContext22.getAllocator().buffer(2000000); @@ -157,7 +157,7 @@ public void testAllocators() throws Exception { // New fragment starts an operator that allocates an amount within the limit def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, OperatorContext.getChildCount(physicalOperator5)); - stats = fragmentContext3.getStats().getOperatorStats(def, fragmentContext3.getAllocator()); + stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator()); OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats, true); DrillBuf b31a = oContext31.getAllocator().buffer(200000);