Skip to content

Commit

Permalink
Merge branch 'cassandra-4.0' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
jolynch committed Dec 9, 2021
2 parents 507c6f7 + e73d05b commit 97b47c3
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -89,6 +89,7 @@ Merged from 3.11:
* Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
* Make assassinate more resilient to missing tokens (CASSANDRA-16847)
Merged from 3.0:
* Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898)
* Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049)
* Allow users to view permissions of the roles they created (CASSANDRA-16902)
* Fix failure handling in inter-node communication (CASSANDRA-16334)
Expand Down
5 changes: 5 additions & 0 deletions conf/cassandra.yaml
Expand Up @@ -440,6 +440,11 @@ counter_cache_save_period: 7200
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
# saved_caches_directory: /var/lib/cassandra/saved_caches

# Number of seconds the server will wait for each cache (row, key, etc ...) to load while starting
# the Cassandra process. Setting this to a negative value is equivalent to disabling all cache loading on startup
# while still having the cache during runtime.
# cache_load_timeout_seconds: 30

# commitlog_sync may be either "periodic", "group", or "batch."
#
# When in batch mode, Cassandra won't ack writes until the commit log
Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/cache/AutoSavingCache.java
Expand Up @@ -200,8 +200,9 @@ public int loadSaved()
+ " does not match current schema version "
+ Schema.instance.getVersion());

ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>();
while (in.available() > 0)
ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<>();
long loadByNanos = start + TimeUnit.SECONDS.toNanos(DatabaseDescriptor.getCacheLoadTimeout());
while (nanoTime() < loadByNanos && in.available() > 0)
{
//tableId and indexName are serialized by the serializers in CacheService
//That is delegated there because there are serializer specific conditions
Expand Down Expand Up @@ -263,6 +264,7 @@ public int loadSaved()
finally
{
FileUtils.closeQuietly(in);
cacheLoader.cleanupAfterDeserialize();
}
}
if (logger.isTraceEnabled())
Expand Down Expand Up @@ -441,5 +443,7 @@ public interface CacheSerializer<K extends CacheKey, V>
void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException;

Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore cfs) throws IOException;

default void cleanupAfterDeserialize() { }
}
}
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -332,6 +332,8 @@ public class Config
public volatile int counter_cache_save_period = 7200;
public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;

public int cache_load_timeout_seconds = 30;

public Long paxos_cache_size_in_mb = null;

private static boolean isClientMode = false;
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -2903,6 +2903,17 @@ public static void setCounterCacheSavePeriod(int counterCacheSavePeriod)
conf.counter_cache_save_period = counterCacheSavePeriod;
}

public static int getCacheLoadTimeout()
{
return conf.cache_load_timeout_seconds;
}

@VisibleForTesting
public static void setCacheLoadTimeout(int seconds)
{
conf.cache_load_timeout_seconds = seconds;
}

