Skip to content
Permalink
Browse files
Fix materialized view schema backup as table
patch by Zhao Yang, Ekaterina Dimitrova; reviewed by Benjamin Lerer, Ekaterina Dimitrova for CASSANDRA-12734

Co-authored-by: Zhao Yang <zhao.yang@datastax.com>
Co-authored-by: Ekaterina Dimitrova <ekaterina.dimitrova@datastax.com>
  • Loading branch information
ekaterinadimitrova2 and zhaoyang1991 committed Sep 3, 2021
1 parent e4b37c3 commit 67eb22ec9d588c9f984d13c0ffd703a14181f775
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 31 deletions.
@@ -1,4 +1,5 @@
3.0.26:
* Fix materialized view schema backup as table (CASSANDRA-12734)
* Avoid signaling DigestResolver until the minimum number of responses are guaranteed to be visible (CASSANDRA-16883)
* Fix secondary indexes on primary key columns skipping some writes (CASSANDRA-16868)
* Fix incorrect error message in LegacyLayout (CASSANDRA-15136)
@@ -758,17 +758,18 @@ public void dropView(String ksName, String viewName)
// make sure all the indexes are dropped, or else.
cfs.indexManager.markAllIndexesRemoved();

CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfs.metadata), true);

if (DatabaseDescriptor.isAutoSnapshot())
cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));

// reinitialize the keyspace.
ViewDefinition view = oldKsm.views.get(viewName).get();
KeyspaceMetadata newKsm = oldKsm.withSwapped(oldKsm.views.without(viewName));

unload(view);
setKeyspaceMetadata(newKsm);

CompactionManager.instance.interruptCompactionFor(Collections.singleton(view.metadata), true);

if (DatabaseDescriptor.isAutoSnapshot())
cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
Keyspace.open(ksName).dropCf(view.metadata.cfId);
Keyspace.open(ksName).viewManager.reload();
MigrationManager.instance.notifyDropView(view);
@@ -103,6 +103,115 @@ public static String getCFMetadataAsCQL(CFMetaData metadata, boolean includeDrop
sb.append("\n(this should not be used to reproduce this schema)\n\n");
}

List<ColumnDefinition> clusteringColumns = getClusteringColumns(metadata);

if (metadata.isView())
{
sb.append(getViewMetadataAsCQL(metadata, includeDroppedColumns));
}
else
{
sb.append(getBaseTableMetadataAsCQL(metadata, includeDroppedColumns));
}

sb.append("WITH ");

sb.append("ID = ").append(metadata.cfId).append("\n\tAND ");

if (metadata.isCompactTable())
sb.append("COMPACT STORAGE\n\tAND ");

if (clusteringColumns.size() > 0)
{
sb.append("CLUSTERING ORDER BY (");

Consumer<StringBuilder> cOrderCommaAppender = commaAppender(" ");
for (ColumnDefinition cd : clusteringColumns)
{
cOrderCommaAppender.accept(sb);
sb.append(cd.name.toCQLString()).append(' ').append(cd.clusteringOrder().toString());
}
sb.append(")\n\tAND ");
}

sb.append(toCQL(metadata.params));
sb.append(";");

if (!isCqlCompatible(metadata))
{
sb.append("\n*/");
}
return sb.toString();
}

private static String getViewMetadataAsCQL(CFMetaData metadata, boolean includeDroppedColumns)
{
assert metadata.isView();
KeyspaceMetadata keyspaceMetadata = Schema.instance.getKSMetaData(metadata.ksName);
assert keyspaceMetadata != null;
ViewDefinition viewMetadata = keyspaceMetadata.views.get(metadata.cfName).orElse(null);
assert viewMetadata != null;

List<ColumnDefinition> partitionKeyColumns = metadata.partitionKeyColumns();
List<ColumnDefinition> clusteringColumns = getClusteringColumns(metadata);

StringBuilder sb = new StringBuilder();
sb.append(String.format("CREATE MATERIALIZED VIEW IF NOT EXISTS %s.%s AS SELECT ",
ColumnIdentifier.maybeQuote(metadata.ksName),
ColumnIdentifier.maybeQuote(metadata.cfName)));

if (viewMetadata.includeAllColumns)
{
sb.append("*");
}
else
{
boolean isFirst = true;
List<ColumnDefinition> columns = new ArrayList<>(metadata.allColumns());
columns.sort(Comparator.comparing((c -> c.name.toString())));
for (ColumnDefinition column : columns) {
if (!isFirst)
sb.append(", ");
sb.append(column.name.toString());
isFirst = false;
}
}
sb.append(" FROM ").append(viewMetadata.ksName).append(".").append(viewMetadata.baseTableName).append("\n\t");
sb.append("WHERE ").append(viewMetadata.whereClause).append("\n\t");

if (clusteringColumns.size() > 0 || partitionKeyColumns.size() > 1)
{
sb.append("PRIMARY KEY (");
if (partitionKeyColumns.size() > 1)
{
sb.append("(");
Consumer<StringBuilder> pkCommaAppender = commaAppender(" ");
for (ColumnDefinition cfd : partitionKeyColumns)
{
pkCommaAppender.accept(sb);
sb.append(cfd.name.toCQLString());
}
sb.append(")");
}
else
{
sb.append(partitionKeyColumns.get(0).name.toCQLString());
}

for (ColumnDefinition cfd : metadata.clusteringColumns())
sb.append(", ").append(cfd.name.toCQLString());

sb.append(')');
}
sb.append("\n\t");
return sb.toString();
}

