Skip to content

Commit

Permalink
Better harmonized dimensions for query metrics. (#3245)
Browse files Browse the repository at this point in the history
All query metrics now start with toolChest.makeMetricBuilder, and all of
*those* now start with DruidMetrics.makePartialQueryTimeMetric. Also, "id"
moved to common code, since all query metrics added it anyway.

In particular this will add query-type specific dimensions like "threshold"
and "numDimensions" to servlet-originated metrics like query/time.
  • Loading branch information
gianm authored and fjy committed Jul 14, 2016
1 parent 55e7a52 commit 6cd1f53
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public void run()
final long cpuTime = cpuTimeAccumulator.get();
if (cpuTime > 0) {
final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query));
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000));
}
}
Expand Down
17 changes: 12 additions & 5 deletions processing/src/main/java/io/druid/query/DruidMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,22 @@ public String apply(Interval input)
).toArray(new String[query.getIntervals().size()])
)
.setDimension("hasFilters", String.valueOf(query.hasFilters()))
.setDimension("duration", query.getDuration().toString());
.setDimension("duration", query.getDuration().toString())
.setDimension(ID, Strings.nullToEmpty(query.getId()));
}

public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(
final ObjectMapper jsonMapper, final Query<T> query, final String remoteAddr
final QueryToolChest<T, Query<T>> toolChest,
final ObjectMapper jsonMapper,
final Query<T> query,
final String remoteAddr
) throws JsonProcessingException
{
return makePartialQueryTimeMetric(query)
final ServiceMetricEvent.Builder baseMetric = toolChest == null
? makePartialQueryTimeMetric(query)
: toolChest.makeMetricBuilder(query);

return baseMetric
.setDimension(
"context",
jsonMapper.writeValueAsString(
Expand All @@ -99,7 +107,6 @@ public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(
: query.getContext()
)
)
.setDimension("remoteAddress", remoteAddr)
.setDimension(ID, query.getId());
.setDimension("remoteAddress", remoteAddr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
builder.setDimension(userDimension.getKey(), userDimension.getValue());
}

builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));

return new Sequence<T>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected Sequence<Result<DataSourceMetadataResultValue>> doRun(
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query)
{
return new ServiceMetricEvent.Builder()
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension("type", query.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected Sequence<Result<TimeBoundaryResultValue>> doRun(
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{
return new ServiceMetricEvent.Builder()
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension(DruidMetrics.TYPE, query.getType());
}
Expand Down
2 changes: 0 additions & 2 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> context)

final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query);
builder.setDimension("server", host);
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));


final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/io/druid/guice/QueryToolChestModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.datasourcemetadata.DataSourceMetadataQuery;
import io.druid.query.datasourcemetadata.DataSourceQueryQueryToolChest;
import io.druid.query.groupby.GroupByQuery;
Expand Down Expand Up @@ -74,6 +76,8 @@ public void configure(Binder binder)
binder.bind(entry.getValue()).in(LazySingleton.class);
}

binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);

JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
Expand Down Expand Up @@ -88,6 +89,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o
response.flushBuffer();
}

private final QueryToolChestWarehouse warehouse;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
Expand All @@ -99,6 +101,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o
private HttpClient broadcastClient;

