Skip to content

Commit

Permalink
feat[rest server]: add warnings when listing fails with kafka error (#…
Browse files Browse the repository at this point in the history
…3069)

This patch adds a mechanism for returning warnings in KsqlEntity for
successful api responses. This patch also adds some warnings for source
listings. Now, when a client tries to list or describe a source, and ksql
is unable to get the details of the backing topic from kafka, a result is
returned that includes 0-values for the underlying topic info (partitions,
replicas), and includes response warnings about the issue with Kafka.
  • Loading branch information
rodesai committed Jul 23, 2019
1 parent 7345ed6 commit 7647e05
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 97 deletions.
15 changes: 8 additions & 7 deletions docs/developer-guide/api.rst
Expand Up @@ -117,47 +117,49 @@ statements use the ``/query`` endpoint.

The response JSON is an array of result objects. The result object contents depend on the statement that it is returning results for. The following sections detail the contents of the result objects by statement.

**CREATE, DROP, TERMINATE**
**Common Fields**
The following fields are common to all responses.

:>json string statementText: The KSQL statement whose result is being returned.
:>json array warnings: A list of warnings about conditions that may be unexpected by the user, b
ut don't result in failure to execute the statement.
:>json string warnings[i].message: A message detailing the condition being warned on.

**CREATE, DROP, TERMINATE**

:>json string commandId: A string that identifies the requested operation. You can use this ID to poll the result of the operation using the status endpoint.
:>json string commandStatus.status: One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR.
:>json string commandStatus.message: Detailed message regarding the status of the execution statement.
:>json long commandSequenceNumber: The sequence number of the requested operation in the command queue, or -1 if the operation was unsuccessful.

**LIST STREAMS, SHOW STREAMS**

:>json string statementText: The KSQL statement whose result is being returned.
:>json array streams: List of streams.
:>json string streams[i].name: The name of the stream.
:>json string streams[i].topic: The topic backing the stream.
:>json string streams[i].format: The serialization format of the data in the stream. One of JSON, AVRO, or DELIMITED.

**LIST TABLES, SHOW TABLES**

:>json string statementText: The KSQL statement whose result is being returned.
:>json array tables: List of tables.
:>json string tables[i].name: The name of the table.
:>json string tables[i].topic: The topic backing the table.
:>json string tables[i].format: The serialization format of the data in the table. One of JSON, AVRO, or DELIMITED.

**LIST QUERIES, SHOW QUERIES**

:>json string statementText: The KSQL statement whose result is being returned.
:>json array queries: List of queries.
:>json string queries[i].queryString: The text of the statement that started the query.
:>json string queries[i].sinks: The streams and tables being written to by the query.
:>json string queries[i].id: The query ID.

**LIST PROPERTIES, SHOW PROPERTIES**

:>json string statementText: The KSQL statement whose result is being returned.
:>json map properties: The KSQL server query properties.
:>json string properties[``property-name``]: The value of the property named by ``property-name``.

**DESCRIBE**

:>json string statementText: The KSQL statement whose result is being returned.
:>json string sourceDescription.name: The name of the stream or table.
:>json array sourceDescription.readQueries: The queries reading from the stream or table.
:>json array sourceDescription.writeQueries: The queries writing into the stream or table
Expand All @@ -180,7 +182,6 @@ statements use the ``/query`` endpoint.

**EXPLAIN**

:>json string statementText: The KSQL statement whose result is being returned.
:>json string queryDescription.statementText: The KSQL statement for which the query being explained is running.
:>json array queryDescription.fields: A list of field objects that describes each field in the query output.
:>json string queryDescription.fields[i].name: The name of the field.
Expand Down
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.rest.entity.KsqlTopicsList;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryDescription;
Expand Down Expand Up @@ -383,6 +384,14 @@ private void printAsTable(final KsqlEntity entity) {
}

handler.handle(this, entity);

printWarnings(entity);
}

private void printWarnings(final KsqlEntity entity) {
for (final KsqlWarning warning : entity.getWarnings()) {
writer().println("WARNING: " + warning.getMessage());
}
}

@SuppressWarnings("ConstantConditions")
Expand Down
Expand Up @@ -43,9 +43,11 @@
import io.confluent.ksql.rest.entity.FunctionType;
import io.confluent.ksql.rest.entity.KafkaTopicInfo;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlTopicInfo;
import io.confluent.ksql.rest.entity.KsqlTopicsList;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.RunningQuery;
Expand Down Expand Up @@ -85,6 +87,22 @@ public class ConsoleTest {
private final Console console;
private final Supplier<String> lineSupplier;
private final CliSpecificCommand cliCommand;
private final SourceDescription sourceDescription = new SourceDescription(
"TestSource",
Collections.emptyList(),
Collections.emptyList(),
buildTestSchema(2),
DataSourceType.KTABLE.getKsqlType(),
"key",
"2000-01-01",
"stats",
"errors",
true,
"avro",
"kadka-topic",
2,
1
);

@Parameterized.Parameters(name = "{0}")
public static Collection<OutputFormat> data() {
Expand Down Expand Up @@ -154,7 +172,8 @@ public void testPrintKSqlEntityList() throws IOException {
new SourceDescription(
"TestSource", Collections.emptyList(), Collections.emptyList(), buildTestSchema(i),
DataSourceType.KTABLE.getKsqlType(), "key", "2000-01-01", "stats",
"errors", false, "avro", "kadka-topic", 1, 1)),
"errors", false, "avro", "kadka-topic", 1, 1),
Collections.emptyList()),
new TopicDescription("e", "TestTopic", "TestKafkaTopic", "AVRO", "schemaString"),
new StreamsList("e", ImmutableList.of(new SourceInfo.Stream("TestStream", "TestTopic", "AVRO"))),
new TablesList("e", ImmutableList.of(new SourceInfo.Table("TestTable", "TestTopic", "JSON", false))),
Expand All @@ -171,11 +190,9 @@ public void shouldPrintTopicDescribeExtended() throws IOException {
final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new SourceDescriptionEntity(
"e",
new SourceDescription(
"TestSource", Collections.emptyList(), Collections.emptyList(),
buildTestSchema(2), DataSourceType.KTABLE.getKsqlType(),
"key", "2000-01-01", "stats", "errors", true, "avro", "kadka-topic",
2, 1))));
sourceDescription,
Collections.emptyList()))
);

