Skip to content

Commit

Permalink
Merge branch 'cassandra-3.11' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
ekaterinadimitrova2 committed Jan 19, 2021
2 parents 7214794 + 11cb810 commit 1b4e1cc
Show file tree
Hide file tree
Showing 17 changed files with 709 additions and 268 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -9,6 +9,7 @@
* Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
Merged from 3.11:
Merged from 3.0:
* Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
* Improve empty hint file handling during startup (CASSANDRA-16162)
* Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
* Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372
Expand Down
Expand Up @@ -23,6 +23,8 @@

import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;

public class InfiniteLoopExecutor
{
private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
Expand Down Expand Up @@ -81,4 +83,10 @@ public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedExce
thread.join(unit.toMillis(time));
return !thread.isAlive();
}

@VisibleForTesting
public boolean isAlive()
{
return this.thread.isAlive();
}
}
14 changes: 7 additions & 7 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -295,8 +295,8 @@ public static Config loadConfig() throws ConfigurationException

String loaderClass = System.getProperty(Config.PROPERTY_PREFIX + "config.loader");
ConfigurationLoader loader = loaderClass == null
? new YamlConfigurationLoader()
: FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
? new YamlConfigurationLoader()
: FBUtilities.construct(loaderClass, "configuration loading");
Config config = loader.loadConfig();

if (!hasLoggedConfig)
Expand Down Expand Up @@ -906,7 +906,7 @@ else if (config.listen_address != null)
}
catch (UnknownHostException e)
{
throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false);
}

if (listenAddress.isAnyLocalAddress())
Expand All @@ -926,7 +926,7 @@ else if (config.listen_interface != null)
}
catch (UnknownHostException e)
{
throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false);
}

if (broadcastAddress.isAnyLocalAddress())
Expand Down Expand Up @@ -967,7 +967,7 @@ else if (config.rpc_interface != null)
}
catch (UnknownHostException e)
{
throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false);
}

if (broadcastRpcAddress.isAnyLocalAddress())
Expand Down Expand Up @@ -1509,7 +1509,7 @@ public static Integer getAllocateTokensForLocalRf()

public static Collection<String> tokensFromString(String tokenString)
{
List<String> tokens = new ArrayList<String>();
List<String> tokens = new ArrayList<>();
if (tokenString != null)
for (String token : StringUtils.split(tokenString, ','))
tokens.add(token.trim());
Expand Down Expand Up @@ -2400,7 +2400,7 @@ public static File getHintsDirectory()
public static File getSerializedCachePath(CacheType cacheType, String version, String extension)
{
String name = cacheType.toString()
+ (version == null ? "" : "-" + version + "." + extension);
+ (version == null ? "" : '-' + version + '.' + extension);
return new File(conf.saved_caches_directory, name);
}

Expand Down
249 changes: 139 additions & 110 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions src/java/org/apache/cassandra/db/Memtable.java
Expand Up @@ -70,6 +70,7 @@
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.HeapPool;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtableCleaner;
import org.apache.cassandra.utils.memory.MemtablePool;
import org.apache.cassandra.utils.memory.NativePool;
import org.apache.cassandra.utils.memory.SlabPool;
Expand All @@ -84,16 +85,18 @@ private static MemtablePool createMemtableAllocatorPool()
{
long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20;
long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20;
final float cleaningThreshold = DatabaseDescriptor.getMemtableCleanupThreshold();
final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable;
switch (DatabaseDescriptor.getMemtableAllocationType())
{
case unslabbed_heap_buffers:
return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
return new HeapPool(heapLimit, cleaningThreshold, cleaner);
case heap_buffers:
return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
return new SlabPool(heapLimit, 0, cleaningThreshold, cleaner);
case offheap_buffers:
return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
return new SlabPool(heapLimit, offHeapLimit, cleaningThreshold, cleaner);
case offheap_objects:
return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
return new NativePool(heapLimit, offHeapLimit, cleaningThreshold, cleaner);
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -405,7 +408,7 @@ public long getMinTimestamp()
@VisibleForTesting
public void makeUnflushable()
{
liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024);
}

class FlushRunnable implements Callable<SSTableMultiWriter>
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/ReadCommand.java
Expand Up @@ -618,7 +618,7 @@ public void onClose()
liveRows, tombstones,
(warnTombstones ? " (see tombstone_warn_threshold)" : ""));
}
};
}

return Transformation.apply(iter, new MetricRecording());
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/utils/memory/HeapPool.java
Expand Up @@ -24,7 +24,7 @@