@VisibleForTesting
public static String getBaseTableMetadataAsCQL(CFMetaData metadata, boolean includeDroppedColumns)
{
StringBuilder sb = new StringBuilder();

sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(quoteIdentifier(metadata.ksName)).append('.').append(quoteIdentifier(metadata.cfName)).append(" (");

@@ -172,33 +281,7 @@ public static String getCFMetadataAsCQL(CFMetaData metadata, boolean includeDrop
sb.append(')');
}
sb.append(")\n\t");
sb.append("WITH ");

sb.append("ID = ").append(metadata.cfId).append("\n\tAND ");

if (metadata.isCompactTable())
sb.append("COMPACT STORAGE\n\tAND ");

if (clusteringColumns.size() > 0)
{
sb.append("CLUSTERING ORDER BY (");

Consumer<StringBuilder> cOrderCommaAppender = commaAppender(" ");
for (ColumnDefinition cd : clusteringColumns)
{
cOrderCommaAppender.accept(sb);
sb.append(quoteIdentifier(cd.name.toString())).append(' ').append(cd.clusteringOrder().toString());
}
sb.append(")\n\tAND ");
}

sb.append(toCQL(metadata.params));
sb.append(";");

if (!isCqlCompatible(metadata))
{
sb.append("\n*/");
}
return sb.toString();
}

@@ -38,6 +38,9 @@
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ColumnFamilyStoreCQLHelper;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.serializers.SimpleDateSerializer;
import org.apache.cassandra.serializers.TimeSerializer;
@@ -49,6 +52,8 @@

import com.datastax.driver.core.exceptions.InvalidQueryException;

import static org.junit.Assert.assertTrue;


public class ViewSchemaTest extends CQLTester
{
@@ -693,9 +698,83 @@ public void testViewTokenRestrictions() throws Throwable
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());

execute("INSERT into %s (a,b,c,d) VALUES (?,?,?,?)", 1,2,3,4);
execute("INSERT into %s (a,b,c,d) VALUES (?,?,?,?)", 1, 2, 3, 4);

assertInvalidThrowMessage("Cannot use token relation when defining a materialized view", InvalidRequestException.class,
"CREATE MATERIALIZED VIEW mv_test AS SELECT a,b,c FROM %s WHERE a IS NOT NULL and b IS NOT NULL and token(a) = token(1) PRIMARY KEY(b,a)");
}

@Test
public void testViewMetadataCQLNotIncludeAllColumn() throws Throwable
{
String createBase = "CREATE TABLE IF NOT EXISTS %s (" +
"pk1 int," +
"pk2 int," +
"ck1 int," +
"ck2 int," +
"reg1 int," +
"reg2 list<int>," +
"reg3 int," +
"PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " +
"CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);";

String createView = "CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS SELECT pk1,pk2,ck1,ck2,reg1,reg2 FROM %%s "
+ "WHERE pk2 IS NOT NULL AND pk1 IS NOT NULL AND ck2 IS NOT NULL AND ck1 IS NOT NULL PRIMARY KEY((pk2, pk1), ck2, ck1)";

String expectedViewSnapshot = "CREATE MATERIALIZED VIEW IF NOT EXISTS %s.%s AS SELECT ck1, ck2, pk1, pk2, reg1, reg2 FROM %s.%s\n" +
"\tWHERE pk2 IS NOT NULL AND pk1 IS NOT NULL AND ck2 IS NOT NULL AND ck1 IS NOT NULL\n" +
"\tPRIMARY KEY ((pk2, pk1), ck2, ck1)\n" +
"\tWITH ID = %s\n" +
"\tAND CLUSTERING ORDER BY (ck2 DESC, ck1 ASC)";

testViewMetadataCQL(createBase,
createView,
expectedViewSnapshot);
}

@Test
public void testViewMetadataCQLIncludeAllColumn() throws Throwable
{
String createBase = "CREATE TABLE IF NOT EXISTS %s (" +
"pk1 int," +
"pk2 int," +
"ck1 int," +
"ck2 int," +
"reg1 int," +
"reg2 list<int>," +
"reg3 int," +
"PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " +
"CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);";

String createView = "CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS SELECT * FROM %%s "
+ "WHERE pk2 IS NOT NULL AND pk1 IS NOT NULL AND ck2 IS NOT NULL AND ck1 IS NOT NULL PRIMARY KEY((pk2, pk1), ck2, ck1)";

String expectedViewSnapshot = "CREATE MATERIALIZED VIEW IF NOT EXISTS %s.%s AS SELECT * FROM %s.%s\n" +
"\tWHERE pk2 IS NOT NULL AND pk1 IS NOT NULL AND ck2 IS NOT NULL AND ck1 IS NOT NULL\n" +
"\tPRIMARY KEY ((pk2, pk1), ck2, ck1)\n" +
"\tWITH ID = %s\n" +
"\tAND CLUSTERING ORDER BY (ck2 DESC, ck1 ASC)";

testViewMetadataCQL(createBase,
createView,
expectedViewSnapshot);
}

private void testViewMetadataCQL(String createBase, String createView, String viewSnapshotSchema) throws Throwable
{
String base = createTable(createBase);

String view = "mv";
createView(view, createView);

ColumnFamilyStore mv = Keyspace.open(keyspace()).getColumnFamilyStore(view);

assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(mv.metadata, true)
.startsWith(String.format(viewSnapshotSchema,
keyspace(),
view,
keyspace(),
base,
mv.metadata.cfId)));
}
}

0 comments on commit 67eb22e

Please sign in to comment.