Skip to content

Commit

Permalink
CEP-15: Convert AccordStateCache cache from write-through to write-back
Browse files Browse the repository at this point in the history
patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
CASSANDRA-18563
  • Loading branch information
iamaleksey committed Jun 16, 2023
1 parent 2230fb5 commit bd33015
Show file tree
Hide file tree
Showing 19 changed files with 1,651 additions and 1,501 deletions.
640 changes: 640 additions & 0 deletions src/java/org/apache/cassandra/service/accord/AccordCachingState.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.function.Consumer;
import java.util.function.Function;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

Expand Down Expand Up @@ -66,7 +68,11 @@
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.Observable;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.async.AsyncOperation;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
Expand Down Expand Up @@ -111,14 +117,39 @@ public AccordCommandStore(int id,
DataStore dataStore,
ProgressLog.Factory progressLogFactory,
RangesForEpochHolder rangesForEpoch)
{
this(id, time, agent, dataStore, progressLogFactory, rangesForEpoch, Stage.READ.executor(), Stage.MUTATION.executor());
}

@VisibleForTesting
public AccordCommandStore(int id,
NodeTimeService time,
Agent agent,
DataStore dataStore,
ProgressLog.Factory progressLogFactory,
RangesForEpochHolder rangesForEpoch,
ExecutorPlus loadExecutor,
ExecutorPlus saveExecutor)
{
super(id, time, agent, dataStore, progressLogFactory, rangesForEpoch);
this.loggingId = String.format("[%s]", id);
this.executor = executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + ']');
this.threadId = getThreadId(this.executor);
this.stateCache = new AccordStateCache(8<<20);
this.commandCache = stateCache.instance(TxnId.class, accord.local.Command.class, AccordSafeCommand::new, AccordObjectSizes::command);
this.commandsForKeyCache = stateCache.instance(RoutableKey.class, CommandsForKey.class, AccordSafeCommandsForKey::new, AccordObjectSizes::commandsForKey);
loggingId = String.format("[%s]", id);
executor = executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + ']');
threadId = getThreadId(this.executor);
stateCache = new AccordStateCache(loadExecutor, saveExecutor, 8 << 20);
commandCache =
stateCache.instance(TxnId.class,
TxnId.class,
AccordSafeCommand::new,
this::loadCommand,
this::saveCommand,
AccordObjectSizes::command);
commandsForKeyCache =
stateCache.instance(RoutableKey.class,
PartitionKey.class,
AccordSafeCommandsForKey::new,
this::loadCommandsForKey,
this::saveCommandsForKey,
AccordObjectSizes::commandsForKey);
executor.execute(() -> CommandStore.register(this));
executor.execute(this::loadRangesToCommands);
}
Expand All @@ -139,7 +170,7 @@ public void onNext(UntypedResultSet.Row row) throws Exception
PartialTxn txn = AccordKeyspace.deserializeTxn(row);
Seekables<?, ?> keys = txn.keys();
if (keys.domain() != Routable.Domain.Range)
throw new AssertionError(String.format("Txn keys are not range", txn));
throw new AssertionError(String.format("Txn keys are not range for %s", txn));
Ranges ranges = (Ranges) keys;

PartialDeps deps = AccordKeyspace.deserializeDependencies(row);
Expand Down Expand Up @@ -219,16 +250,40 @@ public AccordStateCache.Instance<RoutableKey, CommandsForKey, AccordSafeCommands
return commandsForKeyCache;
}

Command loadCommand(TxnId txnId)
{
return AccordKeyspace.loadCommand(this, txnId);
}

CommandsForKey loadCommandsForKey(RoutableKey key)
{
return AccordKeyspace.loadCommandsForKey(this, (PartitionKey) key);
}

@Nullable
Runnable saveCommand(Command before, Command after)
{
Mutation mutation = AccordKeyspace.getCommandMutation(id, before, after, nextSystemTimestampMicros());
return null != mutation ? mutation::apply : null;
}

@Nullable
private Runnable saveCommandsForKey(CommandsForKey before, CommandsForKey after)
{
Mutation mutation = AccordKeyspace.getCommandsForKeyMutation(id, before, after, nextSystemTimestampMicros());
return null != mutation ? mutation::apply : null;
}

@VisibleForTesting
public AccordStateCache cache()
{
return stateCache;
}

@VisibleForTesting
public void clearCache()
public void unsafeClearCache()
{
stateCache.clear();
stateCache.unsafeClear();
}

public void setCurrentOperation(AsyncOperation<?> operation)
Expand Down Expand Up @@ -319,9 +374,7 @@ public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
return current;
}

