From 0a1e900a0a042f78d7d5d6625bc98b84eb463e69 Mon Sep 17 00:00:00 2001 From: Ekaterina Dimitrova Date: Fri, 6 Nov 2020 18:46:14 -0500 Subject: [PATCH] Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261) Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261 --- CHANGES.txt | 1 + .../concurrent/InfiniteLoopExecutor.java | 8 + .../cassandra/config/DatabaseDescriptor.java | 39 +- .../cassandra/db/ColumnFamilyStore.java | 373 +++++++++--------- .../org/apache/cassandra/db/Memtable.java | 4 +- .../org/apache/cassandra/db/ReadCommand.java | 3 +- .../apache/cassandra/utils/FBUtilities.java | 12 + .../cassandra/utils/memory/HeapPool.java | 2 +- .../utils/memory/MemtableAllocator.java | 124 ++++-- .../utils/memory/MemtableCleaner.java | 40 ++ .../utils/memory/MemtableCleanerThread.java | 67 +++- .../cassandra/utils/memory/MemtablePool.java | 35 +- .../cassandra/utils/memory/NativePool.java | 2 +- .../cassandra/utils/memory/SlabPool.java | 2 +- .../org/apache/cassandra/cql3/CQLTester.java | 14 +- .../memory/MemtableCleanerThreadTest.java | 187 +++++++++ .../utils/memory/NativeAllocatorTest.java | 204 ++++++---- 17 files changed, 772 insertions(+), 345 deletions(-) create mode 100644 src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java create mode 100644 test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java diff --git a/CHANGES.txt b/CHANGES.txt index ee4cf6e44c8c..4f3ab1f17be6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.24: + * Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261) * Improve empty hint file handling during startup (CASSANDRA-16162) * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372) * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226) diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java index b54fa3fca51f..8e72d91083b1 100644 --- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; + public class InfiniteLoopExecutor { private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class); @@ -81,4 +83,10 @@ public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedExce thread.join(unit.toMillis(time)); return !thread.isAlive(); } + + @VisibleForTesting + public boolean isAlive() + { + return this.thread.isAlive(); + } } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3f9aa963c004..52f01b6be749 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -151,7 +151,7 @@ public static Config loadConfig() throws ConfigurationException String loaderClass = System.getProperty("cassandra.config.loader"); ConfigurationLoader loader = loaderClass == null ? new YamlConfigurationLoader() - : FBUtilities.construct(loaderClass, "configuration loading"); + : FBUtilities.construct(loaderClass, "configuration loading"); Config config = loader.loadConfig(); if (!hasLoggedConfig) @@ -214,7 +214,7 @@ else if (config.listen_address != null) } catch (UnknownHostException e) { - throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); + throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false); } if (listenAddress.isAnyLocalAddress()) @@ -234,7 +234,7 @@ else if (config.listen_interface != null) } catch (UnknownHostException e) { - throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); + throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false); } if (broadcastAddress.isAnyLocalAddress()) @@ -275,7 +275,7 @@ else if (config.rpc_interface != null) } catch (UnknownHostException e) { - throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); + throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false); } if (broadcastRpcAddress.isAnyLocalAddress()) @@ -520,18 +520,14 @@ else if (conf.native_transport_max_frame_size_in_mb >= 2048) EndpointSnitchInfo.create(); localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = new Comparator() - { - public int compare(InetAddress endpoint1, InetAddress endpoint2) - { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - } + localComparator = (endpoint1, endpoint2) -> { + boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); + boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; }; /* Request Scheduler setup */ @@ -592,7 +588,7 @@ public int compare(InetAddress endpoint1, InetAddress endpoint2) if (conf.commitlog_total_space_in_mb == null) { int preferredSize = 8192; - int minSize = 0; + int minSize; try { // use 1/4 of available space. See discussion on #10013 and #10199 @@ -1061,7 +1057,7 @@ public static String getAllocateTokensForKeyspace() public static Collection tokensFromString(String tokenString) { - List tokens = new ArrayList(); + List tokens = new ArrayList<>(); if (tokenString != null) for (String token : tokenString.split(",")) tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", "")); @@ -1747,7 +1743,7 @@ public static File getHintsDirectory() public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension) { String name = cacheType.toString() - + (version == null ? "" : "-" + version + "." + extension); + + (version == null ? "" : '-' + version + '.' + extension); return new File(conf.saved_caches_directory, name); } @@ -2026,12 +2022,13 @@ public static MemtablePool getMemtableAllocatorPool() { long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20; long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20; + final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable; switch (conf.memtable_allocation_type) { case unslabbed_heap_buffers: - return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); + return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, cleaner); case heap_buffers: - return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); + return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, cleaner); case offheap_buffers: throw new ConfigurationException("offheap_buffers are not available in 3.0. They will be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details"); diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index a9c087eaf923..61d60b1ecff5 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.*; +import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -126,41 +127,41 @@ public static Directories.DataDirectory[] getInitialDirectories() private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); - private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("MemtableFlushWriter"), - "internal"); + private static final ThreadPoolExecutor flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableFlushWriter"), + "internal"); // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed - private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("MemtablePostFlush"), - "internal"); - - private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("MemtableReclaimMemory"), - "internal"); + private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtablePostFlush"), + "internal"); + + private static final ThreadPoolExecutor reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableReclaimMemory"), + "internal"); private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; private static final String[] COUNTER_DESCS = new String[] - { "partition key in raw hex bytes", - "value of this partition for given sampler", - "value is within the error bounds plus or minus of this", - "the partition key turned into a human readable format" }; + { "partition key in raw hex bytes", + "value of this partition for given sampler", + "value is within the error bounds plus or minus of this", + "the partition key turned into a human readable format" }; private static final CompositeType COUNTER_COMPOSITE_TYPE; private static final TabularType COUNTER_TYPE; private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; private static final String[] SAMPLER_DESCS = new String[] - { "cardinality of partitions", - "list of counter results" }; + { "cardinality of partitions", + "list of counter results" }; private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; private static final CompositeType SAMPLING_RESULT; @@ -234,7 +235,6 @@ public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) throws ExecutorUtils.shutdownAndWait(timeout, unit, reclaimExecutor, postFlushExecutor, flushExecutor); } - public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. @@ -271,7 +271,7 @@ void scheduleFlush() logger.trace("scheduling flush in {} ms", period); WrappedRunnable runnable = new WrappedRunnable() { - protected void runMayThrow() throws Exception + protected void runMayThrow() { synchronized (data) { @@ -299,14 +299,10 @@ protected void runMayThrow() throws Exception public static Runnable getBackgroundCompactionTaskSubmitter() { - return new Runnable() - { - public void run() - { - for (Keyspace keyspace : Keyspace.all()) - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - CompactionManager.instance.submitBackground(cfs); - } + return () -> { + for (Keyspace keyspace : Keyspace.all()) + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + CompactionManager.instance.submitBackground(cfs); }; } @@ -360,11 +356,11 @@ public void setCompressionParameters(Map opts) } private ColumnFamilyStore(Keyspace keyspace, - String columnFamilyName, - int generation, - CFMetaData metadata, - Directories directories, - boolean loadSSTables) + String columnFamilyName, + int generation, + CFMetaData metadata, + Directories directories, + boolean loadSSTables) { this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true); } @@ -372,15 +368,15 @@ private ColumnFamilyStore(Keyspace keyspace, @VisibleForTesting public ColumnFamilyStore(Keyspace keyspace, - String columnFamilyName, - int generation, - CFMetaData metadata, - Directories directories, - boolean loadSSTables, - boolean registerBookkeeping) + String columnFamilyName, + int generation, + CFMetaData metadata, + Directories directories, + boolean loadSSTables, + boolean registerBookkeeping) { assert directories != null; - assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; + assert metadata != null : "null metadata for " + keyspace + ':' + columnFamilyName; this.keyspace = keyspace; this.metadata = metadata; @@ -435,8 +431,8 @@ public ColumnFamilyStore(Keyspace keyspace, { // register the mbean mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s", - isIndex() ? "IndexTables" : "Tables", - keyspace.getName(), name); + isIndex() ? "IndexTables" : "Tables", + keyspace.getName(), name); oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s", isIndex() ? "IndexColumnFamilies" : "ColumnFamilies", keyspace.getName(), name); @@ -453,24 +449,20 @@ public ColumnFamilyStore(Keyspace keyspace, throw new RuntimeException(e); } logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry); - latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() - { - public void run() + latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> { + SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry; + switch (retryPolicy.kind()) { - SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry; - switch (retryPolicy.kind()) - { - case PERCENTILE: - // get percentile in nanos - sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold())); - break; - case CUSTOM: - sampleLatencyNanos = (long) retryPolicy.threshold(); - break; - default: - sampleLatencyNanos = Long.MAX_VALUE; - break; - } + case PERCENTILE: + // get percentile in nanos + sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold())); + break; + case CUSTOM: + sampleLatencyNanos = (long) retryPolicy.threshold(); + break; + default: + sampleLatencyNanos = Long.MAX_VALUE; + break; } }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS); } @@ -575,14 +567,14 @@ public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace ke // get the max generation number, to prevent generation conflicts Directories directories = new Directories(metadata, initialDirectories); Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true); - List generations = new ArrayList(); + List generations = new ArrayList<>(); for (Map.Entry> entry : lister.list().entrySet()) { Descriptor desc = entry.getKey(); generations.add(desc.generation); if (!desc.isCompatible()) throw new RuntimeException(String.format("Incompatible SSTable found. Current version %s is unable to read file: %s. Please run upgradesstables.", - desc.getFormat().getLatestVersion(), desc)); + desc.getFormat().getLatestVersion(), desc)); } Collections.sort(generations); int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0; @@ -599,7 +591,7 @@ public static void scrubDataDirectories(CFMetaData metadata) Directories directories = new Directories(metadata, initialDirectories); Set cleanedDirectories = new HashSet<>(); - // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) + // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) clearEphemeralSnapshots(directories); directories.removeTemporaryDirectories(); @@ -637,13 +629,13 @@ public static void scrubDataDirectories(CFMetaData metadata) } // cleanup incomplete saved caches - Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + "-" + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$"); + Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + '-' + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$"); File dir = new File(DatabaseDescriptor.getSavedCachesLocation()); if (dir.exists()) { assert dir.isDirectory(); - for (File file : dir.listFiles()) + for (File file : Objects.requireNonNull(dir.listFiles())) if (tmpCacheFilePattern.matcher(file.getName()).matches()) if (!file.delete()) logger.warn("could not delete {}", file.getAbsolutePath()); @@ -666,7 +658,7 @@ public static void scrubDataDirectories(CFMetaData metadata) */ public static void loadNewSSTables(String ksName, String cfName) { - /** ks/cf existence checks will be done by open and getCFS methods for us */ + /* ks/cf existence checks will be done by open and getCFS methods for us */ Keyspace keyspace = Keyspace.open(ksName); keyspace.getColumnFamilyStore(cfName).loadNewSSTables(); } @@ -693,8 +685,8 @@ public synchronized void loadNewSSTables() if (!descriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", - descriptor.getFormat().getLatestVersion(), - descriptor)); + descriptor.getFormat().getLatestVersion(), + descriptor)); // force foreign sstables to level 0 try @@ -779,7 +771,7 @@ public static void rebuildSecondaryIndex(String ksName, String cfName, String... { ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName); - Set indexes = new HashSet(Arrays.asList(idxNames)); + Set indexes = new HashSet<>(Arrays.asList(idxNames)); Iterable sstables = cfs.getSSTables(SSTableSet.CANONICAL); try (Refs refs = Refs.ref(sstables)) @@ -834,6 +826,7 @@ public ListenableFuture switchMemtableIfCurrent(Memtable memtabl if (data.getView().getCurrentMemtable() == memtable) return switchMemtable(); } + logger.debug("Memtable is no longer current, returning future that completes when current flushing operation completes"); return waitForFlushes(); } @@ -879,7 +872,7 @@ private void logFlush() } logger.debug("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap", - onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100)); + onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100)); } @@ -927,13 +920,9 @@ private ListenableFuture waitForFlushes() // we grab the current memtable; once any preceding memtables have flushed, we know its // commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable) final Memtable current = data.getView().getCurrentMemtable(); - ListenableFutureTask task = ListenableFutureTask.create(new Callable() - { - public ReplayPosition call() - { - logger.debug("forceFlush requested but everything is clean in {}", name); - return current.getCommitLogLowerBound(); - } + ListenableFutureTask task = ListenableFutureTask.create(() -> { + logger.debug("forceFlush requested but everything is clean in {}", name); + return current.getCommitLogLowerBound(); }); postFlushExecutor.execute(task); return task; @@ -1007,11 +996,13 @@ private final class Flush implements Runnable private Flush(boolean truncate) { + if (logger.isTraceEnabled()) + logger.trace("Creating flush task {}@{}", hashCode(), name); // if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard this.truncate = truncate; metric.pendingFlushes.inc(); - /** + /* * To ensure correctness of switch without blocking writes, run() needs to wait for all write operations * started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering * that all write operations register themselves with, and assigning this barrier to the memtables, @@ -1020,7 +1011,7 @@ private Flush(boolean truncate) * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so * that we know the CL position we are dirty to, which can be marked clean when we complete. */ - writeBarrier = keyspace.writeOrder.newBarrier(); + writeBarrier = Keyspace.writeOrder.newBarrier(); // submit flushes for the memtable for any indexed sub-cfses, and our own AtomicReference commitLogUpperBound = new AtomicReference<>(); @@ -1044,15 +1035,26 @@ private Flush(boolean truncate) // replay positions have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); postFlush = new PostFlush(memtables); + + if (logger.isTraceEnabled()) + logger.trace("Created flush task {}@{}", hashCode(), name); } public void run() { + if (logger.isTraceEnabled()) + logger.trace("Flush task {}@{} starts executing, waiting on barrier", hashCode(), name); + + long start = System.nanoTime(); + // mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit // if they are stuck waiting on it, then wait for them all to complete writeBarrier.markBlocking(); writeBarrier.await(); + if (logger.isTraceEnabled()) + logger.trace("Flush task for task {}@{} waited {} ms at the barrier", hashCode(), name, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + // mark all memtables as flushing, removing them from the live memtable list for (Memtable memtable : memtables) memtable.cfs.data.markFlushing(memtable); @@ -1088,8 +1090,14 @@ public void run() } finally { + if (logger.isTraceEnabled()) + logger.trace("Flush task {}@{} signaling post flush task", hashCode(), name); + // signal the post-flush we've done our work postFlush.latch.countDown(); + + if (logger.isTraceEnabled()) + logger.trace("Flush task task {}@{} finished", hashCode(), name); } } @@ -1100,7 +1108,7 @@ private void reclaim(final Memtable memtable) readBarrier.issue(); reclaimExecutor.execute(new WrappedRunnable() { - public void runMayThrow() throws InterruptedException, ExecutionException + public void runMayThrow() { readBarrier.await(); memtable.setDiscarded(); @@ -1130,58 +1138,77 @@ private static void setCommitLogUpperBound(AtomicReference commi * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately * queues it for flushing. If the memtable selected is flushed before this completes, no work is done. */ - public static class FlushLargestColumnFamily implements Runnable + public static CompletableFuture flushLargestMemtable() { - public void run() + float largestRatio = 0f; + Memtable largest = null; + float liveOnHeap = 0, liveOffHeap = 0; + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - float largestRatio = 0f; - Memtable largest = null; - float liveOnHeap = 0, liveOffHeap = 0; - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios - // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only - // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them - Memtable current = cfs.getTracker().getView().getCurrentMemtable(); - - // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, - // both on- and off-heap, and select the largest of the two ratios to weight this CF - float onHeap = 0f, offHeap = 0f; - onHeap += current.getAllocator().onHeap().ownershipRatio(); - offHeap += current.getAllocator().offHeap().ownershipRatio(); - - for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) - { - MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); - onHeap += allocator.onHeap().ownershipRatio(); - offHeap += allocator.offHeap().ownershipRatio(); - } + // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios + // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only + // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them + Memtable current = cfs.getTracker().getView().getCurrentMemtable(); - float ratio = Math.max(onHeap, offHeap); - if (ratio > largestRatio) - { - largest = current; - largestRatio = ratio; - } + // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, + // both on- and off-heap, and select the largest of the two ratios to weight this CF + float onHeap = 0f, offHeap = 0f; + onHeap += current.getAllocator().onHeap().ownershipRatio(); + offHeap += current.getAllocator().offHeap().ownershipRatio(); - liveOnHeap += onHeap; - liveOffHeap += offHeap; + for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) + { + MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); + onHeap += allocator.onHeap().ownershipRatio(); + offHeap += allocator.offHeap().ownershipRatio(); } - if (largest != null) + float ratio = Math.max(onHeap, offHeap); + if (ratio > largestRatio) { - float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); - float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); - float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); - float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); - float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); - float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); - logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", - largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), - ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); - largest.cfs.switchMemtableIfCurrent(largest); + largest = current; + largestRatio = ratio; } + + liveOnHeap += onHeap; + liveOffHeap += offHeap; + } + + CompletableFuture returnFuture = new CompletableFuture<>(); + + if (largest != null) + { + float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); + float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); + float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); + float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); + float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); + float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); + logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", + largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), + ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); + + ListenableFuture flushFuture = largest.cfs.switchMemtableIfCurrent(largest); + flushFuture.addListener(() -> { + try + { + flushFuture.get(); + returnFuture.complete(true); + } + catch (Throwable t) + { + returnFuture.completeExceptionally(t); + } + }, MoreExecutors.directExecutor()); + } + else + { + logger.debug("Flushing of largest memtable, not done, no memtable found"); + + returnFuture.complete(false); } + + return returnFuture; } private static String ratio(float onHeap, float offHeap) @@ -1228,8 +1255,8 @@ public void apply(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Gro catch (RuntimeException e) { throw new RuntimeException(e.getMessage() - + " for ks: " - + keyspace.getName() + ", table: " + name, e); + + " for ks: " + + keyspace.getName() + ", table: " + name, e); } } @@ -1510,7 +1537,7 @@ public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits l nowInSec, filter.selectsAllPartition(), metadata.enforceStrictLiveness())) - || filter.isFullyCoveredBy(cached); + || filter.isFullyCoveredBy(cached); } public int gcBefore(int nowInSec) @@ -1548,7 +1575,7 @@ else if (System.nanoTime() - failingSince > TimeUnit.MILLISECONDS.toNanos(100)) public ViewFragment select(Function> filter) { View view = data.getView(); - List sstables = Lists.newArrayList(filter.apply(view)); + List sstables = Lists.newArrayList(Objects.requireNonNull(filter.apply(view))); return new ViewFragment(sstables, view.getAllMemtables()); } @@ -1578,19 +1605,19 @@ public void beginLocalSampling(String sampler, int capacity) public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException { SamplerResult samplerResults = metric.samplers.get(Sampler.valueOf(sampler)) - .finishSampling(count); + .finishSampling(count); TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); for (Counter counter : samplerResults.topK) { byte[] key = counter.getItem().array(); result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] { - Hex.bytesToHex(key), // raw - counter.getCount(), // count - counter.getError(), // error - metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string + Hex.bytesToHex(key), // raw + counter.getCount(), // count + counter.getError(), // error + metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string } return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{ - samplerResults.cardinality, result}); + samplerResults.cardinality, result}); } public boolean isCompactionDiskSpaceCheckEnabled() @@ -1929,20 +1956,20 @@ public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, Col CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount); } - public void forceMajorCompaction() throws InterruptedException, ExecutionException + public void forceMajorCompaction() { forceMajorCompaction(false); } - public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException + public void forceMajorCompaction(boolean splitOutput) { CompactionManager.instance.performMaximal(this, splitOutput); } public static Iterable all() { - List> stores = new ArrayList>(Schema.instance.getKeyspaces().size()); + List> stores = new ArrayList<>(Schema.instance.getKeyspaces().size()); for (Keyspace keyspace : Keyspace.all()) { stores.add(keyspace.getColumnFamilyStores()); @@ -1985,13 +2012,9 @@ public void clearUnsafe() { for (final ColumnFamilyStore cfs : concatWithIndexes()) { - cfs.runWithCompactionsDisabled(new Callable() - { - public Void call() - { - cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs)); - return null; - } + cfs.runWithCompactionsDisabled((Callable) () -> { + cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs)); + return null; }, true, false); } } @@ -2044,25 +2067,21 @@ public void truncateBlocking() now = Math.max(now, sstable.maxDataAge); truncatedAt = now; - Runnable truncateRunnable = new Runnable() - { - public void run() - { - logger.debug("Discarding sstable data for truncated CF + indexes"); - data.notifyTruncated(truncatedAt); + Runnable truncateRunnable = () -> { + logger.debug("Discarding sstable data for truncated CF + indexes"); + data.notifyTruncated(truncatedAt); - if (DatabaseDescriptor.isAutoSnapshot()) - snapshot(Keyspace.getTimestampedSnapshotName(name)); + if (DatabaseDescriptor.isAutoSnapshot()) + snapshot(Keyspace.getTimestampedSnapshotName(name)); - discardSSTables(truncatedAt); + discardSSTables(truncatedAt); - indexManager.truncateAllIndexesBlocking(truncatedAt); - viewManager.truncateBlocking(replayAfter, truncatedAt); + indexManager.truncateAllIndexesBlocking(truncatedAt); + viewManager.truncateBlocking(replayAfter, truncatedAt); - SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); - logger.trace("cleaning out row cache"); - invalidateCaches(); - } + SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); + logger.trace("cleaning out row cache"); + invalidateCaches(); }; runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); @@ -2163,17 +2182,13 @@ private static Throwable resumeAll(Throwable accumulate, Iterable callable = new Callable() - { - public LifecycleTransaction call() throws Exception - { - assert data.getCompacting().isEmpty() : data.getCompacting(); - Iterable sstables = getLiveSSTables(); - sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); - LifecycleTransaction modifier = data.tryModify(sstables, operationType); - assert modifier != null: "something marked things compacting while compactions are disabled"; - return modifier; - } + Callable callable = () -> { + assert data.getCompacting().isEmpty() : data.getCompacting(); + Iterable sstables = getLiveSSTables(); + sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); + LifecycleTransaction modifier = data.tryModify(sstables, operationType); + assert modifier != null: "something marked things compacting while compactions are disabled"; + return modifier; }; return runWithCompactionsDisabled(callable, false, false); @@ -2296,7 +2311,7 @@ private void validateCompactionThresholds(int minThreshold, int maxThreshold) if (maxThreshold == 0 || minThreshold == 0) throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " + - "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'."); + "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'."); } // End JMX get/set. @@ -2360,7 +2375,7 @@ public Iterable concatWithIndexes() public List getBuiltIndexes() { - return indexManager.getBuiltIndexNames(); + return indexManager.getBuiltIndexNames(); } public int getUnleveledSSTables() @@ -2517,6 +2532,6 @@ public static ColumnFamilyStore getIfExists(String ksName, String cfName) public static TableMetrics metricsFor(UUID tableId) { - return getIfExists(tableId).metric; + return Objects.requireNonNull(getIfExists(tableId)).metric; } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 041ac2e4357a..139663e3f02c 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -339,7 +339,7 @@ public long getMinTimestamp() @VisibleForTesting public void makeUnflushable() { - liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024); + liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024); } private long estimatedSize() @@ -433,7 +433,7 @@ public SSTableTxnWriter createFlushWriter(String filename, return new SSTableTxnWriter(txn, cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), - (long) partitions.size(), + partitions.size(), ActiveRepairService.UNREPAIRED_SSTABLE, sstableMetadataCollector, new SerializationHeader(true, cfs.metadata, columns, stats), diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index e135ebb22a65..925708efc5ab 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -63,7 +63,6 @@ public abstract class ReadCommand implements ReadQuery { protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); - public static final IVersionedSerializer serializer = new Serializer(); // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version. @@ -546,7 +545,7 @@ public void onClose() Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); } - }; + } return Transformation.apply(iter, new MetricRecording()); } diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index d3633fd38cb2..b4418bee310b 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -938,6 +938,18 @@ public static byte[] toWriteUTFBytes(String s) } } + public static void sleepQuietly(long millis) + { + try + { + Thread.sleep(millis); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + @VisibleForTesting protected static void reset() { diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java index 57242c4b5624..1b698c926708 100644 --- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java +++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java @@ -24,7 +24,7 @@ public class HeapPool extends MemtablePool { - public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner) + public HeapPool(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner) { super(maxOnHeapMemory, 0, cleanupThreshold, cleaner); } diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java index 8383ddcd6b84..e5a97c65a6e3 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java @@ -18,21 +18,27 @@ */ package org.apache.cassandra.utils.memory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import org.apache.cassandra.config.CFMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; public abstract class MemtableAllocator { + private static final Logger logger = LoggerFactory.getLogger(MemtableAllocator.class); + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5, TimeUnit.SECONDS); + private final SubAllocator onHeap; private final SubAllocator offHeap; - volatile LifeCycle state = LifeCycle.LIVE; - static enum LifeCycle + enum LifeCycle { LIVE, DISCARDING, DISCARDED; LifeCycle transition(LifeCycle targetState) @@ -78,10 +84,8 @@ public SubAllocator offHeap() */ public void setDiscarding() { - state = state.transition(LifeCycle.DISCARDING); - // mark the memory owned by this allocator as reclaiming - onHeap.markAllReclaiming(); - offHeap.markAllReclaiming(); + onHeap.setDiscarding(); + offHeap.setDiscarding(); } /** @@ -90,15 +94,13 @@ public void setDiscarding() */ public void setDiscarded() { - state = state.transition(LifeCycle.DISCARDED); - // release any memory owned by this allocator; automatically signals waiters - onHeap.releaseAll(); - offHeap.releaseAll(); + onHeap.setDiscarded(); + offHeap.setDiscarded(); } public boolean isLive() { - return state == LifeCycle.LIVE; + return onHeap.state == LifeCycle.LIVE || offHeap.state == LifeCycle.LIVE; } /** Mark the BB as unused, permitting it to be reclaimed */ @@ -107,6 +109,9 @@ public static final class SubAllocator // the tracker we are owning memory from private final MemtablePool.SubPool parent; + // the state of the memtable + private volatile LifeCycle state; + // the amount of memory/resource owned by this object private volatile long owns; // the amount of memory we are reporting to collect; this may be inaccurate, but is close @@ -116,17 +121,44 @@ public static final class SubAllocator SubAllocator(MemtablePool.SubPool parent) { this.parent = parent; + this.state = LifeCycle.LIVE; + } + + /** + * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily + * overshoot the maximum memory limit so that flushing can begin immediately + */ + void setDiscarding() + { + state = state.transition(LifeCycle.DISCARDING); + // mark the memory owned by this allocator as reclaiming + updateReclaiming(); } - // should only be called once we know we will never allocate to the object again. - // currently no corroboration/enforcement of this is performed. + /** + * Indicate the memory and resources owned by this allocator are no longer referenced, + * and can be reclaimed/reused. + */ + void setDiscarded() + { + state = state.transition(LifeCycle.DISCARDED); + // release any memory owned by this allocator; automatically signals waiters + releaseAll(); + } + + /** + * Should only be called once we know we will never allocate to the object again. + * currently no corroboration/enforcement of this is performed. + */ void releaseAll() { parent.released(ownsUpdater.getAndSet(this, 0)); parent.reclaimed(reclaimingUpdater.getAndSet(this, 0)); } - // like allocate, but permits allocations to be negative + /** + * Like allocate, but permits allocations to be negative. + */ public void adjust(long size, OpOrder.Group opGroup) { if (size <= 0) @@ -168,28 +200,71 @@ public void allocate(long size, OpOrder.Group opGroup) } } - // retroactively mark an amount allocated and acquired in the tracker, and owned by us + /** + * Retroactively mark an amount allocated and acquired in the tracker, and owned by us. If the state is discarding, + * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes, + * and it will flush this memory too. + */ private void allocated(long size) { parent.allocated(size); ownsUpdater.addAndGet(this, size); + + if (state == LifeCycle.DISCARDING) + { + noSpamLogger.info("Allocated {} bytes whilst discarding", size); + updateReclaiming(); + } } - // retroactively mark an amount acquired in the tracker, and owned by us + /** + * Retroactively mark an amount acquired in the tracker, and owned by us. If the state is discarding, + * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes, + * and it will flush this memory too. + */ private void acquired(long size) { - parent.acquired(size); + parent.acquired(); ownsUpdater.addAndGet(this, size); + + if (state == LifeCycle.DISCARDING) + { + noSpamLogger.info("Acquired {} bytes whilst discarding", size); + updateReclaiming(); + } } + /** + * If the state is still live, then we update the memory we own here and in the parent. + * + * However, if the state is not live, we do not update it because we would have to update + * reclaiming too, and it could cause problems to the memtable cleaner algorithm if reclaiming + * decreased. If the memtable is flushing, soon enough {@link this#releaseAll()} will be called. + * + * @param size the size that was released + */ void released(long size) { - parent.released(size); - ownsUpdater.addAndGet(this, -size); + if (state == LifeCycle.LIVE) + { + parent.released(size); + ownsUpdater.addAndGet(this, -size); + } + else + { + noSpamLogger.info("Tried to release {} bytes whilst discarding", size); + } } - // mark everything we currently own as reclaiming, both here and in our parent - void markAllReclaiming() + /** + * Mark what we currently own as reclaiming, both here and in our parent. + * This method is called for the first time when the memtable is scheduled for flushing, + * in which case reclaiming will be zero and we mark everything that we own as reclaiming. + * Afterwards, if there are in flight writes that have not completed yet, we also mark any + * more memory that is allocated by these writes as reclaiming, since the memtable is waiting + * on the barrier for these writes to complete, before it can actually start flushing data. + */ + void updateReclaiming() { while (true) { @@ -208,6 +283,11 @@ public long owns() return owns; } + public long getReclaiming() + { + return reclaiming; + } + public float ownershipRatio() { float r = owns / (float) parent.limit; diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java new file mode 100644 index 000000000000..d2cb9c552a42 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.util.concurrent.CompletableFuture; + +/** + * The cleaner is used by {@link MemtableCleanerThread} in order to reclaim space from memtables, normally + * by flushing the largest memtable. + */ +public interface MemtableCleaner +{ + /** + * This is a function that schedules a cleaning task, normally flushing of the largest sstable. + * The future will complete once the operation has completed and it will have a value set to true if + * the cleaner was able to execute the cleaning operation or if another thread concurrently executed + * the same clean operation. If no operation was even attempted, for example because no memtable was + * found, then the value will be false. + * + * The future will complete with an error if the cleaning operation encounters an error. + * + */ + CompletableFuture clean(); +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java index b905d2cb0d3f..f6fccc66faf7 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java @@ -18,6 +18,12 @@ */ package org.apache.cassandra.utils.memory; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.InfiniteLoopExecutor; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -27,54 +33,84 @@ */ public class MemtableCleanerThread

extends InfiniteLoopExecutor { - private static class Clean

implements InterruptibleRunnable + private static final Logger logger = LoggerFactory.getLogger(MemtableCleanerThread.class); + + public static class Clean

implements InterruptibleRunnable { + /** This is incremented when a cleaner is invoked and decremented when a cleaner has completed */ + final AtomicInteger numPendingTasks = new AtomicInteger(0); + /** The pool we're cleaning */ final P pool; /** should ensure that at least some memory has been marked reclaiming after completion */ - final Runnable cleaner; + final MemtableCleaner cleaner; /** signalled whenever needsCleaning() may return true */ final WaitQueue wait = new WaitQueue(); - private Clean(P pool, Runnable cleaner) + private Clean(P pool, MemtableCleaner cleaner) { this.pool = pool; this.cleaner = cleaner; } - boolean needsCleaning() + /** Return the number of pending tasks */ + public int numPendingTasks() { - return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning(); + return numPendingTasks.get(); } @Override public void run() throws InterruptedException { - if (needsCleaning()) - { - cleaner.run(); - } - else + if (!pool.needsCleaning()) { final WaitQueue.Signal signal = wait.register(); - if (!needsCleaning()) + if (!pool.needsCleaning()) signal.await(); else signal.cancel(); } + else + { + int numPendingTasks = this.numPendingTasks.incrementAndGet(); + + if (logger.isTraceEnabled()) + logger.trace("Invoking cleaner with {} tasks pending", numPendingTasks); + + cleaner.clean().handle(this::apply); + } + } + + private Boolean apply(Boolean res, Throwable err) + { + final int tasks = numPendingTasks.decrementAndGet(); + + // if the cleaning job was scheduled (res == true) or had an error, trigger again after decrementing the tasks + if ((res || err != null) && pool.needsCleaning()) + wait.signal(); + + if (err != null) + logger.error("Memtable cleaning tasks failed with an exception and {} pending tasks ", tasks, err); + else if (logger.isTraceEnabled()) + logger.trace("Memtable cleaning task completed ({}), currently pending: {}", res, tasks); + + return res; } } private final Runnable trigger; + private final Clean

clean; + private MemtableCleanerThread(Clean

clean) { super(clean.pool.getClass().getSimpleName() + "Cleaner", clean); this.trigger = clean.wait::signal; + this.clean = clean; } - MemtableCleanerThread(P pool, Runnable cleaner) + public MemtableCleanerThread(P pool, MemtableCleaner cleaner) { this(new Clean<>(pool, cleaner)); } @@ -84,4 +120,11 @@ public void trigger() { trigger.run(); } + + /** Return the number of pending tasks */ + @VisibleForTesting + public int numPendingTasks() + { + return clean.numPendingTasks(); + } } diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index 8061566757a5..cd434c5aab18 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -23,7 +23,9 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; @@ -44,18 +46,22 @@ public abstract class MemtablePool public final SubPool offHeap; public final Timer blockedOnAllocating; + public final Gauge numPendingTasks; final WaitQueue hasRoom = new WaitQueue(); - MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner) + MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner) { + Preconditions.checkArgument(cleaner != null, "Cleaner should not be null"); + this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold); this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold); this.cleaner = getCleaner(cleaner); - blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(new DefaultNameFactory("MemtablePool") - .createMetricName("BlockedOnAllocation")); - if (this.cleaner != null) - this.cleaner.start(); + this.cleaner.start(); + DefaultNameFactory nameFactory = new DefaultNameFactory("MemtablePool"); + blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(nameFactory.createMetricName("BlockedOnAllocation")); + numPendingTasks = CassandraMetricsRegistry.Metrics.register(nameFactory.createMetricName("PendingFlushTasks"), + () -> (long) this.cleaner.numPendingTasks()); } SubPool getSubPool(long limit, float cleanThreshold) @@ -63,7 +69,7 @@ SubPool getSubPool(long limit, float cleanThreshold) return new SubPool(limit, cleanThreshold); } - MemtableCleanerThread getCleaner(Runnable cleaner) + MemtableCleanerThread getCleaner(MemtableCleaner cleaner) { return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner); } @@ -78,6 +84,16 @@ public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedExcep public abstract MemtableAllocator newAllocator(); + public boolean needsCleaning() + { + return onHeap.needsCleaning() || offHeap.needsCleaning(); + } + + public Long getNumPendingtasks() + { + return numPendingTasks.getValue(); + } + /** * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, @@ -169,7 +185,7 @@ void allocated(long size) maybeClean(); } - void acquired(long size) + void acquired() { maybeClean(); } @@ -203,6 +219,11 @@ public long used() return allocated; } + public long getReclaiming() + { + return reclaiming; + } + public float reclaimingRatio() { float r = reclaiming / (float) limit; diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java index 012867aa6e5b..29ea8fb5bda7 100644 --- a/src/java/org/apache/cassandra/utils/memory/NativePool.java +++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java @@ -20,7 +20,7 @@ public class NativePool extends MemtablePool { - public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner) + public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner) { super(maxOnHeapMemory, maxOffHeapMemory, cleanThreshold, cleaner); } diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java index c5c44e1868b0..a779432c1fa9 100644 --- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java +++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java @@ -22,7 +22,7 @@ public class SlabPool extends MemtablePool { final boolean allocateOnHeap; - public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner) + public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, MemtableCleaner cleaner) { super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner); this.allocateOnHeap = maxOffHeapMemory == 0; diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index e7216b261182..ddeb9dae5dd4 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -471,16 +470,9 @@ public void beforeAndAfterFlush(CheckedFunction runnable) throws Throwable public void compact() { - try - { - ColumnFamilyStore store = getCurrentColumnFamilyStore(); - if (store != null) - store.forceMajorCompaction(); - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } + ColumnFamilyStore store = getCurrentColumnFamilyStore(); + if (store != null) + store.forceMajorCompaction(); } public void disableCompaction() diff --git a/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java new file mode 100644 index 000000000000..7100a2ac59c1 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.utils.FBUtilities; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +public class MemtableCleanerThreadTest +{ + private static final long TIMEOUT_SECONDS = 5; + private static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS); + + @Mock + private MemtablePool pool; + + @Mock + private MemtableCleaner cleaner; + + private MemtableCleanerThread cleanerThread; + + @Before + public void setup() + { + MockitoAnnotations.initMocks(this); + } + + private void startThread() + { + cleanerThread = new MemtableCleanerThread<>(pool, cleaner); + assertNotNull(cleanerThread); + cleanerThread.start(); + + for (int i = 0; i < TIMEOUT_MILLIS && !cleanerThread.isAlive(); i++) + FBUtilities.sleepQuietly(1); + } + + private void stopThread() throws InterruptedException + { + cleanerThread.shutdownNow(); + + assertTrue(cleanerThread.awaitTermination(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + } + + private void waitForPendingTasks() + { + // wait for a bit because the cleaner latch completes before the pending tasks are decremented + FBUtilities.sleepQuietly(TIMEOUT_MILLIS); + + assertEquals(0, cleanerThread.numPendingTasks()); + } + + @Test + public void testCleanerInvoked() throws Exception + { + CountDownLatch cleanerExecutedLatch = new CountDownLatch(1); + CompletableFuture fut = new CompletableFuture<>(); + AtomicBoolean needsCleaning = new AtomicBoolean(false); + + when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get()); + + when(cleaner.clean()).thenAnswer(invocation -> { + needsCleaning.set(false); + cleanerExecutedLatch.countDown(); + return fut; + }); + + // start the thread with needsCleaning returning false, the cleaner should not be invoked + needsCleaning.set(false); + startThread(); + assertFalse(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(1, cleanerExecutedLatch.getCount()); + assertEquals(0, cleanerThread.numPendingTasks()); + + // now invoke the cleaner + needsCleaning.set(true); + cleanerThread.trigger(); + assertTrue(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerExecutedLatch.getCount()); + assertEquals(1, cleanerThread.numPendingTasks()); + + // now complete the cleaning task + needsCleaning.set(false); + fut.complete(true); + waitForPendingTasks(); + + stopThread(); + } + + @Test + public void testCleanerError() throws Exception + { + AtomicReference cleanerLatch = new AtomicReference<>(new CountDownLatch(1)); + AtomicReference> fut = new AtomicReference<>(new CompletableFuture<>()); + AtomicBoolean needsCleaning = new AtomicBoolean(false); + AtomicInteger numTimeCleanerInvoked = new AtomicInteger(0); + + when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get()); + + when(cleaner.clean()).thenAnswer(invocation -> { + needsCleaning.set(false); + numTimeCleanerInvoked.incrementAndGet(); + cleanerLatch.get().countDown(); + return fut.get(); + }); + + // start the thread with needsCleaning returning true, the cleaner should be invoked + needsCleaning.set(true); + startThread(); + assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerLatch.get().getCount()); + assertEquals(1, cleanerThread.numPendingTasks()); + assertEquals(1, numTimeCleanerInvoked.get()); + + // complete the cleaning task with an error, no other cleaning task should be invoked + cleanerLatch.set(new CountDownLatch(1)); + CompletableFuture oldFut = fut.get(); + fut.set(new CompletableFuture<>()); + needsCleaning.set(false); + oldFut.completeExceptionally(new RuntimeException("Test")); + assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(1, cleanerLatch.get().getCount()); + assertEquals(1, numTimeCleanerInvoked.get()); + + // now trigger cleaning again and verify that a new task is invoked + cleanerLatch.set(new CountDownLatch(1)); + fut.set(new CompletableFuture<>()); + needsCleaning.set(true); + cleanerThread.trigger(); + assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerLatch.get().getCount()); + assertEquals(2, numTimeCleanerInvoked.get()); + + // complete the cleaning task with false (nothing should be scheduled) + cleanerLatch.set(new CountDownLatch(1)); + oldFut = fut.get(); + fut.set(new CompletableFuture<>()); + needsCleaning.set(false); + oldFut.complete(false); + assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(1, cleanerLatch.get().getCount()); + assertEquals(2, numTimeCleanerInvoked.get()); + + // now trigger cleaning again and verify that a new task is invoked + cleanerLatch.set(new CountDownLatch(1)); + fut.set(new CompletableFuture<>()); + needsCleaning.set(true); + cleanerThread.trigger(); + assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerLatch.get().getCount()); + assertEquals(3, numTimeCleanerInvoked.get()); + + stopThread(); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java index b636bf7527ef..a7b112c8e6db 100644 --- a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java @@ -22,105 +22,137 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import junit.framework.Assert; import org.apache.cassandra.utils.concurrent.OpOrder; public class NativeAllocatorTest { + private ScheduledExecutorService exec; + private OpOrder order; + private OpOrder.Group group; + private CountDownLatch canClean; + private CountDownLatch isClean; + private AtomicReference allocatorRef; + private AtomicReference barrier; + private NativePool pool; + private NativeAllocator allocator; + private Runnable markBlocking; + + @Before + public void setUp() + { + exec = Executors.newScheduledThreadPool(2); + order = new OpOrder(); + group = order.start(); + canClean = new CountDownLatch(1); + isClean = new CountDownLatch(1); + allocatorRef = new AtomicReference<>(); + barrier = new AtomicReference<>(); + pool = new NativePool(1, 100, 0.75f, () -> { + try + { + canClean.await(); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + if (isClean.getCount() > 0) + { + allocatorRef.get().offHeap().released(80); + isClean.countDown(); + } + return CompletableFuture.completedFuture(true); + }); + allocator = new NativeAllocator(pool); + allocatorRef.set(allocator); + markBlocking = () -> { + barrier.set(order.newBarrier()); + barrier.get().issue(); + barrier.get().markBlocking(); + }; + } + + private void verifyUsedReclaiming(long used, long reclaiming) + { + Assert.assertEquals(used, allocator.offHeap().owns()); + Assert.assertEquals(used, pool.offHeap.used()); + Assert.assertEquals(reclaiming, allocator.offHeap().getReclaiming()); + Assert.assertEquals(reclaiming, pool.offHeap.getReclaiming()); + } @Test public void testBookKeeping() throws ExecutionException, InterruptedException { - { - final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2); - final OpOrder order = new OpOrder(); - final OpOrder.Group group = order.start(); - final CountDownLatch canClean = new CountDownLatch(1); - final CountDownLatch isClean = new CountDownLatch(1); - final AtomicReference allocatorRef = new AtomicReference<>(); - final AtomicReference barrier = new AtomicReference<>(); - final NativeAllocator allocator = new NativeAllocator(new NativePool(1, 100, 0.75f, new Runnable() + final Runnable test = () -> { + // allocate normal, check accounted and not cleaned + allocator.allocate(10, group); + verifyUsedReclaiming(10, 0); + + // confirm adjustment works + allocator.offHeap().adjust(-10, group); + verifyUsedReclaiming(0, 0); + + allocator.offHeap().adjust(10, group); + verifyUsedReclaiming(10, 0); + + // confirm we cannot allocate negative + boolean success = false; + try { - public void run() - { - try - { - canClean.await(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - if (isClean.getCount() > 0) - { - allocatorRef.get().offHeap().released(80); - isClean.countDown(); - } - } - })); - allocatorRef.set(allocator); - final Runnable markBlocking = new Runnable() + allocator.offHeap().allocate(-10, group); + } + catch (AssertionError e) { + success = true; + } + + Assert.assertTrue(success); + Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); + Assert.assertEquals(1, isClean.getCount()); + + // allocate above watermark + allocator.allocate(70, group); + verifyUsedReclaiming(80, 0); - public void run() - { - barrier.set(order.newBarrier()); - barrier.get().issue(); - barrier.get().markBlocking(); - } - }; - final Runnable run = new Runnable() + // let the cleaner run, it will release 80 bytes + canClean.countDown(); + try { - public void run() - { - // allocate normal, check accounted and not cleaned - allocator.allocate(10, group); - Assert.assertEquals(10, allocator.offHeap().owns()); - // confirm adjustment works - allocator.offHeap().adjust(-10, group); - Assert.assertEquals(0, allocator.offHeap().owns()); - allocator.offHeap().adjust(10, group); - Assert.assertEquals(10, allocator.offHeap().owns()); - // confirm we cannot allocate negative - boolean success = false; - try - { - allocator.offHeap().allocate(-10, group); - } - catch (AssertionError e) - { - success = true; - } - Assert.assertTrue(success); - Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); - Assert.assertEquals(1, isClean.getCount()); - - // allocate above watermark, check cleaned - allocator.allocate(70, group); - Assert.assertEquals(80, allocator.offHeap().owns()); - canClean.countDown(); - try - { - isClean.await(10L, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - Assert.assertEquals(0, isClean.getCount()); - Assert.assertEquals(0, allocator.offHeap().owns()); - - // allocate above limit, check we block until "marked blocking" - exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS); - allocator.allocate(110, group); - Assert.assertNotNull(barrier.get()); - Assert.assertEquals(110, allocator.offHeap().owns()); - } - }; - exec.submit(run).get(); - } - } + isClean.await(10L, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + Assert.assertEquals(0, isClean.getCount()); + verifyUsedReclaiming(0, 0); + // allocate, then set discarding, then allocated some more + allocator.allocate(30, group); + verifyUsedReclaiming(30, 0); + allocator.setDiscarding(); + Assert.assertFalse(allocator.isLive()); + verifyUsedReclaiming(30, 30); + allocator.allocate(50, group); + verifyUsedReclaiming(80, 80); + + // allocate above limit, check we block until "marked blocking" + exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS); + allocator.allocate(30, group); + Assert.assertNotNull(barrier.get()); + verifyUsedReclaiming(110, 110); + + // release everything + allocator.setDiscarded(); + Assert.assertFalse(allocator.isLive()); + verifyUsedReclaiming(0, 0); + }; + exec.submit(test).get(); + } } + +