Skip to content

Commit

Permalink
Add compaction_properties column to system.compaction_history table a…
Browse files Browse the repository at this point in the history
…nd nodetool compactionhistory command

patch by Maxwell Guo; reviewed by Stefan Miklosovic and Jacek Lewandowski for CASSANDRA-18061
  • Loading branch information
Maxwell-Guo authored and smiklosovic committed Jan 17, 2023
1 parent 0b47c57 commit 49dfb80
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.2
* Add compaction_properties column to system.compaction_history table and nodetool compactionhistory command (CASSANDRA-18061)
* Remove ProtocolVersion entirely from the CollectionSerializer ecosystem (CASSANDRA-18114)
* Fix serialization error in new getsstables --show-levels option (CASSANDRA-18140)
* Use checked casts when reading vints as ints (CASSANDRA-18099)
Expand Down
1 change: 1 addition & 0 deletions NEWS.txt
Expand Up @@ -110,6 +110,7 @@ New features
- Added new CQL native scalar functions for collections. The new functions are mostly analogous to the existing
aggregation functions, but they operate on the elements of collection columns. The new functions are `map_keys`,
`map_values`, `collection_count`, `collection_min`, `collection_max`, `collection_sum` and `collection_avg`.
- Added compaction_properties column to system.compaction_history table and nodetool compactionhistory command

Upgrading
---------
Expand Down
9 changes: 6 additions & 3 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Expand Up @@ -310,6 +310,7 @@ private SystemKeyspace()
+ "compacted_at timestamp,"
+ "keyspace_name text,"
+ "rows_merged map<int, bigint>,"
+ "compaction_properties frozen<map<text, text>>,"
+ "PRIMARY KEY ((id)))")
.defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
.build();
Expand Down Expand Up @@ -592,20 +593,22 @@ public static void updateCompactionHistory(String ksname,
long compactedAt,
long bytesIn,
long bytesOut,
Map<Integer, Long> rowsMerged)
Map<Integer, Long> rowsMerged,
Map<String, String> compactionProperties)
{
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
return;
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged, compaction_properties) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
executeInternal(format(req, COMPACTION_HISTORY),
nextTimeUUID(),
ksname,
cfname,
ByteBufferUtil.bytes(compactedAt),
bytesIn,
bytesOut,
rowsMerged);
rowsMerged,
compactionProperties);
}

public static TabularData getCompactionHistory() throws OpenDataException
Expand Down
38 changes: 35 additions & 3 deletions src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +34,7 @@
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.SequenceBasedSSTableId;
Expand Down Expand Up @@ -62,6 +64,7 @@ public static void migrate()
migrateTransferredRanges();
migrateAvailableRanges();
migrateSSTableActivity();
migrateCompactionHistory();
}

@VisibleForTesting
Expand Down Expand Up @@ -159,12 +162,38 @@ static void migrateSSTableActivity()
})
);
}

@VisibleForTesting
static void migrateCompactionHistory()
{
migrateTable(false,
SystemKeyspace.COMPACTION_HISTORY,
SystemKeyspace.COMPACTION_HISTORY,
new String[]{ "id",
"bytes_in",
"bytes_out",
"columnfamily_name",
"compacted_at",
"keyspace_name",
"rows_merged",
"compaction_properties" },
row -> Collections.singletonList(new Object[]{ row.getTimeUUID("id") ,
row.has("bytes_in") ? row.getLong("bytes_in") : null,
row.has("bytes_out") ? row.getLong("bytes_out") : null,
row.has("columnfamily_name") ? row.getString("columnfamily_name") : null,
row.has("compacted_at") ? row.getTimestamp("compacted_at") : null,
row.has("keyspace_name") ? row.getString("keyspace_name") : null,
row.has("rows_merged") ? row.getMap("rows_merged", Int32Type.instance, LongType.instance) : null,
row.has("compaction_properties") ? row.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance) : ImmutableMap.of() })
);
}

