Skip to content

Commit

Permalink
Add row,tombstone,and sstable count to profileload
Browse files Browse the repository at this point in the history
Patch by Jordan West; Reviewed by David Capwell for CASSANDRA-18022
  • Loading branch information
jrwest committed Feb 6, 2023
1 parent a07c15d commit 2d323cb
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.2
* Add row, tombstone, and sstable count to nodetool profileload (CASSANDRA-18022)
* Coordinator level metrics for read response and mutation row and column counts (CASSANDRA-18155)
* Add CQL functions for dynamic data masking (CASSANDRA-17941)
* Print friendly error when nodetool attempts to connect to uninitialized server (CASSANDRA-11537)
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Expand Up @@ -501,7 +501,9 @@ class MetricRecording extends Transformation<UnfilteredRowIterator>
private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();

private int liveRows = 0;
private int lastReportedLiveRows = 0;
private int tombstones = 0;
private int lastReportedTombstones = 0;

private DecoratedKey currentKey;

Expand Down Expand Up @@ -568,6 +570,22 @@ private void countTombstone(ClusteringPrefix<?> clustering)
}
}

@Override
protected void onPartitionClose()
{
int lr = liveRows - lastReportedLiveRows;
int ts = tombstones - lastReportedTombstones;

if (lr > 0)
metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr);

if (ts > 0)
metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts);

lastReportedLiveRows = liveRows;
lastReportedTombstones = tombstones;
}

@Override
public void onClose()
{
Expand Down
Expand Up @@ -820,6 +820,7 @@ private UnfilteredRowIterator withSSTablesIterated(List<UnfilteredRowIterator> i
{
DecoratedKey key = merged.partitionKey();
metrics.topReadPartitionFrequency.addSample(key.getKey(), 1);
metrics.topReadPartitionSSTableCount.addSample(key.getKey(), metricsCollector.getMergedSSTables());
}

class UpdateSstablesIterated extends Transformation<UnfilteredRowIterator>
Expand Down Expand Up @@ -963,6 +964,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam

DecoratedKey key = result.partitionKey();
cfs.metric.topReadPartitionFrequency.addSample(key.getKey(), 1);
cfs.metric.topReadPartitionSSTableCount.addSample(key.getKey(), metricsCollector.getMergedSSTables());
StorageHook.instance.reportRead(cfs.metadata.id, partitionKey());

return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/metrics/Sampler.java
Expand Up @@ -54,6 +54,23 @@ public enum SamplerType
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Query", "value")
.addColumn("Microseconds", "count"))),
READ_ROW_COUNT("Partitions read with the most rows", ((samplerType, resultBuilder) ->
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Table", "table")
.addColumn("Partition", "value")
.addColumn("Rows", "count"))),

READ_TOMBSTONE_COUNT("Partitions read with the most tombstones", ((samplerType, resultBuilder) ->
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Table", "table")
.addColumn("Partition", "value")
.addColumn("Tombstones", "count"))),

READ_SSTABLE_COUNT("Partitions read with the most sstables", ((samplerType, resultBuilder) ->
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Table", "table")
.addColumn("Partition", "value")
.addColumn("SSTables", "count"))),
WRITE_SIZE("Max mutation size by partition", ((samplerType, resultBuilder) ->
resultBuilder.forType(samplerType, samplerType.description)
.addColumn("Table", "table")
Expand Down
34 changes: 34 additions & 0 deletions src/java/org/apache/cassandra/metrics/TableMetrics.java
Expand Up @@ -262,6 +262,12 @@ public class TableMetrics
public final Sampler<ByteBuffer> topCasPartitionContention;
/** When sampler activated, will track the slowest local reads **/
public final Sampler<String> topLocalReadQueryTime;
/** When sampler activated, will track partitions read with the most rows **/
public final Sampler<ByteBuffer> topReadPartitionRowCount;
/** When sampler activated, will track partitions read with the most tombstones **/
public final Sampler<ByteBuffer> topReadPartitionTombstoneCount;
/** When sample activated, will track partitions read with the most merged sstables **/
public final Sampler<ByteBuffer> topReadPartitionSSTableCount;

public final TableMeter clientTombstoneWarnings;
public final TableMeter clientTombstoneAborts;
Expand Down Expand Up @@ -442,11 +448,39 @@ public String toString(String value)
}
};

topReadPartitionRowCount = new MaxSampler<ByteBuffer>()
{
public String toString(ByteBuffer value)
{
return cfs.metadata().partitionKeyType.getString(value);
}
};

topReadPartitionTombstoneCount = new MaxSampler<ByteBuffer>()
{
public String toString(ByteBuffer value)
{
return cfs.metadata().partitionKeyType.getString(value);
}
};

topReadPartitionSSTableCount = new MaxSampler<ByteBuffer>()
{
@Override
public String toString(ByteBuffer value)
{
return cfs.metadata().partitionKeyType.getString(value);
}
};

