Skip to content

Commit

Permalink
STAR-1341 abstract mutations used by CDC (apache#458)
Browse files Browse the repository at this point in the history
* abstract mutations used by CDC
At first, I tried to refactor StorageProxy methods and
WritePerformer interface, but since every now and then we want to
rebase on OS C*, I reverted all the changes.
Instead, a new abstraction level over various mutate methods is
introduced - Mutator. The default implementation proxies the calls to
the original (default) methods.
The users may provide a custom class Mutator implementation to alter
the mutation behavior.

This solution was chosen to minimize the rebase conflicts, it's a
compromise between code structuring and maintenance cost.
In the ideal world we could refactor the mutation methods to
separate classes.

* add ComplexColumnData.hasCells

* make currentTimeoutNanos overridable
CDCWriteResponseHandler doesn't use the default timeout
taken from DatabaseDescriptor

* port 'CNDB-3334 CDC counter support'
Add applyCounterMutationWithoutLocks method.

The method applies the counter without locks (not needed in CDC as
all the writes to the same partition are serialized by CDC Service).
The mutation is applied with the given id and clockMicros. The values
are precomputed and distributed by CDC Service. Writers receive the
mutation with the attached values, thanks to this all the writers may
apply the mutation as if they were leader replicas.
See the ticket description for details on leader and follower replica
concepts used by pure Cassandra counter write path.

* make RepairedDataInfo public

RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO is used by CDC
to denote no repair data info.

(cherry picked from commit 5191be2)
(cherry picked from commit 75b8d12)
(cherry picked from commit ba2a0a6)
  • Loading branch information
jtgrabowski authored and djatnieks committed Sep 12, 2023
1 parent 6b0c581 commit 22eefdc
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 38 deletions.
Expand Up @@ -335,7 +335,12 @@ public enum CassandraRelevantProperties
// in OSS, when UUID based SSTable generation identifiers are enabled, they use TimeUUID
// though, for CNDB we want to use ULID - this property allows for that
// valid values for this property are: uuid, ulid
SSTABLE_UUID_IMPL("cassandra.sstable.id.uuid_impl", "uuid");
SSTABLE_UUID_IMPL("cassandra.sstable.id.uuid_impl", "uuid"),

/**
* Name of a custom implementation of {@link org.apache.cassandra.service.Mutator}.
*/
CUSTOM_MUTATOR_CLASS("cassandra.custom_mutator_class");

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
72 changes: 57 additions & 15 deletions src/java/org/apache/cassandra/db/CounterMutation.java
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

Expand Down Expand Up @@ -168,11 +169,15 @@ public Mutation applyCounterMutation() throws WriteTimeoutException

List<Lock> locks = new ArrayList<>();
Tracing.trace("Acquiring counter locks");

long clock = FBUtilities.timestampMicros();
CounterId counterId = CounterId.getLocalId();

try
{
grabCounterLocks(keyspace, locks);
for (PartitionUpdate upd : getPartitionUpdates())
resultBuilder.add(processModifications(upd));
resultBuilder.add(processModifications(upd, clock, counterId));

Mutation result = resultBuilder.build();
result.apply();
Expand All @@ -185,6 +190,26 @@ public Mutation applyCounterMutation() throws WriteTimeoutException
}
}

/**
* Applies the counter mutation with the provided time and {@link CounterId}. As opposed to
* {@link #applyCounterMutation()} this method doesn't acquire cell-level locks.
* <p/>
* This method is used in CDC counter write path (CNDB).
* <p/>
* The time and counter values are evaluated and propagated to all replicas by CDC Service. The replicas
* use this method to apply the mutation locally without locks. The locks are not needed in the CDC
* path as all the writes to the same partition are serialized by CDC Service.
*/
public CompletableFuture<Mutation> applyCounterMutationWithoutLocks(long systemClockMicros, CounterId counterId)
{
Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(getKeyspaceName(), key());
for (PartitionUpdate upd : getPartitionUpdates())
resultBuilder.add(processModifications(upd, systemClockMicros, counterId));

Mutation mutatation = resultBuilder.build();
return mutatation.applyFuture(WriteOptions.DEFAULT).thenApply(ignore -> mutatation);
}

public void apply()
{
applyCounterMutation();
Expand Down Expand Up @@ -275,7 +300,9 @@ public Object apply(final ColumnData data)
}));
}

private PartitionUpdate processModifications(PartitionUpdate changes)
private PartitionUpdate processModifications(PartitionUpdate changes,
long systemClockMicros,
CounterId counterId)
{
ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().id);

Expand All @@ -284,34 +311,41 @@ private PartitionUpdate processModifications(PartitionUpdate changes)
if (CacheService.instance.counterCache.getCapacity() != 0)
{
Tracing.trace("Fetching {} counter values from cache", marks.size());
updateWithCurrentValuesFromCache(marks, cfs);
updateWithCurrentValuesFromCache(marks, cfs, systemClockMicros, counterId);
if (marks.isEmpty())
return changes;
}

