Skip to content

Commit

Permalink
Fix bug in Avro schema cleanup during drop source (#2537)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Mar 14, 2019
1 parent ce8b781 commit f810768
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
Expand Up @@ -71,7 +71,7 @@ public DdlCommandResult run(final MutableMetaStore metaStore) {
));
}
final DropTopicCommand dropTopicCommand =
new DropTopicCommand(dataSource.getKsqlTopic().getTopicName());
new DropTopicCommand(dataSource.getTopicName());
metaStore.deleteSource(sourceName);
dropTopicCommand.run(metaStore);

Expand All @@ -81,7 +81,7 @@ public DdlCommandResult run(final MutableMetaStore metaStore) {
true,
"Source " + sourceName + " was dropped. "
+ (deleteTopic ? "Topic '"
+ dataSource.getKsqlTopic().getKafkaTopicName()
+ dataSource.getKafkaTopicName()
+ "' was marked for deletion. Actual deletion "
+ "and removal from brokers may take some time "
+ "to complete." : ""));
Expand All @@ -94,18 +94,19 @@ private void deleteTopicIfNeeded(final StructuredDataSource dataSource) {

try {
final List<String> topic = Collections
.singletonList(dataSource.getKsqlTopic().getKafkaTopicName());
.singletonList(dataSource.getKafkaTopicName());

ExecutorUtil.executeWithRetries(() -> kafkaTopicClient.deleteTopics(topic), ALWAYS);
} catch (final Exception e) {
throw new KsqlException("Could not delete the corresponding kafka topic: "
+ dataSource.getKsqlTopic().getKafkaTopicName(), e);
+ dataSource.getKafkaTopicName(), e);
}

if (dataSource.isSerdeFormat(DataSource.DataSourceSerDe.AVRO)) {
try {
SchemaRegistryUtil.deleteSubjectWithRetries(
schemaRegistryClient, sourceName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
schemaRegistryClient,
dataSource.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
} catch (final Exception e) {
throw new KsqlException("Could not clean up the schema registry for topic: "
+ sourceName, e);
Expand Down
Expand Up @@ -24,7 +24,6 @@
import static org.mockito.Mockito.when;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.parser.tree.DropStream;
Expand Down Expand Up @@ -61,8 +60,6 @@ public class DropSourceCommandTest {
private SchemaRegistryClient schemaRegistryClient;
@Mock
private StructuredDataSource dataSource;
@Mock
private KsqlTopic ksqlTopic;

private DropSourceCommand dropSourceCommand;

Expand All @@ -72,9 +69,8 @@ public class DropSourceCommandTest {
@Before
public void setUp() {
when(metaStore.getSource(STREAM_NAME)).thenReturn(dataSource);
when(dataSource.getKsqlTopic()).thenReturn(ksqlTopic);
when(dataSource.getDataSourceType()).thenReturn(DataSourceType.KSTREAM);
when(ksqlTopic.getKafkaTopicName()).thenReturn(TOPIC_NAME);
when(dataSource.getKafkaTopicName()).thenReturn(TOPIC_NAME);
}

@Test
Expand Down Expand Up @@ -138,7 +134,7 @@ public void shouldCleanUpSchemaIfAvroTopic() throws Exception {

// Then:
verify(schemaRegistryClient)
.deleteSubject(STREAM_NAME + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
.deleteSubject(TOPIC_NAME + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
}

@Test
Expand Down
Expand Up @@ -29,6 +29,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
Expand Down Expand Up @@ -302,6 +303,27 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception {
assertThat(columns.get(3).toString(), either(is("FEMALE")).or(is("MALE")));
}

@Test
public void shouldCleanUpAvroSchemaOnDropSource() throws Exception {
final String topicName = "avro_stream_topic";

executeStatement(format(
"create stream avro_stream with (kafka_topic='%s',value_format='avro') as select * from %s;",
topicName,
PAGE_VIEW_STREAM));

TEST_HARNESS.produceRows(
PAGE_VIEW_TOPIC, PAGE_VIEW_DATA_PROVIDER, JSON, System::currentTimeMillis);

TEST_HARNESS.waitForSubjectToBePresent(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);

ksqlContext.terminateQuery(new QueryId("CSAS_AVRO_STREAM_0"));

executeStatement("DROP STREAM avro_stream DELETE TOPIC;");

TEST_HARNESS.waitForSubjectToBeAbsent(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
}

private QueryMetadata executeStatement(final String statement,
final String... args) {
final String formatted = String.format(statement, (Object[])args);
Expand Down Expand Up @@ -338,7 +360,7 @@ private static List<GenericRow> verifyAvailableRows(
TestUtils.waitForCondition(
() -> rowQueue.size() >= expectedRows,
30_000,
expectedRows + " rows where not available after 30 seconds");
expectedRows + " rows were not available after 30 seconds");

final List<KeyValue<String, GenericRow>> rows = new ArrayList<>();
rowQueue.drainTo(rows);
Expand Down

0 comments on commit f810768

Please sign in to comment.