public static int getCounterCacheKeysToSave()
{
return conf.counter_cache_keys_to_save;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/lifecycle/View.java
Expand Up @@ -136,7 +136,7 @@ public Iterable<SSTableReader> select(SSTableSet sstableSet)
case NONCOMPACTING:
return filter(sstables, (s) -> !compacting.contains(s));
case CANONICAL:
Set<SSTableReader> canonicalSSTables = new HashSet<>();
Set<SSTableReader> canonicalSSTables = new HashSet<>(sstables.size() + compacting.size());
for (SSTableReader sstable : compacting)
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
canonicalSSTables.add(sstable);
Expand Down
41 changes: 32 additions & 9 deletions src/java/org/apache/cassandra/service/CacheService.java
Expand Up @@ -20,9 +20,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import org.slf4j.Logger;
Expand Down Expand Up @@ -412,6 +415,11 @@ public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception

public static class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, RowIndexEntry>
{
// For column families with many SSTables the linear nature of getSSTables slowed down KeyCache loading
// by orders of magnitude. So we cache the sstables once and rely on cleanupAfterDeserialize to cleanup any
// cached state we may have accumulated during the load.
Map<Pair<String, String>, Map<Integer, SSTableReader>> cachedSSTableReaders = new ConcurrentHashMap<>();

public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
{
RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key);
Expand All @@ -431,6 +439,8 @@ public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs

public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
{
boolean skipEntry = cfs == null || !cfs.isKeyCacheEnabled();

//Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
//parameter so they aren't deserialized here, even though they are serialized by this serializer
int keyLength = input.readInt();
Expand All @@ -442,8 +452,25 @@ public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input,
ByteBuffer key = ByteBufferUtil.read(input, keyLength);
int generation = input.readInt();
input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
SSTableReader reader;
if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null)
SSTableReader reader = null;
if (!skipEntry)
{
Pair<String, String> qualifiedName = Pair.create(cfs.metadata.keyspace, cfs.metadata.name);
Map<Integer, SSTableReader> generationToSSTableReader = cachedSSTableReaders.get(qualifiedName);
if (generationToSSTableReader == null)
{
generationToSSTableReader = new HashMap<>(cfs.getLiveSSTables().size());
for (SSTableReader ssTableReader : cfs.getSSTables(SSTableSet.CANONICAL))
{
generationToSSTableReader.put(ssTableReader.descriptor.generation, ssTableReader);
}

cachedSSTableReaders.putIfAbsent(qualifiedName, generationToSSTableReader);
}
reader = generationToSSTableReader.get(generation);
}

if (skipEntry || reader == null)
{
// The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be
// wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that
Expand All @@ -452,21 +479,17 @@ public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input,
RowIndexEntry.Serializer.skipForCache(input);
return null;
}

RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata(),
reader.descriptor.version,
reader.header);
RowIndexEntry<?> entry = indexSerializer.deserializeForCache(input);
return ImmediateFuture.success(Pair.create(new KeyCacheKey(cfs.metadata(), reader.descriptor, key), entry));
}

private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection)
public void cleanupAfterDeserialize()
{
for (SSTableReader sstable : collection)
{
if (sstable.descriptor.generation == generation)
return sstable;
}
return null;
cachedSSTableReaders.clear();
}
}
}
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.test.microbench;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;

import org.apache.cassandra.Util;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@SuppressWarnings("unused")
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@Threads(1)
@State(Scope.Benchmark)
public class CacheLoaderBench extends CQLTester
{
private static final int numSSTables = 1000;
private final Random random = new Random();

@Setup(Level.Trial)
public void setup() throws Throwable
{
CQLTester.prepareServer();
String keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
String table1 = createTable(keyspace, "CREATE TABLE %s (key text PRIMARY KEY, val text)");
String table2 = createTable(keyspace, "CREATE TABLE %s (key text PRIMARY KEY, val text)");


Keyspace.system().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));

ColumnFamilyStore cfs1 = Keyspace.open(keyspace).getColumnFamilyStore(table1);
ColumnFamilyStore cfs2 = Keyspace.open(keyspace).getColumnFamilyStore(table2);
cfs1.disableAutoCompaction();
cfs2.disableAutoCompaction();

// Write a bunch of sstables to both cfs1 and cfs2

List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(2);
columnFamilyStores.add(cfs1);
columnFamilyStores.add(cfs2);

logger.info("Creating {} sstables", numSSTables);
for (ColumnFamilyStore cfs: columnFamilyStores)
{
cfs.truncateBlocking();
for (int i = 0; i < numSSTables ; i++)
{
ColumnMetadata colDef = ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("val"), AsciiType.instance);
RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis() + random.nextInt(), "key");
rowBuilder.add(colDef, "val1");
rowBuilder.build().apply();
cfs.forceBlockingFlush();
}

Assert.assertEquals(numSSTables, cfs.getLiveSSTables().size());

// preheat key cache
for (SSTableReader sstable : cfs.getLiveSSTables())
sstable.getPosition(Util.dk("key"), SSTableReader.Operator.EQ);
}

AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache;

// serialize to file
keyCache.submitWrite(keyCache.size()).get();
}

@Setup(Level.Invocation)
public void setupKeyCache()
{
AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache;
keyCache.clear();
}

@TearDown(Level.Trial)
public void teardown()
{
CQLTester.cleanup();
CQLTester.tearDownClass();
}

@Benchmark
public void keyCacheLoadTest() throws Throwable
{
AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache;

keyCache.loadSavedAsync().get();
}
}

0 comments on commit 97b47c3

Please sign in to comment.