Skip to content

Commit

Permalink
DRILL-4654: Add new metrics to the MetricRegistry
Browse files Browse the repository at this point in the history
+ New metrics:
  - drill.queries.enqueued
    number of queries that have been submitted to the drillbit but have
    not started
  - drill.queries.running
    number of running queries for which this drillbit is the foreman
  - drill.queries.completed
    number of completed queries (or cancelled or failed) for which this
    drillbit was the foreman
  - drill.fragments.running
    number of query fragments that are running in the drillbit
  - drill.allocator.root.used
    amount of memory used in bytes by the internal memory allocator
  - drill.allocator.root.peak
    peak amount of memory used in bytes by the internal memory allocator
  - fd.usage
    ratio of used to total file descriptors (on *nix systems)
+ Rename "current" to "used" for RPC allocator current memory usage to
  follow convention
+ Borrow SystemPropertyUtil class from Netty
+ Configure DrillMetrics through system properties
+ Remove unused methods and imports

closes #495
  • Loading branch information
Sudheesh Katkam committed May 17, 2016
1 parent 09b2627 commit b075bf6
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 90 deletions.
Expand Up @@ -18,77 +18,86 @@
package org.apache.drill.exec.metrics;

import java.lang.management.ManagementFactory;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.config.DrillConfig;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.jvm.BufferPoolMetricSet;
import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import org.apache.drill.exec.util.SystemPropertyUtil;

public class DrillMetrics {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillMetrics.class);

public static final String METRICS_JMX_OUTPUT_ENABLED = "drill.metrics.jmx.enabled";
public static final String METRICS_LOG_OUTPUT_ENABLED = "drill.metrics.log.enabled";
public static final String METRICS_LOG_OUTPUT_INTERVAL = "drill.metrics.log.interval";

static final DrillConfig config = DrillConfig.create();
public final class DrillMetrics {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillMetrics.class);

private DrillMetrics() {
}
public static final boolean METRICS_JMX_OUTPUT_ENABLED =
SystemPropertyUtil.getBoolean("drill.metrics.jmx.enabled", true);
public static final boolean METRICS_LOG_OUTPUT_ENABLED =
SystemPropertyUtil.getBoolean("drill.metrics.log.enabled", false);
public static final int METRICS_LOG_OUTPUT_INTERVAL =
SystemPropertyUtil.getInt("drill.metrics.log.interval", 60);