console.printKsqlEntityList(entityList);

Expand All @@ -187,6 +204,31 @@ public void shouldPrintTopicDescribeExtended() throws IOException {
}
}

@Test
public void shouldPrintWarnings() throws IOException {
// Given:
final KsqlEntity entity = new SourceDescriptionEntity(
"e",
sourceDescription,
ImmutableList.of(new KsqlWarning("oops"), new KsqlWarning("doh!"))
);

// When:
console.printKsqlEntityList(ImmutableList.of(entity));

// Then:
final String output = terminal.getOutputString();
if (console.getOutputFormat() == OutputFormat.TABULAR) {
assertThat(
output,
containsString("WARNING: oops\nWARNING: doh")
);
} else {
assertThat(output, containsString("\"message\" : \"oops\""));
assertThat(output, containsString("\"message\" : \"doh!\""));
}
}

@Test
public void shouldPrintFunctionDescription() throws IOException {
final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down
Expand Up @@ -18,6 +18,9 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
Expand All @@ -43,12 +46,22 @@
})
public abstract class KsqlEntity {
private final String statementText;
private final List<KsqlWarning> warnings;

public KsqlEntity(final String statementText) {
this(statementText, Collections.emptyList());
}

public KsqlEntity(final String statementText, final List<KsqlWarning> warnings) {
this.statementText = statementText;
this.warnings = warnings == null ? Collections.emptyList() : ImmutableList.copyOf(warnings);
}

public String getStatementText() {
return statementText;
}

public List<KsqlWarning> getWarnings() {
return warnings;
}
}
@@ -0,0 +1,59 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.entity;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;

@Immutable
@JsonIgnoreProperties(ignoreUnknown = true)
public class KsqlWarning {
private final String message;

@JsonCreator
public KsqlWarning(@JsonProperty("message") final String message) {
this.message = Objects.requireNonNull(message, "message");
}

public String getMessage() {
return message;
}

@Override
public String toString() {
return message;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof KsqlWarning)) {
return false;
}
final KsqlWarning that = (KsqlWarning) o;
return Objects.equals(message, that.message);
}

@Override
public int hashCode() {
return Objects.hash(message);
}
}
Expand Up @@ -23,12 +23,12 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.admin.TopicDescription;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("description")
Expand Down Expand Up @@ -91,7 +91,7 @@ public SourceDescription(
final String format,
final List<RunningQuery> readQueries,
final List<RunningQuery> writeQueries,
final KafkaTopicClient topicClient
final Optional<TopicDescription> topicDescription
) {
this(
dataSource.getName(),
Expand All @@ -111,46 +111,15 @@ public SourceDescription(
extended,
format,
dataSource.getKafkaTopicName(),
(
extended && topicClient != null ? getPartitions(
topicClient,
dataSource.getKafkaTopicName()
) : 0
),
(
extended && topicClient != null ? getReplication(
topicClient,
dataSource.getKafkaTopicName()
) : 0
)
topicDescription.map(td -> td.partitions().size()).orElse(0),
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0)
);
}

private static int getPartitions(
final KafkaTopicClient topicClient,
final String kafkaTopicName
) {
return topicClient
.describeTopic(kafkaTopicName)
.partitions()
.size();
}

public int getPartitions() {
return partitions;
}

private static int getReplication(
final KafkaTopicClient topicClient,
final String kafkaTopicName
) {
return topicClient
.describeTopic(kafkaTopicName)
.partitions().iterator().next()
.replicas()
.size();
}

public int getReplication() {
return replication;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -27,8 +28,9 @@ public class SourceDescriptionEntity extends KsqlEntity {
@JsonCreator
public SourceDescriptionEntity(
@JsonProperty("statementText") final String statementText,
@JsonProperty("sourceDescription") final SourceDescription sourceDescription) {
super(statementText);
@JsonProperty("sourceDescription") final SourceDescription sourceDescription,
@JsonProperty("warnings") final List<KsqlWarning> warnings) {
super(statementText, warnings);
this.sourceDescription = sourceDescription;
}

Expand Down
Expand Up @@ -29,9 +29,10 @@ public class SourceDescriptionList extends KsqlEntity {
@JsonCreator
public SourceDescriptionList(
@JsonProperty("statementText") final String statementText,
@JsonProperty("sourceDescriptions") final List<SourceDescription> sourceDescriptions
@JsonProperty("sourceDescriptions") final List<SourceDescription> sourceDescriptions,
@JsonProperty("warnings") final List<KsqlWarning> warnings
) {
super(statementText);
super(statementText, warnings);
this.sourceDescriptions = sourceDescriptions;
}

Expand Down

0 comments on commit 7647e05

Please sign in to comment.