From e42599a094770921165993210aa4790c7fb972fc Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Tue, 2 Sep 2025 13:44:52 +0200 Subject: [PATCH] Expose uncaught exceptions in system_views.uncaught_exceptions table patch by Stefan Miklosovic; reviewed by Dmitry Konstantinov for CASSANDRA-20858 --- CHANGES.txt | 1 + .../cassandra/db/virtual/ExceptionsTable.java | 322 ++++++++++++++++++ .../db/virtual/SystemViewsKeyspace.java | 1 + .../db/virtual/VirtualKeyspaceRegistry.java | 10 + .../cassandra/service/CassandraDaemon.java | 7 + .../utils/JVMStabilityInspector.java | 2 + .../logging/AbstractVirtualTableAppender.java | 28 +- .../utils/logging/SlowQueriesAppender.java | 5 +- .../utils/logging/VirtualTableAppender.java | 5 +- .../db/virtual/ExceptionsTableTest.java | 309 +++++++++++++++++ 10 files changed, 674 insertions(+), 16 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/virtual/ExceptionsTable.java create mode 100644 test/unit/org/apache/cassandra/db/virtual/ExceptionsTableTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 9a295402074c..18ee98958013 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Expose uncaught exceptions in system_views.uncaught_exceptions table (CASSANDRA-20858) * Improved observability in AutoRepair to report both expected vs. actual repair bytes and expected vs. actual keyspaces (CASSANDRA-20581) * Execution of CreateTriggerStatement should not rely on external state (CASSANDRA-20287) * Support LIKE expressions in filtering queries (CASSANDRA-17198) diff --git a/src/java/org/apache/cassandra/db/virtual/ExceptionsTable.java b/src/java/org/apache/cassandra/db/virtual/ExceptionsTable.java new file mode 100644 index 000000000000..445aa49da6a2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/ExceptionsTable.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.TimestampType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.logging.AbstractVirtualTableAppender; + +public class ExceptionsTable extends AbstractMutableVirtualTable +{ + public static final String EXCEPTIONS_TABLE_NAME = "uncaught_exceptions"; + public static final String EXCEPTION_CLASS_COLUMN_NAME = "exception_class"; + public static final String EXCEPTION_LOCATION_COLUMN_NAME = "exception_location"; + public static final String COUNT_COLUMN_NAME = "count"; + public static final String LAST_MESSAGE_COLUMN_NAME = "last_message"; + public static final String LAST_STACKTRACE_COLUMN_NAME = "last_stacktrace"; + public static final String LAST_OCCURRENCE_COLUMN_NAME = "last_occurrence"; + + /** + * Buffer of uncaught exceptions which happened while virtual table was not initialized. + */ + static final List preInitialisationBuffer = Collections.synchronizedList(new ArrayList<>()); + + @VisibleForTesting + static volatile ExceptionsTable INSTANCE; + + // please be sure operations on this structure are thread-safe + @VisibleForTesting + final BoundedMap buffer; + + ExceptionsTable(String keyspace) + { + // for starters capped to 1k, I do not think we need to make this configurable (yet). + this(keyspace, 1000); + } + + ExceptionsTable(String keyspace, int maxSize) + { + super(TableMetadata.builder(keyspace, EXCEPTIONS_TABLE_NAME) + .comment("View into uncaught exceptions") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(UTF8Type.instance)) + .addPartitionKeyColumn(EXCEPTION_CLASS_COLUMN_NAME, UTF8Type.instance) + .addClusteringColumn(EXCEPTION_LOCATION_COLUMN_NAME, UTF8Type.instance) + .addRegularColumn(COUNT_COLUMN_NAME, Int32Type.instance) + .addRegularColumn(LAST_MESSAGE_COLUMN_NAME, UTF8Type.instance) + .addRegularColumn(LAST_STACKTRACE_COLUMN_NAME, ListType.getInstance(UTF8Type.instance, false)) + .addRegularColumn(LAST_OCCURRENCE_COLUMN_NAME, TimestampType.instance) + .build()); + + this.buffer = new BoundedMap(maxSize); + } + + public void flush() + { + for (ExceptionRow row : preInitialisationBuffer) + add(row.exceptionClass, row.exceptionLocation, row.message, row.stackTrace, row.occurrence.getTime()); + + preInitialisationBuffer.clear(); + } + + @Override + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + synchronized (buffer) + { + for (Map.Entry> partition : buffer.entrySet()) + { + for (Map.Entry entry : partition.getValue().entrySet()) + populateRow(result, partition.getKey(), entry.getKey(), entry.getValue()); + } + } + + return result; + } + + @Override + public DataSet data(DecoratedKey partitionKey) + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + synchronized (buffer) + { + String exceptionClass = UTF8Type.instance.getSerializer().deserialize(partitionKey.getKey()); + LinkedHashMap partition = buffer.get(exceptionClass); + + if (partition != null) + { + for (Map.Entry row : partition.entrySet()) + populateRow(result, exceptionClass, row.getKey(), row.getValue()); + } + } + + return result; + } + + private void populateRow(SimpleDataSet result, String exceptionClass, String exceptionLocation, ExceptionRow row) + { + result.row(exceptionClass, exceptionLocation) + .column(COUNT_COLUMN_NAME, row.count) + .column(LAST_MESSAGE_COLUMN_NAME, row.message) + .column(LAST_STACKTRACE_COLUMN_NAME, row.stackTrace) + .column(LAST_OCCURRENCE_COLUMN_NAME, row.occurrence); + } + + @Override + public void truncate() + { + synchronized (buffer) + { + buffer.clear(); + } + } + + static List extractStacktrace(StackTraceElement[] stackTraceArray) + { + List result = new ArrayList<>(stackTraceArray.length); + + for (StackTraceElement element : stackTraceArray) + result.add(element.toString()); + + return result; + } + + public static void persist(Throwable t) + { + if (INSTANCE == null) + INSTANCE = AbstractVirtualTableAppender.getVirtualTable(ExceptionsTable.class, EXCEPTIONS_TABLE_NAME); + + Throwable toPersist = t; + + while (toPersist.getCause() != null) + toPersist = toPersist.getCause(); + + List stackTrace = extractStacktrace(toPersist.getStackTrace()); + long now = Clock.Global.currentTimeMillis(); + + if (INSTANCE != null) + { + INSTANCE.add(toPersist.getClass().getName(), + stackTrace.get(0), + toPersist.getMessage(), + stackTrace, + now); + } + else + { + preInitialisationBuffer.add(new ExceptionRow(toPersist.getClass().getName(), + stackTrace.get(0), + 0, + toPersist.getMessage(), + stackTrace, + now)); + } + } + + /** + * Adds entry to internal buffer. + * + * @param exceptionClass exception class of uncaught exception + * @param exceptionLocation location where that exception was thrown + * @param message message of given exception + * @param stackTrace whole stacktrace of given exception + * @param occurrenceTime time when given exception ocurred + */ + private void add(String exceptionClass, + String exceptionLocation, + String message, + List stackTrace, + long occurrenceTime) + { + synchronized (buffer) + { + Map exceptionRowWithLocation = buffer.computeIfAbsent(exceptionClass, (classToAdd) -> new LinkedHashMap<>()); + ExceptionRow exceptionRow = exceptionRowWithLocation.get(exceptionLocation); + if (exceptionRow == null) + { + // exception class and location can be null for value as we have it as part of keys already + exceptionRow = new ExceptionRow(null, null, 1, message, stackTrace, occurrenceTime); + exceptionRowWithLocation.put(exceptionLocation, exceptionRow); + // not important, can be null + // we need to do this, because if we add into a map which is + // a value of some buffer key, we might exceed the number + // of overall entries in the buffer + buffer.removeEldestEntry(null); + } + else + { + exceptionRow.count += 1; + exceptionRow.message = message; + exceptionRow.stackTrace = stackTrace; + exceptionRow.occurrence = new Date(occurrenceTime); + } + } + } + + static final class ExceptionRow + { + final String exceptionClass; + final String exceptionLocation; + int count; + String message; + List stackTrace; + Date occurrence; + + /** + * @param exceptionClass exception class of uncaught exception + * @param exceptionLocation location where that exception was thrown + * @param message message of given exception + * @param stackTrace whole stacktrace of given exception + * @param occurrenceTime time when given exception ocurred, in milliseconds from epoch + */ + ExceptionRow(String exceptionClass, + String exceptionLocation, + int count, + String message, + List stackTrace, + long occurrenceTime) + { + this.exceptionClass = exceptionClass; + this.exceptionLocation = exceptionLocation; + this.count = count; + this.stackTrace = stackTrace; + this.message = message; + this.occurrence = new Date(occurrenceTime); + } + } + + @VisibleForTesting + static class BoundedMap extends LinkedHashMap> + { + private final int maxSize; + + public BoundedMap(int maxSize) + { + if (maxSize <= 0) + throw new IllegalArgumentException("maxSize has to be bigger than 0"); + + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry> eldest) + { + if (computeSize() > maxSize) + { + String oldestExceptionClass = null; + String oldestExceptionLocation = null; + long oldestLastOccurrence = Long.MAX_VALUE; + for (Map.Entry> entry : entrySet()) + { + for (Map.Entry entryInEntry : entry.getValue().entrySet()) + { + long currentLastOccurrence = entryInEntry.getValue().occurrence.getTime(); + if (currentLastOccurrence < oldestLastOccurrence) + { + oldestExceptionLocation = entryInEntry.getKey(); + oldestExceptionClass = entry.getKey(); + oldestLastOccurrence = currentLastOccurrence; + } + } + } + + if (oldestLastOccurrence < Long.MAX_VALUE) + { + LinkedHashMap aMap = get(oldestExceptionClass); + if (aMap.size() == 1) + remove(oldestExceptionClass); + else + aMap.remove(oldestExceptionLocation); + } + } + + // always returning false as per method's contract saying that + // overrides might modify the map directly but in that case it must return false + return false; + } + + private int computeSize() + { + int size = 0; + + for (LinkedHashMap value : values()) + size += value.size(); + + return size; + } + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 28c6dc8fef40..7233bb4d53b0 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -71,6 +71,7 @@ private SystemViewsKeyspace() .addAll(CIDRFilteringMetricsTable.getAll(VIRTUAL_VIEWS)) .addAll(StorageAttachedIndexTables.getAll(VIRTUAL_VIEWS)) .addAll(AccordVirtualTables.getAll(VIRTUAL_VIEWS)) + .add(new ExceptionsTable(VIRTUAL_VIEWS)) .build()); } } diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java index 23814cdf59a0..d48cf05349d3 100644 --- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java +++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java @@ -47,6 +47,16 @@ public void register(VirtualKeyspace keyspace) keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t)); } + public void unregister(VirtualKeyspace keyspace) + { + VirtualKeyspace virtualKeyspace = virtualKeyspaces.get(keyspace.name()); + if (virtualKeyspace == null) + return; + + keyspace.tables().forEach(t -> virtualTables.remove(t.metadata().id)); + virtualKeyspaces.remove(keyspace.name()); + } + @Nullable public VirtualKeyspace getKeyspaceNullable(String name) { diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index d8acdbc26666..f4ed67a44027 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -59,6 +60,7 @@ import org.apache.cassandra.db.SystemKeyspaceMigrator41; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.virtual.AccordDebugKeyspace; +import org.apache.cassandra.db.virtual.ExceptionsTable; import org.apache.cassandra.db.virtual.LogMessagesTable; import org.apache.cassandra.db.virtual.SlowQueriesTable; import org.apache.cassandra.db.virtual.SystemViewsKeyspace; @@ -95,6 +97,7 @@ import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; +import org.apache.cassandra.utils.logging.AbstractVirtualTableAppender; import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.logging.SlowQueriesAppender; import org.apache.cassandra.utils.logging.VirtualTableAppender; @@ -569,6 +572,10 @@ public void setupVirtualKeyspaces() LoggingSupportFactory.getLoggingSupport() .getAppender(SlowQueriesAppender.class, SlowQueriesAppender.APPENDER_NAME) .ifPresent(appender -> appender.flushBuffer(SlowQueriesTable.class, SlowQueriesTable.TABLE_NAME)); + + // populate exceptions table with entries while they were thrown but virtual tables were not registered yet + Optional.ofNullable(AbstractVirtualTableAppender.getVirtualTable(ExceptionsTable.class, ExceptionsTable.EXCEPTIONS_TABLE_NAME)) + .ifPresent(ExceptionsTable::flush); } public synchronized void initializeClientTransports() diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 2c840d4a7d82..22abb48a151c 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -37,6 +37,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.virtual.ExceptionsTable; import org.apache.cassandra.exceptions.UnrecoverableIllegalStateException; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; @@ -70,6 +71,7 @@ public static void uncaughtException(Thread thread, Throwable t) try { StorageMetrics.uncaughtExceptions.inc(); } catch (Throwable ignore) { /* might not be initialised */ } logger.error("Exception in thread {}", thread, t); Tracing.trace("Exception in thread {}", thread, t); + ExceptionsTable.persist(t); for (Throwable t2 = t; t2 != null; t2 = t2.getCause()) { // make sure error gets logged exactly once. diff --git a/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java index 7becbc13fcd5..eea7347de269 100644 --- a/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java +++ b/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java @@ -25,7 +25,6 @@ import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.core.AppenderBase; import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable; -import org.apache.cassandra.db.virtual.SlowQueriesTable; import org.apache.cassandra.db.virtual.VirtualKeyspace; import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; import org.apache.cassandra.db.virtual.VirtualTable; @@ -46,9 +45,9 @@ protected AbstractVirtualTableAppender(int defaultRows) // logged already protected final List messageBuffer = new LinkedList<>(); - protected T getVirtualTable(Class vtableClass, String tableName) + public static T getVirtualTable(Class vtableClass, String keyspaceName, String tableName) { - VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS); + VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspaceName); if (keyspace == null) return null; @@ -65,7 +64,7 @@ protected T getVirtualTable(Class vtableClass, String tableName) if (!vt.getClass().equals(vtableClass)) throw new IllegalStateException(String.format("Virtual table %s.%s is not backed by an instance of %s but by %s", - VIRTUAL_VIEWS, + keyspaceName, tableName, vtableClass.getName(), vt.getClass().getName())); @@ -73,6 +72,11 @@ protected T getVirtualTable(Class vtableClass, String tableName) return (T) vt; } + public static T getVirtualTable(Class vtableClass, String tableName) + { + return getVirtualTable(vtableClass, VIRTUAL_VIEWS, tableName); + } + /** * This method adds an event to virtual table, when present. * When vtable is null, we will attempt to find it among registered ones. Then not found, we add it to internal @@ -80,17 +84,21 @@ protected T getVirtualTable(Class vtableClass, String tableName) * were appended via logging framework sooner than registration of virtual tables was done so after they are registered, * they would miss logging events happened before being so. * - * @param vtable vtable to append to - * @param event event to append to - * @param tableName table name of virtual table to append to + * @param vtableClass class of vtable to append to + * @param vtable vtable to append to + * @param event event to append to + * @param tableName table name of virtual table to append to * @return vtable or when null, found vtable */ - protected AbstractLoggerVirtualTable appendToVirtualTable(AbstractLoggerVirtualTable vtable, LoggingEvent event, String tableName) + protected > T appendToVirtualTable(Class vtableClass, + T vtable, + LoggingEvent event, + String tableName) { - AbstractLoggerVirtualTable foundVtable; + T foundVtable; if (vtable == null) { - foundVtable = getVirtualTable(SlowQueriesTable.class, tableName); + foundVtable = getVirtualTable(vtableClass, tableName); if (foundVtable == null) addToBuffer(event); else diff --git a/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java b/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java index 4af2e383077b..7a13ca397d25 100644 --- a/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java +++ b/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java @@ -19,14 +19,13 @@ package org.apache.cassandra.utils.logging; import ch.qos.logback.classic.spi.LoggingEvent; -import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable; import org.apache.cassandra.db.virtual.SlowQueriesTable; public final class SlowQueriesAppender extends AbstractVirtualTableAppender { public static final String APPENDER_NAME = "SLOW_QUERIES_APPENDER"; - private AbstractLoggerVirtualTable slowQueries; + private SlowQueriesTable slowQueries; public SlowQueriesAppender() { @@ -40,6 +39,6 @@ protected void append(LoggingEvent eventObject) // are not registered, and we already try to put queries there. // As soon as vtable is registered (as part of node's startup / initialisation), // slow queries will never be null again - slowQueries = appendToVirtualTable(slowQueries, eventObject, SlowQueriesTable.TABLE_NAME); + slowQueries = appendToVirtualTable(SlowQueriesTable.class, slowQueries, eventObject, SlowQueriesTable.TABLE_NAME); } } diff --git a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java index 03a142004afd..6c12cd414ebf 100644 --- a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java +++ b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java @@ -24,7 +24,6 @@ import ch.qos.logback.classic.spi.LoggingEvent; import org.apache.cassandra.audit.FileAuditLogger; -import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable; import org.apache.cassandra.db.virtual.LogMessagesTable; /** @@ -36,7 +35,7 @@ public final class VirtualTableAppender extends AbstractVirtualTableAppender private static final Set forbiddenLoggers = ImmutableSet.of(FileAuditLogger.class.getName()); - private AbstractLoggerVirtualTable logs; + private LogMessagesTable logs; public VirtualTableAppender() { @@ -47,6 +46,6 @@ public VirtualTableAppender() protected void append(LoggingEvent eventObject) { if (!forbiddenLoggers.contains(eventObject.getLoggerName())) - logs = appendToVirtualTable(logs, eventObject, LogMessagesTable.TABLE_NAME); + logs = appendToVirtualTable(LogMessagesTable.class, logs, eventObject, LogMessagesTable.TABLE_NAME); } } diff --git a/test/unit/org/apache/cassandra/db/virtual/ExceptionsTableTest.java b/test/unit/org/apache/cassandra/db/virtual/ExceptionsTableTest.java new file mode 100644 index 000000000000..3ce5c2f3a559 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/virtual/ExceptionsTableTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.virtual.ExceptionsTable.ExceptionRow; +import org.apache.cassandra.utils.JVMStabilityInspector; + +import static java.lang.String.format; +import static java.lang.Thread.currentThread; +import static java.util.stream.Collectors.toList; +import static org.apache.cassandra.db.virtual.ExceptionsTable.EXCEPTIONS_TABLE_NAME; +import static org.apache.cassandra.utils.logging.AbstractVirtualTableAppender.getVirtualTable; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ExceptionsTableTest extends CQLTester +{ + private static final String KS_NAME = "vts"; + + @Test + public void testExceptionInterceptionWhileVirtualTableIsNotRegistered() + { + doWithVTable(100, table -> + { + JVMStabilityInspector.uncaughtException(currentThread(), new MyUncaughtException("my exception")); + JVMStabilityInspector.uncaughtException(currentThread(), new MyUncaughtException2("my exception2")); + + // register after treating exception, so it goes to pre-initialisation buffer first + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + + // we have buffer populated with exceptions from time vtable was not initialized yet + assertEquals(2, ExceptionsTable.preInitialisationBuffer.size()); + + // flush buffer and populate table internally + table.flush(); + + assertTrue(ExceptionsTable.preInitialisationBuffer.isEmpty()); + + ExceptionsTable.BoundedMap buffer = table.buffer; + assertEquals(2, buffer.size()); + }); + } + + @Test + public void testExceptionsTableWhileVirtualTableIsRegistered() + { + doWithVTable(100, table -> + { + // register before treating exception to avoid pre-initialisation buffer + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + ExceptionsTable.INSTANCE = getVirtualTable(ExceptionsTable.class, KS_NAME, EXCEPTIONS_TABLE_NAME); + + JVMStabilityInspector.uncaughtException(currentThread(), new MyUncaughtException("my exception")); + JVMStabilityInspector.uncaughtException(currentThread(), new MyUncaughtException2("my exception2")); + + assertTrue(ExceptionsTable.preInitialisationBuffer.isEmpty()); + ExceptionsTable.BoundedMap buffer = table.buffer; + assertEquals(2, buffer.size()); + }); + } + + @Test + public void testWrappedException() + { + doWithVTable(100, table -> + { + // register before treating exception to avoid pre-initialisation buffer + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + ExceptionsTable.INSTANCE = getVirtualTable(ExceptionsTable.class, KS_NAME, EXCEPTIONS_TABLE_NAME); + + MyUncaughtException2 inner = new MyUncaughtException2("inner"); + MyUncaughtException myUncaughtException = new MyUncaughtException("outer", inner); + + JVMStabilityInspector.uncaughtException(currentThread(), myUncaughtException); + + assertTrue(ExceptionsTable.preInitialisationBuffer.isEmpty()); + ExceptionsTable.BoundedMap buffer = table.buffer; + assertEquals(1, buffer.size()); + + LinkedHashMap entry = buffer.get(MyUncaughtException2.class.getName()); + assertNotNull(entry); + ExceptionRow exceptionRow = entry.get(inner.getStackTrace()[0].toString()); + assertNotNull(exceptionRow); + assertEquals("inner", exceptionRow.message); + assertEquals(1, exceptionRow.count); + }); + } + + @Test + public void testDeduplication() + { + doWithVTable(100, table -> + { + // register before treating exception to avoid pre-initialisation buffer + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + ExceptionsTable.INSTANCE = getVirtualTable(ExceptionsTable.class, KS_NAME, EXCEPTIONS_TABLE_NAME); + + // same type of exception + + MyUncaughtException exception1 = new MyUncaughtException("my exception"); + MyUncaughtException exception2 = new MyUncaughtException("my exception"); + MyUncaughtException2 exception3 = new MyUncaughtException2("my second uncaught exception"); + + String firstLocation = ExceptionsTable.extractStacktrace(exception1.getStackTrace()).get(0); + String secondLocation = ExceptionsTable.extractStacktrace(exception2.getStackTrace()).get(0); + String thirdLocation = ExceptionsTable.extractStacktrace(exception3.getStackTrace()).get(0); + + JVMStabilityInspector.uncaughtException(currentThread(), exception1); + JVMStabilityInspector.uncaughtException(currentThread(), exception1); + JVMStabilityInspector.uncaughtException(currentThread(), exception2); + JVMStabilityInspector.uncaughtException(currentThread(), exception3); + + ExceptionsTable.BoundedMap buffer = table.buffer; + assertEquals(2, buffer.size()); + + LinkedHashMap exceptionRow = buffer.get(MyUncaughtException.class.getName()); + assertNotNull(exceptionRow); + ExceptionRow firstLocationRow = exceptionRow.get(firstLocation); + assertEquals("my exception", firstLocationRow.message); + assertEquals(2, firstLocationRow.count); + + ExceptionRow secondLocationRow = exceptionRow.get(secondLocation); + assertEquals("my exception", secondLocationRow.message); + assertEquals(1, secondLocationRow.count); + + LinkedHashMap exceptionRow2 = buffer.get(MyUncaughtException2.class.getName()); + + assertNotNull(exceptionRow2); + ExceptionRow firstLocationRow2 = exceptionRow2.get(thirdLocation); + assertEquals("my second uncaught exception", firstLocationRow2.message); + assertEquals(1, firstLocationRow2.count); + + List rows = execute(format("SELECT * FROM %s.%s", KS_NAME, EXCEPTIONS_TABLE_NAME)).stream().collect(toList()); + assertEquals(3, rows.size()); + + List lastStacktraceException1 = rows.get(0).getList(ExceptionsTable.LAST_STACKTRACE_COLUMN_NAME, UTF8Type.instance); + List lastStacktraceException2 = rows.get(1).getList(ExceptionsTable.LAST_STACKTRACE_COLUMN_NAME, UTF8Type.instance); + + assertTrue(lastStacktraceException1.get(0).contains(ExceptionsTableTest.class.getName())); + assertTrue(lastStacktraceException2.get(0).contains(ExceptionsTableTest.class.getName())); + }); + } + + @Test + public void testRemovalOfEntryWithOldestOccurence() + { + doWithVTable(4, table -> + { + // register before treating exception to avoid pre-initialisation buffer + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + ExceptionsTable.INSTANCE = getVirtualTable(ExceptionsTable.class, KS_NAME, EXCEPTIONS_TABLE_NAME); + + MyUncaughtException2 myException2 = new MyUncaughtException2("my exception2"); + + JVMStabilityInspector.uncaughtException(currentThread(), new MyUncaughtException("my exception")); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + JVMStabilityInspector.uncaughtException(currentThread(), myException2); + JVMStabilityInspector.uncaughtException(currentThread(), myException2); + + JVMStabilityInspector.uncaughtException(currentThread(), new RuntimeException("some exception")); + JVMStabilityInspector.uncaughtException(currentThread(), new IllegalStateException("some illegal state exception")); + + // we inserted 5th unique exception, so the oldest one, based on the occurence, will be removed + // the first one inserted was MyUncaughtException with "my exception" message so that one will not be there anymore + JVMStabilityInspector.uncaughtException(currentThread(), new IllegalArgumentException("some illegal state exception")); + + assertThat(table.buffer.size(), is(4)); + assertThat(table.buffer.get(MyUncaughtException.class.getName()), nullValue()); + + assertThat(rows(format("SELECT * FROM %s.%s", KS_NAME, EXCEPTIONS_TABLE_NAME)).size(), is(4)); + + JVMStabilityInspector.uncaughtException(currentThread(), new IllegalArgumentException("some illegal state exception")); + + assertThat(rows(format("SELECT * FROM %s.%s", KS_NAME, EXCEPTIONS_TABLE_NAME)).size(), is(4)); + assertNull(table.buffer.get(MyUncaughtException2.class.getName())); + }); + } + + @Test + public void testSelection() + { + doWithVTable(4, table -> + { + // register before treating exception to avoid pre-initialisation buffer + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + ExceptionsTable.INSTANCE = getVirtualTable(ExceptionsTable.class, KS_NAME, EXCEPTIONS_TABLE_NAME); + + MyUncaughtException myUncaughtException = new MyUncaughtException("my exception"); + + JVMStabilityInspector.uncaughtException(currentThread(), myUncaughtException); + + assertThat(rows(format("SELECT * FROM %s.%s WHERE exception_class = '%s'", KS_NAME, EXCEPTIONS_TABLE_NAME, MyUncaughtException.class.getName())), not(empty())); + assertThat(rows(format("SELECT * FROM %s.%s WHERE exception_class = '%s'", KS_NAME, EXCEPTIONS_TABLE_NAME, IllegalArgumentException.class.getName())), empty()); + + assertThat(rows(format("SELECT * FROM %s.%s WHERE exception_class = '%s' and exception_location = '%s'", + KS_NAME, EXCEPTIONS_TABLE_NAME, + MyUncaughtException.class.getName(), + myUncaughtException.getStackTrace()[0])), + not(empty())); + }); + } + + @Test + public void testTruncation() + { + doWithVTable(4, table -> + { + // register before treating exception to avoid pre-initialisation buffer + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + ExceptionsTable.INSTANCE = getVirtualTable(ExceptionsTable.class, + KS_NAME, + EXCEPTIONS_TABLE_NAME); + + JVMStabilityInspector.uncaughtException(currentThread(), new MyUncaughtException("my exception")); + + assertThat(rows(format("SELECT * FROM %s.%s WHERE exception_class = '%s'", + KS_NAME, EXCEPTIONS_TABLE_NAME, + MyUncaughtException.class.getName())), + not(empty())); + + assertThat(rows(format("SELECT * FROM %s.%s WHERE exception_class = '%s'", + KS_NAME, EXCEPTIONS_TABLE_NAME, + MyUncaughtException.class.getName())), + not(empty())); + + execute(format("TRUNCATE %s.%s", KS_NAME, EXCEPTIONS_TABLE_NAME)); + + assertThat(rows(format("SELECT * FROM %s.%s;", KS_NAME, EXCEPTIONS_TABLE_NAME)), empty()); + }); + } + + private List rows(String query) + { + return execute(query).stream().collect(toList()); + } + + private void doWithVTable(int maxSize, Consumer consumer) + { + ExceptionsTable table = new ExceptionsTable(KS_NAME, maxSize); + VirtualKeyspace keyspace = new VirtualKeyspace(KS_NAME, ImmutableList.of(table)); + + try + { + consumer.accept(table); + } + finally + { + ExceptionsTable.preInitialisationBuffer.clear(); + VirtualKeyspaceRegistry.instance.unregister(keyspace); + ExceptionsTable.INSTANCE = null; + } + } + + public static class MyUncaughtException extends Throwable + { + public MyUncaughtException(String message) + { + super(message); + } + + public MyUncaughtException(String message, Throwable cause) + { + super(message, cause); + } + } + + public static class MyUncaughtException2 extends Throwable + { + public MyUncaughtException2(String message) + { + super(message); + } + } +}