private static class RegistryHolder {
public static final MetricRegistry REGISTRY;

private static final MetricRegistry REGISTRY;
private static final JmxReporter JMX_REPORTER;
private static final Slf4jReporter LOG_REPORTER;

static {
REGISTRY = new MetricRegistry();
registerSysStats();
registerSystemMetrics();
JMX_REPORTER = getJmxReporter();
LOG_REPORTER = getLogReporter();
}

private static void registerSysStats(){
private static void registerSystemMetrics() {
REGISTRY.registerAll(new GarbageCollectorMetricSet());
REGISTRY.registerAll(new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
REGISTRY.registerAll(new MemoryUsageGaugeSet());
REGISTRY.registerAll(new ThreadStatesGaugeSet());
register("fd.usage", new FileDescriptorRatioGauge());
}

private static JmxReporter getJmxReporter() {
if (config.getBoolean(METRICS_JMX_OUTPUT_ENABLED)) {
JmxReporter reporter = JmxReporter.forRegistry(getInstance()).build();
if (METRICS_JMX_OUTPUT_ENABLED) {
JmxReporter reporter = JmxReporter.forRegistry(REGISTRY).build();
reporter.start();

return reporter;
} else {
return null;
}
return null;
}

private static Slf4jReporter getLogReporter() {
if (config.getBoolean(METRICS_LOG_OUTPUT_ENABLED)) {
Slf4jReporter reporter = Slf4jReporter.forRegistry(getInstance()).outputTo(logger)
.convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();
reporter.start(config.getInt(METRICS_LOG_OUTPUT_INTERVAL), TimeUnit.SECONDS);
if (METRICS_LOG_OUTPUT_ENABLED) {
Slf4jReporter reporter = Slf4jReporter.forRegistry(REGISTRY)
.outputTo(logger)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(METRICS_LOG_OUTPUT_INTERVAL, TimeUnit.SECONDS);

return reporter;
} else {
return null;
}
return null;
}
}

/**
* Note: For counters, histograms, meters and timers, use get or create methods on {@link #getRegistry the
* registry} (e.g. {@link MetricRegistry#counter}). For {@link com.codahale.metrics.Gauge gauges} or custom
* metric implementations use this method. The registry does not allow registering multiple metrics with
* the same name, which is a problem when multiple drillbits are started in the same JVM (e.g. unit tests).
*
* @param name metric name
* @param metric metric instance
* @param <T> metric type
*/
public synchronized static <T extends Metric> void register(String name, T metric) {
boolean removed = RegistryHolder.REGISTRY.remove(name);
if (removed) {
Expand All @@ -97,27 +106,16 @@ public synchronized static <T extends Metric> void register(String name, T metri
RegistryHolder.REGISTRY.register(name, metric);
}

private static void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) {
for (Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
if (entry.getValue() instanceof MetricSet) {
registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue(), registry);
} else {
registry.register(prefix + "." + entry.getKey(), entry.getValue());
}
}
}

public static MetricRegistry getInstance() {
public static MetricRegistry getRegistry() {
return RegistryHolder.REGISTRY;
}

public static void resetMetrics(){
RegistryHolder.REGISTRY.removeMatching(new MetricFilter(){
@Override
public boolean matches(String name, Metric metric) {
return true;
}});
RegistryHolder.registerSysStats();
public static void resetMetrics() {
RegistryHolder.REGISTRY.removeMatching(MetricFilter.ALL);
RegistryHolder.registerSystemMetrics();
}

// prevents instantiation
private DrillMetrics() {
}
}
@@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.util;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.regex.Pattern;

/**
* A collection of utility methods to retrieve and parse the values of Java system properties.
*
* This is a modified version of Netty's internal system property utility class.
*/
public final class SystemPropertyUtil {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemPropertyUtil.class);

private static final Pattern INTEGER_PATTERN = Pattern.compile("-?[0-9]+");

/**
* Returns {@code true} if and only if the system property with the specified {@code key}
* exists.
*/
public static boolean contains(String key) {
return get(key) != null;
}

/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to {@code null} if the property access fails.
*
* @return the property value or {@code null}
*/
public static String get(String key) {
return get(key, null);
}

/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to the specified default value if
* the property access fails.
*
* @return the property value.
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static String get(final String key, String def) {
if (key == null) {
throw new NullPointerException("key");
}
if (key.isEmpty()) {
throw new IllegalArgumentException("key must not be empty.");
}

String value = null;
try {
if (System.getSecurityManager() == null) {
value = System.getProperty(key);
} else {
value = AccessController.doPrivileged(new PrivilegedAction<String>() {
@Override
public String run() {
return System.getProperty(key);
}
});
}
} catch (Exception e) {
logger.warn("Unable to retrieve a system property '" + key + "'; default values will be used.", e);
}

if (value == null) {
return def;
}

return value;
}

/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to the specified default value if
* the property access fails.
*
* @return the property value.
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static boolean getBoolean(String key, boolean def) {
String value = get(key);
if (value == null) {
return def;
}

value = value.trim().toLowerCase();
if (value.isEmpty()) {
return true;
}

if ("true".equals(value) || "yes".equals(value) || "1".equals(value)) {
return true;
}

if ("false".equals(value) || "no".equals(value) || "0".equals(value)) {
return false;
}

logger.warn("Unable to parse the boolean system property '{}':{} - using the default value: {}",
key, value, def);

return def;
}

/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to the specified default value if
* the property access fails.
*
* @return the property value.
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static int getInt(String key, int def) {
String value = get(key);
if (value == null) {
return def;
}

value = value.trim().toLowerCase();
if (INTEGER_PATTERN.matcher(value).matches()) {
try {
return Integer.parseInt(value);
} catch (Exception e) {
// Ignore
}
}

logger.warn("Unable to parse the integer system property '{}':{} - using the default value: {}",
key, value, def);

return def;
}

/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to the specified default value if
* the property access fails.
*
* @return the property value.
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static long getLong(String key, long def) {
String value = get(key);
if (value == null) {
return def;
}

value = value.trim().toLowerCase();
if (INTEGER_PATTERN.matcher(value).matches()) {
try {
return Long.parseLong(value);
} catch (Exception e) {
// Ignore
}
}

logger.warn("Unable to parse the long integer system property '{}':{} - using the default value: {}",
key, value, def);

return def;
}

// prevent instantiation
private SystemPropertyUtil() {
}
}
14 changes: 1 addition & 13 deletions common/src/main/resources/drill-module.conf
Expand Up @@ -29,17 +29,5 @@ drill {
org.apache.drill.exec.store.mock,
org.apache.drill.common.logical
]
},

metrics : {
context: "drillbit",
jmx: {
enabled : true
},
log: {
enabled : false,
interval : 60
}
},

}
}
Expand Up @@ -47,7 +47,7 @@
*/
public class VectorAccessibleSerializable extends AbstractStreamSerializable {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final MetricRegistry metrics = DrillMetrics.getRegistry();
static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");

private VectorContainer va;
Expand Down
Expand Up @@ -171,7 +171,7 @@ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragm
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
}

stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
stats = new FragmentStats(allocator, fragment.getAssignment());
bufferManager = new BufferManagerImpl(this.allocator);
}

Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;

/**
Expand All @@ -37,7 +36,7 @@ public class FragmentStats {
private final DrillbitEndpoint endpoint;
private final BufferAllocator allocator;

public FragmentStats(BufferAllocator allocator, MetricRegistry metrics, DrillbitEndpoint endpoint) {
public FragmentStats(BufferAllocator allocator, DrillbitEndpoint endpoint) {
this.startTime = System.currentTimeMillis();
this.endpoint = endpoint;
this.allocator = allocator;
Expand Down

0 comments on commit b075bf6

Please sign in to comment.