Skip to content

Commit 389f43c

Browse files
authored
Replace the logic for deleting tables in tests with the DROP TABLE command (#12)
* Upgrading the version of the Confluent plugin and switching to use DROP TABLE now that it is supported.
1 parent aa3c094 commit 389f43c

File tree

18 files changed

+80
-552
lines changed

18 files changed

+80
-552
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@
2929

3030
* Recent updates in Confluent Cloud have improved the reliability of queries.
3131
This negates the need for the retries that were previously used.
32-
The retry logic has been removed as a consequence.
32+
The retry logic has been removed as a consequence.
33+
34+
## Version 0.4.0
35+
36+
* Upgrade Confluent Plugin to version 1.20-50
37+
* The new version of the Confluent plugin now supports `DROP TABLE`. This allows us to use it to delete temporary tables during tests, instead of needing the Kafka admin client and the Schema Registry client. This is a much simpler flow and will also simplify the exercise instructions.

solutions/01-connecting-to-confluent-cloud/pom.xml

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ under the License.
3030
<properties>
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3232
<flink.version>1.20.0</flink.version>
33-
<confluent-plugin.version>1.20-42</confluent-plugin.version>
34-
<kafka-clients.version>3.8.0</kafka-clients.version>
35-
<schema-registry-client.version>7.7.0</schema-registry-client.version>
33+
<confluent-plugin.version>1.20-50</confluent-plugin.version>
3634
<target.java.version>21</target.java.version>
3735
<maven.compiler.source>${target.java.version}</maven.compiler.source>
3836
<maven.compiler.target>${target.java.version}</maven.compiler.target>
@@ -73,20 +71,6 @@ under the License.
7371
<version>${confluent-plugin.version}</version>
7472
</dependency>
7573

76-
<!-- Apache Kafka dependencies -->
77-
<dependency>
78-
<groupId>org.apache.kafka</groupId>
79-
<artifactId>kafka-clients</artifactId>
80-
<version>${kafka-clients.version}</version>
81-
<scope>test</scope>
82-
</dependency>
83-
<dependency>
84-
<groupId>io.confluent</groupId>
85-
<artifactId>kafka-schema-registry-client</artifactId>
86-
<version>${schema-registry-client.version}</version>
87-
<scope>test</scope>
88-
</dependency>
89-
9074
<!-- Add logging framework, to produce console output when running in the IDE. -->
9175
<!-- These dependencies are excluded from the application JAR by default. -->
9276
<dependency>

solutions/01-connecting-to-confluent-cloud/src/main/resources/cloud-template.properties

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,4 @@ client.environment-id=<ENVIRONMENT ID>
2424
client.compute-pool-id=<COMPUTE POOL>
2525

2626
# User or service account
27-
client.principal-id=<USER ID>
28-
29-
# Kafka (used by tests)
30-
client.kafka.bootstrap.servers=<BOOTSTRAP SERVER>
31-
client.kafka.security.protocol=SASL_SSL
32-
client.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<KAFKA KEY>' password='<KAFKA SECRET>';
33-
client.kafka.sasl.mechanism=PLAIN
34-
35-
# Schema Registry (used by tests)
36-
client.registry.url=<SCHEMA REGISTRY URL>
37-
client.registry.key=<SCHEMA REGISTRY KEY>
38-
client.registry.secret=<SCHEMA REGISTRY SECRET>
27+
client.principal-id=<USER ID>

solutions/02-querying-flink-tables/pom.xml

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ under the License.
3030
<properties>
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3232
<flink.version>1.20.0</flink.version>
33-
<confluent-plugin.version>1.20-42</confluent-plugin.version>
34-
<kafka-clients.version>3.8.0</kafka-clients.version>
35-
<schema-registry-client.version>7.7.0</schema-registry-client.version>
33+
<confluent-plugin.version>1.20-50</confluent-plugin.version>
3634
<target.java.version>21</target.java.version>
3735
<maven.compiler.source>${target.java.version}</maven.compiler.source>
3836
<maven.compiler.target>${target.java.version}</maven.compiler.target>
@@ -73,20 +71,6 @@ under the License.
7371
<version>${confluent-plugin.version}</version>
7472
</dependency>
7573

76-
<!-- Apache Kafka dependencies -->
77-
<dependency>
78-
<groupId>org.apache.kafka</groupId>
79-
<artifactId>kafka-clients</artifactId>
80-
<version>${kafka-clients.version}</version>
81-
<scope>test</scope>
82-
</dependency>
83-
<dependency>
84-
<groupId>io.confluent</groupId>
85-
<artifactId>kafka-schema-registry-client</artifactId>
86-
<version>${schema-registry-client.version}</version>
87-
<scope>test</scope>
88-
</dependency>
89-
9074
<!-- Add logging framework, to produce console output when running in the IDE. -->
9175
<!-- These dependencies are excluded from the application JAR by default. -->
9276
<dependency>

solutions/02-querying-flink-tables/src/main/resources/cloud-template.properties

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,4 @@ client.environment-id=<ENVIRONMENT ID>
2424
client.compute-pool-id=<COMPUTE POOL>
2525

2626
# User or service account
27-
client.principal-id=<USER ID>
28-
29-
# Kafka (used by tests)
30-
client.kafka.bootstrap.servers=<BOOTSTRAP SERVER>
31-
client.kafka.security.protocol=SASL_SSL
32-
client.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<KAFKA KEY>' password='<KAFKA SECRET>';
33-
client.kafka.sasl.mechanism=PLAIN
34-
35-
# Schema Registry (used by tests)
36-
client.registry.url=<SCHEMA REGISTRY URL>
37-
client.registry.key=<SCHEMA REGISTRY KEY>
38-
client.registry.secret=<SCHEMA REGISTRY SECRET>
27+
client.principal-id=<USER ID>

solutions/02-querying-flink-tables/src/test/java/marketplace/FlinkIntegrationTest.java

Lines changed: 12 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,25 @@
22

33
import io.confluent.flink.plugin.ConfluentSettings;
44
import org.apache.flink.table.api.EnvironmentSettings;
5-
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
6-
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
75
import org.apache.flink.table.api.TableEnvironment;
86
import org.apache.flink.table.api.TableResult;
97
import org.apache.flink.types.Row;
10-
import org.apache.kafka.clients.admin.AdminClient;
11-
import org.apache.kafka.common.KafkaFuture;
128
import org.junit.jupiter.api.AfterEach;
139
import org.junit.jupiter.api.BeforeEach;
1410
import org.slf4j.Logger;
1511
import org.slf4j.LoggerFactory;
1612

17-
import java.io.File;
1813
import java.util.*;
19-
import java.util.function.Supplier;
2014
import java.util.stream.Stream;
2115
import java.util.stream.StreamSupport;
2216

2317
public abstract class FlinkIntegrationTest {
2418
private List<TableResult> jobsToCancel;
25-
private List<String> topicsToDelete;
19+
private List<String> tablesToDelete;
2620
private Thread shutdownHook;
2721
private boolean isShuttingDown;
2822

2923
protected TableEnvironment env;
30-
protected AdminClient adminClient;
31-
protected SchemaRegistryClient registryClient;
3224

3325
protected Logger logger = LoggerFactory.getLogger(this.getClass());
3426

@@ -38,30 +30,11 @@ public abstract class FlinkIntegrationTest {
3830
@BeforeEach
3931
public void mainSetup() throws Exception {
4032
jobsToCancel = new ArrayList<>();
41-
topicsToDelete = new ArrayList<>();
33+
tablesToDelete = new ArrayList<>();
4234

4335
EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties");
4436
env = TableEnvironment.create(settings);
4537

46-
Properties properties = new Properties();
47-
settings.getConfiguration().toMap().forEach((k,v) ->
48-
properties.put(k.replace("client.kafka.", ""), v)
49-
);
50-
adminClient = AdminClient.create(properties);
51-
52-
Map<String, String> schemaConfig = new HashMap<>();
53-
54-
schemaConfig.put("basic.auth.credentials.source", "USER_INFO");
55-
schemaConfig.put(
56-
"basic.auth.user.info",
57-
properties.get("client.registry.key") + ":" + properties.get("client.registry.secret"));
58-
59-
registryClient = new CachedSchemaRegistryClient(
60-
properties.getProperty("client.registry.url"),
61-
100,
62-
schemaConfig
63-
);
64-
6538
isShuttingDown = false;
6639
shutdownHook = new Thread(() -> {
6740
logger.info("Shutdown Detected. Cleaning up resources.");
@@ -84,9 +57,7 @@ public void mainTeardown() {
8457
.join()
8558
);
8659

87-
topicsToDelete.forEach(topic ->
88-
deleteTopic(topic)
89-
);
60+
tablesToDelete.forEach(this::deleteTable);
9061

9162
if(!isShuttingDown) {
9263
Runtime.getRuntime().removeShutdownHook(shutdownHook);
@@ -103,63 +74,29 @@ protected Stream<Row> fetchRows(TableResult result) {
10374
return StreamSupport.stream(iterable.spliterator(), false);
10475
}
10576

106-
protected String getShortTableName(String tableName) {
77+
protected void deleteTable(String tableName) {
10778
String[] tablePath = tableName.split("\\.");
108-
return tablePath[tablePath.length - 1].replace("`","");
109-
}
110-
111-
protected void deleteTopic(String topicName) {
112-
try {
113-
String schemaName = topicName + "-value";
114-
logger.info("Deleting Schema: "+schemaName);
115-
if(registryClient.getAllSubjects().contains(schemaName)) {
116-
registryClient.deleteSubject(schemaName, false);
117-
registryClient.deleteSubject(schemaName, true);
118-
}
119-
logger.info("Deleted Schema: "+schemaName);
120-
} catch (Exception e) {
121-
logger.error("Error Deleting Schema", e);
122-
}
123-
124-
try {
125-
if(adminClient.listTopics().names().get().contains(topicName)) {
126-
logger.info("Deleting Topic: " + topicName);
127-
KafkaFuture<Void> result = adminClient.deleteTopics(List.of(topicName)).all();
12879

129-
while(!result.isDone()) {
130-
logger.info("Waiting for topic to be deleted: " + topicName);
131-
Thread.sleep(1000);
132-
}
80+
String catalog = tablePath[0].replace("`", "");
81+
String database = tablePath[1].replace("`", "");
82+
String table = tablePath[2].replace("`", "");
13383

134-
logger.info("Topic Deleted: " + topicName);
135-
}
136-
} catch (Exception e) {
137-
logger.error("Error Deleting Topic", e);
84+
if(Arrays.asList(env.listTables(catalog, database)).contains(table)) {
85+
logger.info("Deleting table {}", tableName);
86+
env.executeSql(String.format("DROP TABLE %s", tableName));
13887
}
13988
}
14089

141-
protected void deleteTable(String tableName) {
142-
String topicName = getShortTableName(tableName);
143-
deleteTopic(topicName);
144-
}
145-
146-
protected void deleteTopicOnExit(String topicName) {
147-
topicsToDelete.add(topicName);
148-
}
149-
15090
protected void deleteTableOnExit(String tableName) {
151-
String topicName = getShortTableName(tableName);
152-
deleteTopicOnExit(topicName);
91+
tablesToDelete.add(tableName);
15392
}
15493

15594
protected void createTemporaryTable(String fullyQualifiedTableName, String tableDefinition) {
156-
String topicName = getShortTableName(fullyQualifiedTableName);
157-
15895
logger.info("Creating temporary table: " + fullyQualifiedTableName);
15996

16097
try {
16198
env.executeSql(tableDefinition).await();
162-
deleteTopicOnExit(topicName);
99+
deleteTableOnExit(fullyQualifiedTableName);
163100

164101
logger.info("Created temporary table: " + fullyQualifiedTableName);
165102
} catch (Exception e) {

solutions/03-building-a-streaming-pipeline/pom.xml

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ under the License.
3030
<properties>
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3232
<flink.version>1.20.0</flink.version>
33-
<confluent-plugin.version>1.20-42</confluent-plugin.version>
34-
<kafka-clients.version>3.8.0</kafka-clients.version>
35-
<schema-registry-client.version>7.7.0</schema-registry-client.version>
33+
<confluent-plugin.version>1.20-50</confluent-plugin.version>
3634
<target.java.version>21</target.java.version>
3735
<maven.compiler.source>${target.java.version}</maven.compiler.source>
3836
<maven.compiler.target>${target.java.version}</maven.compiler.target>
@@ -73,20 +71,6 @@ under the License.
7371
<version>${confluent-plugin.version}</version>
7472
</dependency>
7573

76-
<!-- Apache Kafka dependencies -->
77-
<dependency>
78-
<groupId>org.apache.kafka</groupId>
79-
<artifactId>kafka-clients</artifactId>
80-
<version>${kafka-clients.version}</version>
81-
<scope>test</scope>
82-
</dependency>
83-
<dependency>
84-
<groupId>io.confluent</groupId>
85-
<artifactId>kafka-schema-registry-client</artifactId>
86-
<version>${schema-registry-client.version}</version>
87-
<scope>test</scope>
88-
</dependency>
89-
9074
<!-- Add logging framework, to produce console output when running in the IDE. -->
9175
<!-- These dependencies are excluded from the application JAR by default. -->
9276
<dependency>

solutions/03-building-a-streaming-pipeline/src/main/resources/cloud-template.properties

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,4 @@ client.environment-id=<ENVIRONMENT ID>
2424
client.compute-pool-id=<COMPUTE POOL>
2525

2626
# User or service account
27-
client.principal-id=<USER ID>
28-
29-
# Kafka (used by tests)
30-
client.kafka.bootstrap.servers=<BOOTSTRAP SERVER>
31-
client.kafka.security.protocol=SASL_SSL
32-
client.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<KAFKA KEY>' password='<KAFKA SECRET>';
33-
client.kafka.sasl.mechanism=PLAIN
34-
35-
# Schema Registry (used by tests)
36-
client.registry.url=<SCHEMA REGISTRY URL>
37-
client.registry.key=<SCHEMA REGISTRY KEY>
38-
client.registry.secret=<SCHEMA REGISTRY SECRET>
27+
client.principal-id=<USER ID>

0 commit comments

Comments
 (0)