Skip to content

Commit

Permalink
feat: add DESCRIBE functionality for connectors (#3206)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Aug 16, 2019
1 parent cd411ac commit a79adb4
Show file tree
Hide file tree
Showing 21 changed files with 808 additions and 15 deletions.
40 changes: 40 additions & 0 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
Expand Down Expand Up @@ -98,7 +99,9 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.jline.terminal.Terminal.Signal;
import org.jline.terminal.Terminal.SignalHandler;
import org.slf4j.Logger;
Expand Down Expand Up @@ -148,6 +151,8 @@ public class Console implements Closeable {
tablePrinter(CreateConnectorEntity.class, ConnectorInfoTableBuilder::new))
.put(ConnectorList.class,
tablePrinter(ConnectorList.class, ConnectorListTableBuilder::new))
.put(ConnectorDescription.class,
Console::printConnectorDescription)
.put(ErrorEntity.class,
tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new))
.build();
Expand Down Expand Up @@ -651,6 +656,41 @@ private void printQueryDescription(final QueryDescription query) {
printOverriddenProperties(query);
}

private void printConnectorDescription(final ConnectorDescription description) {
final ConnectorStateInfo status = description.getStatus();
writer().println(String.format("%-20s : %s", "Name", status.name()));
writer().println(String.format("%-20s : %s", "Class", description.getConnectorClass()));
writer().println(String.format("%-20s : %s", "Type", description.getStatus().type()));
writer().println(String.format("%-20s : %s", "State", status.connector().state()));
writer().println(String.format("%-20s : %s", "WorkerId", status.connector().workerId()));
writer().println();

if (!status.tasks().isEmpty()) {
final Table taskTable = new Table.Builder()
.withColumnHeaders(ImmutableList.of("Task ID", "State", "Error Trace"))
.withRows(status.tasks()
.stream()
.map(task -> ImmutableList.of(
String.valueOf(task.id()),
task.state(),
ObjectUtils.defaultIfNull(task.trace(), ""))))
.build();
taskTable.print(this);
writer().println();
}

if (!description.getSources().isEmpty()) {
final Table sourceTable = new Table.Builder()
.withColumnHeaders("KSQL Source Name", "Kafka Topic", "Type")
.withRows(description.getSources()
.stream()
.map(source -> ImmutableList
.of(source.getName(), source.getTopic(), source.getType())))
.build();
sourceTable.print(this);
}
}

private void printQueryDescriptionList(final QueryDescriptionList queryDescriptionList) {
queryDescriptionList.getQueryDescriptions().forEach(
queryDescription -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void execute(final List<String> args, final PrintWriter terminal) {
"\tThe KSQL CLI provides a terminal-based interactive shell for running queries. "
+ "Each command should be on a separate line. "
+ "For KSQL command syntax, see the documentation at "
+ "https://github.com/confluentinc/ksql/docs/."
+ "https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html"
);
terminal.println();
for (final CliSpecificCommand cliSpecificCommand : cmds.get()) {
Expand Down
113 changes: 113 additions & 0 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.EntityQueryId;
import io.confluent.ksql.rest.entity.ExecutionPlan;
Expand Down Expand Up @@ -72,6 +73,9 @@
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.junit.After;
import org.junit.Test;
Expand Down Expand Up @@ -513,6 +517,115 @@ public void testPrintTopicDescription() throws IOException {
}
}

@Test
public void testPrintConnectorDescription() throws IOException {
// Given:
final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new ConnectorDescription(
"STATEMENT",
"io.confluent.Connector",
new ConnectorStateInfo(
"name",
new ConnectorState("state", "worker", "msg"),
ImmutableList.of(
new TaskState(0, "task", "worker", "task_msg")
),
ConnectorType.SOURCE),
ImmutableList.of(sourceDescription)
)
));

// When:
console.printKsqlEntityList(entityList);

// Then:
final String output = terminal.getOutputString();
if (console.getOutputFormat() == OutputFormat.JSON) {
assertThat(output, is("[ {\n"
+ " \"@type\" : \"connector_description\",\n"
+ " \"statementText\" : \"STATEMENT\",\n"
+ " \"connectorClass\" : \"io.confluent.Connector\",\n"
+ " \"status\" : {\n"
+ " \"name\" : \"name\",\n"
+ " \"connector\" : {\n"
+ " \"state\" : \"state\",\n"
+ " \"worker_id\" : \"worker\",\n"
+ " \"trace\" : \"msg\"\n"
+ " },\n"
+ " \"tasks\" : [ {\n"
+ " \"id\" : 0,\n"
+ " \"state\" : \"task\",\n"
+ " \"worker_id\" : \"worker\",\n"
+ " \"trace\" : \"task_msg\"\n"
+ " } ],\n"
+ " \"type\" : \"source\"\n"
+ " },\n"
+ " \"sources\" : [ {\n"
+ " \"name\" : \"TestSource\",\n"
+ " \"readQueries\" : [ ],\n"
+ " \"writeQueries\" : [ ],\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"ROWTIME\",\n"
+ " \"schema\" : {\n"
+ " \"type\" : \"BIGINT\",\n"
+ " \"fields\" : null,\n"
+ " \"memberSchema\" : null\n"
+ " }\n"
+ " }, {\n"
+ " \"name\" : \"ROWKEY\",\n"
+ " \"schema\" : {\n"
+ " \"type\" : \"STRING\",\n"
+ " \"fields\" : null,\n"
+ " \"memberSchema\" : null\n"
+ " }\n"
+ " }, {\n"
+ " \"name\" : \"f_0\",\n"
+ " \"schema\" : {\n"
+ " \"type\" : \"INTEGER\",\n"
+ " \"fields\" : null,\n"
+ " \"memberSchema\" : null\n"
+ " }\n"
+ " }, {\n"
+ " \"name\" : \"f_1\",\n"
+ " \"schema\" : {\n"
+ " \"type\" : \"STRING\",\n"
+ " \"fields\" : null,\n"
+ " \"memberSchema\" : null\n"
+ " }\n"
+ " } ],\n"
+ " \"type\" : \"TABLE\",\n"
+ " \"key\" : \"key\",\n"
+ " \"timestamp\" : \"2000-01-01\",\n"
+ " \"statistics\" : \"stats\",\n"
+ " \"errorStats\" : \"errors\",\n"
+ " \"extended\" : true,\n"
+ " \"format\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 2,\n"
+ " \"replication\" : 1\n"
+ " } ],\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ "Name : name\n"
+ "Class : io.confluent.Connector\n"
+ "Type : source\n"
+ "State : state\n"
+ "WorkerId : worker\n"
+ "\n"
+ " Task ID | State | Error Trace \n"
+ "-------------------------------\n"
+ " 0 | task | task_msg \n"
+ "-------------------------------\n"
+ "\n"
+ " KSQL Source Name | Kafka Topic | Type \n"
+ "----------------------------------------\n"
+ " TestSource | kadka-topic | TABLE \n"
+ "----------------------------------------\n"));
}
}

