Skip to content
Permalink
Browse files
Add information info whether sstables are dropped or not to SchemaCha…
…ngeListener

patch by Jacek Lewandowski; reviewed by Alex Petrov for CASSANDRA-17582
  • Loading branch information
jacek-lewandowski authored and ekaterinadimitrova2 committed Apr 29, 2022
1 parent e4e19e3 commit 458bfd16c7ec759705f920e7ef9a8f2bb5a3f4b5
Showing 11 changed files with 150 additions and 28 deletions.
@@ -1,4 +1,5 @@
4.1
* Add information whether sstables are dropped to SchemaChangeListener (CASSANDRA-17582)
* Add a pluggable memtable API (CEP-11 / CASSANDRA-17034)
* Save sstable id as string in activity table (CASSANDRA-17585)
* Implement startup check to prevent Cassandra to potentially spread zombie data (CASSANDRA-17180)
@@ -30,15 +30,15 @@
public class AuthSchemaChangeListener implements SchemaChangeListener
{
@Override
public void onDropKeyspace(KeyspaceMetadata keyspace)
public void onDropKeyspace(KeyspaceMetadata keyspace, boolean dropData)
{
DatabaseDescriptor.getAuthorizer().revokeAllOn(DataResource.keyspace(keyspace.name));
DatabaseDescriptor.getAuthorizer().revokeAllOn(DataResource.allTables(keyspace.name));
DatabaseDescriptor.getAuthorizer().revokeAllOn(FunctionResource.keyspace(keyspace.name));
}

@Override
public void onDropTable(TableMetadata table)
public void onDropTable(TableMetadata table, boolean dropData)
{
DatabaseDescriptor.getAuthorizer().revokeAllOn(DataResource.table(table.keyspace, table.name));
}
@@ -1054,14 +1054,14 @@ public void onAlterAggregate(UDAggregate before, UDAggregate after)
}

@Override
public void onDropKeyspace(KeyspaceMetadata keyspace)
public void onDropKeyspace(KeyspaceMetadata keyspace, boolean dropData)
{
logger.trace("Keyspace {} was dropped, invalidating related prepared statements", keyspace.name);
removeInvalidPreparedStatements(keyspace.name, null);
}

@Override
public void onDropTable(TableMetadata table)
public void onDropTable(TableMetadata table, boolean dropData)
{
logger.trace("Table {}.{} was dropped, invalidating related prepared statements", table.keyspace, table.name);
removeInvalidPreparedStatements(table.keyspace, table.name);
@@ -178,7 +178,7 @@ private static long estimateMeanPartitionSize(Collection<SSTableReader> sstables
}

@Override
public void onDropTable(TableMetadata table)
public void onDropTable(TableMetadata table, boolean dropData)
{
SystemKeyspace.clearEstimates(table.keyspace, table.name);
}
@@ -672,7 +672,7 @@ private void alterKeyspace(KeyspaceDiff delta, boolean dropData)
Keyspace.open(delta.after.name, this, true).viewManager.reload(true);
}

schemaChangeNotifier.notifyKeyspaceAltered(delta);
schemaChangeNotifier.notifyKeyspaceAltered(delta, dropData);
SchemaDiagnostics.keyspaceAltered(this, delta);
}

@@ -721,7 +721,7 @@ private void dropKeyspace(KeyspaceMetadata keyspace, boolean dropData)
Keyspace.writeOrder.awaitNewBarrier();
}

schemaChangeNotifier.notifyKeyspaceDropped(keyspace);
schemaChangeNotifier.notifyKeyspaceDropped(keyspace, dropData);
SchemaDiagnostics.keyspaceDropped(this, keyspace);
}

@@ -83,17 +83,17 @@ default void onAlterAggregate(UDAggregate before, UDAggregate after)
{
}

default void onDropKeyspace(KeyspaceMetadata keyspace)
default void onDropKeyspace(KeyspaceMetadata keyspace, boolean dropData)
{
}

default void onDropTable(TableMetadata table)
default void onDropTable(TableMetadata table, boolean dropData)
{
}

