Skip to content

Commit

Permalink
fix(ksql): add ifExists/ifNotExist parameters to java client connecto…
Browse files Browse the repository at this point in the history
…r functions (#8851)

* fix(ksql): add ifExists/ifNotExist parameters to java client connector functions

* checkstyle

* null checks
  • Loading branch information
Zara Lim committed Mar 7, 2022
1 parent b250de3 commit eaf2b1f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 4 deletions.
4 changes: 2 additions & 2 deletions docs/developer-guide/ksqldb-clients/java-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,12 @@ Map<String, String> connectorProperties = ImmutableMap.of(
"table.whitelist", "users",
"key", "username"
);
client.createConnector("jdbc-connector", true, connectorProperties).get();
client.createConnector("jdbc-connector", true, connectorProperties, false).get();
```

Drop a connector:
```java
client.dropConnector("jdbc-connector").get();
client.dropConnector("jdbc-connector", true).get();
```

List connectors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ public interface Client extends Closeable {
CompletableFuture<Void> createConnector(
String connectorName, boolean isSource, Map<String, Object> properties);

/**
* Creates a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector
* @param isSource true if the connector is a source connector, false if it is a sink connector
* @param properties connector properties
* @param ifNotExists is ifNotExists is set to true, then the command won't fail if a connector
* with the same name already exists
* @return result of connector creation
*/
CompletableFuture<Void> createConnector(
String connectorName, boolean isSource, Map<String, Object> properties, boolean ifNotExists);

/**
* Drops a connector.
*
Expand All @@ -261,6 +277,19 @@ CompletableFuture<Void> createConnector(
*/
CompletableFuture<Void> dropConnector(String connectorName);

/**
* Drops a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector to drop
* @param ifExists ifExists is set to true, then the statement won't fail if the connector
* does not exist
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> dropConnector(String connectorName, boolean ifExists);

/**
* Returns a list of connectors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,15 @@ static boolean isDescribeConnectorResponse(final JsonObject ksqlEntity) {
}

static boolean isCreateConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("info") != null;
return ksqlEntity.getJsonObject("info") != null
|| (ksqlEntity.getString("message") != null
&& ksqlEntity.getString("message").contains("already exists"));
}

static boolean isDropConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getString("connectorName") != null;
return ksqlEntity.getString("connectorName") != null
|| (ksqlEntity.getString("message") != null
&& ksqlEntity.getString("message").contains("not exist"));
}

static boolean isConnectErrorResponse(final JsonObject ksqlEntity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,36 @@ public CompletableFuture<Void> createConnector(
return cf;
}

@Override
public CompletableFuture<Void> createConnector(
final String name,
final boolean isSource,
final Map<String, Object> properties,
final boolean ifNotExists
) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final String connectorConfigs = properties.entrySet()
.stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
final String type = isSource ? "SOURCE" : "SINK";
final String ifNotExistsClause = ifNotExists ? "IF NOT EXISTS" : "";

makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql",
String.format("CREATE %s CONNECTOR %s %s WITH (%s);",
type, ifNotExistsClause, name, connectorConfigs))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse)
);

return cf;
}

@Override
public CompletableFuture<Void> dropConnector(final String name) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
Expand All @@ -392,6 +422,24 @@ public CompletableFuture<Void> dropConnector(final String name) {
return cf;
}

@Override
public CompletableFuture<Void> dropConnector(final String name, final boolean ifExists) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final String ifExistsClause = ifExists ? "if exists " : "";

makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql", "drop connector " + ifExistsClause + name + ";")
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse)
);

return cf;
}

@Override
public CompletableFuture<List<ConnectorInfo>> listConnectors() {
final CompletableFuture<List<ConnectorInfo>> cf = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,20 @@ public void shouldCreateConnector() throws Exception {
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR name WITH ();"));
}

@Test
public void shouldCreateConnectorIfNotExist() throws Exception {
// Given
final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;",
new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE));
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When:
javaClient.createConnector("name", true, Collections.emptyMap(), true).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS name WITH ();"));
}

@Test
public void shouldDropConnector() throws Exception {
// Given
Expand All @@ -1585,6 +1599,19 @@ public void shouldDropConnector() throws Exception {
assertThat(testEndpoints.getLastSql(), is("drop connector name;"));
}

@Test
public void shouldDropConnectorIfExists() throws Exception {
// Given
final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name");
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When:
javaClient.dropConnector("name", true).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("drop connector if exists name;"));
}

@Test
public void shouldStoreVariables() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,13 @@ public void shouldDropConnector() throws Exception {
}, is(0));
}

@Test
public void shouldNotFailToDropNonExistantConnector() throws Exception {
// When/Then:
client.dropConnector("nonExistentConnector", true).get();
}


@Test
public void shouldCreateConnector() throws Exception {
// When:
Expand All @@ -1091,6 +1098,15 @@ public void shouldCreateConnector() throws Exception {
);
}

@Test
public void shouldNotFailToCreateConnectorThatExists() throws Exception {
// Given:
givenConnectorExists();

// When/Then:
client.createConnector(TEST_CONNECTOR, true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS), true).get();
}

@Test
public void shouldCreateConnectorWithVariables() throws Exception {
// When:
Expand Down

0 comments on commit eaf2b1f

Please sign in to comment.