Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better harmonized dimensions for query metrics. #3245

Merged
merged 1 commit into from
Jul 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public void run()
final long cpuTime = cpuTimeAccumulator.get();
if (cpuTime > 0) {
final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query));
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000));
}
}
Expand Down
17 changes: 12 additions & 5 deletions processing/src/main/java/io/druid/query/DruidMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,22 @@ public String apply(Interval input)
).toArray(new String[query.getIntervals().size()])
)
.setDimension("hasFilters", String.valueOf(query.hasFilters()))
.setDimension("duration", query.getDuration().toString());
.setDimension("duration", query.getDuration().toString())
.setDimension(ID, Strings.nullToEmpty(query.getId()));
}

public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(
final ObjectMapper jsonMapper, final Query<T> query, final String remoteAddr
final QueryToolChest<T, Query<T>> toolChest,
final ObjectMapper jsonMapper,
final Query<T> query,
final String remoteAddr
) throws JsonProcessingException
{
return makePartialQueryTimeMetric(query)
final ServiceMetricEvent.Builder baseMetric = toolChest == null
? makePartialQueryTimeMetric(query)
: toolChest.makeMetricBuilder(query);

return baseMetric
.setDimension(
"context",
jsonMapper.writeValueAsString(
Expand All @@ -99,7 +107,6 @@ public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(
: query.getContext()
)
)
.setDimension("remoteAddress", remoteAddr)
.setDimension(ID, query.getId());
.setDimension("remoteAddress", remoteAddr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
builder.setDimension(userDimension.getKey(), userDimension.getValue());
}

builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));

return new Sequence<T>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected Sequence<Result<DataSourceMetadataResultValue>> doRun(
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query)
{
return new ServiceMetricEvent.Builder()
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension("type", query.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected Sequence<Result<TimeBoundaryResultValue>> doRun(
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{
return new ServiceMetricEvent.Builder()
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension(DruidMetrics.TYPE, query.getType());
}
Expand Down
2 changes: 0 additions & 2 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> context)

final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query);
builder.setDimension("server", host);
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));


final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/io/druid/guice/QueryToolChestModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.datasourcemetadata.DataSourceMetadataQuery;
import io.druid.query.datasourcemetadata.DataSourceQueryQueryToolChest;
import io.druid.query.groupby.GroupByQuery;
Expand Down Expand Up @@ -74,6 +76,8 @@ public void configure(Binder binder)
binder.bind(entry.getValue()).in(LazySingleton.class);
}

binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);

JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
Expand Down Expand Up @@ -88,6 +89,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o
response.flushBuffer();
}

private final QueryToolChestWarehouse warehouse;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
Expand All @@ -99,6 +101,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o
private HttpClient broadcastClient;

