Skip to content

Commit

Permalink
feat: transient queries added to show queries output (#5105)
Browse files Browse the repository at this point in the history
* feat: transient queries added to show queries output

* ListQueriesExecutor and KsqlExecutionContext changes

* comments

* rename query types
  • Loading branch information
stevenpyzhang committed Apr 23, 2020
1 parent 2500726 commit e8a2a63
Show file tree
Hide file tree
Showing 26 changed files with 266 additions and 90 deletions.
11 changes: 9 additions & 2 deletions docs/developer-guide/ksqldb-reference/show-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@ Query Status
* `ERROR`: the query has entered an error state.
* `UNRESPONSIVE`: the host running the query returned an error when requesting the query status.

Query Type
-----------

* `PERSISTENT`: these queries run on every node and materialize new state.
* `PUSH`: these queries are owned by the client and are terminated when the session ends.

Example
-------

```sql
ksql> show queries;

Query ID | Status | Sink Name | Sink Kafka Topic | Query String
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------
CSAS_TEST_0 | RUNNING:2 | TEST | TEST | CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG EMIT CHANGES;
CSAS_TEST_0 | PERSISTENT | RUNNING:2 | TEST | TEST | CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG EMIT CHANGES;
------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
```
Expand All @@ -48,6 +54,7 @@ For detailed information on a Query run: EXPLAIN <Query ID>;
ksql> show queries extended;

ID : CSAS_TEST_0
Query Type : PERSISTENT
SQL : CREATE STREAM TEST WITH (KAFKA_TOPIC='TEST', PARTITIONS=1, REPLICAS=1) AS SELECT *
FROM KSQL_PROCESSING_LOG KSQL_PROCESSING_LOG
EMIT CHANGES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ private void printQuerySinks(final QueryDescription query) {

private void printQueryDescription(final QueryDescription query) {
writer().println(String.format("%-20s : %s", "ID", query.getId()));
writer().println(String.format("%-20s : %s", "Query Type", query.getQueryType()));
if (query.getStatementText().length() > 0) {
writer().println(String.format("%-20s : %s", "SQL", query.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@
public class QueriesTableBuilder implements TableBuilder<Queries> {

private static final List<String> HEADERS =
ImmutableList.of("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String");
ImmutableList.of(
"Query ID",
"Query Type",
"Status",
"Sink Name",
"Sink Kafka Topic",
"Query String");

@Override
public Table buildTable(final Queries entity) {
final Stream<List<String>> rows = entity.getQueries().stream()
.map(r -> ImmutableList.of(
r.getId().toString(),
r.getQueryType().toString(),
r.getStatusCount().toString(),
String.join(",", r.getSinks()),
String.join(",", r.getSinkKafkaTopics()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryStatus;
import io.confluent.ksql.util.SchemaUtil;
import java.io.IOException;
Expand Down Expand Up @@ -311,7 +312,7 @@ public void testPrintQueries() {
final List<RunningQuery> queries = new ArrayList<>();
queries.add(
new RunningQuery(
"select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), queryStatusCount));
"select * from t1 emit changes", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), queryStatusCount, KsqlConstants.KsqlQueryType.PUSH));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new Queries("e", queries)
Expand All @@ -327,24 +328,25 @@ public void testPrintQueries() {
+ " \"@type\" : \"queries\"," + NEWLINE
+ " \"statementText\" : \"e\"," + NEWLINE
+ " \"queries\" : [ {" + NEWLINE
+ " \"queryString\" : \"select * from t1\"," + NEWLINE
+ " \"queryString\" : \"select * from t1 emit changes\"," + NEWLINE
+ " \"sinks\" : [ \"Test\" ]," + NEWLINE
+ " \"sinkKafkaTopics\" : [ \"Test topic\" ]," + NEWLINE
+ " \"id\" : \"0\"," + NEWLINE
+ " \"statusCount\" : {" + NEWLINE
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PUSH\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
} else {
assertThat(output, is("" + NEWLINE
+ " Query ID | Status | Sink Name | Sink Kafka Topic | Query String " + NEWLINE
+ "--------------------------------------------------------------------------------" + NEWLINE
+ " 0 | " + STATUS_COUNT_STRING + " | Test | Test topic | select * from t1 " + NEWLINE
+ "--------------------------------------------------------------------------------" + NEWLINE
+ " Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String " + NEWLINE
+ "----------------------------------------------------------------------------------------------------" + NEWLINE
+ " 0 | PUSH | " + STATUS_COUNT_STRING + " | Test | Test topic | select * from t1 emit changes " + NEWLINE
+ "----------------------------------------------------------------------------------------------------" + NEWLINE
+ "For detailed information on a Query run: EXPLAIN <Query ID>;" + NEWLINE));
}
}
Expand All @@ -366,10 +368,10 @@ public void testPrintSourceDescription() {
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount)
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount)
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -419,6 +421,7 @@ public void testPrintSourceDescription() {
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PERSISTENT\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"writeQueries\" : [ {" + NEWLINE
Expand All @@ -430,6 +433,7 @@ public void testPrintSourceDescription() {
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PERSISTENT\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"fields\" : [ {" + NEWLINE
Expand Down Expand Up @@ -1034,10 +1038,10 @@ public void testPrintExecuptionPlan() {
public void shouldPrintTopicDescribeExtended() {
// Given:
final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount)
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount)
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), queryStatusCount, KsqlConstants.KsqlQueryType.PERSISTENT)
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -1086,6 +1090,7 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PERSISTENT\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"writeQueries\" : [ {" + NEWLINE
Expand All @@ -1097,6 +1102,7 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"RUNNING\" : 1," + NEWLINE
+ " \"ERROR\" : 2" + NEWLINE
+ " }," + NEWLINE
+ " \"queryType\" : \"PERSISTENT\"," + NEWLINE
+ " \"state\" : \"" + AGGREGATE_STATUS +"\"" + NEWLINE
+ " } ]," + NEWLINE
+ " \"fields\" : [ {" + NEWLINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.ArrayList;
import java.util.List;

import io.confluent.ksql.util.KsqlConstants;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -35,21 +36,22 @@ public void setup() {
@Test
public void shouldBuildQueriesTable() {
// Given:
final String exampleQuery = "select * from test_stream emit changes";
final RunningQuery query = new RunningQuery(
"EXAMPLE QUERY;",
exampleQuery,
ImmutableSet.of("SINK"),
ImmutableSet.of("SINK"),
new QueryId("0"),
queryStatusCount
);
queryStatusCount,
KsqlConstants.KsqlQueryType.PUSH);

// When:
final Table table = buildTableWithSingleQuery(query);

// Then:
assertThat(table.headers(), contains("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String"));
assertThat(table.headers(), contains("Query ID", "Query Type", "Status", "Sink Name", "Sink Kafka Topic", "Query String"));
assertThat(table.rows(), hasSize(1));
assertThat(table.rows().get(0), contains("0", STATUS, "SINK", "SINK", "EXAMPLE QUERY;"));
assertThat(table.rows().get(0), contains("0", KsqlConstants.KsqlQueryType.PUSH.toString(), STATUS, "SINK", "SINK", exampleQuery));
}

@Test
Expand All @@ -60,17 +62,17 @@ public void shouldBuildQueriesTableWithNewlines() {
ImmutableSet.of("S2"),
ImmutableSet.of("S2"),
new QueryId("CSAS_S2_0"),
queryStatusCount
);
queryStatusCount,
KsqlConstants.KsqlQueryType.PERSISTENT);


// When:
final Table table = buildTableWithSingleQuery(query);

// Then:
assertThat(table.headers(), contains("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String"));
assertThat(table.headers(), contains("Query ID", "Query Type", "Status", "Sink Name", "Sink Kafka Topic", "Query String"));
assertThat(table.rows(), hasSize(1));
assertThat(table.rows().get(0), contains("CSAS_S2_0", STATUS, "S2", "S2", "CREATE STREAM S2 AS SELECT * FROM S1 EMIT CHANGES;"));
assertThat(table.rows().get(0), contains("CSAS_S2_0", KsqlConstants.KsqlQueryType.PERSISTENT.toString(), STATUS, "S2", "S2", "CREATE STREAM S2 AS SELECT * FROM S1 EMIT CHANGES;"));
}

private Table buildTableWithSingleQuery(RunningQuery query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ private KsqlConstants() {
public static final String DEFAULT_AVRO_SCHEMA_FULL_NAME =
AVRO_SCHEMA_NAMESPACE + "." + AVRO_SCHEMA_NAME;

public enum KsqlQueryType {
PERSISTENT,
PUSH
}

public enum KsqlQueryStatus {
RUNNING,
ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public interface KsqlExecutionContext {
*/
List<PersistentQueryMetadata> getPersistentQueries();

/**
* Retrieves the list of all running queries.
*
* @return the list of all queries
*/
List<QueryMetadata> getAllLiveQueries();

/**
* Parse the statement(s) in supplied {@code sql}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,10 +129,16 @@ public Optional<PersistentQueryMetadata> getPersistentQuery(final QueryId queryI
return primaryContext.getPersistentQuery(queryId);
}

@Override
public List<PersistentQueryMetadata> getPersistentQueries() {
return ImmutableList.copyOf(primaryContext.getPersistentQueries().values());
}

@Override
public List<QueryMetadata> getAllLiveQueries() {
return ImmutableList.copyOf(allLiveQueries);
}

public boolean hasActiveQueries() {
return !primaryContext.getPersistentQueries().isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public List<PersistentQueryMetadata> getPersistentQueries() {
return ImmutableList.copyOf(engineContext.getPersistentQueries().values());
}

@Override
public List<QueryMetadata> getAllLiveQueries() {
return ImmutableList.of();
}

@Override
public List<ParsedStatement> parse(final String sql) {
return engineContext.parse(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ public QueryMetadata buildTransientQuery(
streamsProperties,
overrides,
queryCloseCallback,
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG)
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG),
new QueryId(applicationId)
) {
@Override
public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
*/
public class PersistentQueryMetadata extends QueryMetadata {

private final QueryId id;
private final KsqlTopic resultTopic;
private final SourceName sinkName;
private final QuerySchemas schemas;
Expand Down Expand Up @@ -77,9 +76,9 @@ public PersistentQueryMetadata(
streamsProperties,
overriddenProperties,
closeCallback,
closeTimeout);
closeTimeout,
id);

this.id = requireNonNull(id, "id");
this.resultTopic = requireNonNull(resultTopic, "resultTopic");
this.sinkName = Objects.requireNonNull(sinkName, "sinkName");
this.schemas = requireNonNull(schemas, "schemas");
Expand All @@ -94,7 +93,6 @@ private PersistentQueryMetadata(
final Consumer<QueryMetadata> closeCallback
) {
super(other, closeCallback);
this.id = other.id;
this.resultTopic = other.resultTopic;
this.sinkName = other.sinkName;
this.schemas = other.schemas;
Expand All @@ -111,10 +109,6 @@ public DataSourceType getDataSourceType() {
return dataSourceType;
}

public QueryId getQueryId() {
return id;
}

public KsqlTopic getResultTopic() {
return resultTopic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.internal.QueryStateListener;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryType;
import java.lang.Thread.UncaughtExceptionHandler;
import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -51,6 +53,7 @@ public abstract class QueryMetadata {
private final Set<SourceName> sourceNames;
private final LogicalSchema logicalSchema;
private final Long closeTimeout;
private final QueryId queryId;

private Optional<QueryStateListener> queryStateListener = Optional.empty();
private boolean everStarted = false;
Expand All @@ -67,7 +70,8 @@ public QueryMetadata(
final Map<String, Object> streamsProperties,
final Map<String, Object> overriddenProperties,
final Consumer<QueryMetadata> closeCallback,
final long closeTimeout
final long closeTimeout,
final QueryId queryId
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementString = Objects.requireNonNull(statementString, "statementString");
Expand All @@ -85,6 +89,7 @@ public QueryMetadata(
this.sourceNames = Objects.requireNonNull(sourceNames, "sourceNames");
this.logicalSchema = Objects.requireNonNull(logicalSchema, "logicalSchema");
this.closeTimeout = closeTimeout;
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

protected QueryMetadata(final QueryMetadata other, final Consumer<QueryMetadata> closeCallback) {
Expand All @@ -99,6 +104,7 @@ protected QueryMetadata(final QueryMetadata other, final Consumer<QueryMetadata>
this.logicalSchema = other.logicalSchema;
this.closeCallback = Objects.requireNonNull(closeCallback, "closeCallback");
this.closeTimeout = other.closeTimeout;
this.queryId = other.queryId;
}

public void registerQueryStateListener(final QueryStateListener queryStateListener) {
Expand Down Expand Up @@ -168,6 +174,13 @@ public boolean hasEverBeenStarted() {
return everStarted;
}

public QueryId getQueryId() {
return queryId;
}

public KsqlQueryType getQueryType() {
return KsqlQueryType.PERSISTENT;
}

/**
* Stops the query without cleaning up the external resources
Expand Down
Loading

0 comments on commit e8a2a63

Please sign in to comment.