diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index d4e043ec3843..624f09669bec 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -144,7 +144,6 @@ public void run() final long cpuTime = cpuTimeAccumulator.get(); if (cpuTime > 0) { final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query)); - builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId())); emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000)); } } diff --git a/processing/src/main/java/io/druid/query/DruidMetrics.java b/processing/src/main/java/io/druid/query/DruidMetrics.java index e3814ab29e51..a62f5c2203cc 100644 --- a/processing/src/main/java/io/druid/query/DruidMetrics.java +++ b/processing/src/main/java/io/druid/query/DruidMetrics.java @@ -83,14 +83,22 @@ public String apply(Interval input) ).toArray(new String[query.getIntervals().size()]) ) .setDimension("hasFilters", String.valueOf(query.hasFilters())) - .setDimension("duration", query.getDuration().toString()); + .setDimension("duration", query.getDuration().toString()) + .setDimension(ID, Strings.nullToEmpty(query.getId())); } public static ServiceMetricEvent.Builder makeQueryTimeMetric( - final ObjectMapper jsonMapper, final Query query, final String remoteAddr + final QueryToolChest> toolChest, + final ObjectMapper jsonMapper, + final Query query, + final String remoteAddr ) throws JsonProcessingException { - return makePartialQueryTimeMetric(query) + final ServiceMetricEvent.Builder baseMetric = toolChest == null + ? makePartialQueryTimeMetric(query) + : toolChest.makeMetricBuilder(query); + + return baseMetric .setDimension( "context", jsonMapper.writeValueAsString( @@ -99,7 +107,6 @@ public static ServiceMetricEvent.Builder makeQueryTimeMetric( : query.getContext() ) ) - .setDimension("remoteAddress", remoteAddr) - .setDimension(ID, query.getId()); + .setDimension("remoteAddress", remoteAddr); } } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index b630748d58c9..b1211e0d7e56 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -104,8 +104,6 @@ public Sequence run(final Query query, final Map responseC builder.setDimension(userDimension.getKey(), userDimension.getValue()); } - builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId())); - return new Sequence() { @Override diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index 28d8e18a2517..09e110005dd5 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -31,6 +31,7 @@ import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; import io.druid.query.DataSourceUtil; +import io.druid.query.DruidMetrics; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -104,7 +105,7 @@ protected Sequence> doRun( @Override public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query) { - return new ServiceMetricEvent.Builder() + return DruidMetrics.makePartialQueryTimeMetric(query) .setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource())) .setDimension("type", query.getType()); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 3c74eabf24cd..b38431814896 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -109,7 +109,7 @@ protected Sequence> doRun( @Override public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) { - return new ServiceMetricEvent.Builder() + return DruidMetrics.makePartialQueryTimeMetric(query) .setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())) .setDimension(DruidMetrics.TYPE, query.getType()); } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 198171e899d1..2cfc17d91252 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -166,8 +166,6 @@ public Sequence run(final Query query, final Map context) final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query); builder.setDimension("server", host); - builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId())); - final HttpResponseHandler responseHandler = new HttpResponseHandler() { diff --git a/server/src/main/java/io/druid/guice/QueryToolChestModule.java b/server/src/main/java/io/druid/guice/QueryToolChestModule.java index 5394efc79106..4c07cdbf0f8d 100644 --- a/server/src/main/java/io/druid/guice/QueryToolChestModule.java +++ b/server/src/main/java/io/druid/guice/QueryToolChestModule.java @@ -23,8 +23,10 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; +import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; import io.druid.query.datasourcemetadata.DataSourceMetadataQuery; import io.druid.query.datasourcemetadata.DataSourceQueryQueryToolChest; import io.druid.query.groupby.GroupByQuery; @@ -74,6 +76,8 @@ public void configure(Binder binder) binder.bind(entry.getValue()).in(LazySingleton.class); } + binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); + JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class); diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index bb09aaf3f00d..c8d00f7b7069 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -32,6 +32,7 @@ import io.druid.guice.http.DruidHttpClientConfig; import io.druid.query.DruidMetrics; import io.druid.query.Query; +import io.druid.query.QueryToolChestWarehouse; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; @@ -88,6 +89,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o response.flushBuffer(); } + private final QueryToolChestWarehouse warehouse; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; @@ -99,6 +101,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o private HttpClient broadcastClient; public AsyncQueryForwardingServlet( + QueryToolChestWarehouse warehouse, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @@ -108,6 +111,7 @@ public AsyncQueryForwardingServlet( RequestLogger requestLogger ) { + this.warehouse = warehouse; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; @@ -342,7 +346,7 @@ public void onComplete(Result result) final long requestTime = System.currentTimeMillis() - start; try { emitter.emit( - DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr()) + DruidMetrics.makeQueryTimeMetric(warehouse.getToolChest(query), jsonMapper, query, req.getRemoteAddr()) .build("query/time", requestTime) ); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 63e37e338da3..5267ea62f037 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -42,6 +42,8 @@ import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.security.Access; @@ -83,6 +85,7 @@ public class QueryResource private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024; + private final QueryToolChestWarehouse warehouse; private final ServerConfig config; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; @@ -94,6 +97,7 @@ public class QueryResource @Inject public QueryResource( + QueryToolChestWarehouse warehouse, ServerConfig config, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, @@ -104,6 +108,7 @@ public QueryResource( AuthConfig authConfig ) { + this.warehouse = warehouse; this.config = config; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; @@ -159,6 +164,7 @@ public Response doPost( { final long start = System.currentTimeMillis(); Query query = null; + QueryToolChest toolChest = null; String queryId = null; final String reqContentType = req.getContentType(); @@ -187,6 +193,7 @@ public Response doPost( ) ); } + toolChest = warehouse.getToolChest(query); Thread.currentThread() .setName(String.format("%s[%s_%s_%s]", currThreadName, query.getType(), query.getDataSource(), queryId)); @@ -236,6 +243,7 @@ public Object accumulate(Object accumulated, Object in) try { final Query theQuery = query; + final QueryToolChest theToolChest = toolChest; Response.ResponseBuilder builder = Response .ok( new StreamingOutput() @@ -252,12 +260,12 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE final long queryTime = System.currentTimeMillis() - start; emitter.emit( - DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr()) + DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) .setDimension("success", "true") .build("query/time", queryTime) ); emitter.emit( - DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr()) + DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) .build("query/bytes", os.getCount()) ); @@ -309,7 +317,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE log.info("%s [%s]", e.getMessage(), queryId); final long queryTime = System.currentTimeMillis() - start; emitter.emit( - DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr()) + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) .setDimension("success", "false") .build("query/time", queryTime) ); @@ -356,7 +364,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE try { final long queryTime = System.currentTimeMillis() - start; emitter.emit( - DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr()) + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) .setDimension("success", "false") .build("query/time", queryTime) ); diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java index e2bac72ab8bb..0ad9d29353c1 100644 --- a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; import com.google.inject.Binder; import com.google.inject.Inject; @@ -39,7 +40,9 @@ import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.initialization.Initialization; +import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; +import io.druid.query.QueryToolChest; import io.druid.server.initialization.BaseJettyTest; import io.druid.server.initialization.jetty.JettyServerInitUtils; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -213,6 +216,7 @@ public Collection getAllHosts() ServletHolder holder = new ServletHolder( new AsyncQueryForwardingServlet( + new MapQueryToolChestWarehouse(ImmutableMap., QueryToolChest>of()), injector.getInstance(ObjectMapper.class), injector.getInstance(Key.get(ObjectMapper.class, Smile.class)), hostFinder, diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index dabd6b575e8c..7518debddcfc 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.guava.Sequence; @@ -28,9 +29,12 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.concurrent.Execs; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; import io.druid.query.SegmentDescriptor; import io.druid.server.initialization.ServerConfig; import io.druid.server.log.NoopRequestLogger; @@ -63,6 +67,7 @@ */ public class QueryResourceTest { + private static final QueryToolChestWarehouse warehouse = new MapQueryToolChestWarehouse(ImmutableMap., QueryToolChest>of()); private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); public static final ServerConfig serverConfig = new ServerConfig() { @@ -125,6 +130,7 @@ public void setup() EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes(); queryManager = new QueryManager(); queryResource = new QueryResource( + warehouse, serverConfig, jsonMapper, jsonMapper, @@ -198,6 +204,7 @@ public Access isAuthorized( EasyMock.replay(testServletRequest); queryResource = new QueryResource( + warehouse, serverConfig, jsonMapper, jsonMapper, @@ -263,6 +270,7 @@ public Access isAuthorized( EasyMock.replay(testServletRequest); queryResource = new QueryResource( + warehouse, serverConfig, jsonMapper, jsonMapper, @@ -359,6 +367,7 @@ public Access isAuthorized( EasyMock.replay(testServletRequest); queryResource = new QueryResource( + warehouse, serverConfig, jsonMapper, jsonMapper, diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 7994bd541e5a..3748c832bdaa 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -85,8 +85,6 @@ public void configure(Binder binder) ); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082); - binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); - binder.bind(CachingClusteredClient.class).in(LazySingleton.class); binder.bind(BrokerServerView.class).in(LazySingleton.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index 7cdb3ad24eba..a179a440a3d4 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -28,6 +28,7 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; +import io.druid.query.QueryToolChestWarehouse; import io.druid.server.AsyncQueryForwardingServlet; import io.druid.server.initialization.jetty.JettyServerInitUtils; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -46,6 +47,7 @@ */ public class RouterJettyServerInitializer implements JettyServerInitializer { + private final QueryToolChestWarehouse warehouse; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; @@ -56,6 +58,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer @Inject public RouterJettyServerInitializer( + QueryToolChestWarehouse warehouse, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @@ -65,6 +68,7 @@ public RouterJettyServerInitializer( RequestLogger requestLogger ) { + this.warehouse = warehouse; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; @@ -82,6 +86,7 @@ public void initialize(Server server, Injector injector) root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); final AsyncQueryForwardingServlet asyncQueryForwardingServlet = new AsyncQueryForwardingServlet( + warehouse, jsonMapper, smileMapper, hostFinder,