@Test
public void testPrintStreamsList() throws IOException {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* helps map topics to KSQL sources.
*/
@Immutable
class Connector {
public class Connector {

private final String name;
private final Predicate<String> isTopicMatch;
Expand All @@ -48,19 +48,19 @@ class Connector {
this.keyField = Optional.ofNullable(keyField);
}

String getName() {
public String getName() {
return name;
}

boolean matches(final String topic) {
public boolean matches(final String topic) {
return isTopicMatch.test(topic);
}

String mapToSource(final String topic) {
public String mapToSource(final String topic) {
return getSourceName.apply(topic);
}

DataSourceType getSourceType() {
public DataSourceType getSourceType() {
return sourceType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

final class Connectors {
public final class Connectors {

static final String CONNECTOR_CLASS = "connector.class";
static final String JDBC_SOURCE_CLASS = "io.confluent.connect.jdbc.JdbcSourceConnector";

private Connectors() {
}

static Optional<Connector> fromConnectInfo(final ConnectorInfo connectorInfo) {
public static Optional<Connector> fromConnectInfo(final ConnectorInfo connectorInfo) {
return fromConnectInfo(connectorInfo.config());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;

/**
* An interface defining the common operations to communicate with
Expand Down Expand Up @@ -50,6 +51,13 @@ public interface ConnectClient {
*/
ConnectResponse<ConnectorInfo> create(String connector, Map<String, String> config);

/**
* Get the status of {@code connector}.
*
* @param connector the name of the connector
*/
ConnectResponse<ConnectorStateInfo> status(String connector);

/**
* An optionally successful response. Either contains a value of type
* {@code <T>} or an error, which is the string representation of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +47,7 @@ public class DefaultConnectClient implements ConnectClient {
private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper;

private static final String CONNECTORS = "/connectors";
private static final String STATUS = "/status";
private static final int DEFAULT_TIMEOUT_MS = 5_000;

private final URI connectUri;
Expand All @@ -67,7 +69,7 @@ public ConnectResponse<ConnectorInfo> create(
final Map<String, String> config
) {
try {
LOG.debug("Issuing request to Kafka Connect at URI {} with name {} and config {}",
LOG.debug("Issuing create request to Kafka Connect at URI {} with name {} and config {}",
connectUri,
connector,
config);
Expand Down Expand Up @@ -119,6 +121,31 @@ public ConnectResponse<List<String>> connectors() {
}
}

@Override
public ConnectResponse<ConnectorStateInfo> status(final String connector) {
try {
LOG.debug("Issuing status request to Kafka Connect at URI {} with name {}",
connectUri,
connector);

final ConnectResponse<ConnectorStateInfo> connectResponse = Request
.Get(connectUri.resolve(CONNECTORS + "/" + connector + STATUS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
createHandler(HttpStatus.SC_OK, ConnectorStateInfo.class, Function.identity()));

connectResponse.error()
.ifPresent(error ->
LOG.warn("Could not query status of connector {}: {}", connector, error));

return connectResponse;
} catch (final Exception e) {
throw new KsqlServerException(e);
}
}

@Override
public ConnectResponse<ConnectorInfo> describe(final String connector) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ private SandboxConnectClient() { }
public static ConnectClient createProxy() {
return LimitedProxyBuilder.forClass(ConnectClient.class)
.swallow("create", methodParams(String.class, Map.class), ConnectResponse.of("sandbox"))
.swallow("describe", methodParams(String.class), ConnectResponse.of(ImmutableList.of()))
.swallow("describe", methodParams(String.class), ConnectResponse.of("sandbox"))
.swallow("connectors", methodParams(), ConnectResponse.of(ImmutableList.of()))
.swallow("status", methodParams(String.class), ConnectResponse.of("sandbox"))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ public void shouldCreateConnectorFromConfig() throws InterruptedException {
}

@Test
public void shouldWakeupConsumerBeforeShuttingDown() {
public void shouldWakeupConsumerBeforeShuttingDown() throws InterruptedException {
// Given:
final CountDownLatch noMoreLatch = new CountDownLatch(1);
setupConfigService();
givenNoMoreRecords(when(consumer.poll(any())));
configService.startAsync().awaitRunning();
givenNoMoreRecords(when(consumer.poll(any())), noMoreLatch);

// When:
configService.startAsync().awaitRunning();
noMoreLatch.await();
configService.stopAsync().awaitTerminated();

// Then:
Expand Down

0 comments on commit a79adb4

Please sign in to comment.