samplers.put(SamplerType.READS, topReadPartitionFrequency);
samplers.put(SamplerType.WRITES, topWritePartitionFrequency);
samplers.put(SamplerType.WRITE_SIZE, topWritePartitionSize);
samplers.put(SamplerType.CAS_CONTENTIONS, topCasPartitionContention);
samplers.put(SamplerType.LOCAL_READ_TIME, topLocalReadQueryTime);
samplers.put(SamplerType.READ_ROW_COUNT, topReadPartitionRowCount);
samplers.put(SamplerType.READ_TOMBSTONE_COUNT, topReadPartitionTombstoneCount);
samplers.put(SamplerType.READ_SSTABLE_COUNT, topReadPartitionSSTableCount);

memtableColumnsCount = createTableGauge("MemtableColumnsCount",
() -> cfs.getTracker().getView().getCurrentMemtable().operationCount());
Expand Down
108 changes: 108 additions & 0 deletions test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.metrics.Sampler;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;

import static java.lang.String.format;
Expand All @@ -54,10 +55,15 @@
*/
public class TopPartitionsTest
{
public static String KEYSPACE = TopPartitionsTest.class.getSimpleName().toLowerCase();
public static String TABLE = "test";

@BeforeClass
public static void loadSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
executeInternal(format("CREATE TABLE %s.%s (k text, c text, v text, PRIMARY KEY (k, c))", KEYSPACE, TABLE));
}

@Test
Expand Down Expand Up @@ -93,6 +99,108 @@ public void testServiceTopPartitionsSingleTable() throws Exception
assertEquals("If this failed you probably have to raise the beginLocalSampling duration", 1, result.size());
}

@Test
public void testTopPartitionsRowTombstoneAndSSTableCount() throws Exception
{
int count = 10;
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE);
cfs.disableAutoCompaction();

executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'a', 'a')", KEYSPACE, TABLE));
executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'b', 'a')", KEYSPACE, TABLE));
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);

executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'c', 'a')", KEYSPACE, TABLE));
executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('b', 'b', 'b')", KEYSPACE, TABLE));
executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'c', 'c')", KEYSPACE, TABLE));
executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'd', 'a')", KEYSPACE, TABLE));
executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'e', 'a')", KEYSPACE, TABLE));
executeInternal(format("DELETE FROM %s.%s WHERE k='a' AND c='a'", KEYSPACE, TABLE));
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);

// test multi-partition read
cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000);
cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000);
cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000);

executeInternal(format("SELECT * FROM %s.%s", KEYSPACE, TABLE));
Thread.sleep(2000); // simulate waiting before finishing sampling

List<CompositeData> rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count);
List<CompositeData> tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count);
List<CompositeData> sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count);

assertEquals(0, sstCounts.size()); // not tracked on range reads
assertEquals(3, rowCounts.size()); // 3 partitions read (a, b, c)
assertEquals(1, tsCounts.size()); // 1 partition w tombstones (a)

for (CompositeData data : rowCounts)
{
String partitionKey = (String) data.get("value");
long numRows = (long) data.get("count");
if (partitionKey.equalsIgnoreCase("a"))
{
assertEquals(2, numRows);
}
else if (partitionKey.equalsIgnoreCase("b"))
assertEquals(1, numRows);
else if (partitionKey.equalsIgnoreCase("c"))
assertEquals(3, numRows);
}

assertEquals("a", tsCounts.get(0).get("value"));
assertEquals(1, (long) tsCounts.get(0).get("count"));

// test single partition read
cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000);
cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000);
cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000);

executeInternal(format("SELECT * FROM %s.%s WHERE k='a'", KEYSPACE, TABLE));
executeInternal(format("SELECT * FROM %s.%s WHERE k='b'", KEYSPACE, TABLE));
executeInternal(format("SELECT * FROM %s.%s WHERE k='c'", KEYSPACE, TABLE));
Thread.sleep(2000); // simulate waiting before finishing sampling

rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count);
tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count);
sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count);

assertEquals(3, sstCounts.size()); // 3 partitions read
assertEquals(3, rowCounts.size()); // 3 partitions read
assertEquals(1, tsCounts.size()); // 3 partitions read only one containing tombstones

for (CompositeData data : sstCounts)
{
String partitionKey = (String) data.get("value");
long numRows = (long) data.get("count");
if (partitionKey.equalsIgnoreCase("a"))
{
assertEquals(2, numRows);
}
else if (partitionKey.equalsIgnoreCase("b"))
assertEquals(1, numRows);
else if (partitionKey.equalsIgnoreCase("c"))
assertEquals(1, numRows);
}

for (CompositeData data : rowCounts)
{
String partitionKey = (String) data.get("value");
long numRows = (long) data.get("count");
if (partitionKey.equalsIgnoreCase("a"))
{
assertEquals(2, numRows);
}
else if (partitionKey.equalsIgnoreCase("b"))
assertEquals(1, numRows);
else if (partitionKey.equalsIgnoreCase("c"))
assertEquals(3, numRows);
}

assertEquals("a", tsCounts.get(0).get("value"));
assertEquals(1, (long) tsCounts.get(0).get("count"));
}

@Test
public void testStartAndStopScheduledSampling()
{
Expand Down

0 comments on commit 2d323cb

Please sign in to comment.