public AsyncQueryForwardingServlet(
QueryToolChestWarehouse warehouse,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
Expand All @@ -108,6 +111,7 @@ public AsyncQueryForwardingServlet(
RequestLogger requestLogger
)
{
this.warehouse = warehouse;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
Expand Down Expand Up @@ -342,7 +346,7 @@ public void onComplete(Result result)
final long requestTime = System.currentTimeMillis() - start;
try {
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(warehouse.getToolChest(query), jsonMapper, query, req.getRemoteAddr())
.build("query/time", requestTime)
);

Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/io/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.Access;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class QueryResource

private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024;

private final QueryToolChestWarehouse warehouse;
private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
Expand All @@ -94,6 +97,7 @@ public class QueryResource

@Inject
public QueryResource(
QueryToolChestWarehouse warehouse,
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
Expand All @@ -104,6 +108,7 @@ public QueryResource(
AuthConfig authConfig
)
{
this.warehouse = warehouse;
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
Expand Down Expand Up @@ -159,6 +164,7 @@ public Response doPost(
{
final long start = System.currentTimeMillis();
Query query = null;
QueryToolChest toolChest = null;
String queryId = null;

final String reqContentType = req.getContentType();
Expand Down Expand Up @@ -187,6 +193,7 @@ public Response doPost(
)
);
}
toolChest = warehouse.getToolChest(query);

Thread.currentThread()
.setName(String.format("%s[%s_%s_%s]", currThreadName, query.getType(), query.getDataSource(), queryId));
Expand Down Expand Up @@ -236,6 +243,7 @@ public Object accumulate(Object accumulated, Object in)

try {
final Query theQuery = query;
final QueryToolChest theToolChest = toolChest;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correct, the only reason you need to have a 2nd variable name here is because the two code paths of A) query successfully deserializing and B) query throws exception while deserializing... are all muddled together in the exception handling.

Would it be worth it in this PR to try and clean that up a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here because the query and toolChest are passed to anonymous classes at some point, and so the references need to be final. But the original references aren't final due to how the exception handling is done.

I didn't really want to mess with any of that in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I figured it would probably be outside the scope of this PR, but should be refactored before too many work-arounds are added.

Response.ResponseBuilder builder = Response
.ok(
new StreamingOutput()
Expand All @@ -252,12 +260,12 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE

final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.setDimension("success", "true")
.build("query/time", queryTime)
);
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.build("query/bytes", os.getCount())
);

Expand Down Expand Up @@ -309,7 +317,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
log.info("%s [%s]", e.getMessage(), queryId);
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
.setDimension("success", "false")
.build("query/time", queryTime)
);
Expand Down Expand Up @@ -356,7 +364,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
try {
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toolchest can possibly be null here, do we need to handle that case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DruidMetrics.makeQueryTimeMetric does handle this case.

.setDimension("success", "false")
.build("query/time", queryTime)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.Binder;
import com.google.inject.Inject;
Expand All @@ -39,7 +40,9 @@
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.initialization.Initialization;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryToolChest;
import io.druid.server.initialization.BaseJettyTest;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
Expand Down Expand Up @@ -213,6 +216,7 @@ public Collection<String> getAllHosts()

ServletHolder holder = new ServletHolder(
new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.<Class<? extends Query>, QueryToolChest>of()),
injector.getInstance(ObjectMapper.class),
injector.getInstance(Key.get(ObjectMapper.class, Smile.class)),
hostFinder,
Expand Down
9 changes: 9 additions & 0 deletions server/src/test/java/io/druid/server/QueryResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.concurrent.Execs;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.SegmentDescriptor;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.NoopRequestLogger;
Expand Down Expand Up @@ -63,6 +67,7 @@
*/
public class QueryResourceTest
{
private static final QueryToolChestWarehouse warehouse = new MapQueryToolChestWarehouse(ImmutableMap.<Class<? extends Query>, QueryToolChest>of());
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
public static final ServerConfig serverConfig = new ServerConfig()
{
Expand Down Expand Up @@ -125,6 +130,7 @@ public void setup()
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryManager = new QueryManager();
queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down Expand Up @@ -198,6 +204,7 @@ public Access isAuthorized(
EasyMock.replay(testServletRequest);

queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down Expand Up @@ -263,6 +270,7 @@ public Access isAuthorized(
EasyMock.replay(testServletRequest);

queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down Expand Up @@ -359,6 +367,7 @@ public Access isAuthorized(
EasyMock.replay(testServletRequest);

queryResource = new QueryResource(
warehouse,
serverConfig,
jsonMapper,
jsonMapper,
Expand Down
2 changes: 0 additions & 2 deletions services/src/main/java/io/druid/cli/CliBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ public void configure(Binder binder)
);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082);

binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);

binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(BrokerServerView.class).in(LazySingleton.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
Expand All @@ -46,6 +47,7 @@
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
{
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
Expand All @@ -56,6 +58,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer

@Inject
public RouterJettyServerInitializer(
QueryToolChestWarehouse warehouse,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
Expand All @@ -65,6 +68,7 @@ public RouterJettyServerInitializer(
RequestLogger requestLogger
)
{
this.warehouse = warehouse;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
Expand All @@ -82,6 +86,7 @@ public void initialize(Server server, Injector injector)
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");

final AsyncQueryForwardingServlet asyncQueryForwardingServlet = new AsyncQueryForwardingServlet(
warehouse,
jsonMapper,
smileMapper,
hostFinder,
Expand Down