From be574fc34ba9834929f1618ab63dd74446cd2683 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Thu, 9 Mar 2023 09:53:58 -0600 Subject: [PATCH] Fix default file system error handler for disk_failure_policy die Patch by Runtian Liu; reviewed by brandonwilliams and smiklosovic for CASSANDRA-18294 --- CHANGES.txt | 1 + .../service/DefaultFSErrorHandler.java | 2 + ...> JVMStabilityInspectorThrowableTest.java} | 72 ++++++++-- .../service/DefaultFSErrorHandlerTest.java | 121 ++++++++++++++++ .../service/DiskFailurePolicyTest.java | 135 ++++++++++++++++++ .../cassandra/utils/KillerForTests.java | 5 + 6 files changed, 321 insertions(+), 15 deletions(-) rename test/distributed/org/apache/cassandra/distributed/test/{JVMStabilityInspectorCorruptSSTableExceptionTest.java => JVMStabilityInspectorThrowableTest.java} (70%) create mode 100644 test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java create mode 100644 test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 7994824ad39e..aa1e60427a14 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.29 + * Fix default file system error handler for disk_failure_policy die (CASSANDRA-18294) * Introduce check for names of test classes (CASSANDRA-17964) * Suppress CVE-2022-41915 (CASSANDRA-18147) * Suppress CVE-2021-1471, CVE-2021-3064, CVE-2021-4235 (CASSANDRA-18149) diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java index 1c81f65e44a1..00029e3f407f 100644 --- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java +++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java @@ -44,6 +44,7 @@ public void handleCorruptSSTable(CorruptSSTableException e) switch (DatabaseDescriptor.getDiskFailurePolicy()) { + case die: case stop_paranoid: // exception not logged here on purpose as it is already logged logger.error("Stopping transports as disk_failure_policy is " + DatabaseDescriptor.getDiskFailurePolicy()); @@ -60,6 +61,7 @@ public void handleFSError(FSError e) switch (DatabaseDescriptor.getDiskFailurePolicy()) { + case die: case stop_paranoid: case stop: // exception not logged here on purpose as it is already logged diff --git a/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java b/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java similarity index 70% rename from test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java rename to test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java index 98ca496ffcd8..d7aeccaf6ffc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java @@ -19,11 +19,15 @@ package org.apache.cassandra.distributed.test; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.cassandra.config.Config.DiskFailurePolicy; import org.apache.cassandra.db.ColumnFamilyStore; @@ -39,6 +43,8 @@ import org.apache.cassandra.distributed.shared.AbstractBuilder; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -51,21 +57,46 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; -public class JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseImpl +@RunWith(Parameterized.class) +public class JVMStabilityInspectorThrowableTest extends TestBaseImpl { - @Test - public void testAbstractLocalAwareExecutorServiceOnIgnoredDiskFailurePolicy() throws Exception + private DiskFailurePolicy testPolicy; + private boolean testCorrupted; + private boolean expectNativeTransportRunning;; + private boolean expectGossiperEnabled; + + public JVMStabilityInspectorThrowableTest(DiskFailurePolicy policy, boolean testCorrupted, + boolean expectNativeTransportRunning, boolean expectGossiperEnabled) + { + this.testPolicy = policy; + this.testCorrupted = testCorrupted; + this.expectNativeTransportRunning = expectNativeTransportRunning; + this.expectGossiperEnabled = expectGossiperEnabled; + } + + @Parameterized.Parameters + public static Collection generateData() { - test(DiskFailurePolicy.ignore, true, true); + return Arrays.asList(new Object[][]{ + { DiskFailurePolicy.ignore, true, true, true}, + { DiskFailurePolicy.stop, true, true, true}, + { DiskFailurePolicy.stop_paranoid, true, false, false}, + { DiskFailurePolicy.best_effort, true, true, true}, + { DiskFailurePolicy.ignore, false, true, true}, + { DiskFailurePolicy.stop, false, false, false}, + { DiskFailurePolicy.stop_paranoid, false, false, false}, + { DiskFailurePolicy.best_effort, false, true, true} + } + ); } @Test - public void testAbstractLocalAwareExecutorServiceOnStopParanoidDiskFailurePolicy() throws Exception + public void testAbstractLocalAwareExecutorServiceOnPolicies() throws Exception { - test(DiskFailurePolicy.stop_paranoid, false, false); + test(testPolicy, testCorrupted, expectNativeTransportRunning, expectGossiperEnabled); } - private static void test(DiskFailurePolicy policy, boolean expectNativeTransportRunning, boolean expectGossiperEnabled) throws Exception + private static void test(DiskFailurePolicy policy, boolean shouldTestCorrupted, boolean expectNativeTransportRunning, boolean expectGossiperEnabled) throws Exception { String table = policy.name(); try (final Cluster cluster = init(getCluster(policy).start())) @@ -84,16 +115,16 @@ private static void test(DiskFailurePolicy policy, boolean expectNativeTransport cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + table + " (id bigint PRIMARY KEY)"); node.executeInternal("INSERT INTO " + KEYSPACE + "." + table + " (id) VALUES (?)", 0L); - corruptTable(node, KEYSPACE, table); + throwThrowable(node, KEYSPACE, table, shouldTestCorrupted); try { cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + '.' + table + " WHERE id=?", ConsistencyLevel.ONE, 0L); - Assert.fail("Select should fail as we corrupted SSTable on purpose."); + Assert.fail("Select should fail as we expect corrupted sstable or FS error."); } catch (final Exception ex) { - // we expect that above query fails as we corrupted an sstable + // we expect that above query fails as we corrupted an sstable or throw FS error when read } waitForStop(!expectGossiperEnabled, node, new SerializableCallable() @@ -154,7 +185,7 @@ private static void waitForStop(boolean shouldWaitForStop, } } - private static void corruptTable(IInvokableInstance node, String keyspace, String table) + private static void throwThrowable(IInvokableInstance node, String keyspace, String table, boolean shouldTestCorrupted) { node.runOnInstance(() -> { ColumnFamilyStore cf = Keyspace.open(keyspace).getColumnFamilyStore(table); @@ -163,7 +194,7 @@ private static void corruptTable(IInvokableInstance node, String keyspace, Strin Set remove = cf.getLiveSSTables(); Set replace = new HashSet<>(); for (SSTableReader r : remove) - replace.add(new CorruptedSSTableReader(r)); + replace.add(new CorruptedSSTableReader(r, shouldTestCorrupted)); cf.getTracker().removeUnsafe(remove); cf.addSSTables(replace); @@ -180,26 +211,37 @@ private static AbstractBuilder get private static final class CorruptedSSTableReader extends ForwardingSSTableReader { - public CorruptedSSTableReader(SSTableReader delegate) + private boolean shouldThrowCorrupted; + public CorruptedSSTableReader(SSTableReader delegate, boolean shouldThrowCorrupted) { super(delegate); + this.shouldThrowCorrupted = shouldThrowCorrupted; } @Override public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, SSTableReadsListener listener) { - throw throwCorrupted(); + if (shouldThrowCorrupted) + throw throwCorrupted(); + throw throwFSError(); } @Override public SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) { - throw throwCorrupted(); + if (shouldThrowCorrupted) + throw throwCorrupted(); + throw throwFSError(); } private CorruptSSTableException throwCorrupted() { throw new CorruptSSTableException(new IOException("failed to get position"), descriptor.baseFilename()); } + + private FSError throwFSError() + { + throw new FSReadError(new IOException("failed to get position"), descriptor.baseFilename()); + } } } \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java new file mode 100644 index 000000000000..f6ed9da7ed28 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.FSErrorHandler; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class DefaultFSErrorHandlerTest +{ + private FSErrorHandler handler = new DefaultFSErrorHandler(); + Config.DiskFailurePolicy oldDiskPolicy; + Config.DiskFailurePolicy testDiskPolicy; + private boolean gossipRunningFSError; + private boolean gossipRunningCorruptedSStableException; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise FS error will kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + StorageService.instance.initServer(); + } + + @AfterClass + public static void shutdown() + { + StorageService.instance.stopClient(); + } + + @Before + public void setup() + { + StorageService.instance.startGossiping(); + assertTrue(Gossiper.instance.isEnabled()); + oldDiskPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + } + + public DefaultFSErrorHandlerTest(Config.DiskFailurePolicy policy, + boolean gossipRunningFSError, + boolean gossipRunningCorruptedSStableException) + { + this.testDiskPolicy = policy; + this.gossipRunningFSError = gossipRunningFSError; + this.gossipRunningCorruptedSStableException = gossipRunningCorruptedSStableException; + } + + @Parameterized.Parameters + public static Collection generateData() + { + return Arrays.asList(new Object[][]{ + { Config.DiskFailurePolicy.die, false, false}, + { Config.DiskFailurePolicy.ignore, true, true}, + { Config.DiskFailurePolicy.stop, false, true}, + { Config.DiskFailurePolicy.stop_paranoid, false, false}, + { Config.DiskFailurePolicy.best_effort, true, true} + } + ); + } + + @After + public void teardown() + { + DatabaseDescriptor.setDiskFailurePolicy(oldDiskPolicy); + } + + @Test + public void testFSErrors() + { + DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy); + handler.handleFSError(new FSReadError(new IOException(), "blah")); + assertEquals(gossipRunningFSError, Gossiper.instance.isEnabled()); + } + + @Test + public void testCorruptSSTableException() + { + DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy); + handler.handleCorruptSSTable(new CorruptSSTableException(new IOException(), "blah")); + assertEquals(gossipRunningCorruptedSStableException, Gossiper.instance.isEnabled()); + } +} diff --git a/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java new file mode 100644 index 000000000000..90e85e9cc009 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.Config.DiskFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; + +@RunWith(Parameterized.class) +public class DiskFailurePolicyTest +{ + DiskFailurePolicy originalDiskFailurePolicy; + JVMStabilityInspector.Killer originalKiller; + KillerForTests killerForTests; + DiskFailurePolicy testPolicy; + boolean isStartUpInProgress; + Throwable t; + boolean expectGossipRunning; + boolean expectJVMKilled; + boolean expectJVMKilledQuiet; + + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + StorageService.instance.initServer(); + FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + } + + public DiskFailurePolicyTest(DiskFailurePolicy testPolicy, boolean isStartUpInProgress, Throwable t, + boolean expectGossipRunning, boolean jvmKilled, boolean jvmKilledQuiet) + { + this.testPolicy = testPolicy; + this.isStartUpInProgress = isStartUpInProgress; + this.t = t; + this.expectGossipRunning = expectGossipRunning; + this.expectJVMKilled = jvmKilled; + this.expectJVMKilledQuiet = jvmKilledQuiet; + } + + @Parameterized.Parameters + public static Collection generateData() + { + return Arrays.asList(new Object[][]{ + { Config.DiskFailurePolicy.die, true, new FSReadError(new IOException(), "blah"), false, true, true}, + { DiskFailurePolicy.ignore, true, new FSReadError(new IOException(), "blah"), true, false, false}, + { DiskFailurePolicy.stop, true, new FSReadError(new IOException(), "blah"), false, true, true}, + { DiskFailurePolicy.stop_paranoid, true, new FSReadError(new IOException(), "blah"), false, true, true}, + { Config.DiskFailurePolicy.die, true, new CorruptSSTableException(new IOException(), "blah"), false, true, true}, + { DiskFailurePolicy.ignore, true, new CorruptSSTableException(new IOException(), "blah"), true, false, false}, + { DiskFailurePolicy.stop, true, new CorruptSSTableException(new IOException(), "blah"), false, true, true}, + { DiskFailurePolicy.stop_paranoid, true, new CorruptSSTableException(new IOException(), "blah"), false, true, true}, + { Config.DiskFailurePolicy.die, false, new FSReadError(new IOException(), "blah"), false, true, false}, + { DiskFailurePolicy.ignore, false, new FSReadError(new IOException(), "blah"), true, false, false}, + { DiskFailurePolicy.stop, false, new FSReadError(new IOException(), "blah"), false, false, false}, + { DiskFailurePolicy.stop_paranoid, false, new FSReadError(new IOException(), "blah"), false, false, false}, + { Config.DiskFailurePolicy.die, false, new CorruptSSTableException(new IOException(), "blah"), false, true, false}, + { DiskFailurePolicy.ignore, false, new CorruptSSTableException(new IOException(), "blah"), true, false, false}, + { DiskFailurePolicy.stop, false, new CorruptSSTableException(new IOException(), "blah"), true, false, false}, + { DiskFailurePolicy.stop_paranoid, false, new CorruptSSTableException(new IOException(), "blah"), false, false, false} + } + ); + } + + @Before + public void setup() + { + CassandraDaemon daemon = new CassandraDaemon(); + if (!isStartUpInProgress) + daemon.completeSetup(); //mark startup completed + StorageService.instance.registerDaemon(daemon); + killerForTests = new KillerForTests(); + originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + originalDiskFailurePolicy = DatabaseDescriptor.getDiskFailurePolicy(); + StorageService.instance.startGossiping(); + Assert.assertTrue(Gossiper.instance.isEnabled()); + } + + @After + public void teardown() + { + JVMStabilityInspector.replaceKiller(originalKiller); + DatabaseDescriptor.setDiskFailurePolicy(originalDiskFailurePolicy); + } + + @Test + public void testPolicies() + { + DatabaseDescriptor.setDiskFailurePolicy(testPolicy); + JVMStabilityInspector.inspectThrowable(t); + Assert.assertEquals(expectJVMKilled, killerForTests.wasKilled()); + Assert.assertEquals(expectJVMKilledQuiet, killerForTests.wasKilledQuietly()); + if (!expectJVMKilled) { + // only verify gossip if JVM is not killed + Assert.assertEquals(expectGossipRunning, Gossiper.instance.isEnabled()); + } + } +} diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java b/test/unit/org/apache/cassandra/utils/KillerForTests.java index abc795232267..ad3a27436e8a 100644 --- a/test/unit/org/apache/cassandra/utils/KillerForTests.java +++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java @@ -29,6 +29,11 @@ public class KillerForTests extends JVMStabilityInspector.Killer @Override protected void killCurrentJVM(Throwable t, boolean quiet) { + if (killed) + { + // Can only be killed once + return; + } this.killed = true; this.quiet = quiet; }