public class HeapPool extends MemtablePool
{
public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
public HeapPool(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
{
super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
}
Expand Down
121 changes: 101 additions & 20 deletions src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
Expand Up @@ -18,18 +18,25 @@
*/
package org.apache.cassandra.utils.memory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;

public abstract class MemtableAllocator
{
private static final Logger logger = LoggerFactory.getLogger(MemtableAllocator.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5, TimeUnit.SECONDS);

private final SubAllocator onHeap;
private final SubAllocator offHeap;
volatile LifeCycle state = LifeCycle.LIVE;

enum LifeCycle
{
Expand Down Expand Up @@ -78,10 +85,8 @@ public SubAllocator offHeap()
*/
public void setDiscarding()
{
state = state.transition(LifeCycle.DISCARDING);
// mark the memory owned by this allocator as reclaiming
onHeap.markAllReclaiming();
offHeap.markAllReclaiming();
onHeap.setDiscarding();
offHeap.setDiscarding();
}

/**
Expand All @@ -90,15 +95,13 @@ public void setDiscarding()
*/
public void setDiscarded()
{
state = state.transition(LifeCycle.DISCARDED);
// release any memory owned by this allocator; automatically signals waiters
onHeap.releaseAll();
offHeap.releaseAll();
onHeap.setDiscarded();
offHeap.setDiscarded();
}

public boolean isLive()
{
return state == LifeCycle.LIVE;
return onHeap.state == LifeCycle.LIVE || offHeap.state == LifeCycle.LIVE;
}

/** Mark the BB as unused, permitting it to be reclaimed */
Expand All @@ -107,6 +110,9 @@ public static final class SubAllocator
// the tracker we are owning memory from
private final MemtablePool.SubPool parent;

// the state of the memtable
private volatile LifeCycle state;

// the amount of memory/resource owned by this object
private volatile long owns;
// the amount of memory we are reporting to collect; this may be inaccurate, but is close
Expand All @@ -116,17 +122,44 @@ public static final class SubAllocator
SubAllocator(MemtablePool.SubPool parent)
{
this.parent = parent;
this.state = LifeCycle.LIVE;
}

/**
* Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
* overshoot the maximum memory limit so that flushing can begin immediately
*/
void setDiscarding()
{
state = state.transition(LifeCycle.DISCARDING);
// mark the memory owned by this allocator as reclaiming
updateReclaiming();
}

// should only be called once we know we will never allocate to the object again.
// currently no corroboration/enforcement of this is performed.
/**
* Indicate the memory and resources owned by this allocator are no longer referenced,
* and can be reclaimed/reused.
*/
void setDiscarded()
{
state = state.transition(LifeCycle.DISCARDED);
// release any memory owned by this allocator; automatically signals waiters
releaseAll();
}

/**
* Should only be called once we know we will never allocate to the object again.
* currently no corroboration/enforcement of this is performed.
*/
void releaseAll()
{
parent.released(ownsUpdater.getAndSet(this, 0));
parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
}

// like allocate, but permits allocations to be negative
/**
* Like allocate, but permits allocations to be negative.
*/
public void adjust(long size, OpOrder.Group opGroup)
{
if (size <= 0)
Expand Down Expand Up @@ -168,28 +201,71 @@ public void allocate(long size, OpOrder.Group opGroup)
}
}

// retroactively mark an amount allocated and acquired in the tracker, and owned by us
/**
* Retroactively mark an amount allocated and acquired in the tracker, and owned by us. If the state is discarding,
* then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
* and it will flush this memory too.
*/
private void allocated(long size)
{
parent.allocated(size);
ownsUpdater.addAndGet(this, size);

if (state == LifeCycle.DISCARDING)
{
noSpamLogger.info("Allocated {} bytes whilst discarding", size);
updateReclaiming();
}
}

// retroactively mark an amount acquired in the tracker, and owned by us
/**
* Retroactively mark an amount acquired in the tracker, and owned by us. If the state is discarding,
* then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
* and it will flush this memory too.
*/
private void acquired(long size)
{
parent.acquired(size);
parent.acquired();
ownsUpdater.addAndGet(this, size);

if (state == LifeCycle.DISCARDING)
{
noSpamLogger.info("Acquired {} bytes whilst discarding", size);
updateReclaiming();
}
}

/**
* If the state is still live, then we update the memory we own here and in the parent.
*
* However, if the state is not live, we do not update it because we would have to update
* reclaiming too, and it could cause problems to the memtable cleaner algorithm if reclaiming
* decreased. If the memtable is flushing, soon enough {@link this#releaseAll()} will be called.
*
* @param size the size that was released
*/
void released(long size)
{
parent.released(size);
ownsUpdater.addAndGet(this, -size);
if (state == LifeCycle.LIVE)
{
parent.released(size);
ownsUpdater.addAndGet(this, -size);
}
else
{
noSpamLogger.info("Tried to release {} bytes whilst discarding", size);
}
}

// mark everything we currently own as reclaiming, both here and in our parent
void markAllReclaiming()
/**
* Mark what we currently own as reclaiming, both here and in our parent.
* This method is called for the first time when the memtable is scheduled for flushing,
* in which case reclaiming will be zero and we mark everything that we own as reclaiming.
* Afterwards, if there are in flight writes that have not completed yet, we also mark any
* more memory that is allocated by these writes as reclaiming, since the memtable is waiting
* on the barrier for these writes to complete, before it can actually start flushing data.
*/
void updateReclaiming()
{
while (true)
{
Expand All @@ -208,6 +284,11 @@ public long owns()
return owns;
}

public long getReclaiming()
{
return reclaiming;
}

public float ownershipRatio()
{
float r = owns / (float) parent.limit;
Expand Down
40 changes: 40 additions & 0 deletions src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.utils.memory;

import java.util.concurrent.CompletableFuture;

/**
* The cleaner is used by {@link MemtableCleanerThread} in order to reclaim space from memtables, normally
* by flushing the largest memtable.
*/
public interface MemtableCleaner
{
/**
* This is a function that schedules a cleaning task, normally flushing of the largest sstable.
* The future will complete once the operation has completed and it will have a value set to true if
* the cleaner was able to execute the cleaning operation or if another thread concurrently executed
* the same clean operation. If no operation was even attempted, for example because no memtable was
* found, then the value will be false.
*
* The future will complete with an error if the cleaning operation encounters an error.
*
*/
CompletableFuture<Boolean> clean();
}

0 comments on commit 1b4e1cc

Please sign in to comment.