public AsyncQueryForwardingServlet(
QueryToolChestWarehouse warehouse,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
Expand All @@ -108,6 +111,7 @@ public AsyncQueryForwardingServlet(
RequestLogger requestLogger
)
{
this.warehouse = warehouse;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
Expand Down Expand Up @@ -342,7 +346,7 @@ public void onComplete(Result result)
final long requestTime = System.currentTimeMillis() - start;
try {
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(warehouse.getToolChest(query), jsonMapper, query, req.getRemoteAddr())
.build("query/time", requestTime)
);

Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/io/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.Access;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class QueryResource

private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024;

private final QueryToolChestWarehouse warehouse;
private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
Expand All @@ -94,6 +97,7 @@ public class QueryResource

@Inject
public QueryResource(
QueryToolChestWarehouse warehouse,
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
Expand All @@ -104,6 +108,7 @@ public QueryResource(
AuthConfig authConfig
)
{
this.warehouse = warehouse;
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
Expand Down Expand Up @@ -159,6 +164,7 @@ public Response doPost(
{
final long start = System.currentTimeMillis();
Query query = null;
QueryToolChest toolChest = null;
String queryId = null;

final String reqContentType = req.getContentType();
Expand Down Expand Up @@ -187,6 +193,7 @@ public Response doPost(
)
);
}
toolChest = warehouse.getToolChest(query);

Thread.currentThread()
.setName(String.format("%s[%s_%s_%s]", currThreadName, query.getType(), query.getDataSource(), queryId));
Expand Down Expand Up @@ -236,6 +243,7 @@ public Object accumulate(Object accumulated, Object in)

try {
final Query theQuery = query;
final QueryToolChest theToolChest = toolChest;
Response.ResponseBuilder builder = Response
.ok(
new StreamingOutput()
Expand All @@ -252,12 +260,12 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE

final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.setDimension("success", "true")
.build("query/time", queryTime)
);
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.build("query/bytes", os.getCount())
);

Expand Down Expand Up @@ -309,7 +317,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
log.info("%s [%s]", e.getMessage(), queryId);
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
.setDimension("success", "false")
.build("query/time", queryTime)
);
Expand Down Expand Up @@ -356,7 +364,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
try {
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
.setDimension("success", "false")
.build("query/time", queryTime)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.Binder;
import com.google.inject.Inject;
Expand All @@ -39,7 +40,9 @@
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.initialization.Initialization;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryToolChest;
import io.druid.server.initialization.BaseJettyTest;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
Expand Down Expand Up @@ -213,6 +216,7 @@ public Collection<String> getAllHosts()

ServletHolder holder = new ServletHolder(
new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.<Class<? extends Query>, QueryToolChest>of()),
injector.getInstance(ObjectMapper.class),
injector.getInstance(Key.get(ObjectMapper.class, Smile.class)),
hostFinder,
Expand Down
9 changes: 9 additions & 0 deletions server/src/test/java/io/druid/server/QueryResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.concurrent.Execs;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.SegmentDescriptor;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.NoopRequestLogger;
Expand Down Expand Up @@ -63,6 +67,7 @@
*/
public class QueryResourceTest
{
private static final QueryToolChestWarehouse warehouse = new MapQueryToolChestWarehouse(ImmutableMap.<Class<? extends Query>, QueryToolChest>of());
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
public static final ServerConfig serverConfig = new ServerConfig()
{
Expand Down Expand Up @@ -125,6 +130,7 @@ public void setup()
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryManager = new QueryManager();
queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down Expand Up @@ -198,6 +204,7 @@ public Access isAuthorized(
EasyMock.replay(testServletRequest);

queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down Expand Up @@ -263,6 +270,7 @@ public Access isAuthorized(
EasyMock.replay(testServletRequest);

queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down Expand Up @@ -359,6 +367,7 @@ public Access isAuthorized(
EasyMock.replay(testServletRequest);

queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down
2 changes: 0 additions & 2 deletions services/src/main/java/io/druid/cli/CliBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ public void configure(Binder binder)
);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082);

binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);

binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(BrokerServerView.class).in(LazySingleton.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
Expand All @@ -46,6 +47,7 @@
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
{
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
Expand All @@ -56,6 +58,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer

@Inject
public RouterJettyServerInitializer(
QueryToolChestWarehouse warehouse,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
Expand All @@ -65,6 +68,7 @@ public RouterJettyServerInitializer(
RequestLogger requestLogger
)
{
this.warehouse = warehouse;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
Expand All @@ -82,6 +86,7 @@ public void initialize(Server server, Injector injector)
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");

final AsyncQueryForwardingServlet asyncQueryForwardingServlet = new AsyncQueryForwardingServlet(
warehouse,
jsonMapper,
smileMapper,
hostFinder,
Expand Down

0 comments on commit 6cd1f53

Please sign in to comment.