Skip to content

Commit

Permalink
feat: Adding ksqlDB Query Status metric. (#9283)
Browse files Browse the repository at this point in the history
* feat: Adding ksqlDB Query Status metric.

The existing metric `query-status` reports the state of the Kafka Streams application.

This new metric `ksql-query-status` reports the state of the ksqlDB.

This is being added to show that the query can be paused.  (While a ksqlDB query is paused,
the underlying Kafka Stream application is still in the `RUNNING` state.)
  • Loading branch information
jnh5y committed Jul 18, 2022
1 parent 5413985 commit 5ad9cf8
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 7 deletions.
11 changes: 10 additions & 1 deletion docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,20 @@ io.confluent.ksql.metrics:type=ksql-queries

### Attributes

**ksqlDB query status**

`ksql-query-status`

The current ksqlDB status of the given query.
The metric `query-status` shows the {{ site.kstreams }} application state.
The `PAUSE` / `RESUME` commands do not impact the {{ site.kstreams }} state, so this new metric shows when a query is paused.

**Query status**

`query-status`

The current status of the given query.
The current {{ site.kstreams }} status of the given query.
The `ksql-query-status` metric has been added to show the ksqlDB query status.

**Error status**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ default void onStateChange(
) {
}

/**
* Called when the ksqlDB query is paused or resumed.
* @param query the query whose state has changed
*/
default void onKsqlStateChange(
QueryMetadata query
) {
}

/**
* Called when a query hits a query execution error
* @param query the query that hit the error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ public void onStateChange(final QueryMetadata query, final State before, final S
final PerQueryListener listener = perQuery.get(query.getQueryId());
if (listener != null) {
listener.onChange(before, after);
listener.setKsqlQueryState(query.getQueryStatus().toString());
}
}

@Override
public void onKsqlStateChange(final QueryMetadata query) {
final PerQueryListener listener = perQuery.get(query.getQueryId());
if (listener != null) {
listener.setKsqlQueryState(query.getQueryStatus().toString());
}
}

Expand All @@ -98,6 +107,7 @@ public void onError(final QueryMetadata query, final QueryError error) {
final PerQueryListener listener = perQuery.get(query.getQueryId());
if (listener != null) {
listener.onError(error);
listener.setKsqlQueryState(query.getQueryStatus().toString());
}
}

Expand All @@ -114,10 +124,12 @@ private static class PerQueryListener {
private final MetricName stateMetricName;
private final MetricName errorMetricName;
private final MetricName queryRestartMetricName;
private final MetricName ksqlQueryStatusMetricName;
private final CumulativeSum queryRestartSum;
private final Ticker ticker;

private volatile String state = "-";
private volatile String ksqlQueryState = "-";
private volatile String error = NO_ERROR;

PerQueryListener(
Expand Down Expand Up @@ -150,7 +162,7 @@ private static class PerQueryListener {
this.stateMetricName = metrics.metricName(
"query-status",
groupPrefix + "ksql-queries",
"The current status of the given query.",
"The current Kafka Streams status of the given query.",
tagsForStateAndError
);

Expand All @@ -169,10 +181,20 @@ private static class PerQueryListener {
QUERY_RESTART_METRIC_DESCRIPTION,
restartTags
);

ksqlQueryStatusMetricName = metrics.metricName(
"ksql-query-status",
groupPrefix + "ksql-queries",
"The current ksqlDB status of the given query.",
tagsForStateAndError
);

this.queryRestartSum = new CumulativeSum();
this.metrics.addMetric(stateMetricName, (Gauge<String>) (config, now) -> state);
this.metrics.addMetric(errorMetricName, (Gauge<String>) (config, now) -> error);
this.metrics.addMetric(queryRestartMetricName, queryRestartSum);
this.metrics.addMetric(ksqlQueryStatusMetricName,
(Gauge<String>) (config, now) -> ksqlQueryState);
}

public void onChange(final State newState, final State oldState) {
Expand All @@ -183,6 +205,10 @@ public void onChange(final State newState, final State oldState) {
}
}

public void setKsqlQueryState(final String ksqlQueryState) {
this.ksqlQueryState = ksqlQueryState;
}

public void onError(final QueryError observedError) {
error = observedError.getType().name();
queryRestartSum.record(new MetricConfig(), 1, System.currentTimeMillis());
Expand All @@ -192,6 +218,7 @@ public void onDeregister() {
metrics.removeMetric(stateMetricName);
metrics.removeMetric(errorMetricName);
metrics.removeMetric(queryRestartMetricName);
metrics.removeMetric(ksqlQueryStatusMetricName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,16 @@ public void onStateChange(
eventListeners.forEach(l -> l.onStateChange(queryMetadata, before, after));
}

@Override
public void onPause(final QueryMetadata queryMetadata) {
eventListeners.forEach(l -> l.onKsqlStateChange(queryMetadata));
}

@Override
public void onResume(final QueryMetadata queryMetadata) {
eventListeners.forEach(l -> l.onKsqlStateChange(queryMetadata));
}

@Override
public void onClose(final QueryMetadata queryMetadata) {
unregisterQuery(queryMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,14 @@ public KsqlQueryStatus getQueryStatus() {
public void pause() {
sharedKafkaStreamsRuntime.getKafkaStreams().pauseNamedTopology(topology.name());
isPaused = true;
listener.onPause(this);
}

@Override
public void resume() {
sharedKafkaStreamsRuntime.getKafkaStreams().resumeNamedTopology(topology.name());
isPaused = false;
listener.onPause(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public void onStateChange(final QueryMetadata query, final State old, final Stat
this.listener.onStateChange(query, old, newState);
}

@Override
public void onPause(final QueryMetadata queryMetadata) {
this.listener.onPause(queryMetadata);
}

@Override
public void onResume(final QueryMetadata queryMetadata) {
this.listener.onResume(queryMetadata);
}

@Override
public void onClose(final QueryMetadata queryMetadata) {
this.listener.onClose(queryMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ void onStateChange(
KafkaStreams.State before,
KafkaStreams.State after);

void onPause(QueryMetadata queryMetadata);

void onResume(QueryMetadata queryMetadata);

void onClose(QueryMetadata queryMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,14 @@ public KsqlQueryStatus getQueryStatus() {
public void pause() {
kafkaStreams.pause();
isPaused.set(true);
listener.onPause(this);
}

@Override
public void resume() {
kafkaStreams.resume();
isPaused.set(false);
listener.onResume(this);
}

public static class RetryEvent implements QueryMetadata.RetryEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.QueryMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
Expand All @@ -59,6 +60,8 @@ public class QueryStateMetricsReportingListenerTest {
new MetricName("dylan", "g1", "d1", ImmutableMap.of());
private static final MetricName METRIC_NAME_3 =
new MetricName("steven", "g1", "d1", ImmutableMap.of());
private static final MetricName METRIC_NAME_4 =
new MetricName("pete", "g1", "d1", ImmutableMap.of());
private static final QueryId QUERY_ID = new QueryId("foo");
private static final String TAG = "_confluent-ksql-" + "some-prefix-" + "query_" + QUERY_ID.toString();

Expand All @@ -81,7 +84,8 @@ public void setUp() {
when(metrics.metricName(any(), any(), any(), anyMap()))
.thenReturn(METRIC_NAME_1)
.thenReturn(METRIC_NAME_2)
.thenReturn(METRIC_NAME_3);
.thenReturn(METRIC_NAME_3)
.thenReturn(METRIC_NAME_4);
when(query.getQueryId()).thenReturn(QUERY_ID);

listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-", metricsTags);
Expand All @@ -101,11 +105,14 @@ public void shouldAddMetricOnCreation() {

// Then:
verify(metrics).metricName("query-status", "some-prefix-ksql-queries",
"The current status of the given query.",
"The current Kafka Streams status of the given query.",
tags);
verify(metrics).metricName("error-status", "some-prefix-ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
tags);
verify(metrics).metricName("ksql-query-status", "some-prefix-ksql-queries",
"The current ksqlDB status of the given query.",
tags);
tags.put("query-id", QUERY_ID.toString());
verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries",
QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION,
Expand All @@ -114,6 +121,7 @@ public void shouldAddMetricOnCreation() {
verify(metrics).addMetric(eq(METRIC_NAME_1), isA(Gauge.class));
verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class));
verify(metrics).addMetric(eq(METRIC_NAME_3), isA(CumulativeSum.class));
verify(metrics).addMetric(eq(METRIC_NAME_4), isA(Gauge.class));
}

@Test
Expand Down Expand Up @@ -151,11 +159,14 @@ public void shouldAddMetricWithSuppliedPrefix() {

// Then:
verify(metrics).metricName("query-status", groupPrefix + "ksql-queries",
"The current status of the given query.",
"The current Kafka Streams status of the given query.",
tags);
verify(metrics).metricName("error-status", groupPrefix + "ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
tags);
verify(metrics).metricName("ksql-query-status", groupPrefix + "ksql-queries",
"The current ksqlDB status of the given query.",
tags);
tags.put("query-id", QUERY_ID.toString());
verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries",
QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION,
Expand All @@ -175,6 +186,7 @@ public void shouldInitiallyHaveInitialState() {
@Test
public void shouldUpdateToNewState() {
// When:
when(query.getQueryStatus()).thenReturn(KsqlConstants.KsqlQueryStatus.RUNNING);
listener.onCreate(serviceContext, metaStore, query);
listener.onStateChange(query, State.REBALANCING, State.RUNNING);

Expand All @@ -185,13 +197,36 @@ public void shouldUpdateToNewState() {
@Test
public void shouldUpdateOnError() {
// When:
when(query.getQueryStatus()).thenReturn(KsqlConstants.KsqlQueryStatus.RUNNING);
listener.onCreate(serviceContext, metaStore, query);
listener.onError(query, new QueryError(1, "foo", Type.USER));

// Then:
assertThat(currentGaugeValue(METRIC_NAME_2), is("USER"));
}

@Test
public void shouldReportKsqlQueryStatus() {
// When:
listener.onCreate(serviceContext, metaStore, query);

when(query.getQueryStatus())
.thenReturn(KsqlConstants.KsqlQueryStatus.RUNNING)
.thenReturn(KsqlConstants.KsqlQueryStatus.PAUSED)
.thenReturn(KsqlConstants.KsqlQueryStatus.UNRESPONSIVE)
.thenReturn(KsqlConstants.KsqlQueryStatus.ERROR);

// Then:
listener.onKsqlStateChange(query);
assertThat(currentGaugeValue(METRIC_NAME_4), is("RUNNING"));
listener.onKsqlStateChange(query);
assertThat(currentGaugeValue(METRIC_NAME_4), is("PAUSED"));
listener.onKsqlStateChange(query);
assertThat(currentGaugeValue(METRIC_NAME_4), is("UNRESPONSIVE"));
listener.onKsqlStateChange(query);
assertThat(currentGaugeValue(METRIC_NAME_4), is("ERROR"));
}

@Test
public void shouldRemoveMetricOnClose() {
// When:
Expand All @@ -202,6 +237,7 @@ public void shouldRemoveMetricOnClose() {
verify(metrics).removeMetric(METRIC_NAME_1);
verify(metrics).removeMetric(METRIC_NAME_2);
verify(metrics).removeMetric(METRIC_NAME_3);
verify(metrics).removeMetric(METRIC_NAME_4);
}

private String currentGaugeValue(final MetricName name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.PageViewDataProvider.Batch;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.jetbrains.annotations.NotNull;
import org.junit.*;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -125,6 +127,7 @@ public void shouldPauseAndResumeQuery() {
RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "PAUSE " + queryId + ";");

assertThat(getPausedCount(), equalTo(1L));
assertThat(getFirstKsqlDbQueryState(), equalTo("PAUSED"));

// Produce more records
TEST_HARNESS.produceRows(PAGE_VIEW_TOPIC, PAGE_VIEWS_PROVIDER2, KAFKA, JSON,
Expand All @@ -134,12 +137,12 @@ public void shouldPauseAndResumeQuery() {
assertThatEventually(supplier, equalTo(9));

// Then:
RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "RESUME "
+ queryId + ";");
RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "RESUME " + queryId + ";");

// 5 more records have been produced
assertThatEventually(supplier, equalTo(14));
assertThat(getRunningCount(), equalTo(1L));
assertThat(getFirstKsqlDbQueryState(), equalTo("RUNNING"));
}

@Test
Expand Down Expand Up @@ -259,4 +262,13 @@ private long getCount(KsqlQueryStatus status) {
return -1;
}
}

private String getFirstKsqlDbQueryState() {
final MetricName key = REST_APP.getEngine().metricCollectors().getMetrics().metrics().keySet()
.stream()
.filter(metricName -> metricName.name().contains("ksql-query-status"))
.findFirst().get();
final KafkaMetric metric = REST_APP.getEngine().metricCollectors().getMetrics().metric(key);
return (String) metric.metricValue();
}
}

0 comments on commit 5ad9cf8

Please sign in to comment.