public void completeOperation(AccordSafeCommandStore store,
Map<TxnId, AccordSafeCommand> commands,
Map<RoutableKey, AccordSafeCommandsForKey> commandsForKeys)
public void completeOperation(AccordSafeCommandStore store)
{
Invariants.checkState(current == store);
current.complete();
Expand Down
41 changes: 24 additions & 17 deletions src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,14 @@ private static <C, K, V> int estimateMapChanges(Function<C, Map<K, V>> get, C or
}

public static Mutation getCommandMutation(AccordCommandStore commandStore, AccordSafeCommand liveCommand, long timestampMicros)
{
return getCommandMutation(commandStore.id(), liveCommand.original(), liveCommand.current(), timestampMicros);
}

public static Mutation getCommandMutation(int storeId, Command original, Command command, long timestampMicros)
{
try
{
Command command = liveCommand.current();
Command original = liveCommand.original();
Invariants.checkArgument(original != command);

Row.Builder builder = BTreeRow.unsortedBuilder();
Expand Down Expand Up @@ -569,12 +572,13 @@ public static Mutation getCommandMutation(AccordCommandStore commandStore, Accor
}
}

ByteBuffer key = CommandsColumns.keyComparator.make(commandStore.id(),
command.txnId().domain().ordinal(),
serializeTimestamp(command.txnId())).serializeAsPartitionKey();
Row row = builder.build();
if (row.isEmpty())
return null;

ByteBuffer key = CommandsColumns.keyComparator.make(storeId,
command.txnId().domain().ordinal(),
serializeTimestamp(command.txnId())).serializeAsPartitionKey();
PartitionUpdate update = PartitionUpdate.singleRowUpdate(Commands, key, row);
return new Mutation(update);
}
Expand Down Expand Up @@ -1014,26 +1018,29 @@ private static void addSeriesMutations(CommandsForKey original,
addSeriesMutations(kind.getValues(original), kind.getValues(cfk), kind, partitionBuilder, rowBuilder, timestampMicros, nowInSeconds);
}

private static DecoratedKey makeKey(CommandStore commandStore, PartitionKey key)
private static DecoratedKey makeKey(int storeId, PartitionKey key)
{
Token token = key.token();
ByteBuffer pk = CommandsForKeyColumns.keyComparator.make(commandStore.id(),
serializeToken(token),
serializeKey(key)).serializeAsPartitionKey();
ByteBuffer pk = CommandsForKeyColumns.keyComparator.make(storeId,
serializeToken(token),
serializeKey(key)).serializeAsPartitionKey();
return CommandsForKeys.partitioner.decorateKey(pk);
}

private static DecoratedKey makeKey(CommandStore commandStore, CommandsForKey cfk)
private static DecoratedKey makeKey(int storeId, CommandsForKey cfk)
{
return makeKey(commandStore, (PartitionKey) cfk.key());
return makeKey(storeId, (PartitionKey) cfk.key());
}

public static Mutation getCommandsForKeyMutation(AccordCommandStore commandStore, AccordSafeCommandsForKey liveCfk, long timestampMicros)
{
return getCommandsForKeyMutation(commandStore.id(), liveCfk.original(), liveCfk.current(), timestampMicros);
}

public static Mutation getCommandsForKeyMutation(int storeId, CommandsForKey original, CommandsForKey cfk, long timestampMicros)
{
try
{
CommandsForKey cfk = liveCfk.current();
CommandsForKey original = liveCfk.original();
Invariants.checkArgument(original != cfk);
// TODO: convert to byte arrays
ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
Expand All @@ -1046,7 +1053,7 @@ public static Mutation getCommandsForKeyMutation(AccordCommandStore commandStore
+ estimateMapChanges(c -> c.byExecuteAt().commands, original, cfk);

PartitionUpdate.Builder partitionBuilder = new PartitionUpdate.Builder(CommandsForKeys,
makeKey(commandStore, cfk),
makeKey(storeId, cfk),
CommandsForKeyColumns.columnsFor(original, cfk),
expectedRows);

Expand Down Expand Up @@ -1095,13 +1102,13 @@ private static <T> ByteBuffer clusteringValue(Clustering<T> clustering, int idx)
return clustering.accessor().toBuffer(clustering.get(idx));
}

public static SinglePartitionReadCommand getCommandsForKeyRead(CommandStore commandStore, PartitionKey key, int nowInSeconds)
public static SinglePartitionReadCommand getCommandsForKeyRead(int storeId, PartitionKey key, int nowInSeconds)
{
return SinglePartitionReadCommand.create(CommandsForKeys, nowInSeconds,
CommandsForKeyColumns.allColumns,
RowFilter.NONE,
DataLimits.NONE,
makeKey(commandStore, key),
makeKey(storeId, key),
FULL_PARTITION);
}

Expand All @@ -1111,7 +1118,7 @@ public static CommandsForKey loadCommandsForKey(AccordCommandStore commandStore,
long timestampMicros = TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros);

SinglePartitionReadCommand command = getCommandsForKeyRead(commandStore, key, nowInSeconds);
SinglePartitionReadCommand command = getCommandsForKeyRead(commandStore.id(), key, nowInSeconds);

EnumMap<SeriesKind, ImmutableSortedMap.Builder<Timestamp, ByteBuffer>> seriesMaps = new EnumMap<>(SeriesKind.class);
for (SeriesKind kind : SeriesKind.values())
Expand Down

0 comments on commit bd33015

Please sign in to comment.