diff --git a/docs/developer-guide/ksqldb-clients/java-client.md b/docs/developer-guide/ksqldb-clients/java-client.md index 9701d13427b8..9804c1da6669 100644 --- a/docs/developer-guide/ksqldb-clients/java-client.md +++ b/docs/developer-guide/ksqldb-clients/java-client.md @@ -731,12 +731,12 @@ Map connectorProperties = ImmutableMap.of( "table.whitelist", "users", "key", "username" ); -client.createConnector("jdbc-connector", true, connectorProperties).get(); +client.createConnector("jdbc-connector", true, connectorProperties, false).get(); ``` Drop a connector: ```java -client.dropConnector("jdbc-connector").get(); +client.dropConnector("jdbc-connector", true).get(); ``` List connectors: diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 44f3912d223b..c95af8c3d1fb 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -250,6 +250,22 @@ public interface Client extends Closeable { CompletableFuture createConnector( String connectorName, boolean isSource, Map properties); + /** + * Creates a connector. + * + *

If a non-200 response is received from the server, the {@code CompletableFuture} will be + * failed. + * + * @param connectorName name of the connector + * @param isSource true if the connector is a source connector, false if it is a sink connector + * @param properties connector properties + * @param ifNotExists is ifNotExists is set to true, then the command won't fail if a connector + * with the same name already exists + * @return result of connector creation + */ + CompletableFuture createConnector( + String connectorName, boolean isSource, Map properties, boolean ifNotExists); + /** * Drops a connector. * @@ -261,6 +277,19 @@ CompletableFuture createConnector( */ CompletableFuture dropConnector(String connectorName); + /** + * Drops a connector. + * + *

If a non-200 response is received from the server, the {@code CompletableFuture} will be + * failed. + * + * @param connectorName name of the connector to drop + * @param ifExists ifExists is set to true, then the statement won't fail if the connector + * does not exist + * @return a future that completes once the server response is received + */ + CompletableFuture dropConnector(String connectorName, boolean ifExists); + /** * Returns a list of connectors. * diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AdminResponseHandlers.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AdminResponseHandlers.java index 044410544fcc..d2c07617cae5 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AdminResponseHandlers.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AdminResponseHandlers.java @@ -165,11 +165,15 @@ static boolean isDescribeConnectorResponse(final JsonObject ksqlEntity) { } static boolean isCreateConnectorResponse(final JsonObject ksqlEntity) { - return ksqlEntity.getJsonObject("info") != null; + return ksqlEntity.getJsonObject("info") != null + || (ksqlEntity.getString("message") != null + && ksqlEntity.getString("message").contains("already exists")); } static boolean isDropConnectorResponse(final JsonObject ksqlEntity) { - return ksqlEntity.getString("connectorName") != null; + return ksqlEntity.getString("connectorName") != null + || (ksqlEntity.getString("message") != null + && ksqlEntity.getString("message").contains("not exist")); } static boolean isConnectErrorResponse(final JsonObject ksqlEntity) { diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 9f4a7f20f983..0efe2ec8ce3e 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -375,6 +375,36 @@ public CompletableFuture createConnector( return cf; } + @Override + public CompletableFuture createConnector( + final String name, + final boolean isSource, + final Map properties, + final boolean ifNotExists + ) { + final CompletableFuture cf = new CompletableFuture<>(); + final String connectorConfigs = properties.entrySet() + .stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); + final String type = isSource ? "SOURCE" : "SINK"; + final String ifNotExistsClause = ifNotExists ? "IF NOT EXISTS" : ""; + + makePostRequest( + KSQL_ENDPOINT, + new JsonObject() + .put("ksql", + String.format("CREATE %s CONNECTOR %s %s WITH (%s);", + type, ifNotExistsClause, name, connectorConfigs)) + .put("sessionVariables", sessionVariables), + cf, + response -> handleSingleEntityResponse( + response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse) + ); + + return cf; + } + @Override public CompletableFuture dropConnector(final String name) { final CompletableFuture cf = new CompletableFuture<>(); @@ -392,6 +422,24 @@ public CompletableFuture dropConnector(final String name) { return cf; } + @Override + public CompletableFuture dropConnector(final String name, final boolean ifExists) { + final CompletableFuture cf = new CompletableFuture<>(); + final String ifExistsClause = ifExists ? "if exists " : ""; + + makePostRequest( + KSQL_ENDPOINT, + new JsonObject() + .put("ksql", "drop connector " + ifExistsClause + name + ";") + .put("sessionVariables", sessionVariables), + cf, + response -> handleSingleEntityResponse( + response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse) + ); + + return cf; + } + @Override public CompletableFuture> listConnectors() { final CompletableFuture> cf = new CompletableFuture<>(); diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 818e1676c3bf..4aef4f3619bf 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -1572,6 +1572,20 @@ public void shouldCreateConnector() throws Exception { assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR name WITH ();")); } + @Test + public void shouldCreateConnectorIfNotExist() throws Exception { + // Given + final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;", + new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE)); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + javaClient.createConnector("name", true, Collections.emptyMap(), true).get(); + + // Then: + assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS name WITH ();")); + } + @Test public void shouldDropConnector() throws Exception { // Given @@ -1585,6 +1599,19 @@ public void shouldDropConnector() throws Exception { assertThat(testEndpoints.getLastSql(), is("drop connector name;")); } + @Test + public void shouldDropConnectorIfExists() throws Exception { + // Given + final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name"); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + javaClient.dropConnector("name", true).get(); + + // Then: + assertThat(testEndpoints.getLastSql(), is("drop connector if exists name;")); + } + @Test public void shouldStoreVariables() { // When: diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index b0cf7e1c4b81..6bd7beed3dd2 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -1073,6 +1073,13 @@ public void shouldDropConnector() throws Exception { }, is(0)); } + @Test + public void shouldNotFailToDropNonExistantConnector() throws Exception { + // When/Then: + client.dropConnector("nonExistentConnector", true).get(); + } + + @Test public void shouldCreateConnector() throws Exception { // When: @@ -1091,6 +1098,15 @@ public void shouldCreateConnector() throws Exception { ); } + @Test + public void shouldNotFailToCreateConnectorThatExists() throws Exception { + // Given: + givenConnectorExists(); + + // When/Then: + client.createConnector(TEST_CONNECTOR, true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS), true).get(); + } + @Test public void shouldCreateConnectorWithVariables() throws Exception { // When: