From 7069d4935357549788eefcd897142628d2dc4cae Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Mon, 10 May 2021 15:43:45 +0200 Subject: [PATCH] DBZ-3417 fix on-the-fly schema changes --- .../connector/cassandra/CassandraClient.java | 4 + .../cassandra/CommitLogReadHandlerImpl.java | 25 +- .../cassandra/NoOpSchemaChangeListener.java | 117 +++++++++ .../connector/cassandra/SchemaHolder.java | 102 +++----- .../connector/cassandra/SchemaProcessor.java | 229 +++++++++++++++++- .../cassandra/SnapshotProcessor.java | 2 +- .../cassandra/CommitLogProcessorTest.java | 5 +- .../cassandra/SchemaProcessorTest.java | 21 +- .../cassandra/SnapshotProcessorTest.java | 9 +- 9 files changed, 421 insertions(+), 93 deletions(-) create mode 100644 src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java index b1999248..aa398b8f 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java @@ -95,6 +95,10 @@ public String getClusterName() { return cluster.getMetadata().getClusterName(); } + public Cluster getCluster() { + return cluster; + } + public boolean isQueryable() { return !cluster.isClosed() && !session.isClosed(); } diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index c9584f68..bd7ebdd4 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler { } /** - * A PartitionType represents the type of a PartitionUpdate. + * A PartitionType represents the type of a PartitionUpdate. */ enum PartitionType { /** @@ -86,7 +86,7 @@ enum PartitionType { PARTITION_KEY_ROW_DELETION, /** - * a partition-level deletion where partition key + clustering key = primary key + * a partition-level deletion where partition key + clustering key = primary key */ PARTITION_AND_CLUSTERING_KEY_ROW_DELETION, @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) { } /** - * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. + * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. */ enum RowType { /** @@ -314,7 +314,12 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace */ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + LOGGER.warn("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString()); + return; + } + Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); @@ -362,14 +367,18 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo */ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); - Schema keySchema = schema.keySchema(); - Schema valueSchema = schema.valueSchema(); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + LOGGER.warn("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString()); + return; + } + Schema keySchema = keyValueSchema.keySchema(); + Schema valueSchema = keyValueSchema.valueSchema(); RowData after = new RowData(); populatePartitionColumns(after, pu); populateClusteringColumns(after, row, pu); - populateRegularColumns(after, row, rowType, schema); + populateRegularColumns(after, row, rowType, keyValueSchema); long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp(); diff --git a/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java new file mode 100644 index 00000000..1b45db42 --- /dev/null +++ b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java @@ -0,0 +1,117 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.cassandra; + +import com.datastax.driver.core.AggregateMetadata; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.FunctionMetadata; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.MaterializedViewMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.UserType; + +public class NoOpSchemaChangeListener implements SchemaChangeListener { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + + } + + @Override + public void onTableAdded(final TableMetadata table) { + + } + + @Override + public void onTableRemoved(final TableMetadata table) { + + } + + @Override + public void onTableChanged(final TableMetadata current, final TableMetadata previous) { + + } + + @Override + public void onUserTypeAdded(final UserType type) { + + } + + @Override + public void onUserTypeRemoved(final UserType type) { + + } + + @Override + public void onUserTypeChanged(final UserType current, final UserType previous) { + + } + + @Override + public void onFunctionAdded(final FunctionMetadata function) { + + } + + @Override + public void onFunctionRemoved(final FunctionMetadata function) { + + } + + @Override + public void onFunctionChanged(final FunctionMetadata current, final FunctionMetadata previous) { + + } + + @Override + public void onAggregateAdded(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateRemoved(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateChanged(final AggregateMetadata current, final AggregateMetadata previous) { + + } + + @Override + public void onMaterializedViewAdded(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewRemoved(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewChanged(final MaterializedViewMetadata current, final MaterializedViewMetadata previous) { + + } + + @Override + public void onRegister(final Cluster cluster) { + + } + + @Override + public void onUnregister(final Cluster cluster) { + + } +} diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java index 839e6dbf..da7f858b 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java @@ -5,8 +5,10 @@ */ package io.debezium.connector.cassandra; -import java.util.HashMap; -import java.util.HashSet; +import static java.util.stream.Collectors.toList; + +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -19,6 +21,7 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.TableMetadata; import io.debezium.connector.SourceInfoStructMaker; @@ -31,49 +34,60 @@ * by {@link SchemaProcessor} periodically. */ public class SchemaHolder { - private static final String NAMESPACE = "io.debezium.connector.cassandra"; private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class); - private final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final String kafkaTopicPrefix; + public final SourceInfoStructMaker sourceInfoStructMaker; private final CassandraClient cassandraClient; - private final String kafkaTopicPrefix; - private final SourceInfoStructMaker sourceInfoStructMaker; public SchemaHolder(CassandraClient cassandraClient, String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { this.cassandraClient = cassandraClient; this.kafkaTopicPrefix = kafkaTopicPrefix; this.sourceInfoStructMaker = sourceInfoStructMaker; - refreshSchemas(); - } - - public void refreshSchemas() { - LOGGER.info("Refreshing schemas..."); - Map latest = getLatestTableMetadatas(); - removeDeletedTableSchemas(latest); - createOrUpdateNewTableSchemas(latest); - LOGGER.info("Schemas are refreshed"); } - public KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { - if (!tableToKVSchemaMap.containsKey(kt)) { - refreshSchema(kt); - } + public synchronized KeyValueSchema getKeyValueSchema(KeyspaceTable kt) { return tableToKVSchemaMap.getOrDefault(kt, null); } public Set getCdcEnabledTableMetadataSet() { - return tableToKVSchemaMap.values().stream() - .map(KeyValueSchema::tableMetadata) - .filter(tm -> tm.getOptions().isCDC()) + return cassandraClient + .getCluster() + .getMetadata() + .getKeyspaces() + .stream() + .map(KeyspaceMetadata::getTables) + .flatMap(Collection::stream) + .filter(t -> t.getOptions().isCDC()) .collect(Collectors.toSet()); } + public synchronized void removeSchemasOfAllTablesInKeyspace(String keyspace) { + final List collect = tableToKVSchemaMap.keySet() + .stream() + .filter(keyValueSchema -> keyValueSchema.keyspace.equals(keyspace)) + .collect(toList()); + + collect.forEach(tableToKVSchemaMap::remove); + } + + public synchronized void removeTableSchema(KeyspaceTable kst) { + tableToKVSchemaMap.remove(kst); + } + + // there is not "addKeyspace", it is not necessary + // as we will ever add a concrete table (with keyspace) but we will also dropping all tables when keyspace is dropped + public synchronized void addOrUpdateTableSchema(KeyspaceTable kst, KeyValueSchema kvs) { + tableToKVSchemaMap.put(kst, kvs); + } + /** * Get the schema of an inner field based on the field name * @param fieldName the name of the field in the schema - * @param schema the schema where the field resides in + * @param schema the schema where the field resides in * @return Schema */ public static Schema getFieldSchema(String fieldName, Schema schema) { @@ -83,48 +97,6 @@ public static Schema getFieldSchema(String fieldName, Schema schema) { throw new CassandraConnectorSchemaException("Only STRUCT type is supported for this method, but encountered " + schema.type()); } - private void refreshSchema(KeyspaceTable keyspaceTable) { - LOGGER.debug("Refreshing schema for {}", keyspaceTable); - TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null; - TableMetadata latest = cassandraClient.getCdcEnabledTableMetadata(keyspaceTable.keyspace, keyspaceTable.table); - if (existing != latest) { - if (existing == null) { - tableToKVSchemaMap.put(keyspaceTable, new KeyValueSchema(kafkaTopicPrefix, latest, sourceInfoStructMaker)); - LOGGER.debug("Updated schema for {}", keyspaceTable); - } - if (latest == null) { - tableToKVSchemaMap.remove(keyspaceTable); - LOGGER.debug("Removed schema for {}", keyspaceTable); - } - } - } - - private Map getLatestTableMetadatas() { - Map latest = new HashMap<>(); - for (TableMetadata tm : cassandraClient.getCdcEnabledTableMetadataList()) { - latest.put(new KeyspaceTable(tm), tm); - } - return latest; - } - - private void removeDeletedTableSchemas(Map latestTableMetadataMap) { - Set existingTables = new HashSet<>(tableToKVSchemaMap.keySet()); - Set latestTables = latestTableMetadataMap.keySet(); - existingTables.removeAll(latestTables); - tableToKVSchemaMap.keySet().removeAll(existingTables); - } - - private void createOrUpdateNewTableSchemas(Map latestTableMetadataMap) { - latestTableMetadataMap.forEach((table, metadata) -> { - TableMetadata existingTableMetadata = tableToKVSchemaMap.containsKey(table) ? tableToKVSchemaMap.get(table).tableMetadata() : null; - if (existingTableMetadata == null || !existingTableMetadata.equals(metadata)) { - KeyValueSchema keyValueSchema = new KeyValueSchema(kafkaTopicPrefix, metadata, sourceInfoStructMaker); - tableToKVSchemaMap.put(table, keyValueSchema); - LOGGER.info("Updated schema for {}.", table); - } - }); - } - public static class KeyValueSchema { private final TableMetadata tableMetadata; private final Schema keySchema; diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 260e73e4..1fc7ef53 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -5,24 +5,243 @@ */ package io.debezium.connector.cassandra; +import static java.util.stream.Collectors.toMap; + +import java.util.Map; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.schema.KeyspaceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; + +import io.debezium.connector.SourceInfoStructMaker; + /** - * The schema processor is responsible for periodically - * refreshing the table schemas in Cassandra. Cassandra - * CommitLog does not provide schema change as events, - * so we pull the schema regularly for updates. + * The schema processor is reponsible for handling changes occuring in + * Cassandra via registered schema change listener into driver. */ public class SchemaProcessor extends AbstractProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaProcessor.class); + private static final String NAME = "Schema Processor"; private final SchemaHolder schemaHolder; + private final CassandraClient cassandraClient; + private final String kafkaTopicPrefix; + private final SourceInfoStructMaker sourceInfoStructMaker; + private final SchemaChangeListener schemaChangeListener; public SchemaProcessor(CassandraConnectorContext context) { super(NAME, context.getCassandraConnectorConfig().schemaPollInterval()); schemaHolder = context.getSchemaHolder(); + this.cassandraClient = context.getCassandraClient(); + this.kafkaTopicPrefix = schemaHolder.kafkaTopicPrefix; + this.sourceInfoStructMaker = schemaHolder.sourceInfoStructMaker; + + schemaChangeListener = new NoOpSchemaChangeListener() { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + try { + Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + Keyspace.openWithoutSSTables(keyspace.getName()); + LOGGER.info("Added keyspace {}", keyspace.asCQLQuery()); + } + catch (Throwable t) { + LOGGER.error("Error happened while adding the keyspace {}", keyspace.getName(), t); + } + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + try { + Schema.instance.updateKeyspace(current.getName(), KeyspaceParams.create(current.isDurableWrites(), current.getReplication())); + LOGGER.info("Updated keyspace {}", current.asCQLQuery()); + } + catch (Throwable t) { + LOGGER.error("Error happened while updating the keyspace {}", current.getName(), t); + } + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + try { + schemaHolder.removeSchemasOfAllTablesInKeyspace(keyspace.getName()); + Schema.instance.clearKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + LOGGER.info("Removed keyspace {}", keyspace.asCQLQuery()); + } + catch (Throwable t) { + LOGGER.error("Error happened while removing the keyspace {}", keyspace.getName(), t); + } + } + + @Override + public void onTableAdded(final TableMetadata tableMetadata) { + try { + LOGGER.debug("Table {}.{} detected to be added!", tableMetadata.getKeyspace().getName(), tableMetadata.getName()); + if (tableMetadata.getOptions().isCDC()) { + schemaHolder.addOrUpdateTableSchema(new KeyspaceTable(tableMetadata), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker)); + } + + final CFMetaData rawCFMetaData = CFMetaData.compile(tableMetadata.asCQLQuery(), tableMetadata.getKeyspace().getName()); + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newCFMetaData = rawCFMetaData.copy(tableMetadata.getId()); + + Keyspace.open(newCFMetaData.ksName).initCf(newCFMetaData, false); + + final org.apache.cassandra.schema.KeyspaceMetadata current = Schema.instance.getKSMetaData(newCFMetaData.ksName); + if (current == null) { + LOGGER.warn("Keyspace {} doesn't exist", newCFMetaData.ksName); + return; + } + + if (current.tables.get(tableMetadata.getName()).isPresent()) { + LOGGER.debug("Table {}.{} is already added!", tableMetadata.getKeyspace(), tableMetadata.getName()); + return; + } + + final java.util.function.Function transformationFunction = ks -> ks + .withSwapped(ks.tables.with(newCFMetaData)); + + org.apache.cassandra.schema.KeyspaceMetadata transformed = transformationFunction.apply(current); + + Schema.instance.setKeyspaceMetadata(transformed); + Schema.instance.load(newCFMetaData); + + LOGGER.info("Added schema for table {}", tableMetadata.asCQLQuery()); + } + catch (Throwable t) { + LOGGER.error(String.format("Error happend while adding table %s.%s", tableMetadata.getKeyspace(), tableMetadata.getName()), t); + } + } + + @Override + public void onTableRemoved(final TableMetadata table) { + try { + LOGGER.info(String.format("Table %s.%s detected to be removed!", table.getKeyspace().getName(), table.getName())); + if (table.getOptions().isCDC()) { + schemaHolder.removeTableSchema(new KeyspaceTable(table)); + } + + final String ksName = table.getKeyspace().getName(); + final String tableName = table.getName(); + + final org.apache.cassandra.schema.KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(table.getKeyspace().getName()); + + if (oldKsm == null) { + LOGGER.warn("KeyspaceMetadata for keyspace {} is not found!", table.getKeyspace().getName()); + return; + } + + final ColumnFamilyStore cfs = Keyspace.openWithoutSSTables(ksName).getColumnFamilyStore(tableName); + + if (cfs == null) { + LOGGER.warn("ColumnFamilyStore for {}.{} is not found!", table.getKeyspace(), table.getName()); + return; + } + + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + + // reinitialize the keyspace. + final CFMetaData cfm = oldKsm.tables.get(tableName).get(); + final org.apache.cassandra.schema.KeyspaceMetadata newKsm = oldKsm.withSwapped(oldKsm.tables.without(tableName)); + + Schema.instance.unload(cfm); + Schema.instance.setKeyspaceMetadata(newKsm); + + LOGGER.info("Removed schema for table {}", table.asCQLQuery()); + } + catch (Throwable t) { + LOGGER.error(String.format("Error happened while removing table %s.%s", table.getKeyspace(), table.getName()), t); + } + } + + @Override + public void onTableChanged(final TableMetadata newTableMetadata, final TableMetadata oldTableMetaData) { + try { + LOGGER.debug("Detected alternation in schema of {}.{} (previous cdc = {}, current cdc = {})", + newTableMetadata.getKeyspace().getName(), + newTableMetadata.getName(), + oldTableMetaData.getOptions().isCDC(), + newTableMetadata.getOptions().isCDC()); + + if (newTableMetadata.getOptions().isCDC()) { + // if it was cdc before and now it is too, add it, because its schema might change + // however if it is CDC-enabled but it was not, update it in schema too because its cdc flag has changed + // this basically means we add / update every time if new has cdc flag equals to true + schemaHolder.addOrUpdateTableSchema(new KeyspaceTable(newTableMetadata), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, newTableMetadata, sourceInfoStructMaker)); + } + else if (oldTableMetaData.getOptions().isCDC()) { + // if new table is not on cdc anymore, and we see the old one was, remove it + schemaHolder.removeTableSchema(new KeyspaceTable(newTableMetadata)); + } + + // else if it was not cdc before nor now, do nothing with schema holder + // but add it to Cassandra for subsequent deserialization path in every case + + final CFMetaData rawNewMetadata = CFMetaData.compile(newTableMetadata.asCQLQuery(), + newTableMetadata.getKeyspace().getName()); + + final CFMetaData oldMetadata = Schema.instance.getCFMetaData(oldTableMetaData.getKeyspace().getName(), oldTableMetaData.getName()); + + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newMetadata = rawNewMetadata.copy(oldMetadata.cfId); + oldMetadata.apply(newMetadata); + + LOGGER.info("Updated schema for table {}", newTableMetadata.asCQLQuery()); + } + catch (Throwable t) { + LOGGER.error(String.format("Error happened while reacting on changed table %s.%s", newTableMetadata.getKeyspace(), newTableMetadata.getName()), t); + } + } + }; + } + + @Override + public void initialize() { + // populate schema holder when Debezium first starts + // because it would not be notified about that otherwise, + // listener is triggered on changes, not when driver connects for the first time + // so holder map would be empty + final Map tables = schemaHolder.getCdcEnabledTableMetadataSet() + .stream() + .collect(toMap(KeyspaceTable::new, + tableMetadata -> new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker))); + + tables.forEach(schemaHolder::addOrUpdateTableSchema); + + LOGGER.info("Registering schema change listener ..."); + cassandraClient.getCluster().register(schemaChangeListener); } @Override public void process() { - schemaHolder.refreshSchemas(); + // intentionally empty as this processor is reactive in fact + // it is not querying what tables Cassandra has every n-seconds + // but it is notified by driver about these changes once they happen + // so there is nothing to do "periodically" as other processors do. + } + + @Override + public void destroy() { + LOGGER.info("Unregistering schema change listener ..."); + cassandraClient.getCluster().unregister(schemaChangeListener); + LOGGER.info("Clearing cdc keyspace / table map ... "); + schemaHolder.tableToKVSchemaMap.clear(); } } diff --git a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java index 6d5f00d7..0a108422 100644 --- a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java @@ -198,7 +198,7 @@ private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetad private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException { String tableName = tableName(tableMetadata); KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata); - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); diff --git a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java index 2388b0e2..061b7c90 100644 --- a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java @@ -29,11 +29,14 @@ public class CommitLogProcessorTest extends EmbeddedCassandraConnectorTestBase { private CassandraConnectorContext context; private CommitLogProcessor commitLogProcessor; + private SchemaProcessor schemaProcessor; @Before public void setUp() throws Exception { context = generateTaskContext(); commitLogProcessor = new CommitLogProcessor(context); + schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); commitLogProcessor.initialize(); } @@ -41,6 +44,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { deleteTestOffsets(context); commitLogProcessor.destroy(); + schemaProcessor.destroy(); context.cleanUp(); } @@ -48,7 +52,6 @@ public void tearDown() throws Exception { public void testProcessCommitLogs() throws Exception { int commitLogRowSize = 10; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); // programmatically add insertion and deletion events into commit log, this is because running an 'INSERT' or 'DELETE' // cql against the embedded Cassandra does not modify the commit log file on disk. diff --git a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java index 0d61a385..b61deb4f 100644 --- a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java @@ -19,6 +19,7 @@ public class SchemaProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testProcess() throws Exception { CassandraConnectorContext context = generateTaskContext(); SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); SchemaHolder.KeyValueSchema keyValueSchema; String namespacePrefix = "io.debezium.connector.cassandra" + "." + EmbeddedCassandraConnectorTestBase.TEST_KAFKA_TOPIC_PREFIX + "." @@ -29,48 +30,48 @@ public void testProcess() throws Exception { assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table1") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); + + Thread.sleep(5000); + schemaProcessor.process(); assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); + assertNull(context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;"); schemaProcessor.process(); assertEquals(1, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); schemaProcessor.process(); - assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table2" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table2" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table2") + " ADD c text"); schemaProcessor.process(); assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); TableMetadata expectedTm1 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table1"); TableMetadata expectedTm2 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table2"); - TableMetadata tm1 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); - TableMetadata tm2 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); + TableMetadata tm1 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); + TableMetadata tm2 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); assertEquals(expectedTm1, tm1); assertEquals(expectedTm2, tm2); - deleteTestKeyspaceTables(); context.cleanUp(); } diff --git a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java index f71d997e..92420f52 100644 --- a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java @@ -31,12 +31,13 @@ public class SnapshotProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testSnapshotTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); @@ -75,11 +76,12 @@ public void testSnapshotTable() throws Exception { public void testSnapshotSkipsNonCdcEnabledTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); - context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); } @@ -99,10 +101,11 @@ public void testSnapshotEmptyTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); AtomicBoolean globalTaskState = new AtomicBoolean(true); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); ChangeEventQueue queue = context.getQueues().get(0); assertEquals(queue.totalCapacity(), queue.remainingCapacity());