Skip to content

Commit

Permalink
fix: Uses pull query metrics for all paths, not just /query (#5983)
Browse files Browse the repository at this point in the history
* fix: Uses pull query metrics for all paths, not just /query
  • Loading branch information
AlanConfluent committed Aug 21, 2020
1 parent 6b15d7d commit 143849c
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;

public class QueryEndpoint {

Expand All @@ -62,14 +63,14 @@ public QueryPublisher createQueryPublisher(
final Context context,
final WorkerExecutor workerExecutor,
final ServiceContext serviceContext) {

final long startTimeNanos = Time.SYSTEM.nanoseconds();
// Must be run on worker as all this stuff is slow
VertxUtils.checkIsWorker();

final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

if (statement.getStatement().isPullQuery()) {
return createPullQueryPublisher(context, serviceContext, statement);
return createPullQueryPublisher(context, serviceContext, statement, startTimeNanos);
} else {
return createPushQueryPublisher(context, serviceContext, statement, workerExecutor);
}
Expand All @@ -93,10 +94,11 @@ private QueryPublisher createPushQueryPublisher(
private QueryPublisher createPullQueryPublisher(
final Context context,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement
final ConfiguredStatement<Query> statement,
final long startTimeNanos
) {
final PullQueryResult result = pullQueryExecutor.execute(
statement, serviceContext, Optional.empty(), Optional.of(false));
statement, serviceContext, Optional.of(false), startTimeNanos);
final TableRows tableRows = result.getTableRows();

return new PullQueryPublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
Expand Down Expand Up @@ -468,7 +469,7 @@ public void notifyTerminated() {
public void shutdown() {
log.info("ksqlDB shutdown called");
try {
streamedQueryResource.closeMetrics();
pullQueryExecutor.closeMetrics();
} catch (final Exception e) {
log.error("Exception while waiting for pull query metrics to close", e);
}
Expand Down Expand Up @@ -701,7 +702,8 @@ static KsqlRestApplication buildApplication(
heartbeatAgent, lagReportingAgent);

final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(
ksqlEngine, routingFilterFactory, ksqlConfig);
ksqlEngine, routingFilterFactory, ksqlConfig, ksqlEngine.getServiceId(),
Time.SYSTEM);

final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
Expand Down Expand Up @@ -138,17 +139,24 @@ public final class PullQueryExecutor {
private final KsqlExecutionContext executionContext;
private final RoutingFilterFactory routingFilterFactory;
private final RateLimiter rateLimiter;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

public PullQueryExecutor(
final KsqlExecutionContext executionContext,
final RoutingFilterFactory routingFilterFactory,
final KsqlConfig ksqlConfig
final KsqlConfig ksqlConfig,
final String serviceId,
final Time time
) {
this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
this.routingFilterFactory =
Objects.requireNonNull(routingFilterFactory, "routingFilterFactory");
this.rateLimiter = RateLimiter.create(ksqlConfig.getInt(
KsqlConfig.KSQL_QUERY_PULL_MAX_QPS_CONFIG));
this.pullQueryMetrics = ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_METRICS_ENABLED)
? Optional.of(new PullQueryExecutorMetrics(serviceId,
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS), time))
: Optional.empty();
}

@SuppressWarnings("unused") // Needs to match validator API.
Expand All @@ -164,8 +172,8 @@ public static void validate(
public PullQueryResult execute(
final ConfiguredStatement<Query> statement,
final ServiceContext serviceContext,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<Boolean> isInternalRequest
final Optional<Boolean> isInternalRequest,
final long startTimeNanos
) {
if (!statement.getStatement().isPullQuery()) {
throw new IllegalArgumentException("Executor can only handle pull queries");
Expand Down Expand Up @@ -222,13 +230,17 @@ public PullQueryResult execute(
contextStacker,
pullQueryMetrics);

return handlePullQuery(
final PullQueryResult result = handlePullQuery(
statement,
executionContext,
serviceContext,
pullQueryContext,
routingOptions
);

pullQueryMetrics.ifPresent(metrics ->
metrics.recordLatency(startTimeNanos));
return result;
} catch (final Exception e) {
pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1));
throw new KsqlStatementException(
Expand All @@ -247,6 +259,10 @@ void checkRateLimit() {
}
}

public void closeMetrics() {
pullQueryMetrics.ifPresent(PullQueryExecutorMetrics::close);
}

private PullQueryResult handlePullQuery(
final ConfiguredStatement<Query> statement,
final KsqlExecutionContext executionContext,
Expand Down Expand Up @@ -465,8 +481,7 @@ private PullQueryContext(
this.whereInfo = Objects.requireNonNull(whereInfo, "whereInfo");
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.contextStacker = Objects.requireNonNull(contextStacker, "contextStacker");
this.pullQueryMetrics = Objects.requireNonNull(
pullQueryMetrics, "pullQueryExecutorMetrics");
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
Expand All @@ -32,6 +33,7 @@
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.Time;

public class PullQueryExecutorMetrics implements Closeable {

Expand All @@ -46,14 +48,17 @@ public class PullQueryExecutorMetrics implements Closeable {
private final Sensor errorRateSensor;
private final Metrics metrics;
private final Map<String, String> customMetricsTags;
private final Time time;
private final String ksqlServiceId;

public PullQueryExecutorMetrics(
final String ksqlServiceId,
final Map<String, String> customMetricsTags
final Map<String, String> customMetricsTags,
final Time time
) {

this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
this.time = Objects.requireNonNull(time, "time");
this.metrics = MetricCollectors.getMetrics();
this.ksqlServiceId = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ ksqlServiceId;
Expand All @@ -78,12 +83,12 @@ public void recordRemoteRequests(final double value) {
this.remoteRequestsSensor.record(value);
}

public void recordRate(final double value) {
this.requestRateSensor.record(value);
}

public void recordLatency(final double value) {
this.latencySensor.record(value);
public void recordLatency(final long startTimeNanos) {
// Record latency at microsecond scale
final long nowNanos = time.nanoseconds();
final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos);
this.latencySensor.record(latency);
this.requestRateSensor.record(1);
}

public void recordErrorRate(final double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,26 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
private final ServiceContext serviceContext;
private final ConfiguredStatement<Query> query;
private final PullQueryExecutor pullQueryExecutor;
private final long startTimeNanos;

@VisibleForTesting
PullQueryPublisher(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> query,
final PullQueryExecutor pullQueryExecutor
final PullQueryExecutor pullQueryExecutor,
final long startTimeNanos
) {
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.query = requireNonNull(query, "query");
this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.startTimeNanos = startTimeNanos;
}

@Override
public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> subscriber) {
final PullQuerySubscription subscription = new PullQuerySubscription(
subscriber,
() -> pullQueryExecutor.execute(query, serviceContext, Optional.empty(), Optional.of(false))
() -> pullQueryExecutor.execute(query, serviceContext, Optional.of(false), startTimeNanos)
);

subscriber.onSubscribe(subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

package io.confluent.ksql.rest.server.resources.streaming;

import static java.util.Optional.empty;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -37,7 +35,6 @@
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.execution.PullQueryResult;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
Expand Down Expand Up @@ -82,8 +79,6 @@ public class StreamedQueryResource implements KsqlConfigurable {
private final Errors errorHandler;
private KsqlConfig ksqlConfig;
private final PullQueryExecutor pullQueryExecutor;
private Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final Time time;
private final DenyListPropertyValidator denyListPropertyValidator;

public StreamedQueryResource(
Expand Down Expand Up @@ -140,7 +135,6 @@ public StreamedQueryResource(
this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.denyListPropertyValidator =
Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
this.time = Time.SYSTEM;
}

@Override
Expand All @@ -150,13 +144,6 @@ public void configure(final KsqlConfig config) {
}

ksqlConfig = config;
final Boolean collectMetrics = ksqlConfig.getBoolean(
KsqlConfig.KSQL_QUERY_PULL_METRICS_ENABLED);
this.pullQueryMetrics = collectMetrics
? Optional.of(new PullQueryExecutorMetrics(
ksqlEngine.getServiceId(),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)))
: empty();
}

public EndpointResponse streamQuery(
Expand All @@ -165,7 +152,7 @@ public EndpointResponse streamQuery(
final CompletableFuture<Void> connectionClosedFuture,
final Optional<Boolean> isInternalRequest
) {
final long startTime = time.nanoseconds();
final long startTimeNanos = Time.SYSTEM.nanoseconds();
throwIfNotConfigured();

activenessRegistrar.updateLastRequestTime();
Expand All @@ -175,14 +162,8 @@ public EndpointResponse streamQuery(
CommandStoreUtil.httpWaitForCommandSequenceNumber(
commandQueue, request, commandQueueCatchupTimeout);

return handleStatement(securityContext, request, statement, startTime, connectionClosedFuture,
isInternalRequest);
}

public void closeMetrics() {
if (pullQueryMetrics != null) {
pullQueryMetrics.ifPresent(PullQueryExecutorMetrics::close);
}
return handleStatement(securityContext, request, statement, connectionClosedFuture,
isInternalRequest, startTimeNanos);
}

private void throwIfNotConfigured() {
Expand All @@ -209,9 +190,9 @@ private EndpointResponse handleStatement(
final KsqlSecurityContext securityContext,
final KsqlRequest request,
final PreparedStatement<?> statement,
final long startTime,
final CompletableFuture<Void> connectionClosedFuture,
final Optional<Boolean> isInternalRequest
final Optional<Boolean> isInternalRequest,
final long startTimeNanos
) {
try {
authorizationValidator.ifPresent(validator ->
Expand All @@ -233,14 +214,9 @@ private EndpointResponse handleStatement(
queryStmt,
configProperties,
request.getRequestProperties(),
isInternalRequest
isInternalRequest,
startTimeNanos
);
if (pullQueryMetrics.isPresent()) {
//Record latency at microsecond scale
final double latency = (time.nanoseconds() - startTime) / 1000f;
pullQueryMetrics.get().recordLatency(latency);
pullQueryMetrics.get().recordRate(1);
}
return response;
}

Expand Down Expand Up @@ -277,13 +253,14 @@ private EndpointResponse handlePullQuery(
final PreparedStatement<Query> statement,
final Map<String, Object> configOverrides,
final Map<String, Object> requestProperties,
final Optional<Boolean> isInternalRequest
final Optional<Boolean> isInternalRequest,
final long startTimeNanos
) {
final ConfiguredStatement<Query> configured =
ConfiguredStatement.of(statement, configOverrides, requestProperties, ksqlConfig);

final PullQueryResult result = pullQueryExecutor
.execute(configured, serviceContext, pullQueryMetrics, isInternalRequest);
.execute(configured, serviceContext, isInternalRequest, startTimeNanos);
final TableRows tableRows = result.getTableRows();
final Optional<KsqlHostInfoEntity> host = result.getSourceNode()
.map(KsqlNode::location)
Expand Down

0 comments on commit 143849c

Please sign in to comment.