-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CASSANDRA-17034: Memtable API #1295
Conversation
e3e30f7
to
18909fa
Compare
18909fa
to
be837f0
Compare
for (AtomicBTreePartition partition : toFlush.values()) | ||
{ | ||
keySize += partition.partitionKey().getKey().remaining(); | ||
if (trackContention && partition.useLock()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think trackContention
is always true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking a bit more closely at this, it seems like the basic idea of getFlushSet()
was separating writeSortedContents()
into general logic and the skiplist-specific contention tracking stuff. That makes sense, but my only worry is that it looks like it also entails iterating over all the partitions twice. Would we be able to avoid that if the FlushCollection
contract included something like a beforeAppend()
callback that could do the contention tracking and logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why trackContention
would always be true (for one, running junit tests, e.g. SimpleQueryTest
enters the path below and never this one).
Also, we previously iterated twice as well: once to gather keySize
in the FlushRunnable
constructor, and once to write the partitions in writeSortedContents
. The new code moves the contended row count collection from the second loop (which now isn't memtable-specific) to the first (which is).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why trackContention would always be true
I think what he means is that trackContention
isn't always true, but if it's true on line 264 (in the outer if
statement) then it's also true on line 272 (in the inner if
statement).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we previously iterated twice
My bad. I didn't see the keySize
bit in the FlushRunnable
constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I managed to miss that it's in that particular if
too... sorry, fixed now.
{ | ||
private final TableMetadata metadata; | ||
private final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter; | ||
private final Map<PartitionPosition, AtomicBTreePartition> source; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source
is only used by the constructor to get the iterator, we don't need the attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -2382,7 +2500,7 @@ public void run() | |||
/** | |||
* Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable. | |||
*/ | |||
public Future<CommitLogPosition> dumpMemtable() | |||
public Future<CommitLogPosition> dumpMemtable(FlushReason reason) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason argument is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should remove the argument if we are not going to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked pmem code, it's not used there either. Removed now.
* Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables. | ||
* They are of several types: | ||
* - construction factory interface | ||
* - write and read operations: put, getPartition and makePartitionIterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mentioned read methods (getPartition
and makePartition
) don't exist on UnfilteredSource
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a compile error around this in org.apache.cassandra.simulator.paxos.Ballots
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
The error is fixed in a separate commit.
} | ||
|
||
/** | ||
* Interface for providing signals back to the owner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to ease reading, I would add some details about what is the owner, like in the description of the owner
parameter of Memtable.Factory#create
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* - SNAPSHOT will be followed by performSnapshot(). | ||
* - STREAMING/REPAIR will be followed by creating a FlushSet for the streamed/repaired ranges. This data will be | ||
* used to create sstables, which will be streamed and then deleted. | ||
* This will not be called if the sstable is switched because of truncation or drop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed this isn't called from ColumnFamilyStore#dumpMemtable
, but I think it can be called with TRUNCATE
from ColumnFamilyStore#truncateBlocking
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the comment somewhat, but I'm going to revisit this after looking at what Intel had to do for pmem.
* Called when the known ranges have been updated and owner.localRangeSplits() may return different values. | ||
* This will not be called if shouldSwitch(OWNED_RANGES_CHANGE) returns true, the memtable will be swapped out | ||
* instead. | ||
* TODO: Implement call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually called from ColumnFamilyStore#invalidateLocalRanges
, what call this refers to? Is it the missed call to owner.localRangeSplits
in the implementation of the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -33,6 +33,7 @@ | |||
import io.netty.util.concurrent.Future; | |||
import org.apache.cassandra.concurrent.ScheduledExecutors; | |||
import org.apache.cassandra.config.DatabaseDescriptor; | |||
import org.apache.cassandra.io.sstable.SSTableMultiWriter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unused import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -178,6 +182,9 @@ public void validate() | |||
|
|||
if (memtableFlushPeriodInMs < 0) | |||
fail("%s must be greater than or equal to 0 (got %s)", Option.MEMTABLE_FLUSH_PERIOD_IN_MS, memtableFlushPeriodInMs); | |||
|
|||
if (cdc && memtable.factory.writesShouldSkipCommitLog()) | |||
fail("CDC cannot work if writes skip the commit log. Check your memtable configuration."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fail("CDC cannot work if writes skip the commit log. Check your memtable configuration."); | |
fail("CDC cannot work if memtable writes skip the commit log. Check your memtable configuration."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the original version is better -- the qualification is unnecessary because all writes go to the memtable, and it could cause unnecessary confusion.
{ | ||
Map<String, String> copy = new HashMap<>(options); | ||
String className = copy.remove(Option.CLASS.toString()); | ||
if (className.isEmpty() || className == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check first if className
is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
static String keyspace; | ||
String table; | ||
ColumnFamilyStore cfs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These can be local variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
int partitions = 50_000; | ||
int rowsPerPartition = 4; | ||
|
||
int deletedPartitionsStart = 20_000; | ||
int deletedPartitionsEnd = deletedPartitionsStart + 10_000; | ||
|
||
int deletedRowsStart = 40_000; | ||
int deletedRowsEnd = deletedRowsStart + 5_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These can be private static final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
CQLTester.setUpClass(); | ||
CQLTester.prepareServer(); | ||
CQLTester.disablePreparedReuseForTest(); | ||
System.err.println("setupClass done."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.err.println("setupClass done."); | |
System.out.println("setupClass done."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: double blank line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid))" + | ||
" with compression = {'enabled': false}" + | ||
" and memtable = { 'class': '" + memtableClass + "'}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: parameter alignement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
assertRows(execute(format("SELECT memtable FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", | ||
SchemaConstants.SCHEMA_KEYSPACE_NAME, | ||
SchemaKeyspaceTables.TABLES), | ||
KEYSPACE, | ||
currentTable()), | ||
row(map())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block appears seven times across the test, only changing the map arguments. We could reduce duplication with an utility method like:
private void assertMemtableOptions(Object... options) throws Throwable
{
assertRows(execute(format("SELECT memtable FROM %s.%s WHERE keyspace_name = ? and table_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map(options)));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
assertRows(execute(format("SELECT memtable FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", | ||
SchemaConstants.SCHEMA_KEYSPACE_NAME, | ||
SchemaKeyspaceTables.TABLES), | ||
KEYSPACE, | ||
currentTable()), | ||
row(map())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block appears six times across the test, only changing the map
arguments. We could reduce duplication with an utility method like:
private void assertMemtableOptions(Object... options) throws Throwable
{
assertRows(execute(format("SELECT memtable FROM %s.%s WHERE keyspace_name = ? and table_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map(options)));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -568,6 +567,105 @@ public void testDoubleWith() throws Throwable | |||
} | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unneeded blank line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -23,6 +23,7 @@ | |||
import java.util.function.BiFunction; | |||
|
|||
import org.apache.cassandra.Util; | |||
import org.apache.cassandra.db.memtable.Memtable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unused import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -141,6 +141,7 @@ private SystemKeyspace() | |||
+ "version int," | |||
+ "PRIMARY KEY ((id)))") | |||
.partitioner(new LocalPartitioner(TimeUUIDType.instance)) | |||
.memtable(MemtableParams.DEFAULT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? MemtableParams.DEFAULT
is already the default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
if (conf == null) | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can conf
be null when this is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently some tests can end up calling this without initializing the configuration.
|
||
public ShardBoundaries localRangeSplits(int shardCount) | ||
{ | ||
return null; // not implemented |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should throw UnsupportedOperationException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only callsite actually had access to the CFS, this is not necessary any more.
* splitting the owned space evenly. It is up to the memtable to use this information. | ||
* Any changes in the ring structure (e.g. added or removed nodes) will invalidate the splits; in such a case | ||
* the memtable will be sent a shouldSwitch(OWNED_RANGES_CHANGE) and, should that return false, a | ||
* localRangesChanged() call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* localRangesChanged() call. | |
* {@link #localRangesUpdated()} call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
shardBoundaries = new ShardBoundaries(boundaries.subList(0, boundaries.size() - 1), | ||
versionedLocalRanges.ringVersion); | ||
cachedShardBoundaries = shardBoundaries; | ||
logger.info("Memtable shard boundaries for {}.{}: {}", keyspace.getName(), getTableName(), boundaries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use DEBUG
level? IIUC DiskBoundariesManager
uses DEBUG
level for similar messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* @param reversed true if the content should be returned in reverse order | ||
* @param listener a listener used to handle internal read events | ||
*/ | ||
UnfilteredRowIterator iterator(DecoratedKey key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think that the two UnfilteredSource#iterator
methods would be better named UnfilteredSource#rowIterator
, complementing UnfilteredSource#partitionIterator
. That would also apply to SSTableReader#iterator(FileDataInput, DecoratedKey, RowIndexEntry, Slices, ColumnFilter, boolean)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
* In practice, each keyspace has its associated boundaries, see {@link Keyspace}. | ||
* <p> | ||
* Technically, if we use {@code n} shards, this is a list of {@code n-1} tokens and each token {@code tk} gets assigned | ||
* to the core ID corresponding to the slot of the smallest token in the list that is greater to {@code tk}, or {@code n} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* to the core ID corresponding to the slot of the smallest token in the list that is greater to {@code tk}, or {@code n} | |
* to the shard id corresponding to the slot of the smallest token in the list that is greater to {@code tk}, or {@code n} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public long estimateSize(SSTableWriter.SSTableSizeParameters parameters) | ||
{ | ||
return (long) ((parameters.partitionCount() // index entries | ||
+ parameters.partitionCount() // keys in data file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be parameters.partitionKeySize()
, which isn't called anywhere? Also, I would call that method partitionKeysSize
, in plural, so one can't think that this somehow refers to the size of some single partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed both calls. Also changed partitionCount
to be collected during the key size collection pass to avoid potentially walking the flush set again to collect it.
test/conf/cassandra.yaml
Outdated
test_invalid_factory_method: | ||
class: org.apache.cassandra.cql3.validation.operations.CreateTest$InvalidMemtableFactoryMethod | ||
test_invalid_factory_field: | ||
class: org.apache.cassandra.cql3.validation.operations.CreateTest$InvalidMemtableFactoryField |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With CASSANDRA-17292 on the horizon, it might be a good time to think about using a structure for memtable configuration that would be compatible w/ the main proposal there: maedhroz@450b920
Given we're moving toward implementation-specific configuration, we could even replace the existing ungrouped memtable YAML items with a new top-level memtable
element. (There are a few ways to handle compatibility, including just using the items under the new memtable
element if one is actually specified, pulling values from the old top-level options if they aren't specified in the new format, etc.)
ex.
memtable:
configuration: skiplist
configurations:
skiplist:
class: SkipListMemtable
trie:
class: TrieMemtable
shards: 16
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid having an entry for selecting the default configuration, relying on modifying the "default" one instead (and if you want to copy a config for the default, you can do that by extending). Unless you have a strong feeling about this, I prefer to keep that for now.
The configuration is now changed to nested format, and I changed it to use ParameterizedClass
-like format. One of the reasons for this was the YAML type reinterpretation, as I could not find a way to make snakeyaml read Map<String, Map<String, String>>
correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will also refrain from moving the existing properties in this ticket, but that's something that does have to be done eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks!
Just a couple minor things to cleanup, like CLASS_OPTION
and EXTENDS_OPTION
become unused in MemtableParams
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the two properties and added format documentation.
REPAIR, | ||
SCHEMA_CHANGE, | ||
OWNED_RANGES_CHANGE, | ||
UNIT_TESTS; // explicitly requested flush needed for a test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UNIT_TESTS; // explicitly requested flush needed for a test | |
UNIT_TESTS // explicitly requested flush needed for a test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -97,7 +97,7 @@ public void setup() throws Throwable | |||
RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis() + random.nextInt(), "key"); | |||
rowBuilder.add(colDef, "val1"); | |||
rowBuilder.build().apply(); | |||
cfs.forceBlockingFlush(); | |||
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super important, but I notice we're using UNIT_TESTS
in some jmh tests and USER_FORCED
in others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally chose to go with USER_FORCED
whenever it's not an actual unit test; this one is now corrected.
@@ -31,6 +31,7 @@ | |||
import org.apache.cassandra.cache.KeyCacheKey; | |||
import org.apache.cassandra.config.DatabaseDescriptor; | |||
import org.apache.cassandra.db.ColumnFamilyStore; | |||
import org.apache.cassandra.db.ColumnFamilyStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: duplicate import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -31,6 +31,7 @@ | |||
import org.apache.cassandra.cache.KeyCacheKey; | |||
import org.apache.cassandra.config.DatabaseDescriptor; | |||
import org.apache.cassandra.db.ColumnFamilyStore; | |||
import org.apache.cassandra.db.ColumnFamilyStore; | |||
import org.apache.cassandra.db.Keyspace; | |||
import org.apache.cassandra.index.Index; | |||
import org.apache.cassandra.io.sstable.format.SSTableFormat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -635,7 +635,7 @@ public void flush(String keyspace) | |||
{ | |||
ColumnFamilyStore store = getCurrentColumnFamilyStore(keyspace); | |||
if (store != null) | |||
store.forceBlockingFlush(); | |||
store.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we're going to have to change a ton of tests either way, so what do you think about adding a couple utility flush methods to CQLTester
that hide the details of the flush reason?
ex.
List<Future<?>> flush(Keyspace keyspace)
{
return keyspace.flush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...probably also one for forceBlockingFlush()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added static methods in Util
and changed all test to use them.
{ | ||
try | ||
{ | ||
cfs.switchMemtableIfCurrent(current, ColumnFamilyStore.FlushReason.UNIT_TESTS).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
I wonder if this is connected to any known test failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit log test is flaky, this may be part of the reason.
catch (InterruptedException|ExecutionException e) | ||
{ | ||
throw Throwables.propagate(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to use FBUtilities.waitOnFuture()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, changed.
|
||
logger.info("Enqueuing flush of {}: {}", | ||
logger.info("Enqueuing flush of {} ({}): {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1.) Would it be helpful to make this a fully qualified table name (i.e. throw the keyspace prefix on there)?
2.) nit, ignore if you don't like: Suggestion for format string:
Enqueueing flush of {}, Reason: {}, Usage: {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -195,12 +195,19 @@ public CassandraValidationIterator(ColumnFamilyStore cfs, Collection<Range<Token | |||
if (!isIncremental) | |||
{ | |||
// flush first so everyone is validating data that is as similar as possible | |||
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); | |||
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.REPAIR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard for me to know whether it would actually be useful, but we could possibly make REPAIR
more granular (ex. VALIDATION
, ANTICOMPACTION
, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be helpful in the logs; done.
import org.apache.cassandra.db.partitions.PartitionUpdate; | ||
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; | ||
import org.apache.cassandra.db.memtable.Memtable; | ||
import org.apache.cassandra.db.partitions.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Was the switch to a wildcard import intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea why Intellij decided to do that... fixed.
@@ -4179,7 +4185,7 @@ public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws | |||
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames)) | |||
{ | |||
logger.debug("Forcing flush on keyspace {}, CF {}", keyspaceName, cfStore.name); | |||
cfStore.forceBlockingFlush(); | |||
cfStore.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are also some cases, like in RepairTest
, where we call this but could make an argument the reason should be UNIT_TESTS
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a reason-specifying version and changed callsites.
INTERNALLY_FORCED, // explicitly requested flush, necessary for the operation of an internal table | ||
USER_FORCED, // flush explicitly requested by the user (e.g. nodetool flush) | ||
STARTUP, | ||
SHUTDOWN, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would it make sense to use DRAIN
here, given we can do that via nodetool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
return performReadSerial(readStatement, supplier); | ||
else | ||
return performReadThreads(readStatement, supplier); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to control read concurrency, would it be good enough to just delegate that to the @Threads
annotation on the top-level class here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to measure the minimum time per batch, which is different from what JMH threads can be used to measure (more suitable for max throughput).
for (Future<Integer> f : futures) | ||
done += f.get(); | ||
assert count == done; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were writes just taking too long and making it hard to iterate quickly on optimizations? (AFAICT, the benchmark only explicitly measures reads...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There are some common bits of write logic between WriteTest
and ReadTest
, to the extent your might be able to factor out a "writer" class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was helpful to run just the read test and get a basic idea of the write performance too (and also data size).
Refactored the benchmarks to extract the shared code.
} | ||
|
||
@Benchmark | ||
public void writeTable() throws Throwable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea here to do as many of these write/flush cycles as we can do in 1 second for each iteration? @Measurement
seems to have support for batching, so just curious. Would it make sense to benchmark the writes and flushes separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeatedly running the same write(+truncate+flush), for at least one second per measurement iteration (if the count is the default 1m, it takes more than a second).
Separating the two is not that easy, but if we measure both TRUNCATE and FLUSH we can look at the difference.
SchemaKeyspaceTables.TABLES), | ||
KEYSPACE, | ||
currentTable()), | ||
row("skiplist")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the concept of a default exist mainly as a mechanism to have a new implementation picked up automatically on startup after simply changing the YAML at the node level, rather than across the whole cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the CEP-19 discussion, users want to try out new settings a subset of nodes at a time, and prefer to not have settings that are not overridable per node.
This design addresses these concerns and addresses all scenarios I could think of: we can gradually switch all tables to a different implementation by changing the node's default; we can assign a specific implementation to some tables and, to analyze or work around problems, switch only some nodes to a different one; we can create targeted per-node settings in heterogeneous deployments.
test/conf/cassandra.yaml
Outdated
skiplist: | ||
extends: default | ||
class: SkipListMemtable | ||
skiplist_remapped: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use skiplist_remapped
anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added inCreateTest
.
import java.util.List; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import org.junit.Assert; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused: import org.junit.Assert;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
} | ||
|
||
public static Memtable.Factory FACTORY = | ||
(commitLogLowerBound, metadaRef, owner) -> new SkipListMemtable(commitLogLowerBound, metadaRef, owner); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also just be...
public static Memtable.Factory FACTORY = SkipListMemtable::new;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (hasOption(Option.MEMTABLE)) | ||
builder.memtable(MemtableParams.get(getString(Option.MEMTABLE))); | ||
else | ||
builder.memtable(MemtableParams.DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also be...
builder.memtable(MemtableParams.get(getSimple(Option.MEMTABLE.toString())));
...given how get()
handles nulls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the else
part does not follow the pattern of the code. In theory one could call this twice and resetting to default if the memtable is not specified the second time would be unexpected. Changed.
return memtable().metadata(); | ||
} | ||
|
||
long partitionCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this necessary, given we extend SSTableWriter.SSTableSizeParameters
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
also make sure default memtable options are not stored in table config.
Table configuration can now only select a configuration defined in cassandra.yaml, to permit per-node configuration that cannot be overridden by table configuration. Also adds support for configurations to extend from others, to permit easy remapping of memtable configurations.
The move to ParameterizedClass is to avoid deviating from what is used elsewhere, but also to ensure parameters are correctly interpreted as strings.
This adds a "snapshot_commitlog_position" field to "commitlog_archiving.properties", which overrides the commit log intervals to be replayed. This should be used to specify the time that a persistent memtable snapshot was taken (or started) to correctly replay commit log segments. If the value is not specified, and a persistent memtable is in use, all present segments will be replayed to support a mode where older segments are deleted when a snapshot is created.
09f4e05
to
9627194
Compare
Provides a memtable API as described in CEP-11.