default void onDropView(ViewMetadata view)
default void onDropView(ViewMetadata view, boolean dropData)
{
onDropTable(view.metadata);
onDropTable(view.metadata, dropData);
}

default void onDropType(UserType type)
@@ -56,13 +56,13 @@ public void notifyKeyspaceCreated(KeyspaceMetadata keyspace)
keyspace.functions.udas().forEach(this::notifyCreateAggregate);
}

public void notifyKeyspaceAltered(KeyspaceMetadata.KeyspaceDiff delta)
public void notifyKeyspaceAltered(KeyspaceMetadata.KeyspaceDiff delta, boolean dropData)
{
// notify on everything dropped
delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) uda));
delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) udf));
delta.views.dropped.forEach(this::notifyDropView);
delta.tables.dropped.forEach(this::notifyDropTable);
delta.views.dropped.forEach(view -> notifyDropView(view, dropData));
delta.tables.dropped.forEach(metadata -> notifyDropTable(metadata, dropData));
delta.types.dropped.forEach(this::notifyDropType);

// notify on everything created
@@ -82,14 +82,14 @@ public void notifyKeyspaceAltered(KeyspaceMetadata.KeyspaceDiff delta)
delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, diff.after));
}

public void notifyKeyspaceDropped(KeyspaceMetadata keyspace)
public void notifyKeyspaceDropped(KeyspaceMetadata keyspace, boolean dropData)
{
keyspace.functions.udas().forEach(this::notifyDropAggregate);
keyspace.functions.udfs().forEach(this::notifyDropFunction);
keyspace.views.forEach(this::notifyDropView);
keyspace.tables.forEach(this::notifyDropTable);
keyspace.views.forEach(view -> notifyDropView(view, dropData));
keyspace.tables.forEach(metadata -> notifyDropTable(metadata, dropData));
keyspace.types.forEach(this::notifyDropType);
notifyDropKeyspace(keyspace);
notifyDropKeyspace(keyspace, dropData);
}

public void notifyPreChanges(SchemaTransformationResult transformationResult)
@@ -175,19 +175,19 @@ private void notifyAlterAggregate(UDAggregate before, UDAggregate after)
changeListeners.forEach(l -> l.onAlterAggregate(before, after));
}

private void notifyDropKeyspace(KeyspaceMetadata ksm)
private void notifyDropKeyspace(KeyspaceMetadata ksm, boolean dropData)
{
changeListeners.forEach(l -> l.onDropKeyspace(ksm));
changeListeners.forEach(l -> l.onDropKeyspace(ksm, dropData));
}

private void notifyDropTable(TableMetadata metadata)
private void notifyDropTable(TableMetadata metadata, boolean dropData)
{
changeListeners.forEach(l -> l.onDropTable(metadata));
changeListeners.forEach(l -> l.onDropTable(metadata, dropData));
}

private void notifyDropView(ViewMetadata view)
private void notifyDropView(ViewMetadata view, boolean dropData)
{
changeListeners.forEach(l -> l.onDropView(view));
changeListeners.forEach(l -> l.onDropView(view, dropData));
}

private void notifyDropType(UserType ut)
@@ -31,7 +31,7 @@
*/
public class SchemaUpdateHandlerFactoryProvider implements Provider<SchemaUpdateHandlerFactory>
{
private static final String SUH_FACTORY_CLASS_PROPERTY = "cassandra.schema.update_handler_factory.class";
public static final String SUH_FACTORY_CLASS_PROPERTY = "cassandra.schema.update_handler_factory.class";

public final static SchemaUpdateHandlerFactoryProvider instance = new SchemaUpdateHandlerFactoryProvider();

@@ -541,13 +541,13 @@ public void onAlterAggregate(UDAggregate before, UDAggregate after)
}

@Override
public void onDropKeyspace(KeyspaceMetadata keyspace)
public void onDropKeyspace(KeyspaceMetadata keyspace, boolean dropData)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, keyspace.name));
}

