Skip to content

Commit

Permalink
Prevent unbounded number of pending flushing tasks; Add PendingFlushT…
Browse files Browse the repository at this point in the history
…asks metric (CASSANDRA-16261)

Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261
  • Loading branch information
ekaterinadimitrova2 committed Jan 19, 2021
1 parent f42a4eb commit 0a1e900
Show file tree
Hide file tree
Showing 17 changed files with 772 additions and 345 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.0.24:
* Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261)
* Improve empty hint file handling during startup (CASSANDRA-16162)
* Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
* Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
Expand Down
Expand Up @@ -23,6 +23,8 @@

import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;

public class InfiniteLoopExecutor
{
private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
Expand Down Expand Up @@ -81,4 +83,10 @@ public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedExce
thread.join(unit.toMillis(time));
return !thread.isAlive();
}

@VisibleForTesting
public boolean isAlive()
{
return this.thread.isAlive();
}
}
39 changes: 18 additions & 21 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -151,7 +151,7 @@ public static Config loadConfig() throws ConfigurationException
String loaderClass = System.getProperty("cassandra.config.loader");
ConfigurationLoader loader = loaderClass == null
? new YamlConfigurationLoader()
: FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
: FBUtilities.construct(loaderClass, "configuration loading");
Config config = loader.loadConfig();

if (!hasLoggedConfig)
Expand Down Expand Up @@ -214,7 +214,7 @@ else if (config.listen_address != null)
}
catch (UnknownHostException e)
{
throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false);
}

if (listenAddress.isAnyLocalAddress())
Expand All @@ -234,7 +234,7 @@ else if (config.listen_interface != null)
}
catch (UnknownHostException e)
{
throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false);
}

if (broadcastAddress.isAnyLocalAddress())
Expand Down Expand Up @@ -275,7 +275,7 @@ else if (config.rpc_interface != null)
}
catch (UnknownHostException e)
{
throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false);
}

if (broadcastRpcAddress.isAnyLocalAddress())
Expand Down Expand Up @@ -520,18 +520,14 @@ else if (conf.native_transport_max_frame_size_in_mb >= 2048)
EndpointSnitchInfo.create();

localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
localComparator = new Comparator<InetAddress>()
{
public int compare(InetAddress endpoint1, InetAddress endpoint2)
{
boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
if (local1 && !local2)
return -1;
if (local2 && !local1)
return 1;
return 0;
}
localComparator = (endpoint1, endpoint2) -> {
boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
if (local1 && !local2)
return -1;
if (local2 && !local1)
return 1;
return 0;
};

/* Request Scheduler setup */
Expand Down Expand Up @@ -592,7 +588,7 @@ public int compare(InetAddress endpoint1, InetAddress endpoint2)
if (conf.commitlog_total_space_in_mb == null)
{
int preferredSize = 8192;
int minSize = 0;
int minSize;
try
{
// use 1/4 of available space. See discussion on #10013 and #10199
Expand Down Expand Up @@ -1061,7 +1057,7 @@ public static String getAllocateTokensForKeyspace()

public static Collection<String> tokensFromString(String tokenString)
{
List<String> tokens = new ArrayList<String>();
List<String> tokens = new ArrayList<>();
if (tokenString != null)
for (String token : tokenString.split(","))
tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", ""));
Expand Down Expand Up @@ -1747,7 +1743,7 @@ public static File getHintsDirectory()
public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
{
String name = cacheType.toString()
+ (version == null ? "" : "-" + version + "." + extension);
+ (version == null ? "" : '-' + version + '.' + extension);
return new File(conf.saved_caches_directory, name);
}

Expand Down Expand Up @@ -2026,12 +2022,13 @@ public static MemtablePool getMemtableAllocatorPool()
{
long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable;
switch (conf.memtable_allocation_type)
{
case unslabbed_heap_buffers:
return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, cleaner);
case heap_buffers:
return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, cleaner);
case offheap_buffers:
throw new ConfigurationException("offheap_buffers are not available in 3.0. They will be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");

Expand Down

0 comments on commit 0a1e900

Please sign in to comment.