From 49dfb805e9045c856181d6c2ac3b586b98d1a82a Mon Sep 17 00:00:00 2001 From: maxwellguo Date: Mon, 16 Jan 2023 19:49:38 +0100 Subject: [PATCH] Add compaction_properties column to system.compaction_history table and nodetool compactionhistory command patch by Maxwell Guo; reviewed by Stefan Miklosovic and Jacek Lewandowski for CASSANDRA-18061 --- CHANGES.txt | 1 + NEWS.txt | 1 + .../apache/cassandra/db/SystemKeyspace.java | 9 +- .../db/SystemKeyspaceMigrator41.java | 38 +++++- .../CompactionHistoryTabularData.java | 16 ++- .../db/compaction/CompactionTask.java | 9 +- .../stats/CompactionHistoryHolder.java | 8 +- .../stats/CompactionHistoryPrinter.java | 3 +- ...mpactionHistorySystemTableUpgradeTest.java | 93 +++++++++++++ .../db/SystemKeyspaceMigrator41Test.java | 56 ++++++++ .../tools/nodetool/CompactionHistoryTest.java | 128 ++++++++++++++++++ 11 files changed, 344 insertions(+), 18 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java create mode 100644 test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java diff --git a/CHANGES.txt b/CHANGES.txt index f7dfb6f9ae21..19ba7ab37da9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Add compaction_properties column to system.compaction_history table and nodetool compactionhistory command (CASSANDRA-18061) * Remove ProtocolVersion entirely from the CollectionSerializer ecosystem (CASSANDRA-18114) * Fix serialization error in new getsstables --show-levels option (CASSANDRA-18140) * Use checked casts when reading vints as ints (CASSANDRA-18099) diff --git a/NEWS.txt b/NEWS.txt index f8eefc986b29..50a4d206f28e 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -110,6 +110,7 @@ New features - Added new CQL native scalar functions for collections. The new functions are mostly analogous to the existing aggregation functions, but they operate on the elements of collection columns. The new functions are `map_keys`, `map_values`, `collection_count`, `collection_min`, `collection_max`, `collection_sum` and `collection_avg`. + - Added compaction_properties column to system.compaction_history table and nodetool compactionhistory command Upgrading --------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index bca5a18d0869..319c01d8d04c 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -310,6 +310,7 @@ private SystemKeyspace() + "compacted_at timestamp," + "keyspace_name text," + "rows_merged map," + + "compaction_properties frozen>," + "PRIMARY KEY ((id)))") .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)) .build(); @@ -592,12 +593,13 @@ public static void updateCompactionHistory(String ksname, long compactedAt, long bytesIn, long bytesOut, - Map rowsMerged) + Map rowsMerged, + Map compactionProperties) { // don't write anything when the history table itself is compacted, since that would in turn cause new compactions if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY)) return; - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)"; + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged, compaction_properties) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; executeInternal(format(req, COMPACTION_HISTORY), nextTimeUUID(), ksname, @@ -605,7 +607,8 @@ public static void updateCompactionHistory(String ksname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, - rowsMerged); + rowsMerged, + compactionProperties); } public static TabularData getCompactionHistory() throws OpenDataException diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java index bfce78019d6e..ab9f01f94500 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java @@ -24,6 +24,7 @@ import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; @@ -62,6 +64,7 @@ public static void migrate() migrateTransferredRanges(); migrateAvailableRanges(); migrateSSTableActivity(); + migrateCompactionHistory(); } @VisibleForTesting @@ -159,12 +162,38 @@ static void migrateSSTableActivity() }) ); } + + @VisibleForTesting + static void migrateCompactionHistory() + { + migrateTable(false, + SystemKeyspace.COMPACTION_HISTORY, + SystemKeyspace.COMPACTION_HISTORY, + new String[]{ "id", + "bytes_in", + "bytes_out", + "columnfamily_name", + "compacted_at", + "keyspace_name", + "rows_merged", + "compaction_properties" }, + row -> Collections.singletonList(new Object[]{ row.getTimeUUID("id") , + row.has("bytes_in") ? row.getLong("bytes_in") : null, + row.has("bytes_out") ? row.getLong("bytes_out") : null, + row.has("columnfamily_name") ? row.getString("columnfamily_name") : null, + row.has("compacted_at") ? row.getTimestamp("compacted_at") : null, + row.has("keyspace_name") ? row.getString("keyspace_name") : null, + row.has("rows_merged") ? row.getMap("rows_merged", Int32Type.instance, LongType.instance) : null, + row.has("compaction_properties") ? row.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance) : ImmutableMap.of() }) + ); + } /** * Perform table migration by reading data from the old table, converting it, and adding to the new table. - * + * If oldName and newName are same, it means data in the table will be refreshed. + * * @param truncateIfExists truncate the existing table if it exists before migration; if it is disabled - * and the new table is not empty, no migration is performed + * and the new table is not empty and oldName is not equal to newName, no migration is performed * @param oldName old table name * @param newName new table name * @param columns columns to fill in the new table in the same order as returned by the transformation @@ -175,7 +204,7 @@ static void migrateTable(boolean truncateIfExists, String oldName, String newNam { ColumnFamilyStore newTable = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(newName); - if (!newTable.isEmpty() && !truncateIfExists) + if (!newTable.isEmpty() && !truncateIfExists && !oldName.equals(newName)) return; if (truncateIfExists) @@ -189,6 +218,9 @@ static void migrateTable(boolean truncateIfExists, String oldName, String newNam StringUtils.join(columns, ", "), StringUtils.repeat("?", ", ", columns.length)); UntypedResultSet rows = QueryProcessor.executeInternal(query); + + assert rows != null : String.format("Migrating rows from legacy %s to %s was not done as returned rows from %s are null!", oldName, newName, oldName); + int transferred = 0; logger.info("Migrating rows from legacy {} to {}", oldName, newName); for (UntypedResultSet.Row row : rows) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java index bc4ac5a58062..182f34896a04 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java @@ -24,16 +24,17 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.utils.FBUtilities; public class CompactionHistoryTabularData { private static final String[] ITEM_NAMES = new String[]{ "id", "keyspace_name", "columnfamily_name", "compacted_at", - "bytes_in", "bytes_out", "rows_merged" }; + "bytes_in", "bytes_out", "rows_merged", "compaction_properties" }; private static final String[] ITEM_DESCS = new String[]{ "time uuid", "keyspace name", "column family name", "compaction finished at", - "total bytes in", "total bytes out", "total rows merged" }; + "total bytes in", "total bytes out", "total rows merged", "compaction properties" }; private static final String TYPE_NAME = "CompactionHistory"; @@ -44,13 +45,15 @@ public class CompactionHistoryTabularData private static final CompositeType COMPOSITE_TYPE; private static final TabularType TABULAR_TYPE; - + + public static final String COMPACTION_TYPE_PROPERTY = "compaction_type"; + static { try { ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, - SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; + SimpleType.LONG, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING }; COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); @@ -74,10 +77,11 @@ public static TabularData from(UntypedResultSet resultSet) throws OpenDataExcept long bytesIn = row.getLong(ITEM_NAMES[4]); long bytesOut = row.getLong(ITEM_NAMES[5]); Map rowMerged = row.getMap(ITEM_NAMES[6], Int32Type.instance, LongType.instance); - + Map compactionProperties = row.getMap(ITEM_NAMES[7], UTF8Type.instance, UTF8Type.instance); result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES, new Object[]{ id.toString(), ksName, cfName, compactedAt, bytesIn, bytesOut, - "{" + FBUtilities.toString(rowMerged) + "}" })); + '{' + FBUtilities.toString(rowMerged) + '}', + '{' + FBUtilities.toString(compactionProperties) + '}' })); } return result; } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 176c1f2a2bd9..81ea5e5b9db2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; @@ -49,6 +50,7 @@ import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.Refs; +import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.FBUtilities.now; @@ -244,7 +246,8 @@ public boolean apply(SSTableReader sstable) for (int i = 0; i < mergedRowCounts.length; i++) totalSourceRows += mergedRowCounts[i] * (i + 1); - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getTableName(), mergedRowCounts, startsize, endsize); + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getTableName(), mergedRowCounts, startsize, endsize, + ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType.type)); logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}. Time spent writing keys = %,dms", taskId, @@ -283,7 +286,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel()); } - public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize) + public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map compactionProperties) { StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length * 10); Map mergedRows = new HashMap<>(); @@ -297,7 +300,7 @@ public static String updateCompactionHistory(String keyspaceName, String columnF mergeSummary.append(String.format("%d:%d, ", rows, count)); mergedRows.put(rows, count); } - SystemKeyspace.updateCompactionHistory(keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows); + SystemKeyspace.updateCompactionHistory(keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows, compactionProperties); return mergeSummary.toString(); } diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java index 8799ffbef847..362bc67c3443 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java @@ -54,8 +54,9 @@ private static class CompactionHistoryRow implements Comparable getAllAsMap() compaction.put("bytes_in", this.bytesIn); compaction.put("bytes_out", this.bytesOut); compaction.put("rows_merged", this.rowMerged); + compaction.put("compaction_properties", this.compactionProperties); return compaction; } } @@ -110,7 +113,8 @@ public Map convert2Map() (Long)value.get(3), (Long)value.get(4), (Long)value.get(5), - (String)value.get(6) + (String)value.get(6), + (String)value.get(7) ); chrList.add(chr); } diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java index 97ed1c445387..c2b62ee030d5 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java @@ -65,7 +65,7 @@ public void print(CompactionHistoryHolder data, PrintStream out) for (Object chr : compactionHistories) { Map value = chr instanceof Map ? (Map)chr : Collections.emptyMap(); - String[] obj = new String[7]; + String[] obj = new String[8]; obj[0] = (String)value.get("id"); obj[1] = (String)value.get("keyspace_name"); obj[2] = (String)value.get("columnfamily_name"); @@ -73,6 +73,7 @@ public void print(CompactionHistoryHolder data, PrintStream out) obj[4] = value.get("bytes_in").toString(); obj[5] = value.get("bytes_out").toString(); obj[6] = (String)value.get("rows_merged"); + obj[7] = (String)value.get("compaction_properties"); table.add(obj); } table.printTo(out); diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java new file mode 100644 index 000000000000..73e92dd42307 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import com.vdurmont.semver4j.Semver; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.tools.ToolRunner; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; + +import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY; +import static org.apache.cassandra.tools.ToolRunner.invokeNodetoolJvmDtest; +import static org.apache.cassandra.tools.nodetool.CompactionHistoryTest.assertCompactionHistoryOutPut; + +@RunWith(Parameterized.class) +public class CompactionHistorySystemTableUpgradeTest extends UpgradeTestBase +{ + @Parameter + public Semver version; + + @Parameters() + public static ArrayList versions() + { + return Lists.newArrayList(v30, v3X, v40, v41); + } + + @Test + public void compactionHistorySystemTableTest() throws Throwable + { + new TestCase() + .nodes(1) + .nodesToUpgrade(1) + .upgradesToCurrentFrom(version) + .setup((cluster) -> { + //create table + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tb (" + + "pk text PRIMARY KEY," + + "c1 text," + + "c2 int," + + "c3 int)"); + // disable auto compaction + cluster.stream().forEach(node -> node.nodetool("disableautocompaction")); + // generate sstables + for (int i = 0; i != 10; ++i) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tb (pk, c1, c2, c3) VALUES ('pk" + i + "', 'c1" + i + "', " + i + ',' + i + ')', ConsistencyLevel.ALL); + cluster.stream().forEach(node -> node.flush(KEYSPACE)); + } + // force compact + cluster.stream().forEach(node -> node.forceCompact(KEYSPACE, "tb")); + }).runAfterClusterUpgrade((cluster) -> { + // disable auto compaction at start up + cluster.stream().forEach(node -> node.nodetool("disableautocompaction")); + ToolRunner.ToolResult toolHistory = invokeNodetoolJvmDtest(cluster.get(1), "compactionhistory"); + toolHistory.assertOnCleanExit(); + // upgraded system.compaction_history data verify + assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of()); + + // force compact + cluster.stream().forEach(node -> node.nodetool("compact")); + toolHistory = invokeNodetoolJvmDtest(cluster.get(1), "compactionhistory"); + toolHistory.assertOnCleanExit(); + assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of(COMPACTION_TYPE_PROPERTY, OperationType.MAJOR_COMPACTION.type)); + }) + .run(); + } +} diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java index b54367920f72..2a1561b8a1a5 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java @@ -20,6 +20,8 @@ import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Date; +import java.util.Map; import java.util.UUID; import com.google.common.collect.ImmutableMap; @@ -32,6 +34,7 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.Range; @@ -42,6 +45,8 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.TimeUUID; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; +import static org.apache.cassandra.utils.FBUtilities.now; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; import static org.junit.Assert.assertEquals; @@ -264,4 +269,55 @@ public void testMigrateSSTableActivity() throws Throwable } assertEquals(1, rowCount); } + + @Test + public void testMigrateCompactionHistory() throws Throwable + { + String table = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.COMPACTION_HISTORY); + String insert = String.format("INSERT INTO %s (" + + "id, " + + "bytes_in, " + + "bytes_out, " + + "columnfamily_name, " + + "compacted_at, " + + "keyspace_name, " + + "rows_merged) " + + " values ( ?, ?, ?, ?, ?, ?, ? )", + table); + TimeUUID compactionId = TimeUUID.Generator.atUnixMillis(currentTimeMillis()); + Date compactAt = Date.from(now()); + Map rowsMerged = ImmutableMap.of(6, 1L); + execute(insert, + compactionId, + 10L, + 5L, + "table", + compactAt, + "keyspace", + rowsMerged); + SystemKeyspaceMigrator41.migrateCompactionHistory(); + + int rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s where keyspace_name = 'keyspace' and columnfamily_name = 'table' allow filtering", table))) + { + rowCount++; + assertEquals(compactionId, row.getTimeUUID("id")); + assertEquals(10L, row.getLong("bytes_in")); + assertEquals(5L, row.getLong("bytes_out")); + assertEquals("table", row.getString("columnfamily_name")); + assertEquals(compactAt, row.getTimestamp("compacted_at")); + assertEquals("keyspace", row.getString("keyspace_name")); + assertEquals(rowsMerged, row.getMap("rows_merged", Int32Type.instance, LongType.instance)); + assertEquals(ImmutableMap.of(), row.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance)); + } + assertEquals(1, rowCount); + + //Test nulls/missing don't prevent the row from propagating + execute(String.format("TRUNCATE %s", table)); + + execute(String.format("INSERT INTO %s (id) VALUES (?)", table), compactionId); + SystemKeyspaceMigrator41.migrateCompactionHistory(); + + assertEquals(1, execute(String.format("SELECT * FROM %s", table)).size()); + } } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java b/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java new file mode 100644 index 000000000000..804b75aab268 --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.tools.ToolRunner.ToolResult; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY; +import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class CompactionHistoryTest extends CQLTester +{ + @Parameter + public List cmd; + + @Parameter(1) + public String compactionType; + + @Parameter(2) + public int systemTableRecord; + + @Parameters(name = "{index}: cmd={0} compactionType={1} systemTableRecord={2}") + public static Collection data() + { + List result = new ArrayList<>(); + result.add(new Object[]{ Lists.newArrayList("compact"), OperationType.MAJOR_COMPACTION.type, 1 }); + result.add(new Object[]{ Lists.newArrayList("garbagecollect"), OperationType.GARBAGE_COLLECT.type, 10 }); + result.add(new Object[]{ Lists.newArrayList("upgradesstables", "-a"), OperationType.UPGRADE_SSTABLES.type, 10 }); + return result; + } + + @BeforeClass + public static void setup() throws Exception + { + requireNetwork(); + startJMXServer(); + } + + @Test + public void testCompactionProperties() throws Throwable + { + createTable("CREATE TABLE %s (id text, value text, PRIMARY KEY ((id)))"); + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + cfs.disableAutoCompaction(); + // write SSTables for the specific key + for (int i = 0; i < 10; i++) + { + execute("INSERT INTO %s (id, value) VALUES (?, ?)", "key" + i, "value" + i); + flush(keyspace()); + } + + int expectedSSTablesCount = 10; + assertThat(cfs.getTracker().getView().liveSSTables()).hasSize(expectedSSTablesCount); + + ImmutableList.Builder builder = ImmutableList.builder(); + List cmds = builder.addAll(cmd).add(keyspace()).add(currentTable()).build(); + compactionHistoryResultVerify(keyspace(), currentTable(), ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType), cmds); + + String cql = "select keyspace_name,columnfamily_name,compaction_properties from system." + SystemKeyspace.COMPACTION_HISTORY + + " where keyspace_name = '" + keyspace() + "' AND columnfamily_name = '" + currentTable() + "' ALLOW FILTERING"; + Object[][] objects = new Object[systemTableRecord][]; + for (int i = 0; i != systemTableRecord; ++i) + { + objects[i] = row(keyspace(), currentTable(), ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType)); + } + assertRows(execute(cql), objects); + } + + private void compactionHistoryResultVerify(String keyspace, String table, Map properties, List cmds) + { + ToolResult toolCompact = invokeNodetool(cmds); + toolCompact.assertOnCleanExit(); + + ToolResult toolHistory = invokeNodetool("compactionhistory"); + toolHistory.assertOnCleanExit(); + assertCompactionHistoryOutPut(toolHistory, keyspace, table, properties); + } + + public static void assertCompactionHistoryOutPut(ToolResult toolHistory, String keyspace, String table, Map properties) + { + String stdout = toolHistory.getStdout(); + String[] resultArray = stdout.split(System.lineSeparator()); + assertTrue(Arrays.stream(resultArray) + .anyMatch(result -> result.contains('{' + FBUtilities.toString(properties) + '}') + && result.contains(keyspace) + && result.contains(table))); + } +}