Skip to content

Commit

Permalink
Log queries that fail on timeout or unavailable errors up to once per…
Browse files Browse the repository at this point in the history
… minute by default

patch by Caleb Rackliffe and Marcus Eriksson; reviewed by David Capwell and Yifan Cai for CASSANDRA-17159

Co-authored-by: Caleb Rackliffe <calebrackliffe@gmail.com>
Co-authored-by: Marcus Eriksson <marcuse@apache.org>
  • Loading branch information
maedhroz and krummas committed Nov 18, 2021
1 parent 5d78123 commit 33fd2dc
Show file tree
Hide file tree
Showing 7 changed files with 431 additions and 176 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1
* Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159)
* Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
* Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
* Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
Expand Down
Expand Up @@ -233,6 +233,7 @@ public enum CassandraRelevantProperties
DETERMINISM_SSTABLE_COMPRESSION_DEFAULT("cassandra.sstable_compression_default", "true"),
DETERMINISM_CONSISTENT_DIRECTORY_LISTINGS("cassandra.consistent_directory_listings", "false"),
DETERMINISM_UNSAFE_UUID_NODE("cassandra.unsafe.deterministicuuidnode", "false"),
FAILURE_LOGGING_INTERVAL_SECONDS("cassandra.request_failure_log_interval_seconds", "60"),

// properties to disable certain behaviours for testing
DISABLE_GOSSIP_ENDPOINT_REMOVAL("cassandra.gossip.disable_endpoint_removal"),
Expand Down
24 changes: 23 additions & 1 deletion src/java/org/apache/cassandra/service/StorageProxy.java
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
Expand All @@ -49,6 +50,7 @@
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
Expand Down Expand Up @@ -130,6 +132,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
Expand Down Expand Up @@ -163,6 +166,8 @@ public class StorageProxy implements StorageProxyMBean

public static final String UNREACHABLE = "UNREACHABLE";

private static final int FAILURE_LOGGING_INTERVAL_SECONDS = CassandraRelevantProperties.FAILURE_LOGGING_INTERVAL_SECONDS.getInt();

private static final WritePerformer standardWritePerformer;
private static final WritePerformer counterWritePerformer;
private static final WritePerformer counterWriteOnCoordinatorPerformer;
Expand Down Expand Up @@ -1758,7 +1763,9 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group, Con
{
readMetrics.unavailables.mark();
readMetricsForLevel(consistencyLevel).unavailables.mark();
throw new IsBootstrappingException();
IsBootstrappingException exception = new IsBootstrappingException();
logRequestException(exception, group.queries);
throw exception;
}

if (DatabaseDescriptor.getEnablePartitionDenylist() && DatabaseDescriptor.getEnableDenylistReads())
Expand Down Expand Up @@ -1838,13 +1845,15 @@ private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group
readMetrics.unavailables.mark();
casReadMetrics.unavailables.mark();
readMetricsForLevel(consistencyLevel).unavailables.mark();
logRequestException(e, group.queries);
throw e;
}
catch (ReadTimeoutException e)
{
readMetrics.timeouts.mark();
casReadMetrics.timeouts.mark();
readMetricsForLevel(consistencyLevel).timeouts.mark();
logRequestException(e, group.queries);
throw e;
}
catch (ReadAbortException e)
Expand Down Expand Up @@ -1894,12 +1903,14 @@ private static PartitionIterator readRegular(SinglePartitionReadCommand.Group gr
{
readMetrics.unavailables.mark();
readMetricsForLevel(consistencyLevel).unavailables.mark();
logRequestException(e, group.queries);
throw e;
}
catch (ReadTimeoutException e)
{
readMetrics.timeouts.mark();
readMetricsForLevel(consistencyLevel).timeouts.mark();
logRequestException(e, group.queries);
throw e;
}
catch (ReadAbortException e)
Expand Down Expand Up @@ -2455,6 +2466,17 @@ protected void runMayThrow() throws Exception
abstract protected void runMayThrow() throws Exception;
}

public static void logRequestException(Exception exception, Collection<? extends ReadCommand> commands)
{
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, FAILURE_LOGGING_INTERVAL_SECONDS, TimeUnit.SECONDS,
"\"{}\" while executing {}",
() -> new Object[]
{
exception.getMessage(),
commands.stream().map(ReadCommand::toCQLString).collect(Collectors.joining("; "))
});
}

/**
* HintRunnable will decrease totalHintsInProgress and targetHints when finished.
* It is the caller's responsibility to increment them initially.
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.service.reads.range;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -54,7 +55,8 @@

import static org.apache.cassandra.utils.Clock.Global.nanoTime;

class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
@VisibleForTesting
public class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
{
private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);

Expand Down Expand Up @@ -125,11 +127,13 @@ protected RowIterator computeNext()
catch (UnavailableException e)
{
rangeMetrics.unavailables.mark();
StorageProxy.logRequestException(e, Collections.singleton(command));
throw e;
}
catch (ReadTimeoutException e)
{
rangeMetrics.timeouts.mark();
StorageProxy.logRequestException(e, Collections.singleton(command));
throw e;
}
catch (ReadAbortException e)
Expand Down
48 changes: 38 additions & 10 deletions src/java/org/apache/cassandra/utils/NoSpamLogger.java
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class NoSpamLogger
*/
public enum Level
{
INFO, WARN, ERROR;
INFO, WARN, ERROR
}

@VisibleForTesting
Expand Down Expand Up @@ -84,21 +85,31 @@ private boolean shouldLog(long nowNanos)
return nowNanos >= expected && compareAndSet(expected, nowNanos + minIntervalNanos);
}

