From f8107684f00465c398caf7e6027621b16ba0fa34 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 14 Mar 2019 14:16:38 -0700 Subject: [PATCH] Fix bug in Avro schema cleanup during drop source (#2537) --- .../ksql/ddl/commands/DropSourceCommand.java | 11 +++++---- .../ddl/commands/DropSourceCommandTest.java | 8 ++----- .../integration/EndToEndIntegrationTest.java | 24 ++++++++++++++++++- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java index fd83fe173238..308354e8ed86 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java @@ -71,7 +71,7 @@ public DdlCommandResult run(final MutableMetaStore metaStore) { )); } final DropTopicCommand dropTopicCommand = - new DropTopicCommand(dataSource.getKsqlTopic().getTopicName()); + new DropTopicCommand(dataSource.getTopicName()); metaStore.deleteSource(sourceName); dropTopicCommand.run(metaStore); @@ -81,7 +81,7 @@ public DdlCommandResult run(final MutableMetaStore metaStore) { true, "Source " + sourceName + " was dropped. " + (deleteTopic ? "Topic '" - + dataSource.getKsqlTopic().getKafkaTopicName() + + dataSource.getKafkaTopicName() + "' was marked for deletion. Actual deletion " + "and removal from brokers may take some time " + "to complete." : "")); @@ -94,18 +94,19 @@ private void deleteTopicIfNeeded(final StructuredDataSource dataSource) { try { final List topic = Collections - .singletonList(dataSource.getKsqlTopic().getKafkaTopicName()); + .singletonList(dataSource.getKafkaTopicName()); ExecutorUtil.executeWithRetries(() -> kafkaTopicClient.deleteTopics(topic), ALWAYS); } catch (final Exception e) { throw new KsqlException("Could not delete the corresponding kafka topic: " - + dataSource.getKsqlTopic().getKafkaTopicName(), e); + + dataSource.getKafkaTopicName(), e); } if (dataSource.isSerdeFormat(DataSource.DataSourceSerDe.AVRO)) { try { SchemaRegistryUtil.deleteSubjectWithRetries( - schemaRegistryClient, sourceName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); + schemaRegistryClient, + dataSource.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); } catch (final Exception e) { throw new KsqlException("Could not clean up the schema registry for topic: " + sourceName, e); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceCommandTest.java index ad4aa445f55b..5334e4b9fbfb 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceCommandTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.ksql.metastore.KsqlTopic; import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.metastore.StructuredDataSource; import io.confluent.ksql.parser.tree.DropStream; @@ -61,8 +60,6 @@ public class DropSourceCommandTest { private SchemaRegistryClient schemaRegistryClient; @Mock private StructuredDataSource dataSource; - @Mock - private KsqlTopic ksqlTopic; private DropSourceCommand dropSourceCommand; @@ -72,9 +69,8 @@ public class DropSourceCommandTest { @Before public void setUp() { when(metaStore.getSource(STREAM_NAME)).thenReturn(dataSource); - when(dataSource.getKsqlTopic()).thenReturn(ksqlTopic); when(dataSource.getDataSourceType()).thenReturn(DataSourceType.KSTREAM); - when(ksqlTopic.getKafkaTopicName()).thenReturn(TOPIC_NAME); + when(dataSource.getKafkaTopicName()).thenReturn(TOPIC_NAME); } @Test @@ -138,7 +134,7 @@ public void shouldCleanUpSchemaIfAvroTopic() throws Exception { // Then: verify(schemaRegistryClient) - .deleteSubject(STREAM_NAME + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); + .deleteSubject(TOPIC_NAME + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index d246f9a9b035..2392bc7b2e67 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -29,6 +29,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.GenericRow; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.PageViewDataProvider; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; @@ -302,6 +303,27 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception { assertThat(columns.get(3).toString(), either(is("FEMALE")).or(is("MALE"))); } + @Test + public void shouldCleanUpAvroSchemaOnDropSource() throws Exception { + final String topicName = "avro_stream_topic"; + + executeStatement(format( + "create stream avro_stream with (kafka_topic='%s',value_format='avro') as select * from %s;", + topicName, + PAGE_VIEW_STREAM)); + + TEST_HARNESS.produceRows( + PAGE_VIEW_TOPIC, PAGE_VIEW_DATA_PROVIDER, JSON, System::currentTimeMillis); + + TEST_HARNESS.waitForSubjectToBePresent(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); + + ksqlContext.terminateQuery(new QueryId("CSAS_AVRO_STREAM_0")); + + executeStatement("DROP STREAM avro_stream DELETE TOPIC;"); + + TEST_HARNESS.waitForSubjectToBeAbsent(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); + } + private QueryMetadata executeStatement(final String statement, final String... args) { final String formatted = String.format(statement, (Object[])args); @@ -338,7 +360,7 @@ private static List verifyAvailableRows( TestUtils.waitForCondition( () -> rowQueue.size() >= expectedRows, 30_000, - expectedRows + " rows where not available after 30 seconds"); + expectedRows + " rows were not available after 30 seconds"); final List> rows = new ArrayList<>(); rowQueue.drainTo(rows);