diff --git a/CHANGES.txt b/CHANGES.txt index c732513b5a63..29723d46b5c4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161) * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) Merged from 3.0: + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) * Improve empty hint file handling during startup (CASSANDRA-16162) * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372) * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226) diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java index b54fa3fca51f..8e72d91083b1 100644 --- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; + public class InfiniteLoopExecutor { private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class); @@ -81,4 +83,10 @@ public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedExce thread.join(unit.toMillis(time)); return !thread.isAlive(); } + + @VisibleForTesting + public boolean isAlive() + { + return this.thread.isAlive(); + } } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c88a0e7c7a12..aa9a4ccec6a5 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -271,8 +271,8 @@ public static Config loadConfig() throws ConfigurationException String loaderClass = System.getProperty(Config.PROPERTY_PREFIX + "config.loader"); ConfigurationLoader loader = loaderClass == null - ? new YamlConfigurationLoader() - : FBUtilities.construct(loaderClass, "configuration loading"); + ? new YamlConfigurationLoader() + : FBUtilities.construct(loaderClass, "configuration loading"); Config config = loader.loadConfig(); if (!hasLoggedConfig) @@ -513,7 +513,7 @@ else if (conf.native_transport_max_frame_size_in_mb >= 2048) if (conf.cdc_total_space_in_mb == 0) { int preferredSize = 4096; - int minSize = 0; + int minSize; try { // use 1/8th of available space. See discussion on #10013 and #10199 on the CL, taking half that for CDC @@ -827,7 +827,7 @@ else if (config.listen_address != null) } catch (UnknownHostException e) { - throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); + throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false); } if (listenAddress.isAnyLocalAddress()) @@ -847,7 +847,7 @@ else if (config.listen_interface != null) } catch (UnknownHostException e) { - throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); + throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false); } if (broadcastAddress.isAnyLocalAddress()) @@ -888,7 +888,7 @@ else if (config.rpc_interface != null) } catch (UnknownHostException e) { - throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); + throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false); } if (broadcastRpcAddress.isAnyLocalAddress()) @@ -1030,18 +1030,14 @@ public static void applySnitch() EndpointSnitchInfo.create(); localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = new Comparator() - { - public int compare(InetAddress endpoint1, InetAddress endpoint2) - { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - } + localComparator = (endpoint1, endpoint2) -> { + boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); + boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; }; } @@ -1395,7 +1391,7 @@ public static String getAllocateTokensForKeyspace() public static Collection tokensFromString(String tokenString) { - List tokens = new ArrayList(); + List tokens = new ArrayList<>(); if (tokenString != null) for (String token : StringUtils.split(tokenString, ',')) tokens.add(token.trim()); @@ -2076,7 +2072,7 @@ public static File getHintsDirectory() public static File getSerializedCachePath(CacheType cacheType, String version, String extension) { String name = cacheType.toString() - + (version == null ? "" : "-" + version + "." + extension); + + (version == null ? "" : '-' + version + '.' + extension); return new File(conf.saved_caches_directory, name); } diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index abdc3f81c3b8..83241e557b99 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.*; +import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -136,12 +137,12 @@ public static Directories.DataDirectory[] getInitialDirectories() are finished. By having flushExecutor size the same size as each of the perDiskflushExecutors we make sure we can have that many flushes going at the same time. */ - private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("MemtableFlushWriter"), - "internal"); + private static final ThreadPoolExecutor flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableFlushWriter"), + "internal"); private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length]; static @@ -158,33 +159,33 @@ public static Directories.DataDirectory[] getInitialDirectories() } // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed - private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("MemtablePostFlush"), - "internal"); - - private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("MemtableReclaimMemory"), - "internal"); + private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtablePostFlush"), + "internal"); + + private static final ThreadPoolExecutor reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableReclaimMemory"), + "internal"); private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; private static final String[] COUNTER_DESCS = new String[] - { "partition key in raw hex bytes", - "value of this partition for given sampler", - "value is within the error bounds plus or minus of this", - "the partition key turned into a human readable format" }; + { "partition key in raw hex bytes", + "value of this partition for given sampler", + "value is within the error bounds plus or minus of this", + "the partition key turned into a human readable format" }; private static final CompositeType COUNTER_COMPOSITE_TYPE; private static final TabularType COUNTER_TYPE; private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; private static final String[] SAMPLER_DESCS = new String[] - { "cardinality of partitions", - "list of counter results" }; + { "cardinality of partitions", + "list of counter results" }; private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; private static final CompositeType SAMPLING_RESULT; @@ -337,14 +338,10 @@ protected void runMayThrow() public static Runnable getBackgroundCompactionTaskSubmitter() { - return new Runnable() - { - public void run() - { - for (Keyspace keyspace : Keyspace.all()) - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - CompactionManager.instance.submitBackground(cfs); - } + return () -> { + for (Keyspace keyspace : Keyspace.all()) + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + CompactionManager.instance.submitBackground(cfs); }; } @@ -399,16 +396,16 @@ public void setCompressionParameters(Map opts) @VisibleForTesting public ColumnFamilyStore(Keyspace keyspace, - String columnFamilyName, - int generation, - CFMetaData metadata, - Directories directories, - boolean loadSSTables, - boolean registerBookeeping, - boolean offline) + String columnFamilyName, + int generation, + CFMetaData metadata, + Directories directories, + boolean loadSSTables, + boolean registerBookeeping, + boolean offline) { assert directories != null; - assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; + assert metadata != null : "null metadata for " + keyspace + ':' + columnFamilyName; this.keyspace = keyspace; this.metadata = metadata; @@ -475,8 +472,8 @@ public ColumnFamilyStore(Keyspace keyspace, { // register the mbean mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s", - isIndex() ? "IndexTables" : "Tables", - keyspace.getName(), name); + isIndex() ? "IndexTables" : "Tables", + keyspace.getName(), name); oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s", isIndex() ? "IndexColumnFamilies" : "ColumnFamilies", keyspace.getName(), name); @@ -493,24 +490,20 @@ public ColumnFamilyStore(Keyspace keyspace, throw new RuntimeException(e); } logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry); - latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() - { - public void run() + latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> { + SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry; + switch (retryPolicy.kind()) { - SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry; - switch (retryPolicy.kind()) - { - case PERCENTILE: - // get percentile in nanos - sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold())); - break; - case CUSTOM: - sampleLatencyNanos = (long) retryPolicy.threshold(); - break; - default: - sampleLatencyNanos = Long.MAX_VALUE; - break; - } + case PERCENTILE: + // get percentile in nanos + sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold())); + break; + case CUSTOM: + sampleLatencyNanos = (long) retryPolicy.threshold(); + break; + default: + sampleLatencyNanos = Long.MAX_VALUE; + break; } }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS); } @@ -627,7 +620,7 @@ public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace ke { // get the max generation number, to prevent generation conflicts Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true); - List generations = new ArrayList(); + List generations = new ArrayList<>(); for (Map.Entry> entry : lister.list().entrySet()) { Descriptor desc = entry.getKey(); @@ -651,7 +644,7 @@ public static void scrubDataDirectories(CFMetaData metadata) throws StartupExce Directories directories = new Directories(metadata, initialDirectories); Set cleanedDirectories = new HashSet<>(); - // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) + // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) clearEphemeralSnapshots(directories); directories.removeTemporaryDirectories(); @@ -694,13 +687,13 @@ public static void scrubDataDirectories(CFMetaData metadata) throws StartupExce } // cleanup incomplete saved caches - Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + "-" + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$"); + Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + '-' + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$"); File dir = new File(DatabaseDescriptor.getSavedCachesLocation()); if (dir.exists()) { assert dir.isDirectory(); - for (File file : dir.listFiles()) + for (File file : Objects.requireNonNull(dir.listFiles())) if (tmpCacheFilePattern.matcher(file.getName()).matches()) if (!file.delete()) logger.warn("could not delete {}", file.getAbsolutePath()); @@ -723,7 +716,7 @@ public static void scrubDataDirectories(CFMetaData metadata) throws StartupExce */ public static void loadNewSSTables(String ksName, String cfName) { - /** ks/cf existence checks will be done by open and getCFS methods for us */ + /* ks/cf existence checks will be done by open and getCFS methods for us */ Keyspace keyspace = Keyspace.open(ksName); keyspace.getColumnFamilyStore(cfName).loadNewSSTables(); } @@ -750,8 +743,8 @@ public synchronized void loadNewSSTables() if (!descriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", - descriptor.getFormat().getLatestVersion(), - descriptor)); + descriptor.getFormat().getLatestVersion(), + descriptor)); // force foreign sstables to level 0 try @@ -836,7 +829,7 @@ public static void rebuildSecondaryIndex(String ksName, String cfName, String... { ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName); - Set indexes = new HashSet(Arrays.asList(idxNames)); + Set indexes = new HashSet<>(Arrays.asList(idxNames)); Iterable sstables = cfs.getSSTables(SSTableSet.CANONICAL); try (Refs refs = Refs.ref(sstables)) @@ -891,6 +884,7 @@ public ListenableFuture switchMemtableIfCurrent(Memtable memt if (data.getView().getCurrentMemtable() == memtable) return switchMemtable(); } + logger.debug("Memtable is no longer current, returning future that completes when current flushing operation completes"); return waitForFlushes(); } @@ -1065,11 +1059,13 @@ private final class Flush implements Runnable private Flush(boolean truncate) { + if (logger.isTraceEnabled()) + logger.trace("Creating flush task {}@{}", hashCode(), name); // if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard this.truncate = truncate; metric.pendingFlushes.inc(); - /** + /* * To ensure correctness of switch without blocking writes, run() needs to wait for all write operations * started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering * that all write operations register themselves with, and assigning this barrier to the memtables, @@ -1078,7 +1074,7 @@ private Flush(boolean truncate) * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so * that we know the CL position we are dirty to, which can be marked clean when we complete. */ - writeBarrier = keyspace.writeOrder.newBarrier(); + writeBarrier = Keyspace.writeOrder.newBarrier(); // submit flushes for the memtable for any indexed sub-cfses, and our own AtomicReference commitLogUpperBound = new AtomicReference<>(); @@ -1107,11 +1103,19 @@ private Flush(boolean truncate) public void run() { + if (logger.isTraceEnabled()) + logger.trace("Flush task {}@{} starts executing, waiting on barrier", hashCode(), name); + + long start = System.nanoTime(); + // mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit // if they are stuck waiting on it, then wait for them all to complete writeBarrier.markBlocking(); writeBarrier.await(); + if (logger.isTraceEnabled()) + logger.trace("Flush task for task {}@{} waited {} ms at the barrier", hashCode(), name, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + // mark all memtables as flushing, removing them from the live memtable list for (Memtable memtable : memtables) memtable.cfs.data.markFlushing(memtable); @@ -1130,12 +1134,22 @@ public void run() JVMStabilityInspector.inspectThrowable(t); postFlush.flushFailure = t; } + + if (logger.isTraceEnabled()) + logger.trace("Flush task {}@{} signaling post flush task", hashCode(), name); + // signal the post-flush we've done our work postFlush.latch.countDown(); + + if (logger.isTraceEnabled()) + logger.trace("Flush task task {}@{} finished", hashCode(), name); } public Collection flushMemtable(Memtable memtable, boolean flushNonCf2i) { + if (logger.isTraceEnabled()) + logger.trace("Flush task task {}@{} flushing memtable {}", hashCode(), name, memtable); + if (memtable.isClean() || truncate) { memtable.cfs.replaceFlushed(memtable, Collections.emptyList()); @@ -1278,58 +1292,77 @@ private static void setCommitLogUpperBound(AtomicReference co * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately * queues it for flushing. If the memtable selected is flushed before this completes, no work is done. */ - public static class FlushLargestColumnFamily implements Runnable + public static CompletableFuture flushLargestMemtable() { - public void run() + float largestRatio = 0f; + Memtable largest = null; + float liveOnHeap = 0, liveOffHeap = 0; + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - float largestRatio = 0f; - Memtable largest = null; - float liveOnHeap = 0, liveOffHeap = 0; - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios - // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only - // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them - Memtable current = cfs.getTracker().getView().getCurrentMemtable(); - - // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, - // both on- and off-heap, and select the largest of the two ratios to weight this CF - float onHeap = 0f, offHeap = 0f; - onHeap += current.getAllocator().onHeap().ownershipRatio(); - offHeap += current.getAllocator().offHeap().ownershipRatio(); - - for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) - { - MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); - onHeap += allocator.onHeap().ownershipRatio(); - offHeap += allocator.offHeap().ownershipRatio(); - } + // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios + // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only + // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them + Memtable current = cfs.getTracker().getView().getCurrentMemtable(); - float ratio = Math.max(onHeap, offHeap); - if (ratio > largestRatio) - { - largest = current; - largestRatio = ratio; - } + // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, + // both on- and off-heap, and select the largest of the two ratios to weight this CF + float onHeap = 0f, offHeap = 0f; + onHeap += current.getAllocator().onHeap().ownershipRatio(); + offHeap += current.getAllocator().offHeap().ownershipRatio(); - liveOnHeap += onHeap; - liveOffHeap += offHeap; + for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) + { + MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); + onHeap += allocator.onHeap().ownershipRatio(); + offHeap += allocator.offHeap().ownershipRatio(); } - if (largest != null) + float ratio = Math.max(onHeap, offHeap); + if (ratio > largestRatio) { - float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); - float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); - float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); - float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); - float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); - float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); - logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", - largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), - ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); - largest.cfs.switchMemtableIfCurrent(largest); + largest = current; + largestRatio = ratio; } + + liveOnHeap += onHeap; + liveOffHeap += offHeap; + } + + CompletableFuture returnFuture = new CompletableFuture<>(); + + if (largest != null) + { + float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); + float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); + float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); + float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); + float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); + float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); + logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", + largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), + ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); + + ListenableFuture flushFuture = largest.cfs.switchMemtableIfCurrent(largest); + flushFuture.addListener(() -> { + try + { + flushFuture.get(); + returnFuture.complete(true); + } + catch (Throwable t) + { + returnFuture.completeExceptionally(t); + } + }, MoreExecutors.directExecutor()); + } + else + { + logger.debug("Flushing of largest memtable, not done, no memtable found"); + + returnFuture.complete(false); } + + return returnFuture; } private static String ratio(float onHeap, float offHeap) @@ -1659,7 +1692,7 @@ public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits l nowInSec, filter.selectsAllPartition(), metadata.enforceStrictLiveness())) - || filter.isFullyCoveredBy(cached); + || filter.isFullyCoveredBy(cached); } public int gcBefore(int nowInSec) @@ -1697,7 +1730,7 @@ else if (System.nanoTime() - failingSince > TimeUnit.MILLISECONDS.toNanos(100)) public ViewFragment select(Function> filter) { View view = data.getView(); - List sstables = Lists.newArrayList(filter.apply(view)); + List sstables = Lists.newArrayList(Objects.requireNonNull(filter.apply(view))); return new ViewFragment(sstables, view.getAllMemtables()); } @@ -1733,7 +1766,7 @@ public void beginLocalSampling(String sampler, int capacity) public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException { SamplerResult samplerResults = metric.samplers.get(Sampler.valueOf(sampler)) - .finishSampling(count); + .finishSampling(count); TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); for (Counter counter : samplerResults.topK) { @@ -1747,7 +1780,7 @@ public CompositeData finishLocalSampling(String sampler, int count) throws OpenD metadata.getKeyValidator().getString(key) })); // string } return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{ - samplerResults.cardinality, result}); + samplerResults.cardinality, result}); } public boolean isCompactionDiskSpaceCheckEnabled() @@ -2101,13 +2134,13 @@ public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, Col CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount); } - public void forceMajorCompaction() throws InterruptedException, ExecutionException + public void forceMajorCompaction() { forceMajorCompaction(false); } - public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException - { + public void forceMajorCompaction(boolean splitOutput) + { CompactionManager.instance.performMaximal(this, splitOutput); } @@ -2118,7 +2151,7 @@ public void forceCompactionForTokenRange(Collection> tokenRanges) t public static Iterable all() { - List> stores = new ArrayList>(Schema.instance.getKeyspaces().size()); + List> stores = new ArrayList<>(Schema.instance.getKeyspaces().size()); for (Keyspace keyspace : Keyspace.all()) { stores.add(keyspace.getColumnFamilyStores()); @@ -2161,13 +2194,9 @@ public void clearUnsafe() { for (final ColumnFamilyStore cfs : concatWithIndexes()) { - cfs.runWithCompactionsDisabled(new Callable() - { - public Void call() - { - cfs.data.reset(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs)); - return null; - } + cfs.runWithCompactionsDisabled((Callable) () -> { + cfs.data.reset(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs)); + return null; }, true, false); } } @@ -2220,25 +2249,21 @@ public void truncateBlocking() now = Math.max(now, sstable.maxDataAge); truncatedAt = now; - Runnable truncateRunnable = new Runnable() - { - public void run() - { - logger.debug("Discarding sstable data for truncated CF + indexes"); - data.notifyTruncated(truncatedAt); + Runnable truncateRunnable = () -> { + logger.debug("Discarding sstable data for truncated CF + indexes"); + data.notifyTruncated(truncatedAt); - if (DatabaseDescriptor.isAutoSnapshot()) - snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); + if (DatabaseDescriptor.isAutoSnapshot()) + snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); - discardSSTables(truncatedAt); + discardSSTables(truncatedAt); - indexManager.truncateAllIndexesBlocking(truncatedAt); - viewManager.truncateBlocking(replayAfter, truncatedAt); + indexManager.truncateAllIndexesBlocking(truncatedAt); + viewManager.truncateBlocking(replayAfter, truncatedAt); - SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); - logger.trace("cleaning out row cache"); - invalidateCaches(); - } + SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); + logger.trace("cleaning out row cache"); + invalidateCaches(); }; runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); @@ -2340,17 +2365,13 @@ private static Throwable resumeAll(Throwable accumulate, Iterable callable = new Callable() - { - public LifecycleTransaction call() - { - assert data.getCompacting().isEmpty() : data.getCompacting(); - Iterable sstables = getLiveSSTables(); - sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); - LifecycleTransaction modifier = data.tryModify(sstables, operationType); - assert modifier != null: "something marked things compacting while compactions are disabled"; - return modifier; - } + Callable callable = () -> { + assert data.getCompacting().isEmpty() : data.getCompacting(); + Iterable sstables = getLiveSSTables(); + sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); + LifecycleTransaction modifier = data.tryModify(sstables, operationType); + assert modifier != null: "something marked things compacting while compactions are disabled"; + return modifier; }; return runWithCompactionsDisabled(callable, false, false); @@ -2473,7 +2494,7 @@ private void validateCompactionThresholds(int minThreshold, int maxThreshold) if (maxThreshold == 0 || minThreshold == 0) throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " + - "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'."); + "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'."); } // End JMX get/set. @@ -2537,7 +2558,7 @@ public Iterable concatWithIndexes() public List getBuiltIndexes() { - return indexManager.getBuiltIndexNames(); + return indexManager.getBuiltIndexNames(); } public int getUnleveledSSTables() @@ -2699,7 +2720,7 @@ public static ColumnFamilyStore getIfExists(String ksName, String cfName) public static TableMetrics metricsFor(UUID tableId) { - return getIfExists(tableId).metric; + return Objects.requireNonNull(getIfExists(tableId)).metric; } public DiskBoundaries getDiskBoundaries() @@ -2711,4 +2732,4 @@ public void invalidateDiskBoundaries() { diskBoundaryManager.invalidate(); } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 5af789e5135e..ae8b8d3a74a6 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -56,6 +56,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.memory.HeapPool; import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.apache.cassandra.utils.memory.MemtableCleaner; import org.apache.cassandra.utils.memory.MemtablePool; import org.apache.cassandra.utils.memory.NativePool; import org.apache.cassandra.utils.memory.SlabPool; @@ -70,20 +71,22 @@ private static MemtablePool createMemtableAllocatorPool() { long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20; long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20; + final float cleaningThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); + final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable; switch (DatabaseDescriptor.getMemtableAllocationType()) { case unslabbed_heap_buffers: - return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new HeapPool(heapLimit, cleaningThreshold, cleaner); case heap_buffers: - return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new SlabPool(heapLimit, 0, cleaningThreshold, cleaner); case offheap_buffers: if (!FileUtils.isCleanerAvailable) { throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start."); } - return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new SlabPool(heapLimit, offHeapLimit, cleaningThreshold, cleaner); case offheap_objects: - return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new NativePool(heapLimit, offHeapLimit, cleaningThreshold, cleaner); default: throw new AssertionError(); } @@ -395,7 +398,7 @@ public long getMinTimestamp() @VisibleForTesting public void makeUnflushable() { - liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024); + liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024); } class FlushRunnable implements Callable diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index efe082052d9f..0543173cf955 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -580,7 +580,7 @@ public void onClose() liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); } - }; + } return Transformation.apply(iter, new MetricRecording()); } diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java index abcc2414fb8e..6371bdaf7dac 100644 --- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java +++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java @@ -24,7 +24,7 @@ public class HeapPool extends MemtablePool { - public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner) + public HeapPool(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner) { super(maxOnHeapMemory, 0, cleanupThreshold, cleaner); } diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java index fc5aec30b579..62ebe65fed7d 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java @@ -18,18 +18,25 @@ */ package org.apache.cassandra.utils.memory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; public abstract class MemtableAllocator { + private static final Logger logger = LoggerFactory.getLogger(MemtableAllocator.class); + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5, TimeUnit.SECONDS); + private final SubAllocator onHeap; private final SubAllocator offHeap; - volatile LifeCycle state = LifeCycle.LIVE; enum LifeCycle { @@ -78,10 +85,8 @@ public SubAllocator offHeap() */ public void setDiscarding() { - state = state.transition(LifeCycle.DISCARDING); - // mark the memory owned by this allocator as reclaiming - onHeap.markAllReclaiming(); - offHeap.markAllReclaiming(); + onHeap.setDiscarding(); + offHeap.setDiscarding(); } /** @@ -90,15 +95,13 @@ public void setDiscarding() */ public void setDiscarded() { - state = state.transition(LifeCycle.DISCARDED); - // release any memory owned by this allocator; automatically signals waiters - onHeap.releaseAll(); - offHeap.releaseAll(); + onHeap.setDiscarded(); + offHeap.setDiscarded(); } public boolean isLive() { - return state == LifeCycle.LIVE; + return onHeap.state == LifeCycle.LIVE || offHeap.state == LifeCycle.LIVE; } /** Mark the BB as unused, permitting it to be reclaimed */ @@ -107,6 +110,9 @@ public static final class SubAllocator // the tracker we are owning memory from private final MemtablePool.SubPool parent; + // the state of the memtable + private volatile LifeCycle state; + // the amount of memory/resource owned by this object private volatile long owns; // the amount of memory we are reporting to collect; this may be inaccurate, but is close @@ -116,17 +122,44 @@ public static final class SubAllocator SubAllocator(MemtablePool.SubPool parent) { this.parent = parent; + this.state = LifeCycle.LIVE; + } + + /** + * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily + * overshoot the maximum memory limit so that flushing can begin immediately + */ + void setDiscarding() + { + state = state.transition(LifeCycle.DISCARDING); + // mark the memory owned by this allocator as reclaiming + updateReclaiming(); } - // should only be called once we know we will never allocate to the object again. - // currently no corroboration/enforcement of this is performed. + /** + * Indicate the memory and resources owned by this allocator are no longer referenced, + * and can be reclaimed/reused. + */ + void setDiscarded() + { + state = state.transition(LifeCycle.DISCARDED); + // release any memory owned by this allocator; automatically signals waiters + releaseAll(); + } + + /** + * Should only be called once we know we will never allocate to the object again. + * currently no corroboration/enforcement of this is performed. + */ void releaseAll() { parent.released(ownsUpdater.getAndSet(this, 0)); parent.reclaimed(reclaimingUpdater.getAndSet(this, 0)); } - // like allocate, but permits allocations to be negative + /** + * Like allocate, but permits allocations to be negative. + */ public void adjust(long size, OpOrder.Group opGroup) { if (size <= 0) @@ -168,28 +201,71 @@ public void allocate(long size, OpOrder.Group opGroup) } } - // retroactively mark an amount allocated and acquired in the tracker, and owned by us + /** + * Retroactively mark an amount allocated and acquired in the tracker, and owned by us. If the state is discarding, + * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes, + * and it will flush this memory too. + */ private void allocated(long size) { parent.allocated(size); ownsUpdater.addAndGet(this, size); + + if (state == LifeCycle.DISCARDING) + { + noSpamLogger.info("Allocated {} bytes whilst discarding", size); + updateReclaiming(); + } } - // retroactively mark an amount acquired in the tracker, and owned by us + /** + * Retroactively mark an amount acquired in the tracker, and owned by us. If the state is discarding, + * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes, + * and it will flush this memory too. + */ private void acquired(long size) { - parent.acquired(size); + parent.acquired(); ownsUpdater.addAndGet(this, size); + + if (state == LifeCycle.DISCARDING) + { + noSpamLogger.info("Acquired {} bytes whilst discarding", size); + updateReclaiming(); + } } + /** + * If the state is still live, then we update the memory we own here and in the parent. + * + * However, if the state is not live, we do not update it because we would have to update + * reclaiming too, and it could cause problems to the memtable cleaner algorithm if reclaiming + * decreased. If the memtable is flushing, soon enough {@link this#releaseAll()} will be called. + * + * @param size the size that was released + */ void released(long size) { - parent.released(size); - ownsUpdater.addAndGet(this, -size); + if (state == LifeCycle.LIVE) + { + parent.released(size); + ownsUpdater.addAndGet(this, -size); + } + else + { + noSpamLogger.info("Tried to release {} bytes whilst discarding", size); + } } - // mark everything we currently own as reclaiming, both here and in our parent - void markAllReclaiming() + /** + * Mark what we currently own as reclaiming, both here and in our parent. + * This method is called for the first time when the memtable is scheduled for flushing, + * in which case reclaiming will be zero and we mark everything that we own as reclaiming. + * Afterwards, if there are in flight writes that have not completed yet, we also mark any + * more memory that is allocated by these writes as reclaiming, since the memtable is waiting + * on the barrier for these writes to complete, before it can actually start flushing data. + */ + void updateReclaiming() { while (true) { @@ -208,6 +284,11 @@ public long owns() return owns; } + public long getReclaiming() + { + return reclaiming; + } + public float ownershipRatio() { float r = owns / (float) parent.limit; diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java new file mode 100644 index 000000000000..d2cb9c552a42 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.util.concurrent.CompletableFuture; + +/** + * The cleaner is used by {@link MemtableCleanerThread} in order to reclaim space from memtables, normally + * by flushing the largest memtable. + */ +public interface MemtableCleaner +{ + /** + * This is a function that schedules a cleaning task, normally flushing of the largest sstable. + * The future will complete once the operation has completed and it will have a value set to true if + * the cleaner was able to execute the cleaning operation or if another thread concurrently executed + * the same clean operation. If no operation was even attempted, for example because no memtable was + * found, then the value will be false. + * + * The future will complete with an error if the cleaning operation encounters an error. + * + */ + CompletableFuture clean(); +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java index b905d2cb0d3f..f6fccc66faf7 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java @@ -18,6 +18,12 @@ */ package org.apache.cassandra.utils.memory; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.InfiniteLoopExecutor; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -27,54 +33,84 @@ */ public class MemtableCleanerThread

extends InfiniteLoopExecutor { - private static class Clean

implements InterruptibleRunnable + private static final Logger logger = LoggerFactory.getLogger(MemtableCleanerThread.class); + + public static class Clean

implements InterruptibleRunnable { + /** This is incremented when a cleaner is invoked and decremented when a cleaner has completed */ + final AtomicInteger numPendingTasks = new AtomicInteger(0); + /** The pool we're cleaning */ final P pool; /** should ensure that at least some memory has been marked reclaiming after completion */ - final Runnable cleaner; + final MemtableCleaner cleaner; /** signalled whenever needsCleaning() may return true */ final WaitQueue wait = new WaitQueue(); - private Clean(P pool, Runnable cleaner) + private Clean(P pool, MemtableCleaner cleaner) { this.pool = pool; this.cleaner = cleaner; } - boolean needsCleaning() + /** Return the number of pending tasks */ + public int numPendingTasks() { - return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning(); + return numPendingTasks.get(); } @Override public void run() throws InterruptedException { - if (needsCleaning()) - { - cleaner.run(); - } - else + if (!pool.needsCleaning()) { final WaitQueue.Signal signal = wait.register(); - if (!needsCleaning()) + if (!pool.needsCleaning()) signal.await(); else signal.cancel(); } + else + { + int numPendingTasks = this.numPendingTasks.incrementAndGet(); + + if (logger.isTraceEnabled()) + logger.trace("Invoking cleaner with {} tasks pending", numPendingTasks); + + cleaner.clean().handle(this::apply); + } + } + + private Boolean apply(Boolean res, Throwable err) + { + final int tasks = numPendingTasks.decrementAndGet(); + + // if the cleaning job was scheduled (res == true) or had an error, trigger again after decrementing the tasks + if ((res || err != null) && pool.needsCleaning()) + wait.signal(); + + if (err != null) + logger.error("Memtable cleaning tasks failed with an exception and {} pending tasks ", tasks, err); + else if (logger.isTraceEnabled()) + logger.trace("Memtable cleaning task completed ({}), currently pending: {}", res, tasks); + + return res; } } private final Runnable trigger; + private final Clean

clean; + private MemtableCleanerThread(Clean

clean) { super(clean.pool.getClass().getSimpleName() + "Cleaner", clean); this.trigger = clean.wait::signal; + this.clean = clean; } - MemtableCleanerThread(P pool, Runnable cleaner) + public MemtableCleanerThread(P pool, MemtableCleaner cleaner) { this(new Clean<>(pool, cleaner)); } @@ -84,4 +120,11 @@ public void trigger() { trigger.run(); } + + /** Return the number of pending tasks */ + @VisibleForTesting + public int numPendingTasks() + { + return clean.numPendingTasks(); + } } diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index bd17f7812480..89d5e3702353 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -23,7 +23,9 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; @@ -44,18 +46,22 @@ public abstract class MemtablePool public final SubPool offHeap; public final Timer blockedOnAllocating; + public final Gauge numPendingTasks; final WaitQueue hasRoom = new WaitQueue(); - MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner) + MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner) { + Preconditions.checkArgument(cleaner != null, "Cleaner should not be null"); + this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold); this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold); this.cleaner = getCleaner(cleaner); - blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(new DefaultNameFactory("MemtablePool") - .createMetricName("BlockedOnAllocation")); - if (this.cleaner != null) - this.cleaner.start(); + this.cleaner.start(); + DefaultNameFactory nameFactory = new DefaultNameFactory("MemtablePool"); + blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(nameFactory.createMetricName("BlockedOnAllocation")); + numPendingTasks = CassandraMetricsRegistry.Metrics.register(nameFactory.createMetricName("PendingFlushTasks"), + () -> (long) this.cleaner.numPendingTasks()); } SubPool getSubPool(long limit, float cleanThreshold) @@ -63,7 +69,7 @@ SubPool getSubPool(long limit, float cleanThreshold) return new SubPool(limit, cleanThreshold); } - MemtableCleanerThread getCleaner(Runnable cleaner) + MemtableCleanerThread getCleaner(MemtableCleaner cleaner) { return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner); } @@ -77,6 +83,16 @@ public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedExcep public abstract MemtableAllocator newAllocator(); + public boolean needsCleaning() + { + return onHeap.needsCleaning() || offHeap.needsCleaning(); + } + + public Long getNumPendingtasks() + { + return numPendingTasks.getValue(); + } + /** * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, @@ -168,7 +184,7 @@ void allocated(long size) maybeClean(); } - void acquired(long size) + void acquired() { maybeClean(); } @@ -202,6 +218,11 @@ public long used() return allocated; } + public long getReclaiming() + { + return reclaiming; + } + public float reclaimingRatio() { float r = reclaiming / (float) limit; diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java index 800c777748dc..e88b4d791078 100644 --- a/src/java/org/apache/cassandra/utils/memory/NativePool.java +++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java @@ -20,7 +20,7 @@ public class NativePool extends MemtablePool { - public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner) + public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner) { super(maxOnHeapMemory, maxOffHeapMemory, cleanThreshold, cleaner); } diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java index bd7ec1f7b791..416d1dd3d758 100644 --- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java +++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java @@ -22,7 +22,7 @@ public class SlabPool extends MemtablePool { final boolean allocateOnHeap; - public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner) + public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, MemtableCleaner cleaner) { super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner); this.allocateOnHeap = maxOffHeapMemory == 0; diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 4e320ef6b6a7..e09c4dfa7b12 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -477,16 +476,9 @@ public void disableCompaction(String keyspace) public void compact() { - try - { - ColumnFamilyStore store = getCurrentColumnFamilyStore(); - if (store != null) - store.forceMajorCompaction(); - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } + ColumnFamilyStore store = getCurrentColumnFamilyStore(); + if (store != null) + store.forceMajorCompaction(); } public void disableCompaction() diff --git a/test/unit/org/apache/cassandra/db/NativeCellTest.java b/test/unit/org/apache/cassandra/db/NativeCellTest.java index 69e615b13477..9b18f656a18e 100644 --- a/test/unit/org/apache/cassandra/db/NativeCellTest.java +++ b/test/unit/org/apache/cassandra/db/NativeCellTest.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.db; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,7 +44,11 @@ public class NativeCellTest { private static final Logger logger = LoggerFactory.getLogger(NativeCellTest.class); - private static final NativeAllocator nativeAllocator = new NativePool(Integer.MAX_VALUE, Integer.MAX_VALUE, 1f, null).newAllocator(); + private static final NativeAllocator nativeAllocator = new NativePool(Integer.MAX_VALUE, + Integer.MAX_VALUE, + 1f, + () -> CompletableFuture.completedFuture(true)).newAllocator(); + @SuppressWarnings("resource") private static final OpOrder.Group group = new OpOrder().start(); private static Random rand; @@ -57,7 +61,7 @@ public static void setUp() } @Test - public void testCells() throws IOException + public void testCells() { for (int run = 0 ; run < 1000 ; run++) { @@ -158,9 +162,9 @@ private static void test(Row row) Assert.assertEquals(nrow.clustering(), brow.clustering()); ClusteringComparator comparator = new ClusteringComparator(UTF8Type.instance); - Assert.assertTrue(comparator.compare(row.clustering(), nrow.clustering()) == 0); - Assert.assertTrue(comparator.compare(row.clustering(), brow.clustering()) == 0); - Assert.assertTrue(comparator.compare(nrow.clustering(), brow.clustering()) == 0); + Assert.assertEquals(0, comparator.compare(row.clustering(), nrow.clustering())); + Assert.assertEquals(0, comparator.compare(row.clustering(), brow.clustering())); + Assert.assertEquals(0, comparator.compare(nrow.clustering(), brow.clustering())); } private static Row clone(Row row, Row.Builder builder) diff --git a/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java new file mode 100644 index 000000000000..7100a2ac59c1 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.utils.FBUtilities; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +public class MemtableCleanerThreadTest +{ + private static final long TIMEOUT_SECONDS = 5; + private static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS); + + @Mock + private MemtablePool pool; + + @Mock + private MemtableCleaner cleaner; + + private MemtableCleanerThread cleanerThread; + + @Before + public void setup() + { + MockitoAnnotations.initMocks(this); + } + + private void startThread() + { + cleanerThread = new MemtableCleanerThread<>(pool, cleaner); + assertNotNull(cleanerThread); + cleanerThread.start(); + + for (int i = 0; i < TIMEOUT_MILLIS && !cleanerThread.isAlive(); i++) + FBUtilities.sleepQuietly(1); + } + + private void stopThread() throws InterruptedException + { + cleanerThread.shutdownNow(); + + assertTrue(cleanerThread.awaitTermination(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + } + + private void waitForPendingTasks() + { + // wait for a bit because the cleaner latch completes before the pending tasks are decremented + FBUtilities.sleepQuietly(TIMEOUT_MILLIS); + + assertEquals(0, cleanerThread.numPendingTasks()); + } + + @Test + public void testCleanerInvoked() throws Exception + { + CountDownLatch cleanerExecutedLatch = new CountDownLatch(1); + CompletableFuture fut = new CompletableFuture<>(); + AtomicBoolean needsCleaning = new AtomicBoolean(false); + + when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get()); + + when(cleaner.clean()).thenAnswer(invocation -> { + needsCleaning.set(false); + cleanerExecutedLatch.countDown(); + return fut; + }); + + // start the thread with needsCleaning returning false, the cleaner should not be invoked + needsCleaning.set(false); + startThread(); + assertFalse(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(1, cleanerExecutedLatch.getCount()); + assertEquals(0, cleanerThread.numPendingTasks()); + + // now invoke the cleaner + needsCleaning.set(true); + cleanerThread.trigger(); + assertTrue(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerExecutedLatch.getCount()); + assertEquals(1, cleanerThread.numPendingTasks()); + + // now complete the cleaning task + needsCleaning.set(false); + fut.complete(true); + waitForPendingTasks(); + + stopThread(); + } + + @Test + public void testCleanerError() throws Exception + { + AtomicReference cleanerLatch = new AtomicReference<>(new CountDownLatch(1)); + AtomicReference> fut = new AtomicReference<>(new CompletableFuture<>()); + AtomicBoolean needsCleaning = new AtomicBoolean(false); + AtomicInteger numTimeCleanerInvoked = new AtomicInteger(0); + + when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get()); + + when(cleaner.clean()).thenAnswer(invocation -> { + needsCleaning.set(false); + numTimeCleanerInvoked.incrementAndGet(); + cleanerLatch.get().countDown(); + return fut.get(); + }); + + // start the thread with needsCleaning returning true, the cleaner should be invoked + needsCleaning.set(true); + startThread(); + assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerLatch.get().getCount()); + assertEquals(1, cleanerThread.numPendingTasks()); + assertEquals(1, numTimeCleanerInvoked.get()); + + // complete the cleaning task with an error, no other cleaning task should be invoked + cleanerLatch.set(new CountDownLatch(1)); + CompletableFuture oldFut = fut.get(); + fut.set(new CompletableFuture<>()); + needsCleaning.set(false); + oldFut.completeExceptionally(new RuntimeException("Test")); + assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(1, cleanerLatch.get().getCount()); + assertEquals(1, numTimeCleanerInvoked.get()); + + // now trigger cleaning again and verify that a new task is invoked + cleanerLatch.set(new CountDownLatch(1)); + fut.set(new CompletableFuture<>()); + needsCleaning.set(true); + cleanerThread.trigger(); + assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerLatch.get().getCount()); + assertEquals(2, numTimeCleanerInvoked.get()); + + // complete the cleaning task with false (nothing should be scheduled) + cleanerLatch.set(new CountDownLatch(1)); + oldFut = fut.get(); + fut.set(new CompletableFuture<>()); + needsCleaning.set(false); + oldFut.complete(false); + assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(1, cleanerLatch.get().getCount()); + assertEquals(2, numTimeCleanerInvoked.get()); + + // now trigger cleaning again and verify that a new task is invoked + cleanerLatch.set(new CountDownLatch(1)); + fut.set(new CompletableFuture<>()); + needsCleaning.set(true); + cleanerThread.trigger(); + assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + assertEquals(0, cleanerLatch.get().getCount()); + assertEquals(3, numTimeCleanerInvoked.get()); + + stopThread(); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java index b636bf7527ef..a7b112c8e6db 100644 --- a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java @@ -22,105 +22,137 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import junit.framework.Assert; import org.apache.cassandra.utils.concurrent.OpOrder; public class NativeAllocatorTest { + private ScheduledExecutorService exec; + private OpOrder order; + private OpOrder.Group group; + private CountDownLatch canClean; + private CountDownLatch isClean; + private AtomicReference allocatorRef; + private AtomicReference barrier; + private NativePool pool; + private NativeAllocator allocator; + private Runnable markBlocking; + + @Before + public void setUp() + { + exec = Executors.newScheduledThreadPool(2); + order = new OpOrder(); + group = order.start(); + canClean = new CountDownLatch(1); + isClean = new CountDownLatch(1); + allocatorRef = new AtomicReference<>(); + barrier = new AtomicReference<>(); + pool = new NativePool(1, 100, 0.75f, () -> { + try + { + canClean.await(); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + if (isClean.getCount() > 0) + { + allocatorRef.get().offHeap().released(80); + isClean.countDown(); + } + return CompletableFuture.completedFuture(true); + }); + allocator = new NativeAllocator(pool); + allocatorRef.set(allocator); + markBlocking = () -> { + barrier.set(order.newBarrier()); + barrier.get().issue(); + barrier.get().markBlocking(); + }; + } + + private void verifyUsedReclaiming(long used, long reclaiming) + { + Assert.assertEquals(used, allocator.offHeap().owns()); + Assert.assertEquals(used, pool.offHeap.used()); + Assert.assertEquals(reclaiming, allocator.offHeap().getReclaiming()); + Assert.assertEquals(reclaiming, pool.offHeap.getReclaiming()); + } @Test public void testBookKeeping() throws ExecutionException, InterruptedException { - { - final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2); - final OpOrder order = new OpOrder(); - final OpOrder.Group group = order.start(); - final CountDownLatch canClean = new CountDownLatch(1); - final CountDownLatch isClean = new CountDownLatch(1); - final AtomicReference allocatorRef = new AtomicReference<>(); - final AtomicReference barrier = new AtomicReference<>(); - final NativeAllocator allocator = new NativeAllocator(new NativePool(1, 100, 0.75f, new Runnable() + final Runnable test = () -> { + // allocate normal, check accounted and not cleaned + allocator.allocate(10, group); + verifyUsedReclaiming(10, 0); + + // confirm adjustment works + allocator.offHeap().adjust(-10, group); + verifyUsedReclaiming(0, 0); + + allocator.offHeap().adjust(10, group); + verifyUsedReclaiming(10, 0); + + // confirm we cannot allocate negative + boolean success = false; + try { - public void run() - { - try - { - canClean.await(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - if (isClean.getCount() > 0) - { - allocatorRef.get().offHeap().released(80); - isClean.countDown(); - } - } - })); - allocatorRef.set(allocator); - final Runnable markBlocking = new Runnable() + allocator.offHeap().allocate(-10, group); + } + catch (AssertionError e) { + success = true; + } + + Assert.assertTrue(success); + Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); + Assert.assertEquals(1, isClean.getCount()); + + // allocate above watermark + allocator.allocate(70, group); + verifyUsedReclaiming(80, 0); - public void run() - { - barrier.set(order.newBarrier()); - barrier.get().issue(); - barrier.get().markBlocking(); - } - }; - final Runnable run = new Runnable() + // let the cleaner run, it will release 80 bytes + canClean.countDown(); + try { - public void run() - { - // allocate normal, check accounted and not cleaned - allocator.allocate(10, group); - Assert.assertEquals(10, allocator.offHeap().owns()); - // confirm adjustment works - allocator.offHeap().adjust(-10, group); - Assert.assertEquals(0, allocator.offHeap().owns()); - allocator.offHeap().adjust(10, group); - Assert.assertEquals(10, allocator.offHeap().owns()); - // confirm we cannot allocate negative - boolean success = false; - try - { - allocator.offHeap().allocate(-10, group); - } - catch (AssertionError e) - { - success = true; - } - Assert.assertTrue(success); - Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); - Assert.assertEquals(1, isClean.getCount()); - - // allocate above watermark, check cleaned - allocator.allocate(70, group); - Assert.assertEquals(80, allocator.offHeap().owns()); - canClean.countDown(); - try - { - isClean.await(10L, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - Assert.assertEquals(0, isClean.getCount()); - Assert.assertEquals(0, allocator.offHeap().owns()); - - // allocate above limit, check we block until "marked blocking" - exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS); - allocator.allocate(110, group); - Assert.assertNotNull(barrier.get()); - Assert.assertEquals(110, allocator.offHeap().owns()); - } - }; - exec.submit(run).get(); - } - } + isClean.await(10L, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + Assert.assertEquals(0, isClean.getCount()); + verifyUsedReclaiming(0, 0); + // allocate, then set discarding, then allocated some more + allocator.allocate(30, group); + verifyUsedReclaiming(30, 0); + allocator.setDiscarding(); + Assert.assertFalse(allocator.isLive()); + verifyUsedReclaiming(30, 30); + allocator.allocate(50, group); + verifyUsedReclaiming(80, 80); + + // allocate above limit, check we block until "marked blocking" + exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS); + allocator.allocate(30, group); + Assert.assertNotNull(barrier.get()); + verifyUsedReclaiming(110, 110); + + // release everything + allocator.setDiscarded(); + Assert.assertFalse(allocator.isLive()); + verifyUsedReclaiming(0, 0); + }; + exec.submit(test).get(); + } } + +