Tracing.trace("Reading {} counter values from the CF", marks.size());
updateWithCurrentValuesFromCFS(marks, cfs);
updateWithCurrentValuesFromCFS(marks, cfs, systemClockMicros, counterId);

// What's remain is new counters
for (PartitionUpdate.CounterMark mark : marks)
updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs);
updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs, systemClockMicros, counterId);

return changes;
}

private void updateWithCurrentValue(PartitionUpdate.CounterMark mark, ClockAndCount currentValue, ColumnFamilyStore cfs)
private void updateWithCurrentValue(PartitionUpdate.CounterMark mark,
ClockAndCount currentValue,
ColumnFamilyStore cfs,
long systemClockMicros,
CounterId counterId)
{
long clock = Math.max(FBUtilities.timestampMicros(), currentValue.clock + 1L);
long clock = Math.max(systemClockMicros, currentValue.clock + 1L);
long count = currentValue.count + CounterContext.instance().total(mark.value(), ByteBufferAccessor.instance);

mark.setValue(CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count));
mark.setValue(CounterContext.instance().createGlobal(counterId, clock, count));

// Cache the newly updated value
cfs.putCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path(), ClockAndCount.create(clock, count));
}

// Returns the count of cache misses.
private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs)
private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark> marks,
ColumnFamilyStore cfs,
long systemClockMicros,
CounterId counterId)
{
Iterator<PartitionUpdate.CounterMark> iter = marks.iterator();
while (iter.hasNext())
Expand All @@ -320,14 +354,17 @@ private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark>
ClockAndCount cached = cfs.getCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path());
if (cached != null)
{
updateWithCurrentValue(mark, cached, cfs);
updateWithCurrentValue(mark, cached, cfs, systemClockMicros, counterId);
iter.remove();
}
}
}

// Reads the missing current values from the CFS.
private void updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs)
private void updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks,
ColumnFamilyStore cfs,
long systemClockMicros,
CounterId counterId)
{
ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
BTreeSet.Builder<Clustering<?>> names = BTreeSet.builder(cfs.metadata().comparator);
Expand All @@ -348,14 +385,14 @@ private void updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> ma
try (ReadExecutionController controller = cmd.executionController();
RowIterator partition = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, controller), nowInSec))
{
updateForRow(markIter, partition.staticRow(), cfs);
updateForRow(markIter, partition.staticRow(), cfs, systemClockMicros, counterId);

while (partition.hasNext())
{
if (!markIter.hasNext())
return;

updateForRow(markIter, partition.next(), cfs);
updateForRow(markIter, partition.next(), cfs, systemClockMicros, counterId);
}
}
}
Expand All @@ -370,7 +407,11 @@ private int compare(Clustering<?> c1, Clustering<?> c2, ColumnFamilyStore cfs)
return cfs.getComparator().compare(c1, c2);
}

private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> markIter, Row row, ColumnFamilyStore cfs)
private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> markIter,
Row row,
ColumnFamilyStore cfs,
long systemClockMicros,
CounterId counterId)
{
int cmp = 0;
// If the mark is before the row, we have no value for this mark, just consume it
Expand All @@ -386,7 +427,8 @@ private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> markIter,
Cell<?> cell = mark.path() == null ? row.getCell(mark.column()) : row.getCell(mark.column(), mark.path());
if (cell != null)
{
updateWithCurrentValue(mark, CounterContext.instance().getLocalClockAndCount(cell.buffer()), cfs);
ClockAndCount localClockAndCount = CounterContext.instance().getLocalClockAndCount(cell.buffer());
updateWithCurrentValue(mark, localClockAndCount, cfs, systemClockMicros, counterId);
markIter.remove();
}
if (!markIter.hasNext())
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/RepairedDataInfo.java
Expand Up @@ -37,7 +37,7 @@
import org.apache.cassandra.utils.ByteBufferUtil;

@NotThreadSafe
class RepairedDataInfo
public class RepairedDataInfo
{
public static final RepairedDataInfo NO_OP_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
{
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Expand Up @@ -67,6 +67,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell<?>>
this.complexDeletion = complexDeletion;
}

// Used by CNDB
public boolean hasCells() {
return !BTree.isEmpty(this.cells);
}

public int cellsCount()
{
return BTree.size(cells);
Expand Down
Expand Up @@ -125,7 +125,7 @@ public void get() throws WriteTimeoutException, WriteFailureException
}
}

public final long currentTimeoutNanos()
public long currentTimeoutNanos()
{
long requestTimeout = writeType == WriteType.COUNTER
? DatabaseDescriptor.getCounterWriteRpcTimeout(NANOSECONDS)
Expand Down
61 changes: 61 additions & 0 deletions src/java/org/apache/cassandra/service/Mutator.java
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service;

import javax.annotation.Nullable;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.service.paxos.Commit;

/**
* Facilitates mutations for counters, simple inserts, unlogged batches and LWTs.
* Used on the coordinator.
* <br/>
* The implementations may choose how and where to send the mutations.
* <br/>
* An instance of this interface implementation must be obtained via {@link MutatorProvider#instance}.
*/
public interface Mutator
{
/**
* Used for handling counter mutations on the coordinator level.
*/
AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime);

/**
* Used for standard inserts and unlogged batchs.
*/
AbstractWriteResponseHandler<IMutation> mutateStandard(Mutation mutation,
ConsistencyLevel consistencyLevel,
String localDataCenter,
StorageProxy.WritePerformer writePerformer,
Runnable callback,
WriteType writeType,
long queryStartNanoTime);

/**
* Used for LWT mutation at the last (COMMIT) phase of Paxos.
*/
@Nullable
AbstractWriteResponseHandler<Commit> mutatePaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime);
}
56 changes: 56 additions & 0 deletions src/java/org/apache/cassandra/service/MutatorProvider.java
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_MUTATOR_CLASS;