@Override
public void onDropTable(TableMetadata table)
public void onDropTable(TableMetadata table, boolean dropData)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, table.keyspace, table.name));
}
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.schema;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.SchemaTransformation.SchemaTransformationResult;
import org.mockito.Mockito;

import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.assertj.core.api.Assertions.assertThat;

public class RemoveWithoutDroppingTest
{
static volatile boolean dropDataOverride = true;

static final SchemaChangeListener listener = Mockito.mock(SchemaChangeListener.class);

@BeforeClass
public static void beforeClass()
{
System.setProperty(SchemaUpdateHandlerFactoryProvider.SUH_FACTORY_CLASS_PROPERTY, TestSchemaUpdateHandlerFactory.class.getName());
CQLTester.prepareServer();
Schema.instance.registerListener(listener);
}

@Before
public void before()
{
Mockito.reset(listener);
}

public static void callbackOverride(BiConsumer<SchemaTransformationResult, Boolean> updateSchemaCallback, SchemaTransformationResult result, boolean dropData)
{
updateSchemaCallback.accept(result, dropDataOverride);
}

public static class TestSchemaUpdateHandlerFactory implements SchemaUpdateHandlerFactory
{
@Override
public SchemaUpdateHandler getSchemaUpdateHandler(boolean online, BiConsumer<SchemaTransformationResult, Boolean> updateSchemaCallback)
{
return online
? new DefaultSchemaUpdateHandler((result, dropData) -> callbackOverride(updateSchemaCallback, result, dropData))
: new OfflineSchemaUpdateHandler((result, dropData) -> callbackOverride(updateSchemaCallback, result, dropData));
}
}

private void testRemoveKeyspace(String ks, String tab, boolean expectDropped) throws Throwable
{
executeInternal(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", ks));
executeInternal(String.format("CREATE TABLE %s.%s (id INT PRIMARY KEY, v INT)", ks, tab));
executeInternal(String.format("INSERT INTO %s.%s (id, v) VALUES (?, ?)", ks, tab), 1, 2);
executeInternal(String.format("INSERT INTO %s.%s (id, v) VALUES (?, ?)", ks, tab), 3, 4);
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(ks, tab);
cfs.forceFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS).get();

KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ks);
TableMetadata tm = Schema.instance.getTableMetadata(ks, tab);

List<File> directories = cfs.getDirectories().getCFDirectories();
Set<File> filesBefore = directories.stream().flatMap(d -> Arrays.stream(d.tryList(f -> !f.isDirectory()))).collect(Collectors.toSet());
assertThat(filesBefore).isNotEmpty();

executeInternal(String.format("DROP KEYSPACE %s", ks));

Set<File> filesAfter = directories.stream().flatMap(d -> Arrays.stream(d.tryList(f -> !f.isDirectory()))).collect(Collectors.toSet());
if (expectDropped)
assertThat(filesAfter).isEmpty();
else
assertThat(filesAfter).hasSameElementsAs(filesBefore);

Mockito.verify(listener).onDropTable(tm, expectDropped);
Mockito.verify(listener).onDropKeyspace(ksm, expectDropped);
}

@Test
public void testRemoveWithoutDropping() throws Throwable
{
dropDataOverride = false;
String ks = "test_remove_without_dropping";
String tab = "test_table";
testRemoveKeyspace(ks, tab, false);
}

@Test
public void testRemoveWithDropping() throws Throwable
{
dropDataOverride = true;
String ks = "test_remove_with_dropping";
String tab = "test_table";
testRemoveKeyspace(ks, tab, true);
}
}
@@ -88,7 +88,7 @@ public void testNotifications() throws Exception
notifier.onLeaveCluster(broadcastAddress);
notifier.onCreateKeyspace(ks);
notifier.onAlterKeyspace(ks, ks);
notifier.onDropKeyspace(ks);
notifier.onDropKeyspace(ks, true);

handler.assertNextEvent(Event.StatusChange.nodeUp(nativeAddress));
handler.assertNextEvent(Event.StatusChange.nodeDown(nativeAddress));

0 comments on commit 458bfd1

Please sign in to comment.