Skip to content

Commit

Permalink
Fix default file system error handler for disk_failure_policy die
Browse files Browse the repository at this point in the history
Patch by Runtian Liu; reviewed by brandonwilliams and smiklosovic for
CASSANDRA-18294
  • Loading branch information
driftx committed Mar 9, 2023
1 parent 5374f56 commit be574fc
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
3.0.29
* Fix default file system error handler for disk_failure_policy die (CASSANDRA-18294)
* Introduce check for names of test classes (CASSANDRA-17964)
* Suppress CVE-2022-41915 (CASSANDRA-18147)
* Suppress CVE-2021-1471, CVE-2021-3064, CVE-2021-4235 (CASSANDRA-18149)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void handleCorruptSSTable(CorruptSSTableException e)

switch (DatabaseDescriptor.getDiskFailurePolicy())
{
case die:
case stop_paranoid:
// exception not logged here on purpose as it is already logged
logger.error("Stopping transports as disk_failure_policy is " + DatabaseDescriptor.getDiskFailurePolicy());
Expand All @@ -60,6 +61,7 @@ public void handleFSError(FSError e)

switch (DatabaseDescriptor.getDiskFailurePolicy())
{
case die:
case stop_paranoid:
case stop:
// exception not logged here on purpose as it is already logged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package org.apache.cassandra.distributed.test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.cassandra.config.Config.DiskFailurePolicy;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand All @@ -39,6 +43,8 @@
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader;
Expand All @@ -51,21 +57,46 @@
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;

public class JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseImpl
@RunWith(Parameterized.class)
public class JVMStabilityInspectorThrowableTest extends TestBaseImpl
{
@Test
public void testAbstractLocalAwareExecutorServiceOnIgnoredDiskFailurePolicy() throws Exception
private DiskFailurePolicy testPolicy;
private boolean testCorrupted;
private boolean expectNativeTransportRunning;;
private boolean expectGossiperEnabled;

public JVMStabilityInspectorThrowableTest(DiskFailurePolicy policy, boolean testCorrupted,
boolean expectNativeTransportRunning, boolean expectGossiperEnabled)
{
this.testPolicy = policy;
this.testCorrupted = testCorrupted;
this.expectNativeTransportRunning = expectNativeTransportRunning;
this.expectGossiperEnabled = expectGossiperEnabled;
}

@Parameterized.Parameters
public static Collection<Object[]> generateData()
{
test(DiskFailurePolicy.ignore, true, true);
return Arrays.asList(new Object[][]{
{ DiskFailurePolicy.ignore, true, true, true},
{ DiskFailurePolicy.stop, true, true, true},
{ DiskFailurePolicy.stop_paranoid, true, false, false},
{ DiskFailurePolicy.best_effort, true, true, true},
{ DiskFailurePolicy.ignore, false, true, true},
{ DiskFailurePolicy.stop, false, false, false},
{ DiskFailurePolicy.stop_paranoid, false, false, false},
{ DiskFailurePolicy.best_effort, false, true, true}
}
);
}

@Test
public void testAbstractLocalAwareExecutorServiceOnStopParanoidDiskFailurePolicy() throws Exception
public void testAbstractLocalAwareExecutorServiceOnPolicies() throws Exception
{
test(DiskFailurePolicy.stop_paranoid, false, false);
test(testPolicy, testCorrupted, expectNativeTransportRunning, expectGossiperEnabled);
}

private static void test(DiskFailurePolicy policy, boolean expectNativeTransportRunning, boolean expectGossiperEnabled) throws Exception
private static void test(DiskFailurePolicy policy, boolean shouldTestCorrupted, boolean expectNativeTransportRunning, boolean expectGossiperEnabled) throws Exception
{
String table = policy.name();
try (final Cluster cluster = init(getCluster(policy).start()))
Expand All @@ -84,16 +115,16 @@ private static void test(DiskFailurePolicy policy, boolean expectNativeTransport

cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + table + " (id bigint PRIMARY KEY)");
node.executeInternal("INSERT INTO " + KEYSPACE + "." + table + " (id) VALUES (?)", 0L);
corruptTable(node, KEYSPACE, table);
throwThrowable(node, KEYSPACE, table, shouldTestCorrupted);

try
{
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + '.' + table + " WHERE id=?", ConsistencyLevel.ONE, 0L);
Assert.fail("Select should fail as we corrupted SSTable on purpose.");
Assert.fail("Select should fail as we expect corrupted sstable or FS error.");
}
catch (final Exception ex)
{
// we expect that above query fails as we corrupted an sstable
// we expect that above query fails as we corrupted an sstable or throw FS error when read
}

waitForStop(!expectGossiperEnabled, node, new SerializableCallable<Boolean>()
Expand Down Expand Up @@ -154,7 +185,7 @@ private static void waitForStop(boolean shouldWaitForStop,
}
}

private static void corruptTable(IInvokableInstance node, String keyspace, String table)
private static void throwThrowable(IInvokableInstance node, String keyspace, String table, boolean shouldTestCorrupted)
{
node.runOnInstance(() -> {
ColumnFamilyStore cf = Keyspace.open(keyspace).getColumnFamilyStore(table);
Expand All @@ -163,7 +194,7 @@ private static void corruptTable(IInvokableInstance node, String keyspace, Strin
Set<SSTableReader> remove = cf.getLiveSSTables();
Set<SSTableReader> replace = new HashSet<>();
for (SSTableReader r : remove)
replace.add(new CorruptedSSTableReader(r));
replace.add(new CorruptedSSTableReader(r, shouldTestCorrupted));

cf.getTracker().removeUnsafe(remove);
cf.addSSTables(replace);
Expand All @@ -180,26 +211,37 @@ private static AbstractBuilder<IInvokableInstance, Cluster, Cluster.Builder> get

private static final class CorruptedSSTableReader extends ForwardingSSTableReader
{
public CorruptedSSTableReader(SSTableReader delegate)
private boolean shouldThrowCorrupted;
public CorruptedSSTableReader(SSTableReader delegate, boolean shouldThrowCorrupted)
{
super(delegate);
this.shouldThrowCorrupted = shouldThrowCorrupted;
}

@Override
public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, SSTableReadsListener listener)
{
throw throwCorrupted();
if (shouldThrowCorrupted)
throw throwCorrupted();
throw throwFSError();
}

@Override
public SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
{
throw throwCorrupted();
if (shouldThrowCorrupted)
throw throwCorrupted();
throw throwFSError();
}

private CorruptSSTableException throwCorrupted()
{
throw new CorruptSSTableException(new IOException("failed to get position"), descriptor.baseFilename());
}

private FSError throwFSError()
{
throw new FSReadError(new IOException("failed to get position"), descriptor.baseFilename());
}
}
}
121 changes: 121 additions & 0 deletions test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.FSErrorHandler;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class DefaultFSErrorHandlerTest
{
private FSErrorHandler handler = new DefaultFSErrorHandler();
Config.DiskFailurePolicy oldDiskPolicy;
Config.DiskFailurePolicy testDiskPolicy;
private boolean gossipRunningFSError;
private boolean gossipRunningCorruptedSStableException;

@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
CassandraDaemon daemon = new CassandraDaemon();
daemon.completeSetup(); //startup must be completed, otherwise FS error will kill JVM regardless of failure policy
StorageService.instance.registerDaemon(daemon);
StorageService.instance.initServer();
}

@AfterClass
public static void shutdown()
{
StorageService.instance.stopClient();
}

@Before
public void setup()
{
StorageService.instance.startGossiping();
assertTrue(Gossiper.instance.isEnabled());
oldDiskPolicy = DatabaseDescriptor.getDiskFailurePolicy();
}

public DefaultFSErrorHandlerTest(Config.DiskFailurePolicy policy,
boolean gossipRunningFSError,
boolean gossipRunningCorruptedSStableException)
{
this.testDiskPolicy = policy;
this.gossipRunningFSError = gossipRunningFSError;
this.gossipRunningCorruptedSStableException = gossipRunningCorruptedSStableException;
}

@Parameterized.Parameters
public static Collection<Object[]> generateData()
{
return Arrays.asList(new Object[][]{
{ Config.DiskFailurePolicy.die, false, false},
{ Config.DiskFailurePolicy.ignore, true, true},
{ Config.DiskFailurePolicy.stop, false, true},
{ Config.DiskFailurePolicy.stop_paranoid, false, false},
{ Config.DiskFailurePolicy.best_effort, true, true}
}
);
}

@After
public void teardown()
{
DatabaseDescriptor.setDiskFailurePolicy(oldDiskPolicy);
}

@Test
public void testFSErrors()
{
DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy);
handler.handleFSError(new FSReadError(new IOException(), "blah"));
assertEquals(gossipRunningFSError, Gossiper.instance.isEnabled());
}

@Test
public void testCorruptSSTableException()
{
DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy);
handler.handleCorruptSSTable(new CorruptSSTableException(new IOException(), "blah"));
assertEquals(gossipRunningCorruptedSStableException, Gossiper.instance.isEnabled());
}
}
Loading

0 comments on commit be574fc

Please sign in to comment.