From 35f2c1820f747c8624f811dcd7a1dd6fdc56376b Mon Sep 17 00:00:00 2001 From: pwalczak Date: Sun, 19 Apr 2026 21:54:16 +0200 Subject: [PATCH] CASSANDRA-21321: Add RowsRead and RowsMutated counters to TableMetrics for accurate per-table row throughput tracking --- .../org/apache/cassandra/db/Keyspace.java | 2 + .../org/apache/cassandra/db/ReadCommand.java | 1 + .../cassandra/metrics/TableMetrics.java | 6 ++ .../cassandra/metrics/TableMetricsTest.java | 59 +++++++++++++++++++ 4 files changed, 68 insertions(+) diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index e0cdf8b5c26c..7b90df79b91c 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -577,6 +577,8 @@ else if (isDeferrable) } cfs.getWriteHandler().write(upd, ctx, updateIndexes); + int rowCount = upd.affectedRowCount(); + cfs.metric.rowsMutated.inc(rowCount); if (requiresViewUpdate) baseComplete.set(currentTimeMillis()); diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index c81f9ab57f2f..4e0c307e2d91 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -718,6 +718,7 @@ public void onClose() metric.tombstoneScannedHistogram.update(tombstones); metric.liveScannedHistogram.update(liveRows); + metric.rowsRead.inc(liveRows); boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; if (warnTombstones) diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 9e00988da76f..d0f22d6c0695 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -166,6 +166,10 @@ public class TableMetrics public final TableHistogram purgeableTombstoneScannedHistogram; /** Live rows scanned in queries on this CF */ public final TableHistogram liveScannedHistogram; + /** Total number of live rows read from this CF (cumulative counter, suitable for rate/windowed calculations) */ + public final Counter rowsRead; + /** Total number of rows mutated in writes to this CF (cumulative counter, suitable for rate/windowed calculations) */ + public final Counter rowsMutated; /** Column update time delta on this CF */ public final TableHistogram colUpdateTimeDeltaHistogram; /** time taken acquiring the partition lock for materialized view updates for this table */ @@ -812,6 +816,8 @@ public Long getValue() tombstoneScannedHistogram = createTableHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram, false); purgeableTombstoneScannedHistogram = createTableHistogram("PurgeableTombstoneScannedHistogram", cfs.keyspace.metric.purgeableTombstoneScannedHistogram, true); liveScannedHistogram = createTableHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram, false); + rowsRead = createTableCounter("RowsRead"); + rowsMutated = createTableCounter("RowsMutated"); colUpdateTimeDeltaHistogram = createTableHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false); coordinatorReadLatency = createTableTimer("CoordinatorReadLatency"); coordinatorScanLatency = createTableTimer("CoordinatorScanLatency"); diff --git a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java index 5cb934af7248..08a84fb58e92 100644 --- a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java @@ -213,6 +213,65 @@ public void testPreparedStatementsExecuted() assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0); } + @Test + public void testRowsMutatedCounter() + { + ColumnFamilyStore cfs = recreateTable(); + assertEquals(0, cfs.metric.rowsMutated.getCount()); + + // Each INSERT touches exactly one row + session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (1, 'a', 'b')", KEYSPACE, TABLE)); + assertEquals(1, cfs.metric.rowsMutated.getCount()); + + session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (2, 'c', 'd')", KEYSPACE, TABLE)); + assertEquals(2, cfs.metric.rowsMutated.getCount()); + + // Batch of 3 rows — counter should jump by 3 + executeBatch(false, 3, 1); + assertEquals(5, cfs.metric.rowsMutated.getCount()); + + assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.table_group"), + row("org.apache.cassandra.metrics.Table.RowsMutated.junit.tablemetricstest", + "junit.tablemetricstest", + "counter", + String.valueOf(cfs.metric.rowsMutated.getCount()))); + assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.column_family_group"), + row("org.apache.cassandra.metrics.ColumnFamily.RowsMutated.junit.tablemetricstest", + "junit.tablemetricstest", + "counter", + String.valueOf(cfs.metric.rowsMutated.getCount()))); + } + + @Test + public void testRowsReadCounter() + { + ColumnFamilyStore cfs = recreateTable(); + assertEquals(0, cfs.metric.rowsRead.getCount()); + + // Seed some rows + for (int i = 0; i < 5; i++) + session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (%d, 'v%d', 'x')", KEYSPACE, TABLE, i, i)); + + // Full-table scan should touch all 5 rows + session.execute(String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE)); + assertEquals(5, cfs.metric.rowsRead.getCount()); + + // Single-partition read touches 1 row + session.execute(String.format("SELECT * FROM %s.%s WHERE id = 0", KEYSPACE, TABLE)); + assertEquals(6, cfs.metric.rowsRead.getCount()); + + assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.table_group"), + row("org.apache.cassandra.metrics.Table.RowsRead.junit.tablemetricstest", + "junit.tablemetricstest", + "counter", + String.valueOf(cfs.metric.rowsRead.getCount()))); + assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.column_family_group"), + row("org.apache.cassandra.metrics.ColumnFamily.RowsRead.junit.tablemetricstest", + "junit.tablemetricstest", + "counter", + String.valueOf(cfs.metric.rowsRead.getCount()))); + } + @Test public void testLoggedPartitionsPerBatch() {