/**
* Perform table migration by reading data from the old table, converting it, and adding to the new table.
*
* If oldName and newName are same, it means data in the table will be refreshed.
*
* @param truncateIfExists truncate the existing table if it exists before migration; if it is disabled
* and the new table is not empty, no migration is performed
* and the new table is not empty and oldName is not equal to newName, no migration is performed
* @param oldName old table name
* @param newName new table name
* @param columns columns to fill in the new table in the same order as returned by the transformation
Expand All @@ -175,7 +204,7 @@ static void migrateTable(boolean truncateIfExists, String oldName, String newNam
{
ColumnFamilyStore newTable = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(newName);

if (!newTable.isEmpty() && !truncateIfExists)
if (!newTable.isEmpty() && !truncateIfExists && !oldName.equals(newName))
return;

if (truncateIfExists)
Expand All @@ -189,6 +218,9 @@ static void migrateTable(boolean truncateIfExists, String oldName, String newNam
StringUtils.join(columns, ", "), StringUtils.repeat("?", ", ", columns.length));

UntypedResultSet rows = QueryProcessor.executeInternal(query);

assert rows != null : String.format("Migrating rows from legacy %s to %s was not done as returned rows from %s are null!", oldName, newName, oldName);

int transferred = 0;
logger.info("Migrating rows from legacy {} to {}", oldName, newName);
for (UntypedResultSet.Row row : rows)
Expand Down
Expand Up @@ -24,16 +24,17 @@
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.utils.FBUtilities;

public class CompactionHistoryTabularData
{
private static final String[] ITEM_NAMES = new String[]{ "id", "keyspace_name", "columnfamily_name", "compacted_at",
"bytes_in", "bytes_out", "rows_merged" };
"bytes_in", "bytes_out", "rows_merged", "compaction_properties" };

private static final String[] ITEM_DESCS = new String[]{ "time uuid", "keyspace name",
"column family name", "compaction finished at",
"total bytes in", "total bytes out", "total rows merged" };
"total bytes in", "total bytes out", "total rows merged", "compaction properties" };

private static final String TYPE_NAME = "CompactionHistory";

Expand All @@ -44,13 +45,15 @@ public class CompactionHistoryTabularData
private static final CompositeType COMPOSITE_TYPE;

private static final TabularType TABULAR_TYPE;


public static final String COMPACTION_TYPE_PROPERTY = "compaction_type";

static
{
try
{
ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG,
SimpleType.LONG, SimpleType.LONG, SimpleType.STRING };
SimpleType.LONG, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING };

COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);

Expand All @@ -74,10 +77,11 @@ public static TabularData from(UntypedResultSet resultSet) throws OpenDataExcept
long bytesIn = row.getLong(ITEM_NAMES[4]);
long bytesOut = row.getLong(ITEM_NAMES[5]);
Map<Integer, Long> rowMerged = row.getMap(ITEM_NAMES[6], Int32Type.instance, LongType.instance);

Map<String, String> compactionProperties = row.getMap(ITEM_NAMES[7], UTF8Type.instance, UTF8Type.instance);
result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
new Object[]{ id.toString(), ksName, cfName, compactedAt, bytesIn, bytesOut,
"{" + FBUtilities.toString(rowMerged) + "}" }));
'{' + FBUtilities.toString(rowMerged) + '}',
'{' + FBUtilities.toString(compactionProperties) + '}' }));
}
return result;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -49,6 +50,7 @@
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Refs;

import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.FBUtilities.now;
Expand Down Expand Up @@ -244,7 +246,8 @@ public boolean apply(SSTableReader sstable)
for (int i = 0; i < mergedRowCounts.length; i++)
totalSourceRows += mergedRowCounts[i] * (i + 1);

String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getTableName(), mergedRowCounts, startsize, endsize);
String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getTableName(), mergedRowCounts, startsize, endsize,
ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType.type));

logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}. Time spent writing keys = %,dms",
taskId,
Expand Down Expand Up @@ -283,7 +286,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel());
}

public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map<String, String> compactionProperties)
{
StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length * 10);
Map<Integer, Long> mergedRows = new HashMap<>();
Expand All @@ -297,7 +300,7 @@ public static String updateCompactionHistory(String keyspaceName, String columnF
mergeSummary.append(String.format("%d:%d, ", rows, count));
mergedRows.put(rows, count);
}
SystemKeyspace.updateCompactionHistory(keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows);
SystemKeyspace.updateCompactionHistory(keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows, compactionProperties);
return mergeSummary.toString();
}

Expand Down
Expand Up @@ -54,8 +54,9 @@ private static class CompactionHistoryRow implements Comparable<CompactionHistor
private final long bytesIn;
private final long bytesOut;
private final String rowMerged;
private final String compactionProperties;

CompactionHistoryRow(String id, String ksName, String cfName, long compactedAt, long bytesIn, long bytesOut, String rowMerged)
CompactionHistoryRow(String id, String ksName, String cfName, long compactedAt, long bytesIn, long bytesOut, String rowMerged, String compactionProperties)
{
this.id = id;
this.ksName = ksName;
Expand All @@ -64,6 +65,7 @@ private static class CompactionHistoryRow implements Comparable<CompactionHistor
this.bytesIn = bytesIn;
this.bytesOut = bytesOut;
this.rowMerged = rowMerged;
this.compactionProperties = compactionProperties;
}

public int compareTo(CompactionHistoryHolder.CompactionHistoryRow chr)
Expand All @@ -83,6 +85,7 @@ private HashMap<String, Object> getAllAsMap()
compaction.put("bytes_in", this.bytesIn);
compaction.put("bytes_out", this.bytesOut);
compaction.put("rows_merged", this.rowMerged);
compaction.put("compaction_properties", this.compactionProperties);
return compaction;
}
}
Expand Down Expand Up @@ -110,7 +113,8 @@ public Map<String, Object> convert2Map()
(Long)value.get(3),
(Long)value.get(4),
(Long)value.get(5),
(String)value.get(6)
(String)value.get(6),
(String)value.get(7)
);
chrList.add(chr);
}
Expand Down
Expand Up @@ -65,14 +65,15 @@ public void print(CompactionHistoryHolder data, PrintStream out)
for (Object chr : compactionHistories)
{
Map value = chr instanceof Map<?, ?> ? (Map)chr : Collections.emptyMap();
String[] obj = new String[7];
String[] obj = new String[8];
obj[0] = (String)value.get("id");
obj[1] = (String)value.get("keyspace_name");
obj[2] = (String)value.get("columnfamily_name");
obj[3] = (String)value.get("compacted_at");
obj[4] = value.get("bytes_in").toString();
obj[5] = value.get("bytes_out").toString();
obj[6] = (String)value.get("rows_merged");
obj[7] = (String)value.get("compaction_properties");
table.add(obj);
}
table.printTo(out);
Expand Down
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.upgrade;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import com.vdurmont.semver4j.Semver;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.tools.ToolRunner;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.util.ArrayList;

import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY;
import static org.apache.cassandra.tools.ToolRunner.invokeNodetoolJvmDtest;
import static org.apache.cassandra.tools.nodetool.CompactionHistoryTest.assertCompactionHistoryOutPut;

@RunWith(Parameterized.class)
public class CompactionHistorySystemTableUpgradeTest extends UpgradeTestBase
{
@Parameter
public Semver version;

@Parameters()
public static ArrayList<Semver> versions()
{
return Lists.newArrayList(v30, v3X, v40, v41);
}

@Test
public void compactionHistorySystemTableTest() throws Throwable
{
new TestCase()
.nodes(1)
.nodesToUpgrade(1)
.upgradesToCurrentFrom(version)
.setup((cluster) -> {
//create table
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tb (" +
"pk text PRIMARY KEY," +
"c1 text," +
"c2 int," +
"c3 int)");
// disable auto compaction
cluster.stream().forEach(node -> node.nodetool("disableautocompaction"));
// generate sstables
for (int i = 0; i != 10; ++i)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tb (pk, c1, c2, c3) VALUES ('pk" + i + "', 'c1" + i + "', " + i + ',' + i + ')', ConsistencyLevel.ALL);
cluster.stream().forEach(node -> node.flush(KEYSPACE));
}
// force compact
cluster.stream().forEach(node -> node.forceCompact(KEYSPACE, "tb"));
}).runAfterClusterUpgrade((cluster) -> {
// disable auto compaction at start up
cluster.stream().forEach(node -> node.nodetool("disableautocompaction"));
ToolRunner.ToolResult toolHistory = invokeNodetoolJvmDtest(cluster.get(1), "compactionhistory");
toolHistory.assertOnCleanExit();
// upgraded system.compaction_history data verify
assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of());

// force compact
cluster.stream().forEach(node -> node.nodetool("compact"));
toolHistory = invokeNodetoolJvmDtest(cluster.get(1), "compactionhistory");
toolHistory.assertOnCleanExit();
assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of(COMPACTION_TYPE_PROPERTY, OperationType.MAJOR_COMPACTION.type));
})
.run();
}
}

0 comments on commit 49dfb80

Please sign in to comment.