public boolean log(Level l, long nowNanos, Supplier<Object[]> objects)
{
if (!shouldLog(nowNanos)) return false;
return logNoCheck(l, objects.get());
}

public boolean log(Level l, long nowNanos, Object... objects)
{
if (!shouldLog(nowNanos)) return false;
return logNoCheck(l, objects);
}

private boolean logNoCheck(Level l, Object... objects)
{
switch (l)
{
case INFO:
wrapped.info(statement, objects);
break;
case WARN:
wrapped.warn(statement, objects);
break;
case ERROR:
wrapped.error(statement, objects);
break;
case INFO:
wrapped.info(statement, objects);
break;
case WARN:
wrapped.warn(statement, objects);
break;
case ERROR:
wrapped.error(statement, objects);
break;
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -174,6 +185,23 @@ public static boolean log(Logger logger, Level level, String key, long minInterv
return statement.log(level, nowNanos, objects);
}

public static boolean log(Logger logger, Level level, long minInterval, TimeUnit unit, String message, Supplier<Object[]> objects)
{
return log(logger, level, message, minInterval, unit, CLOCK.nanoTime(), message, objects);
}

public static boolean log(Logger logger, Level level, String key, long minInterval, TimeUnit unit, String message, Supplier<Object[]> objects)
{
return log(logger, level, key, minInterval, unit, CLOCK.nanoTime(), message, objects);
}

public static boolean log(Logger logger, Level level, String key, long minInterval, TimeUnit unit, long nowNanos, String message, Supplier<Object[]> objects)
{
NoSpamLogger wrapped = getLogger(logger, minInterval, unit);
NoSpamLogStatement statement = wrapped.getStatement(key, message);
return statement.log(level, nowNanos, objects);
}

public static NoSpamLogStatement getStatement(Logger logger, String message, long minInterval, TimeUnit unit)
{
NoSpamLogger wrapped = getLogger(logger, minInterval, unit);
Expand Down
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import java.io.IOException;
import java.util.List;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.LogResult;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.range.RangeCommandIterator;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class FailureLoggingTest extends TestBaseImpl
{
private static Cluster cluster;

@BeforeClass
public static void setUpCluster() throws IOException
{
CassandraRelevantProperties.FAILURE_LOGGING_INTERVAL_SECONDS.setInt(0);
cluster = init(Cluster.build(1).withInstanceInitializer(BBRequestFailures::install).start());
cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, i int)");
}

@AfterClass
public static void tearDownCluster()
{
if (cluster != null)
cluster.close();
}

@Before
public void resetBootstrappingState()
{
cluster.get(1).callOnInstance(() -> BBRequestFailures.bootstrapping = false);

}

@Test
public void testRequestBootstrapFail() throws Throwable
{
cluster.get(1).callOnInstance(() -> BBRequestFailures.bootstrapping = true);
long mark = cluster.get(1).logs().mark();

try
{
cluster.coordinator(1).execute("select * from " + KEYSPACE + ".tbl where id = 55", ConsistencyLevel.ALL);
fail("Query should fail");
}
catch (RuntimeException e)
{
LogResult<List<String>> result = cluster.get(1).logs().grep(mark, "while executing SELECT");
assertEquals(1, result.getResult().size());
assertTrue(result.getResult().get(0).contains("Cannot read from a bootstrapping node"));
}
}

@Test
public void testRangeRequestFail() throws Throwable
{
long mark = cluster.get(1).logs().mark();

try
{
cluster.coordinator(1).execute("select * from " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
fail("Query should fail");
}
catch (RuntimeException e)
{
LogResult<List<String>> result = cluster.get(1).logs().grep(mark, "while executing SELECT");
assertEquals(1, result.getResult().size());
assertTrue(result.getResult().get(0).contains("Cannot achieve consistency level"));
}
}

@Test
public void testReadRequestFail() throws Throwable
{
long mark = cluster.get(1).logs().mark();

try
{
cluster.coordinator(1).execute("select * from " + KEYSPACE + ".tbl where id = 55", ConsistencyLevel.ALL);
fail("Query should fail");
}
catch (RuntimeException e)
{
LogResult<List<String>> result = cluster.get(1).logs().grep(mark, "while executing SELECT");
assertEquals(1, result.getResult().size());
assertTrue(result.getResult().get(0).contains("Cannot achieve consistency level"));
}
}

public static class BBRequestFailures
{
static volatile boolean bootstrapping = false;

static void install(ClassLoader cl, int nodeNumber)
{
ByteBuddy bb = new ByteBuddy();

bb.redefine(StorageService.class)
.method(named("isBootstrapMode"))
.intercept(MethodDelegation.to(BBRequestFailures.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);

bb.redefine(RangeCommandIterator.class)
.method(named("sendNextRequests"))
.intercept(MethodDelegation.to(BBRequestFailures.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);

bb.redefine(StorageProxy.class)
.method(named("fetchRows"))
.intercept(MethodDelegation.to(BBRequestFailures.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}

@SuppressWarnings("unused")
public static boolean isBootstrapMode()
{
return bootstrapping;
}

@SuppressWarnings("unused")
public static PartitionIterator sendNextRequests()
{
throw UnavailableException.create(org.apache.cassandra.db.ConsistencyLevel.ALL, 1, 0);
}

@SuppressWarnings("unused")
public static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands,
org.apache.cassandra.db.ConsistencyLevel consistencyLevel,
long queryStartNanoTime)
{
throw UnavailableException.create(org.apache.cassandra.db.ConsistencyLevel.ALL, 1, 0);
}
}
}

0 comments on commit 33fd2dc

Please sign in to comment.