From 2d323cb56572e867b13b6d102a61aaff8bd66c86 Mon Sep 17 00:00:00 2001 From: Jordan West Date: Wed, 21 Dec 2022 12:10:42 -0800 Subject: [PATCH] Add row,tombstone,and sstable count to profileload Patch by Jordan West; Reviewed by David Capwell for CASSANDRA-18022 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadCommand.java | 18 +++ .../db/SinglePartitionReadCommand.java | 2 + .../org/apache/cassandra/metrics/Sampler.java | 17 +++ .../cassandra/metrics/TableMetrics.java | 34 ++++++ .../cassandra/tools/TopPartitionsTest.java | 108 ++++++++++++++++++ 6 files changed, 180 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index bf760aa8dcd1..84136b255de6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Add row, tombstone, and sstable count to nodetool profileload (CASSANDRA-18022) * Coordinator level metrics for read response and mutation row and column counts (CASSANDRA-18155) * Add CQL functions for dynamic data masking (CASSANDRA-17941) * Print friendly error when nodetool attempts to connect to uninitialized server (CASSANDRA-11537) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d03650ff364d..95fa95da910a 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -501,7 +501,9 @@ class MetricRecording extends Transformation private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness(); private int liveRows = 0; + private int lastReportedLiveRows = 0; private int tombstones = 0; + private int lastReportedTombstones = 0; private DecoratedKey currentKey; @@ -568,6 +570,22 @@ private void countTombstone(ClusteringPrefix clustering) } } + @Override + protected void onPartitionClose() + { + int lr = liveRows - lastReportedLiveRows; + int ts = tombstones - lastReportedTombstones; + + if (lr > 0) + metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr); + + if (ts > 0) + metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts); + + lastReportedLiveRows = liveRows; + lastReportedTombstones = tombstones; + } + @Override public void onClose() { diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 963b9fee1c8d..34414bbea608 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -820,6 +820,7 @@ private UnfilteredRowIterator withSSTablesIterated(List i { DecoratedKey key = merged.partitionKey(); metrics.topReadPartitionFrequency.addSample(key.getKey(), 1); + metrics.topReadPartitionSSTableCount.addSample(key.getKey(), metricsCollector.getMergedSSTables()); } class UpdateSstablesIterated extends Transformation @@ -963,6 +964,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam DecoratedKey key = result.partitionKey(); cfs.metric.topReadPartitionFrequency.addSample(key.getKey(), 1); + cfs.metric.topReadPartitionSSTableCount.addSample(key.getKey(), metricsCollector.getMergedSSTables()); StorageHook.instance.reportRead(cfs.metadata.id, partitionKey()); return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java b/src/java/org/apache/cassandra/metrics/Sampler.java index de7f0b2e1a70..6eb8508b9fa1 100644 --- a/src/java/org/apache/cassandra/metrics/Sampler.java +++ b/src/java/org/apache/cassandra/metrics/Sampler.java @@ -54,6 +54,23 @@ public enum SamplerType resultBuilder.forType(samplerType, samplerType.description) .addColumn("Query", "value") .addColumn("Microseconds", "count"))), + READ_ROW_COUNT("Partitions read with the most rows", ((samplerType, resultBuilder) -> + resultBuilder.forType(samplerType, samplerType.description) + .addColumn("Table", "table") + .addColumn("Partition", "value") + .addColumn("Rows", "count"))), + + READ_TOMBSTONE_COUNT("Partitions read with the most tombstones", ((samplerType, resultBuilder) -> + resultBuilder.forType(samplerType, samplerType.description) + .addColumn("Table", "table") + .addColumn("Partition", "value") + .addColumn("Tombstones", "count"))), + + READ_SSTABLE_COUNT("Partitions read with the most sstables", ((samplerType, resultBuilder) -> + resultBuilder.forType(samplerType, samplerType.description) + .addColumn("Table", "table") + .addColumn("Partition", "value") + .addColumn("SSTables", "count"))), WRITE_SIZE("Max mutation size by partition", ((samplerType, resultBuilder) -> resultBuilder.forType(samplerType, samplerType.description) .addColumn("Table", "table") diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 8f3645dd1e63..04102f7995b7 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -262,6 +262,12 @@ public class TableMetrics public final Sampler topCasPartitionContention; /** When sampler activated, will track the slowest local reads **/ public final Sampler topLocalReadQueryTime; + /** When sampler activated, will track partitions read with the most rows **/ + public final Sampler topReadPartitionRowCount; + /** When sampler activated, will track partitions read with the most tombstones **/ + public final Sampler topReadPartitionTombstoneCount; + /** When sample activated, will track partitions read with the most merged sstables **/ + public final Sampler topReadPartitionSSTableCount; public final TableMeter clientTombstoneWarnings; public final TableMeter clientTombstoneAborts; @@ -442,11 +448,39 @@ public String toString(String value) } }; + topReadPartitionRowCount = new MaxSampler() + { + public String toString(ByteBuffer value) + { + return cfs.metadata().partitionKeyType.getString(value); + } + }; + + topReadPartitionTombstoneCount = new MaxSampler() + { + public String toString(ByteBuffer value) + { + return cfs.metadata().partitionKeyType.getString(value); + } + }; + + topReadPartitionSSTableCount = new MaxSampler() + { + @Override + public String toString(ByteBuffer value) + { + return cfs.metadata().partitionKeyType.getString(value); + } + }; + samplers.put(SamplerType.READS, topReadPartitionFrequency); samplers.put(SamplerType.WRITES, topWritePartitionFrequency); samplers.put(SamplerType.WRITE_SIZE, topWritePartitionSize); samplers.put(SamplerType.CAS_CONTENTIONS, topCasPartitionContention); samplers.put(SamplerType.LOCAL_READ_TIME, topLocalReadQueryTime); + samplers.put(SamplerType.READ_ROW_COUNT, topReadPartitionRowCount); + samplers.put(SamplerType.READ_TOMBSTONE_COUNT, topReadPartitionTombstoneCount); + samplers.put(SamplerType.READ_SSTABLE_COUNT, topReadPartitionSSTableCount); memtableColumnsCount = createTableGauge("MemtableColumnsCount", () -> cfs.getTracker().getView().getCurrentMemtable().operationCount()); diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java index 934d4d814c7b..d9cfdeecef91 100644 --- a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java +++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.metrics.Sampler; +import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import static java.lang.String.format; @@ -54,10 +55,15 @@ */ public class TopPartitionsTest { + public static String KEYSPACE = TopPartitionsTest.class.getSimpleName().toLowerCase(); + public static String TABLE = "test"; + @BeforeClass public static void loadSchema() throws ConfigurationException { SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1)); + executeInternal(format("CREATE TABLE %s.%s (k text, c text, v text, PRIMARY KEY (k, c))", KEYSPACE, TABLE)); } @Test @@ -93,6 +99,108 @@ public void testServiceTopPartitionsSingleTable() throws Exception assertEquals("If this failed you probably have to raise the beginLocalSampling duration", 1, result.size()); } + @Test + public void testTopPartitionsRowTombstoneAndSSTableCount() throws Exception + { + int count = 10; + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE); + cfs.disableAutoCompaction(); + + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'a', 'a')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'b', 'a')", KEYSPACE, TABLE)); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'c', 'a')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('b', 'b', 'b')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'c', 'c')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'd', 'a')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'e', 'a')", KEYSPACE, TABLE)); + executeInternal(format("DELETE FROM %s.%s WHERE k='a' AND c='a'", KEYSPACE, TABLE)); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + // test multi-partition read + cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000); + cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000); + cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000); + + executeInternal(format("SELECT * FROM %s.%s", KEYSPACE, TABLE)); + Thread.sleep(2000); // simulate waiting before finishing sampling + + List rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count); + List tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count); + List sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count); + + assertEquals(0, sstCounts.size()); // not tracked on range reads + assertEquals(3, rowCounts.size()); // 3 partitions read (a, b, c) + assertEquals(1, tsCounts.size()); // 1 partition w tombstones (a) + + for (CompositeData data : rowCounts) + { + String partitionKey = (String) data.get("value"); + long numRows = (long) data.get("count"); + if (partitionKey.equalsIgnoreCase("a")) + { + assertEquals(2, numRows); + } + else if (partitionKey.equalsIgnoreCase("b")) + assertEquals(1, numRows); + else if (partitionKey.equalsIgnoreCase("c")) + assertEquals(3, numRows); + } + + assertEquals("a", tsCounts.get(0).get("value")); + assertEquals(1, (long) tsCounts.get(0).get("count")); + + // test single partition read + cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000); + cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000); + cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000); + + executeInternal(format("SELECT * FROM %s.%s WHERE k='a'", KEYSPACE, TABLE)); + executeInternal(format("SELECT * FROM %s.%s WHERE k='b'", KEYSPACE, TABLE)); + executeInternal(format("SELECT * FROM %s.%s WHERE k='c'", KEYSPACE, TABLE)); + Thread.sleep(2000); // simulate waiting before finishing sampling + + rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count); + tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count); + sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count); + + assertEquals(3, sstCounts.size()); // 3 partitions read + assertEquals(3, rowCounts.size()); // 3 partitions read + assertEquals(1, tsCounts.size()); // 3 partitions read only one containing tombstones + + for (CompositeData data : sstCounts) + { + String partitionKey = (String) data.get("value"); + long numRows = (long) data.get("count"); + if (partitionKey.equalsIgnoreCase("a")) + { + assertEquals(2, numRows); + } + else if (partitionKey.equalsIgnoreCase("b")) + assertEquals(1, numRows); + else if (partitionKey.equalsIgnoreCase("c")) + assertEquals(1, numRows); + } + + for (CompositeData data : rowCounts) + { + String partitionKey = (String) data.get("value"); + long numRows = (long) data.get("count"); + if (partitionKey.equalsIgnoreCase("a")) + { + assertEquals(2, numRows); + } + else if (partitionKey.equalsIgnoreCase("b")) + assertEquals(1, numRows); + else if (partitionKey.equalsIgnoreCase("c")) + assertEquals(3, numRows); + } + + assertEquals("a", tsCounts.get(0).get("value")); + assertEquals(1, (long) tsCounts.get(0).get("count")); + } + @Test public void testStartAndStopScheduledSampling() {