From 1f19a07bd6e0f3875d654a48f47ebd16ec7c8785 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 16 Jan 2015 18:00:29 -0800 Subject: [PATCH 1/4] Add metrics for Jetty connections using overridden SelectChannelConnector. --- .../java/io/confluent/rest/Application.java | 63 ++++++++++----- .../java/io/confluent/rest/RestConfig.java | 48 ++++++++++- .../MetricsSelectChannelConnector.java | 81 +++++++++++++++++++ .../helloworld/HelloWorldApplication.java | 11 +++ .../rest/EmbeddedServerTestHarness.java | 1 - 5 files changed, 180 insertions(+), 24 deletions(-) create mode 100644 core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java diff --git a/core/src/main/java/io/confluent/rest/Application.java b/core/src/main/java/io/confluent/rest/Application.java index f82b9fdb7b..3276b15387 100644 --- a/core/src/main/java/io/confluent/rest/Application.java +++ b/core/src/main/java/io/confluent/rest/Application.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; +import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; @@ -29,42 +30,47 @@ import org.glassfish.jersey.server.validation.ValidationFeature; import org.glassfish.jersey.servlet.ServletContainer; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.Configurable; +import io.confluent.common.metrics.JmxReporter; +import io.confluent.common.metrics.MetricConfig; +import io.confluent.common.metrics.Metrics; +import io.confluent.common.metrics.MetricsReporter; import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper; import io.confluent.rest.exceptions.GenericExceptionMapper; import io.confluent.rest.exceptions.WebApplicationExceptionMapper; import io.confluent.rest.logging.Slf4jRequestLog; +import io.confluent.rest.metrics.MetricsSelectChannelConnector; import io.confluent.rest.validation.JacksonMessageBodyProvider; /** - * A REST application. Extend this class and implement the configure() method to generate your - * application-specific configuration class and setupResources() to register REST resources with the - * JAX-RS server. Use createServer() to get a fully-configured, ready to run Jetty server. + * A REST application. Extend this class and implement setupResources() to register REST + * resources with the JAX-RS server. Use createServer() to get a fully-configured, ready to run + * Jetty server. */ public abstract class Application { protected T config; protected Server server = null; protected CountDownLatch shutdownLatch = new CountDownLatch(1); - - public Application() {} + protected Metrics metrics; public Application(T config) { this.config = config; - } - - /** - * Parse, load, or generate the Configuration for this application. - */ - public T configure() throws RestConfigException { - // Allow this implementation as a nop if they provide - if (this.config == null) - throw new RestConfigException( - "Application.configure() was not overridden for " + getClass().getName() + - " but the configuration was not passed to the Application class's constructor."); - return this.config; + MetricConfig metricConfig = new MetricConfig() + .samples(config.getInt(RestConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(RestConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + List reporters = + config.getConfiguredInstances(RestConfig.METRICS_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(config.getString(RestConfig.METRICS_JMX_PREFIX_CONFIG))); + this.metrics = new Metrics(metricConfig, reporters, config.getTime()); } /** @@ -74,14 +80,19 @@ public T configure() throws RestConfigException { */ public abstract void setupResources(Configurable config, T appConfig); + /** + * Returns a map of tag names to tag values to apply to metrics for this application. + * + * @return a Map of tags and values + */ + public Map getMetricsTags() { + return new LinkedHashMap(); + } + /** * Configure and create the server. */ public Server createServer() throws RestConfigException { - if (config == null) { - configure(); - } - // The configuration for the JAX-RS REST service ResourceConfig resourceConfig = new ResourceConfig(); @@ -91,15 +102,23 @@ public Server createServer() throws RestConfigException { // Configure the servlet container ServletContainer servletContainer = new ServletContainer(resourceConfig); ServletHolder servletHolder = new ServletHolder(servletContainer); - server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG)) { + server = new Server() { @Override protected void doStop() throws Exception { super.doStop(); + Application.this.metrics.close(); Application.this.onShutdown(); Application.this.shutdownLatch.countDown(); } }; + int port = getConfiguration().getInt(RestConfig.PORT_CONFIG); + Map metricTags = getMetricsTags(); + String connectorMetricGrpName = "jetty"; + MetricsSelectChannelConnector connector = new MetricsSelectChannelConnector( + port, metrics, connectorMetricGrpName, metricTags); + server.setConnectors(new Connector[]{connector}); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); context.addServlet(servletHolder, "/*"); diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index ef4d9a8503..b9fcae6aeb 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -19,6 +19,8 @@ import io.confluent.common.config.ConfigDef; import io.confluent.common.config.ConfigDef.Type; import io.confluent.common.config.ConfigDef.Importance; +import io.confluent.common.utils.SystemTime; +import io.confluent.common.utils.Time; import java.util.Map; import java.util.TreeMap; @@ -58,6 +60,31 @@ public class RestConfig extends AbstractConfig { "Name of the SLF4J logger to write the NCSA Common Log Format request log."; protected static final String REQUEST_LOGGER_NAME_DEFAULT = "io.confluent.rest-utils.requests"; + public static final String METRICS_JMX_PREFIX_CONFIG = "metrics.jmx.prefix"; + protected static final String METRICS_JMX_PREFIX_DOC = + "Prefix to apply to metric names for the default JMX reporter."; + protected static final String METRICS_JMX_PREFIX_DEFAULT = "rest-utils"; + + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; + protected static final String METRICS_SAMPLE_WINDOW_MS_DOC = + "The metrics system maintains a configurable number of samples over a fixed window size. " + + "This configuration controls the size of the window. For example we might maintain two " + + "samples each measured over a 30 second period. When a window expires we erase and " + + "overwrite the oldest window."; + protected static final long METRICS_SAMPLE_WINDOW_MS_DEFAULT = 30000; + + public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; + protected static final String METRICS_NUM_SAMPLES_DOC = + "The number of samples maintained to compute metrics."; + protected static final int METRICS_NUM_SAMPLES_DEFAULT = 2; + + public static final String METRICS_REPORTER_CLASSES_CONFIG = "metric.reporters"; + protected static final String METRICS_REPORTER_CLASSES_DOC = + "A list of classes to use as metrics reporters. Implementing the " + + "MetricReporter interface allows plugging in classes that will be notified " + + "of new metric creation. The JmxReporter is always included to register JMX statistics."; + protected static final String METRICS_REPORTER_CLASSES_DEFAULT = ""; + static { config = new ConfigDef() .define(DEBUG_CONFIG, Type.BOOLEAN, @@ -75,9 +102,24 @@ public class RestConfig extends AbstractConfig { SHUTDOWN_GRACEFUL_MS_DOC) .define(REQUEST_LOGGER_NAME_CONFIG, Type.STRING, REQUEST_LOGGER_NAME_DEFAULT, Importance.LOW, - REQUEST_LOGGER_NAME_DOC); + REQUEST_LOGGER_NAME_DOC) + .define(METRICS_JMX_PREFIX_CONFIG, Type.STRING, + METRICS_JMX_PREFIX_DEFAULT, Importance.LOW, METRICS_JMX_PREFIX_DOC) + .define(METRICS_REPORTER_CLASSES_CONFIG, Type.LIST, + METRICS_REPORTER_CLASSES_DEFAULT, Importance.LOW, METRICS_REPORTER_CLASSES_DOC) + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + METRICS_SAMPLE_WINDOW_MS_DEFAULT, + ConfigDef.Range.atLeast(0), + Importance.LOW, + METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, + METRICS_NUM_SAMPLES_DEFAULT, ConfigDef.Range.atLeast(1), + Importance.LOW, METRICS_NUM_SAMPLES_DOC); } + private static Time defaultTime = new SystemTime(); + public RestConfig() { super(config, new TreeMap()); } @@ -86,6 +128,10 @@ public RestConfig(Map props) { super(config, props); } + public Time getTime() { + return defaultTime; + } + public static void main(String[] args) { System.out.println(config.toHtmlTable()); } diff --git a/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java b/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java new file mode 100644 index 0000000000..3bf91a64cd --- /dev/null +++ b/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java @@ -0,0 +1,81 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.rest.metrics; + +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.server.nio.SelectChannelConnector; + +import java.io.IOException; +import java.util.Map; + +import io.confluent.common.metrics.MetricName; +import io.confluent.common.metrics.Metrics; +import io.confluent.common.metrics.Sensor; +import io.confluent.common.metrics.stats.Rate; +import io.confluent.common.metrics.stats.Total; + +public class MetricsSelectChannelConnector extends SelectChannelConnector { + private Sensor accepts, connects, disconnects, connections; + + public MetricsSelectChannelConnector(int port, Metrics metrics, String metricGrpPrefix, + Map metricTags) { + super(); + this.setPort(port); + + String metricGrpName = metricGrpPrefix + "-metrics"; + this.accepts = metrics.sensor("connections-accepted"); + MetricName metricName = new MetricName("connections-accepted-rate", metricGrpName, + "Rate at which Jetty TCP connections are being accepted", + metricTags); + this.accepts.add(metricName, new Rate()); + this.connects = metrics.sensor("connections-opened"); + metricName = new MetricName("connections-opened-rate", metricGrpName, + "Rate at which new Jetty TCP connections are being opened", + metricTags); + this.connects.add(metricName, new Rate()); + this.disconnects = metrics.sensor("connections-closed"); + metricName = new MetricName("connections-closed-rate", metricGrpName, + "Rate at which Jetty TCP connections are being closed", + metricTags); + this.disconnects.add(metricName, new Rate()); + this.connections = metrics.sensor("connections"); + metricName = new MetricName("connections-active", metricGrpName, + "Total number of active TCP connections", + metricTags); + this.connections.add(metricName, new Total()); + } + + @Override + public void accept(int acceptorID) throws IOException { + super.accept(acceptorID); + this.accepts.record(); + } + + @Override + protected void connectionOpened(Connection connection) { + super.connectionOpened(connection); + this.connects.record(); + this.connections.record(1); + } + + @Override + protected void connectionClosed(Connection connection) { + super.connectionClosed(connection); + this.disconnects.record(); + this.connections.record(-1); + } +} diff --git a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java index c93d1686ce..d52956e706 100644 --- a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java +++ b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java @@ -18,6 +18,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.TreeMap; import javax.ws.rs.core.Configurable; @@ -47,6 +49,15 @@ public void setupResources(Configurable config, HelloWorldRestConfig appConfi config.register(new HelloWorldResource(appConfig)); } + @Override + public Map getMetricsTags() { + Map tags = new LinkedHashMap(); + // In a real app, you might have or generate a unique ID for this instance and add other + // tags like data center, app version, etc. + tags.put("instance-id", "1"); + return tags; + } + public static void main(String[] args) { try { // This simple configuration is driven by the command line. Run with an argument to specify diff --git a/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java b/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java index eb7a375f64..ebf54e2d66 100644 --- a/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java +++ b/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java @@ -107,7 +107,6 @@ public void setUp() throws Exception { ); } - app.configure(); getJerseyTest().setUp(); } From 4643bcce4bfa93986aeb103817f1922a27f5fdc6 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 19 Jan 2015 16:23:04 -0800 Subject: [PATCH 2/4] Add metrics for Jersey requests, both globally and per Resource method. --- core/pom.xml | 4 - .../java/io/confluent/rest/Application.java | 24 ++- ...ricsResourceMethodApplicationListener.java | 192 ++++++++++++++++++ examples/pom.xml | 10 + pom.xml | 1 + .../rest/EmbeddedServerTestHarness.java | 5 +- 6 files changed, 222 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java diff --git a/core/pom.xml b/core/pom.xml index 6ed89326ba..c6a27dc07f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -15,10 +15,6 @@ jar rest-utils - - 4.11 - - diff --git a/core/src/main/java/io/confluent/rest/Application.java b/core/src/main/java/io/confluent/rest/Application.java index 3276b15387..2cea986066 100644 --- a/core/src/main/java/io/confluent/rest/Application.java +++ b/core/src/main/java/io/confluent/rest/Application.java @@ -46,6 +46,7 @@ import io.confluent.rest.exceptions.GenericExceptionMapper; import io.confluent.rest.exceptions.WebApplicationExceptionMapper; import io.confluent.rest.logging.Slf4jRequestLog; +import io.confluent.rest.metrics.MetricsResourceMethodApplicationListener; import io.confluent.rest.metrics.MetricsSelectChannelConnector; import io.confluent.rest.validation.JacksonMessageBodyProvider; @@ -96,7 +97,9 @@ public Server createServer() throws RestConfigException { // The configuration for the JAX-RS REST service ResourceConfig resourceConfig = new ResourceConfig(); - configureBaseApplication(resourceConfig); + Map metricTags = getMetricsTags(); + + configureBaseApplication(resourceConfig, metricTags); setupResources(resourceConfig, getConfiguration()); // Configure the servlet container @@ -113,10 +116,8 @@ protected void doStop() throws Exception { }; int port = getConfiguration().getInt(RestConfig.PORT_CONFIG); - Map metricTags = getMetricsTags(); - String connectorMetricGrpName = "jetty"; MetricsSelectChannelConnector connector = new MetricsSelectChannelConnector( - port, metrics, connectorMetricGrpName, metricTags); + port, metrics, "jetty", metricTags); server.setConnectors(new Connector[]{connector}); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); @@ -142,21 +143,28 @@ protected void doStop() throws Exception { return server; } + public void configureBaseApplication(Configurable config) { + configureBaseApplication(config, null); + } + /** * Register standard components for a JSON REST application on the given JAX-RS configurable, * which can be either an ResourceConfig for a server or a ClientConfig for a Jersey-based REST * client. */ - public void configureBaseApplication(Configurable config) { - RestConfig restRestConfig = getConfiguration(); + public void configureBaseApplication(Configurable config, Map metricTags) { + RestConfig restConfig = getConfiguration(); config.register(JacksonMessageBodyProvider.class); config.register(JsonParseExceptionMapper.class); config.register(ValidationFeature.class); config.register(ConstraintViolationExceptionMapper.class); - config.register(new WebApplicationExceptionMapper(restRestConfig)); - config.register(new GenericExceptionMapper(restRestConfig)); + config.register(new WebApplicationExceptionMapper(restConfig)); + config.register(new GenericExceptionMapper(restConfig)); + + config.register(new MetricsResourceMethodApplicationListener(metrics, "jersey", + metricTags, restConfig.getTime())); config.property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true); } diff --git a/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java b/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java new file mode 100644 index 0000000000..4fe21a247e --- /dev/null +++ b/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java @@ -0,0 +1,192 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.rest.metrics; + +import org.glassfish.jersey.server.model.Resource; +import org.glassfish.jersey.server.model.ResourceMethod; +import org.glassfish.jersey.server.monitoring.ApplicationEvent; +import org.glassfish.jersey.server.monitoring.ApplicationEventListener; +import org.glassfish.jersey.server.monitoring.RequestEvent; +import org.glassfish.jersey.server.monitoring.RequestEventListener; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import io.confluent.common.metrics.MetricName; +import io.confluent.common.metrics.Metrics; +import io.confluent.common.metrics.Sensor; +import io.confluent.common.metrics.stats.Avg; +import io.confluent.common.metrics.stats.Count; +import io.confluent.common.metrics.stats.Rate; +import io.confluent.common.utils.Time; + +/** + * Jersey ResourceMethodApplicationListener that records metrics for each endpoint by listening + * for method start and finish events. It reports some common metrics for each such as rate and + * latency (average, 90th, 99th, etc). + */ +public class MetricsResourceMethodApplicationListener implements ApplicationEventListener { + private final Metrics metrics; + private final String metricGrpPrefix; + private Map metricTags; + Time time; + // This is immutable after it's initial construction + private Map methodMetrics = new HashMap(); + + public MetricsResourceMethodApplicationListener(Metrics metrics, String metricGrpPrefix, + Map metricTags, Time time) { + super(); + this.metrics = metrics; + this.metricGrpPrefix = metricGrpPrefix; + this.metricTags = (metricTags != null) ? metricTags : Collections.emptyMap(); + this.time = time; + } + + @Override + public void onEvent(ApplicationEvent event) { + if (event.getType() == ApplicationEvent.Type.INITIALIZATION_FINISHED) { + // Special null key is used for global stats + methodMetrics.put(null, new MethodMetrics(null, this.metrics, metricGrpPrefix, metricTags)); + + for (final Resource resource : event.getResourceModel().getResources()) { + for (final ResourceMethod method : resource.getAllMethods()) { + register(method); + } + + for (final Resource childResource : resource.getChildResources()) { + for (final ResourceMethod method : childResource.getAllMethods()) { + register(method); + } + } + } + } + } + + private void register(ResourceMethod method) { + final Method definitionMethod = method.getInvocable().getDefinitionMethod(); + if (!definitionMethod.getDeclaringClass().getName().startsWith("org.glassfish.jersey")) { + methodMetrics.put( + definitionMethod, + new MethodMetrics(method, this.metrics, metricGrpPrefix, metricTags)); + } + } + + @Override + public RequestEventListener onRequest(final RequestEvent event) { + return new MetricsRequestEventListener(methodMetrics, time); + } + + private static class MethodMetrics { + // This records request latencies, but we can also get basic requests stats (like rate of + // HTTP requests) as well + private Sensor requests; + private Sensor exceptions; + + public MethodMetrics(ResourceMethod method, Metrics metrics, String metricGrpPrefix, + Map metricTags) { + String metricGrpName = metricGrpPrefix + "-metrics"; + + this.requests = metrics.sensor(getName(method, "requests")); + MetricName metricName = new MetricName(getName(method, "requests-rate"), metricGrpName, + "Rate of HTTP requests", + metricTags); + this.requests.add(metricName, new Rate(new Count())); + metricName = new MetricName(getName(method, "requests-latency-avg"), metricGrpName, + "Average latency of HTTP requests requests", + metricTags); + this.requests.add(metricName, new Avg()); + + this.exceptions = metrics.sensor(getName(method, "request-exceptions")); + metricName = new MetricName(getName(method, "request-exceptions-rate"), metricGrpName, + "Rate at which HTTP requests are being generated", + metricTags); + this.exceptions.add(metricName, new Rate()); + } + + /** + * Indicate that a request has finished successfully. + */ + public void finished(long latency) { + requests.record(); + } + + /** + * Indicate that a request has failed with an exception. + */ + public void exception() { + exceptions.record(); + } + + private static String getName(final ResourceMethod method, String metric) { + StringBuilder builder = new StringBuilder(); + if (method != null) { + String className = method.getInvocable().getDefinitionMethod() + .getDeclaringClass().getSimpleName(); + String methodName = method.getInvocable().getDefinitionMethod().getName(); + builder.append(className); + builder.append('.'); + builder.append(methodName); + builder.append('.'); + } + builder.append(metric); + return builder.toString(); + } + + } + + private static class MetricsRequestEventListener implements RequestEventListener { + private final Time time; + private final Map metrics; + private long started; + + public MetricsRequestEventListener(final Map metrics, Time time) { + this.metrics = metrics; + this.time = time; + } + + @Override + public void onEvent(RequestEvent event) { + if (event.getType() == RequestEvent.Type.RESOURCE_METHOD_START) { + started = time.milliseconds(); + } else if (event.getType() == RequestEvent.Type.RESOURCE_METHOD_FINISHED) { + final long elapsed = time.milliseconds() - started; + this.metrics.get(null).finished(elapsed); + final MethodMetrics metrics = getMethodMetrics(event); + if (metrics != null) { + metrics.finished(elapsed); + } + } else if (event.getType() == RequestEvent.Type.ON_EXCEPTION) { + this.metrics.get(null).exception(); + final MethodMetrics metrics = getMethodMetrics(event); + if (metrics != null) { + metrics.exception(); + } + } + } + + private MethodMetrics getMethodMetrics(RequestEvent event) { + ResourceMethod method = event.getUriInfo().getMatchedResourceMethod(); + if (method == null) { + return null; + } + return this.metrics.get(method.getInvocable().getDefinitionMethod()); + } + } +} + diff --git a/examples/pom.xml b/examples/pom.xml index d377b21b62..8a3f38b5f0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -29,6 +29,16 @@ ${project.version} test + + junit + junit + ${junit.version} + + + org.glassfish.jersey.test-framework + jersey-test-framework-core + ${jersey.version} + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 173d5355c3..ae662b7df4 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,7 @@ 2.6 8.1.16.v20140903 2.4.3 + 4.11 UTF-8 diff --git a/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java b/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java index ebf54e2d66..915bc95bc8 100644 --- a/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java +++ b/test/src/main/java/io/confluent/rest/EmbeddedServerTestHarness.java @@ -113,6 +113,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { test.tearDown(); + test = null; } protected void addResource(Object resource) { @@ -153,7 +154,7 @@ public JettyJerseyTest() { protected Application configure() { ResourceConfig config = new ResourceConfig(); // Only configure the base application, resources are added manually with addResource - app.configureBaseApplication(config); + app.configureBaseApplication(config, null); for (Object resource : resources) config.register(resource); for (Class resource : resourceClasses) @@ -162,7 +163,7 @@ protected Application configure() { } @Override protected void configureClient(ClientConfig config) { - app.configureBaseApplication(config); + app.configureBaseApplication(config, null); } } } From 592cde25b4f86ed5373697d6728c89fa7ed47029 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 26 Jan 2015 23:07:09 -0800 Subject: [PATCH 3/4] Add PerformanceMetric annotation that can override the metric name and only report metrics if they have the annotation. --- .../rest/annotations/PerformanceMetric.java | 43 +++++++++++++++++++ ...ricsResourceMethodApplicationListener.java | 41 +++++++++++------- .../helloworld/HelloWorldResource.java | 3 ++ 3 files changed, 71 insertions(+), 16 deletions(-) create mode 100644 core/src/main/java/io/confluent/rest/annotations/PerformanceMetric.java diff --git a/core/src/main/java/io/confluent/rest/annotations/PerformanceMetric.java b/core/src/main/java/io/confluent/rest/annotations/PerformanceMetric.java new file mode 100644 index 0000000000..2ee86cef1a --- /dev/null +++ b/core/src/main/java/io/confluent/rest/annotations/PerformanceMetric.java @@ -0,0 +1,43 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.rest.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation that specifies that performance metrics should be collected for the annotated + * resource. + */ +@Inherited +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.CONSTRUCTOR, + ElementType.METHOD, ElementType.ANNOTATION_TYPE}) +public @interface PerformanceMetric { + /** + * Placeholder that specifies the name should use a default value, such as the class/method + * name of the resource. + */ + public static String DEFAULT_NAME = "__DEFAULT_NAME__"; + + String value() default DEFAULT_NAME; +} diff --git a/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java b/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java index 4fe21a247e..c743f797bd 100644 --- a/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java +++ b/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java @@ -35,6 +35,7 @@ import io.confluent.common.metrics.stats.Count; import io.confluent.common.metrics.stats.Rate; import io.confluent.common.utils.Time; +import io.confluent.rest.annotations.PerformanceMetric; /** * Jersey ResourceMethodApplicationListener that records metrics for each endpoint by listening @@ -62,7 +63,8 @@ public MetricsResourceMethodApplicationListener(Metrics metrics, String metricGr public void onEvent(ApplicationEvent event) { if (event.getType() == ApplicationEvent.Type.INITIALIZATION_FINISHED) { // Special null key is used for global stats - methodMetrics.put(null, new MethodMetrics(null, this.metrics, metricGrpPrefix, metricTags)); + methodMetrics.put(null, + new MethodMetrics(null, null, this.metrics, metricGrpPrefix, metricTags)); for (final Resource resource : event.getResourceModel().getResources()) { for (final ResourceMethod method : resource.getAllMethods()) { @@ -80,10 +82,11 @@ public void onEvent(ApplicationEvent event) { private void register(ResourceMethod method) { final Method definitionMethod = method.getInvocable().getDefinitionMethod(); - if (!definitionMethod.getDeclaringClass().getName().startsWith("org.glassfish.jersey")) { + if (definitionMethod.isAnnotationPresent(PerformanceMetric.class)) { + PerformanceMetric annotation = definitionMethod.getAnnotation(PerformanceMetric.class); methodMetrics.put( definitionMethod, - new MethodMetrics(method, this.metrics, metricGrpPrefix, metricTags)); + new MethodMetrics(method, annotation, this.metrics, metricGrpPrefix, metricTags)); } } @@ -98,23 +101,22 @@ private static class MethodMetrics { private Sensor requests; private Sensor exceptions; - public MethodMetrics(ResourceMethod method, Metrics metrics, String metricGrpPrefix, - Map metricTags) { + public MethodMetrics(ResourceMethod method, PerformanceMetric annotation, Metrics metrics, + String metricGrpPrefix, Map metricTags) { String metricGrpName = metricGrpPrefix + "-metrics"; - this.requests = metrics.sensor(getName(method, "requests")); - MetricName metricName = new MetricName(getName(method, "requests-rate"), metricGrpName, - "Rate of HTTP requests", - metricTags); + this.requests = metrics.sensor(getName(method, annotation, "requests")); + MetricName metricName = new MetricName(getName(method, annotation, "requests-rate"), + metricGrpName, "Rate of HTTP requests", metricTags); this.requests.add(metricName, new Rate(new Count())); - metricName = new MetricName(getName(method, "requests-latency-avg"), metricGrpName, - "Average latency of HTTP requests requests", + metricName = new MetricName(getName(method, annotation, "requests-latency-avg"), + metricGrpName, "Average latency of HTTP requests requests", metricTags); this.requests.add(metricName, new Avg()); - this.exceptions = metrics.sensor(getName(method, "request-exceptions")); - metricName = new MetricName(getName(method, "request-exceptions-rate"), metricGrpName, - "Rate at which HTTP requests are being generated", + this.exceptions = metrics.sensor(getName(method, annotation, "request-exceptions")); + metricName = new MetricName(getName(method, annotation, "request-exceptions-rate"), + metricGrpName, "Rate at which HTTP requests are being generated", metricTags); this.exceptions.add(metricName, new Rate()); } @@ -133,9 +135,16 @@ public void exception() { exceptions.record(); } - private static String getName(final ResourceMethod method, String metric) { + private static String getName(final ResourceMethod method, + final PerformanceMetric annotation, String metric) { StringBuilder builder = new StringBuilder(); - if (method != null) { + boolean prefixed = false; + if (annotation != null && !annotation.value().equals(PerformanceMetric.DEFAULT_NAME)) { + builder.append(annotation.value()); + builder.append('.'); + prefixed = true; + } + if (!prefixed && method != null) { String className = method.getInvocable().getDefinitionMethod() .getDeclaringClass().getSimpleName(); String methodName = method.getInvocable().getDefinitionMethod().getName(); diff --git a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldResource.java b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldResource.java index 89dd9983ca..599b8b1785 100644 --- a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldResource.java +++ b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldResource.java @@ -24,6 +24,8 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import io.confluent.rest.annotations.PerformanceMetric; + @Path("/hello") @Produces("application/vnd.hello.v1+json") public class HelloWorldResource { @@ -54,6 +56,7 @@ public String getMessage() { } @GET + @PerformanceMetric("hello-with-name") public HelloResponse hello(@QueryParam("name") String name) { // Use a configuration setting to control the message that's written. The name is extracted from // the query parameter "name", or defaults to "World". You can test this API with curl: From 9c81dbeffe52274ee65f3de834438a0e5e02accf Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 27 Jan 2015 15:17:35 -0800 Subject: [PATCH 4/4] Rename sensors and metrics to better match Kafka's naming and add metrics for request and response size. --- ...ricsResourceMethodApplicationListener.java | 189 +++++++++++++++--- .../MetricsSelectChannelConnector.java | 24 +-- 2 files changed, 172 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java b/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java index c743f797bd..8585547a24 100644 --- a/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java +++ b/core/src/main/java/io/confluent/rest/metrics/MetricsResourceMethodApplicationListener.java @@ -16,6 +16,8 @@ package io.confluent.rest.metrics; +import org.glassfish.jersey.server.ContainerRequest; +import org.glassfish.jersey.server.ContainerResponse; import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.ResourceMethod; import org.glassfish.jersey.server.monitoring.ApplicationEvent; @@ -23,6 +25,11 @@ import org.glassfish.jersey.server.monitoring.RequestEvent; import org.glassfish.jersey.server.monitoring.RequestEventListener; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; @@ -33,6 +40,7 @@ import io.confluent.common.metrics.Sensor; import io.confluent.common.metrics.stats.Avg; import io.confluent.common.metrics.stats.Count; +import io.confluent.common.metrics.stats.Max; import io.confluent.common.metrics.stats.Rate; import io.confluent.common.utils.Time; import io.confluent.rest.annotations.PerformanceMetric; @@ -96,43 +104,83 @@ public RequestEventListener onRequest(final RequestEvent event) { } private static class MethodMetrics { - // This records request latencies, but we can also get basic requests stats (like rate of - // HTTP requests) as well - private Sensor requests; - private Sensor exceptions; + private Sensor requestSizeSensor; + private Sensor responseSizeSensor; + private Sensor requestLatencySensor; + private Sensor errorSensor; public MethodMetrics(ResourceMethod method, PerformanceMetric annotation, Metrics metrics, String metricGrpPrefix, Map metricTags) { String metricGrpName = metricGrpPrefix + "-metrics"; - this.requests = metrics.sensor(getName(method, annotation, "requests")); - MetricName metricName = new MetricName(getName(method, annotation, "requests-rate"), - metricGrpName, "Rate of HTTP requests", metricTags); - this.requests.add(metricName, new Rate(new Count())); - metricName = new MetricName(getName(method, annotation, "requests-latency-avg"), - metricGrpName, "Average latency of HTTP requests requests", - metricTags); - this.requests.add(metricName, new Avg()); - - this.exceptions = metrics.sensor(getName(method, annotation, "request-exceptions")); - metricName = new MetricName(getName(method, annotation, "request-exceptions-rate"), - metricGrpName, "Rate at which HTTP requests are being generated", - metricTags); - this.exceptions.add(metricName, new Rate()); + this.requestSizeSensor = metrics.sensor(getName(method, annotation, "request-size")); + MetricName metricName = new MetricName( + getName(method, annotation, "request-rate"), metricGrpName, + "The average number of HTTP requests per second.", metricTags); + this.requestSizeSensor.add(metricName, new Rate(new Count())); + metricName = new MetricName( + getName(method, annotation, "request-byte-rate"), metricGrpName, + "Bytes/second of incoming requests", metricTags); + this.requestSizeSensor.add(metricName, new Avg()); + metricName = new MetricName( + getName(method, annotation, "request-size-avg"), metricGrpName, + "The average request size in bytes", metricTags); + this.requestSizeSensor.add(metricName, new Avg()); + metricName = new MetricName( + getName(method, annotation, "request-size-max"), metricGrpName, + "The maximum request size in bytes", metricTags); + this.requestSizeSensor.add(metricName, new Max()); + + this.responseSizeSensor = metrics.sensor(getName(method, annotation, "response-size")); + metricName = new MetricName( + getName(method, annotation, "response-rate"), metricGrpName, + "The average number of HTTP responses per second.", metricTags); + this.responseSizeSensor.add(metricName, new Rate(new Count())); + metricName = new MetricName( + getName(method, annotation, "response-byte-rate"), metricGrpName, + "Bytes/second of outgoing responses", metricTags); + this.responseSizeSensor.add(metricName, new Avg()); + metricName = new MetricName( + getName(method, annotation, "response-size-avg"), metricGrpName, + "The average response size in bytes", metricTags); + this.responseSizeSensor.add(metricName, new Avg()); + metricName = new MetricName( + getName(method, annotation, "response-size-max"), metricGrpName, + "The maximum response size in bytes", metricTags); + this.responseSizeSensor.add(metricName, new Max()); + + this.requestLatencySensor = metrics.sensor(getName(method, annotation, "request-latency")); + metricName = new MetricName( + getName(method, annotation, "request-latency-avg"), metricGrpName, + "The average request latency in ms", metricTags); + this.requestLatencySensor.add(metricName, new Avg()); + metricName = new MetricName( + getName(method, annotation, "request-latency-max"), metricGrpName, + "The maximum request latency in ms", metricTags); + this.requestLatencySensor.add(metricName, new Max()); + + this.errorSensor = metrics.sensor(getName(method, annotation, "errors")); + metricName = new MetricName( + getName(method, annotation, "request-error-rate"), metricGrpName, + "The average number of requests per second that resulted in HTTP error responses", + metricTags); + this.errorSensor.add(metricName, new Rate()); } /** * Indicate that a request has finished successfully. */ - public void finished(long latency) { - requests.record(); + public void finished(long requestSize, long responseSize, long latencyMs) { + requestSizeSensor.record(requestSize); + responseSizeSensor.record(responseSize); + requestLatencySensor.record(latencyMs); } /** * Indicate that a request has failed with an exception. */ public void exception() { - exceptions.record(); + errorSensor.record(); } private static String getName(final ResourceMethod method, @@ -163,6 +211,8 @@ private static class MetricsRequestEventListener implements RequestEventListener private final Time time; private final Map metrics; private long started; + private CountingInputStream wrappedRequestStream; + private CountingOutputStream wrappedResponseStream; public MetricsRequestEventListener(final Map metrics, Time time) { this.metrics = metrics; @@ -171,21 +221,30 @@ public MetricsRequestEventListener(final Map metrics, Tim @Override public void onEvent(RequestEvent event) { - if (event.getType() == RequestEvent.Type.RESOURCE_METHOD_START) { + if (event.getType() == RequestEvent.Type.MATCHING_START) { started = time.milliseconds(); - } else if (event.getType() == RequestEvent.Type.RESOURCE_METHOD_FINISHED) { - final long elapsed = time.milliseconds() - started; - this.metrics.get(null).finished(elapsed); - final MethodMetrics metrics = getMethodMetrics(event); - if (metrics != null) { - metrics.finished(elapsed); - } + final ContainerRequest request = event.getContainerRequest(); + wrappedRequestStream = new CountingInputStream(request.getEntityStream()); + request.setEntityStream(wrappedRequestStream); + } else if (event.getType() == RequestEvent.Type.RESP_FILTERS_START) { + final ContainerResponse response = event.getContainerResponse(); + wrappedResponseStream = new CountingOutputStream(response.getEntityStream()); + response.setEntityStream(wrappedResponseStream); } else if (event.getType() == RequestEvent.Type.ON_EXCEPTION) { this.metrics.get(null).exception(); final MethodMetrics metrics = getMethodMetrics(event); if (metrics != null) { metrics.exception(); } + } else if (event.getType() == RequestEvent.Type.FINISHED) { + final long elapsed = time.milliseconds() - started; + final long requestSize = wrappedRequestStream.size(); + final long responseSize = wrappedResponseStream.size(); + this.metrics.get(null).finished(requestSize, responseSize, elapsed); + final MethodMetrics metrics = getMethodMetrics(event); + if (metrics != null) { + metrics.finished(requestSize, responseSize, elapsed); + } } } @@ -196,6 +255,78 @@ private MethodMetrics getMethodMetrics(RequestEvent event) { } return this.metrics.get(method.getInvocable().getDefinitionMethod()); } + + private static class CountingInputStream extends FilterInputStream { + private long count = 0; + private long mark = 0; + public CountingInputStream(InputStream is) { + super(is); + } + + public long size() { + return count; + } + + @Override + public int read() throws IOException { + int b = super.read(); + count++; + return b; + } + + // Note that read(byte[]) for FilterInputStream calls this.read(b,0,b.length), NOT + // underlying.read(b), so accounting for those calls is handled by read(byte[],int,int). + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + int nread = super.read(bytes, off, len); + if (nread > 0) { + count += nread; + } + return nread; + } + + @Override + public long skip(long l) throws IOException { + long skipped = super.skip(l); + count += skipped; + return skipped; + } + + @Override + public synchronized void mark(int i) { + super.mark(i); + mark = count; + } + + @Override + public synchronized void reset() throws IOException { + super.reset(); + count = mark; + } + } + + private static class CountingOutputStream extends FilterOutputStream { + private long count = 0; + + public CountingOutputStream(OutputStream os) { + super(os); + } + + public long size() { + return count; + } + + @Override + public void write(int b) throws IOException { + super.write(b); + count++; + } + + // The methods write(byte[]) and write(byte[],int,int) are guaranteed to call write(int), + // either directly or indirectly, for FilterOutputStream, so they do not need to be + // overridden -- all the accounting is handled by write(int) + } } } diff --git a/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java b/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java index 3bf91a64cd..14b4e16664 100644 --- a/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java +++ b/core/src/main/java/io/confluent/rest/metrics/MetricsSelectChannelConnector.java @@ -38,24 +38,24 @@ public MetricsSelectChannelConnector(int port, Metrics metrics, String metricGrp String metricGrpName = metricGrpPrefix + "-metrics"; this.accepts = metrics.sensor("connections-accepted"); - MetricName metricName = new MetricName("connections-accepted-rate", metricGrpName, - "Rate at which Jetty TCP connections are being accepted", - metricTags); + MetricName metricName = new MetricName( + "connections-accepted-rate", metricGrpName, + "The average rate per second of accepted Jetty TCP connections", metricTags); this.accepts.add(metricName, new Rate()); this.connects = metrics.sensor("connections-opened"); - metricName = new MetricName("connections-opened-rate", metricGrpName, - "Rate at which new Jetty TCP connections are being opened", - metricTags); + metricName = new MetricName + ("connections-opened-rate", metricGrpName, + "The average rate per second of opened Jetty TCP connections", metricTags); this.connects.add(metricName, new Rate()); this.disconnects = metrics.sensor("connections-closed"); - metricName = new MetricName("connections-closed-rate", metricGrpName, - "Rate at which Jetty TCP connections are being closed", - metricTags); + metricName = new MetricName( + "connections-closed-rate", metricGrpName, + "The average rate per second of closed Jetty TCP connections", metricTags); this.disconnects.add(metricName, new Rate()); this.connections = metrics.sensor("connections"); - metricName = new MetricName("connections-active", metricGrpName, - "Total number of active TCP connections", - metricTags); + metricName = new MetricName( + "connections-active", metricGrpName, + "Total number of active Jetty TCP connections", metricTags); this.connections.add(metricName, new Total()); }