/**
* Provides an instance of {@link Mutator} that facilitates mutation writes for standard mutations, unlogged batches,
* counters and paxos commits (LWT)s.
* <br/>
* An implementation may choose to fallback to the default implementation ({@link StorageProxy.DefaultMutator})
* obtained via {@link #getDefaultMutator()}.
*/
public abstract class MutatorProvider
{
static final Mutator instance = getCustomOrDefault();

@VisibleForTesting
static Mutator getCustomOrDefault()
{
if (CUSTOM_MUTATOR_CLASS.isPresent())
{
return FBUtilities.construct(CUSTOM_MUTATOR_CLASS.getString(),
"custom mutator class (set with " + CUSTOM_MUTATOR_CLASS.getKey() + ")");
}
else
{
return getDefaultMutator();
}
}

public static Mutator getDefaultMutator()
{
return new StorageProxy.DefaultMutator();
}
}
28 changes: 12 additions & 16 deletions src/java/org/apache/cassandra/service/QueryInfoTracker.java
Expand Up @@ -18,12 +18,9 @@

package org.apache.cassandra.service;

import java.net.InetAddress;
import java.util.Collection;
import java.util.List;

import com.google.common.base.Preconditions;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
Expand All @@ -35,21 +32,20 @@
import org.apache.cassandra.schema.TableMetadata;

/**
* A tracker objects that can be registered against {@link StorageProxy} to be called back with information on executed
* queries.
* A tracker notified about executed queries.
*
* <p>The goal of this interface is to provide to implementations enough information for it to accurately estimate how
* much "work" a query has performed. So while for write this mostly just mean passing the generated mutations, for
* reads this mean passing the unfiltered result of the query.
* <p>The goal of this interface is to provide enough information to accurately estimate how
* much "work" a query has performed. So while for writes this mostly just means passing the generated mutations, for
* reads this means passing the unfiltered result of the query.
*
* <p>The methods of this tracker are called from {@link StorageProxy} and are thus "coordinator level". As such, all
* user writes or reads will trigger the call of one of these methods, as will internal distributed system table
* queries, but internal local system table queries will not.
* <p>The tracker methods are called from {@link StorageProxy} and are thus "coordinator level". All
* user queries and internal distributed system table queries trigger a call to one of these methods.
* Internal local system table queries don't.
*
* <p>For writes, the {@link #onWrite} method is only called for the "user write", but if that write trigger either
* secondary index or materialized views updates, those additional update do not trigger additional calls.
* <p>For writes, the {@link #onWrite} method is only called for the "user write", but if that write triggers either
* secondary index or materialized views updates, those additional updates do not trigger additional calls.
*
* <p>The methods of this tracker are called on hot path, so they should be as lightweight as possible.
* <p>The tracker methods are called on write and read hot paths, so they should be as lightweight as possible.
*/
public interface QueryInfoTracker
{
Expand Down Expand Up @@ -197,10 +193,10 @@ public void onError(Throwable exception)
interface ReadTracker extends Tracker
{
/**
* Calls just before queries are sent with the contacts from the replica plan.
* Called just before the queries are sent to the replica plan contacts.
* Note that this callback method may be invoked more than once for a given read,
* e.g. range quries spanning multiple partitions are internally issued as a
* number of subranges requests to different replicas (with different
* number of subrange requests to different replicas (with different
* ReplicaPlans). This callback is called at least once for a given read.
*
* @param replicaPlan the queried nodes.
Expand Down

0 comments on commit 22eefdc

Please sign in to comment.