Skip to content

Commit

Permalink
DBZ-3417 fix on-the-fly schema changes
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed May 10, 2021
1 parent e8c89e8 commit 7069d49
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 93 deletions.
Expand Up @@ -95,6 +95,10 @@ public String getClusterName() {
return cluster.getMetadata().getClusterName();
}

public Cluster getCluster() {
return cluster;
}

public boolean isQueryable() {
return !cluster.isClosed() && !session.isClosed();
}
Expand Down
Expand Up @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler {
}

/**
* A PartitionType represents the type of a PartitionUpdate.
* A PartitionType represents the type of a PartitionUpdate.
*/
enum PartitionType {
/**
Expand All @@ -86,7 +86,7 @@ enum PartitionType {
PARTITION_KEY_ROW_DELETION,

/**
* a partition-level deletion where partition key + clustering key = primary key
* a partition-level deletion where partition key + clustering key = primary key
*/
PARTITION_AND_CLUSTERING_KEY_ROW_DELETION,

Expand Down Expand Up @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) {
}

/**
* A RowType represents different types of {@link Row}-level modifications in a Cassandra table.
* A RowType represents different types of {@link Row}-level modifications in a Cassandra table.
*/
enum RowType {
/**
Expand Down Expand Up @@ -314,7 +314,12 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace
*/
private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {

SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
LOGGER.warn("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString());
return;
}

Schema keySchema = keyValueSchema.keySchema();
Schema valueSchema = keyValueSchema.valueSchema();

Expand Down Expand Up @@ -362,14 +367,18 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
*/
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {

SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
Schema keySchema = schema.keySchema();
Schema valueSchema = schema.valueSchema();
SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
LOGGER.warn("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString());
return;
}
Schema keySchema = keyValueSchema.keySchema();
Schema valueSchema = keyValueSchema.valueSchema();

RowData after = new RowData();
populatePartitionColumns(after, pu);
populateClusteringColumns(after, row, pu);
populateRegularColumns(after, row, rowType, schema);
populateRegularColumns(after, row, rowType, keyValueSchema);

long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();

Expand Down
@@ -0,0 +1,117 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.cassandra;

import com.datastax.driver.core.AggregateMetadata;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.FunctionMetadata;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.MaterializedViewMetadata;
import com.datastax.driver.core.SchemaChangeListener;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.UserType;

public class NoOpSchemaChangeListener implements SchemaChangeListener {
@Override
public void onKeyspaceAdded(final KeyspaceMetadata keyspace) {

}

@Override
public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) {

}

@Override
public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) {

}

@Override
public void onTableAdded(final TableMetadata table) {

}

@Override
public void onTableRemoved(final TableMetadata table) {

}

@Override
public void onTableChanged(final TableMetadata current, final TableMetadata previous) {

}

@Override
public void onUserTypeAdded(final UserType type) {

}

@Override
public void onUserTypeRemoved(final UserType type) {

}

@Override
public void onUserTypeChanged(final UserType current, final UserType previous) {

}

@Override
public void onFunctionAdded(final FunctionMetadata function) {

}

@Override
public void onFunctionRemoved(final FunctionMetadata function) {

}

@Override
public void onFunctionChanged(final FunctionMetadata current, final FunctionMetadata previous) {

}

@Override
public void onAggregateAdded(final AggregateMetadata aggregate) {

}

@Override
public void onAggregateRemoved(final AggregateMetadata aggregate) {

}

@Override
public void onAggregateChanged(final AggregateMetadata current, final AggregateMetadata previous) {

}

@Override
public void onMaterializedViewAdded(final MaterializedViewMetadata view) {

}

@Override
public void onMaterializedViewRemoved(final MaterializedViewMetadata view) {

}

@Override
public void onMaterializedViewChanged(final MaterializedViewMetadata current, final MaterializedViewMetadata previous) {

}

@Override
public void onRegister(final Cluster cluster) {

}

@Override
public void onUnregister(final Cluster cluster) {

}
}
102 changes: 37 additions & 65 deletions src/main/java/io/debezium/connector/cassandra/SchemaHolder.java
Expand Up @@ -5,8 +5,10 @@
*/
package io.debezium.connector.cassandra;

import java.util.HashMap;
import java.util.HashSet;
import static java.util.stream.Collectors.toList;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -19,6 +21,7 @@
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.TableMetadata;

import io.debezium.connector.SourceInfoStructMaker;
Expand All @@ -31,49 +34,60 @@
* by {@link SchemaProcessor} periodically.
*/
public class SchemaHolder {

private static final String NAMESPACE = "io.debezium.connector.cassandra";
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class);

private final Map<KeyspaceTable, KeyValueSchema> tableToKVSchemaMap = new ConcurrentHashMap<>();
public final Map<KeyspaceTable, KeyValueSchema> tableToKVSchemaMap = new ConcurrentHashMap<>();

public final String kafkaTopicPrefix;
public final SourceInfoStructMaker sourceInfoStructMaker;
private final CassandraClient cassandraClient;
private final String kafkaTopicPrefix;
private final SourceInfoStructMaker sourceInfoStructMaker;

public SchemaHolder(CassandraClient cassandraClient, String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) {
this.cassandraClient = cassandraClient;
this.kafkaTopicPrefix = kafkaTopicPrefix;
this.sourceInfoStructMaker = sourceInfoStructMaker;
refreshSchemas();
}

public void refreshSchemas() {
LOGGER.info("Refreshing schemas...");
Map<KeyspaceTable, TableMetadata> latest = getLatestTableMetadatas();
removeDeletedTableSchemas(latest);
createOrUpdateNewTableSchemas(latest);
LOGGER.info("Schemas are refreshed");
}

public KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) {
if (!tableToKVSchemaMap.containsKey(kt)) {
refreshSchema(kt);
}
public synchronized KeyValueSchema getKeyValueSchema(KeyspaceTable kt) {
return tableToKVSchemaMap.getOrDefault(kt, null);
}

public Set<TableMetadata> getCdcEnabledTableMetadataSet() {
return tableToKVSchemaMap.values().stream()
.map(KeyValueSchema::tableMetadata)
.filter(tm -> tm.getOptions().isCDC())
return cassandraClient
.getCluster()
.getMetadata()
.getKeyspaces()
.stream()
.map(KeyspaceMetadata::getTables)
.flatMap(Collection::stream)
.filter(t -> t.getOptions().isCDC())
.collect(Collectors.toSet());
}

public synchronized void removeSchemasOfAllTablesInKeyspace(String keyspace) {
final List<KeyspaceTable> collect = tableToKVSchemaMap.keySet()
.stream()
.filter(keyValueSchema -> keyValueSchema.keyspace.equals(keyspace))
.collect(toList());

collect.forEach(tableToKVSchemaMap::remove);
}

public synchronized void removeTableSchema(KeyspaceTable kst) {
tableToKVSchemaMap.remove(kst);
}

// there is not "addKeyspace", it is not necessary
// as we will ever add a concrete table (with keyspace) but we will also dropping all tables when keyspace is dropped
public synchronized void addOrUpdateTableSchema(KeyspaceTable kst, KeyValueSchema kvs) {
tableToKVSchemaMap.put(kst, kvs);
}

/**
* Get the schema of an inner field based on the field name
* @param fieldName the name of the field in the schema
* @param schema the schema where the field resides in
* @param schema the schema where the field resides in
* @return Schema
*/
public static Schema getFieldSchema(String fieldName, Schema schema) {
Expand All @@ -83,48 +97,6 @@ public static Schema getFieldSchema(String fieldName, Schema schema) {
throw new CassandraConnectorSchemaException("Only STRUCT type is supported for this method, but encountered " + schema.type());
}

private void refreshSchema(KeyspaceTable keyspaceTable) {
LOGGER.debug("Refreshing schema for {}", keyspaceTable);
TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null;
TableMetadata latest = cassandraClient.getCdcEnabledTableMetadata(keyspaceTable.keyspace, keyspaceTable.table);
if (existing != latest) {
if (existing == null) {
tableToKVSchemaMap.put(keyspaceTable, new KeyValueSchema(kafkaTopicPrefix, latest, sourceInfoStructMaker));
LOGGER.debug("Updated schema for {}", keyspaceTable);
}
if (latest == null) {
tableToKVSchemaMap.remove(keyspaceTable);
LOGGER.debug("Removed schema for {}", keyspaceTable);
}
}
}

private Map<KeyspaceTable, TableMetadata> getLatestTableMetadatas() {
Map<KeyspaceTable, TableMetadata> latest = new HashMap<>();
for (TableMetadata tm : cassandraClient.getCdcEnabledTableMetadataList()) {
latest.put(new KeyspaceTable(tm), tm);
}
return latest;
}

private void removeDeletedTableSchemas(Map<KeyspaceTable, TableMetadata> latestTableMetadataMap) {
Set<KeyspaceTable> existingTables = new HashSet<>(tableToKVSchemaMap.keySet());
Set<KeyspaceTable> latestTables = latestTableMetadataMap.keySet();
existingTables.removeAll(latestTables);
tableToKVSchemaMap.keySet().removeAll(existingTables);
}

private void createOrUpdateNewTableSchemas(Map<KeyspaceTable, TableMetadata> latestTableMetadataMap) {
latestTableMetadataMap.forEach((table, metadata) -> {
TableMetadata existingTableMetadata = tableToKVSchemaMap.containsKey(table) ? tableToKVSchemaMap.get(table).tableMetadata() : null;
if (existingTableMetadata == null || !existingTableMetadata.equals(metadata)) {
KeyValueSchema keyValueSchema = new KeyValueSchema(kafkaTopicPrefix, metadata, sourceInfoStructMaker);
tableToKVSchemaMap.put(table, keyValueSchema);
LOGGER.info("Updated schema for {}.", table);
}
});
}

public static class KeyValueSchema {
private final TableMetadata tableMetadata;
private final Schema keySchema;
Expand Down

0 comments on commit 7069d49

Please sign in to comment.