diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index b3b376fc7084..d2a5a76ccfc4 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -335,7 +335,12 @@ public enum CassandraRelevantProperties // in OSS, when UUID based SSTable generation identifiers are enabled, they use TimeUUID // though, for CNDB we want to use ULID - this property allows for that // valid values for this property are: uuid, ulid - SSTABLE_UUID_IMPL("cassandra.sstable.id.uuid_impl", "uuid"); + SSTABLE_UUID_IMPL("cassandra.sstable.id.uuid_impl", "uuid"), + + /** + * Name of a custom implementation of {@link org.apache.cassandra.service.Mutator}. + */ + CUSTOM_MUTATOR_CLASS("cassandra.custom_mutator_class"); CassandraRelevantProperties(String key, String defaultVal) { diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 7a71448d6844..523ba2d483ab 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -168,11 +169,15 @@ public Mutation applyCounterMutation() throws WriteTimeoutException List locks = new ArrayList<>(); Tracing.trace("Acquiring counter locks"); + + long clock = FBUtilities.timestampMicros(); + CounterId counterId = CounterId.getLocalId(); + try { grabCounterLocks(keyspace, locks); for (PartitionUpdate upd : getPartitionUpdates()) - resultBuilder.add(processModifications(upd)); + resultBuilder.add(processModifications(upd, clock, counterId)); Mutation result = resultBuilder.build(); result.apply(); @@ -185,6 +190,26 @@ public Mutation applyCounterMutation() throws WriteTimeoutException } } + /** + * Applies the counter mutation with the provided time and {@link CounterId}. As opposed to + * {@link #applyCounterMutation()} this method doesn't acquire cell-level locks. + *

+ * This method is used in CDC counter write path (CNDB). + *

+ * The time and counter values are evaluated and propagated to all replicas by CDC Service. The replicas + * use this method to apply the mutation locally without locks. The locks are not needed in the CDC + * path as all the writes to the same partition are serialized by CDC Service. + */ + public CompletableFuture applyCounterMutationWithoutLocks(long systemClockMicros, CounterId counterId) + { + Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(getKeyspaceName(), key()); + for (PartitionUpdate upd : getPartitionUpdates()) + resultBuilder.add(processModifications(upd, systemClockMicros, counterId)); + + Mutation mutatation = resultBuilder.build(); + return mutatation.applyFuture(WriteOptions.DEFAULT).thenApply(ignore -> mutatation); + } + public void apply() { applyCounterMutation(); @@ -275,7 +300,9 @@ public Object apply(final ColumnData data) })); } - private PartitionUpdate processModifications(PartitionUpdate changes) + private PartitionUpdate processModifications(PartitionUpdate changes, + long systemClockMicros, + CounterId counterId) { ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().id); @@ -284,34 +311,41 @@ private PartitionUpdate processModifications(PartitionUpdate changes) if (CacheService.instance.counterCache.getCapacity() != 0) { Tracing.trace("Fetching {} counter values from cache", marks.size()); - updateWithCurrentValuesFromCache(marks, cfs); + updateWithCurrentValuesFromCache(marks, cfs, systemClockMicros, counterId); if (marks.isEmpty()) return changes; } Tracing.trace("Reading {} counter values from the CF", marks.size()); - updateWithCurrentValuesFromCFS(marks, cfs); + updateWithCurrentValuesFromCFS(marks, cfs, systemClockMicros, counterId); // What's remain is new counters for (PartitionUpdate.CounterMark mark : marks) - updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs); + updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs, systemClockMicros, counterId); return changes; } - private void updateWithCurrentValue(PartitionUpdate.CounterMark mark, ClockAndCount currentValue, ColumnFamilyStore cfs) + private void updateWithCurrentValue(PartitionUpdate.CounterMark mark, + ClockAndCount currentValue, + ColumnFamilyStore cfs, + long systemClockMicros, + CounterId counterId) { - long clock = Math.max(FBUtilities.timestampMicros(), currentValue.clock + 1L); + long clock = Math.max(systemClockMicros, currentValue.clock + 1L); long count = currentValue.count + CounterContext.instance().total(mark.value(), ByteBufferAccessor.instance); - mark.setValue(CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count)); + mark.setValue(CounterContext.instance().createGlobal(counterId, clock, count)); // Cache the newly updated value cfs.putCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path(), ClockAndCount.create(clock, count)); } // Returns the count of cache misses. - private void updateWithCurrentValuesFromCache(List marks, ColumnFamilyStore cfs) + private void updateWithCurrentValuesFromCache(List marks, + ColumnFamilyStore cfs, + long systemClockMicros, + CounterId counterId) { Iterator iter = marks.iterator(); while (iter.hasNext()) @@ -320,14 +354,17 @@ private void updateWithCurrentValuesFromCache(List ClockAndCount cached = cfs.getCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path()); if (cached != null) { - updateWithCurrentValue(mark, cached, cfs); + updateWithCurrentValue(mark, cached, cfs, systemClockMicros, counterId); iter.remove(); } } } // Reads the missing current values from the CFS. - private void updateWithCurrentValuesFromCFS(List marks, ColumnFamilyStore cfs) + private void updateWithCurrentValuesFromCFS(List marks, + ColumnFamilyStore cfs, + long systemClockMicros, + CounterId counterId) { ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); BTreeSet.Builder> names = BTreeSet.builder(cfs.metadata().comparator); @@ -348,14 +385,14 @@ private void updateWithCurrentValuesFromCFS(List ma try (ReadExecutionController controller = cmd.executionController(); RowIterator partition = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, controller), nowInSec)) { - updateForRow(markIter, partition.staticRow(), cfs); + updateForRow(markIter, partition.staticRow(), cfs, systemClockMicros, counterId); while (partition.hasNext()) { if (!markIter.hasNext()) return; - updateForRow(markIter, partition.next(), cfs); + updateForRow(markIter, partition.next(), cfs, systemClockMicros, counterId); } } } @@ -370,7 +407,11 @@ private int compare(Clustering c1, Clustering c2, ColumnFamilyStore cfs) return cfs.getComparator().compare(c1, c2); } - private void updateForRow(PeekingIterator markIter, Row row, ColumnFamilyStore cfs) + private void updateForRow(PeekingIterator markIter, + Row row, + ColumnFamilyStore cfs, + long systemClockMicros, + CounterId counterId) { int cmp = 0; // If the mark is before the row, we have no value for this mark, just consume it @@ -386,7 +427,8 @@ private void updateForRow(PeekingIterator markIter, Cell cell = mark.path() == null ? row.getCell(mark.column()) : row.getCell(mark.column(), mark.path()); if (cell != null) { - updateWithCurrentValue(mark, CounterContext.instance().getLocalClockAndCount(cell.buffer()), cfs); + ClockAndCount localClockAndCount = CounterContext.instance().getLocalClockAndCount(cell.buffer()); + updateWithCurrentValue(mark, localClockAndCount, cfs, systemClockMicros, counterId); markIter.remove(); } if (!markIter.hasNext()) diff --git a/src/java/org/apache/cassandra/db/RepairedDataInfo.java b/src/java/org/apache/cassandra/db/RepairedDataInfo.java index 896bad13804a..cd6821b70c3d 100644 --- a/src/java/org/apache/cassandra/db/RepairedDataInfo.java +++ b/src/java/org/apache/cassandra/db/RepairedDataInfo.java @@ -37,7 +37,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; @NotThreadSafe -class RepairedDataInfo +public class RepairedDataInfo { public static final RepairedDataInfo NO_OP_REPAIRED_DATA_INFO = new RepairedDataInfo(null) { diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index b6ea7fbe9eb7..b80b182d6d02 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -67,6 +67,11 @@ public class ComplexColumnData extends ColumnData implements Iterable> this.complexDeletion = complexDeletion; } + // Used by CNDB + public boolean hasCells() { + return !BTree.isEmpty(this.cells); + } + public int cellsCount() { return BTree.size(cells); diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 712827788396..cbd98512a039 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -125,7 +125,7 @@ public void get() throws WriteTimeoutException, WriteFailureException } } - public final long currentTimeoutNanos() + public long currentTimeoutNanos() { long requestTimeout = writeType == WriteType.COUNTER ? DatabaseDescriptor.getCounterWriteRpcTimeout(NANOSECONDS) diff --git a/src/java/org/apache/cassandra/service/Mutator.java b/src/java/org/apache/cassandra/service/Mutator.java new file mode 100644 index 000000000000..954ead8593af --- /dev/null +++ b/src/java/org/apache/cassandra/service/Mutator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import javax.annotation.Nullable; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.service.paxos.Commit; + +/** + * Facilitates mutations for counters, simple inserts, unlogged batches and LWTs. + * Used on the coordinator. + *
+ * The implementations may choose how and where to send the mutations. + *
+ * An instance of this interface implementation must be obtained via {@link MutatorProvider#instance}. + */ +public interface Mutator +{ + /** + * Used for handling counter mutations on the coordinator level. + */ + AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime); + + /** + * Used for standard inserts and unlogged batchs. + */ + AbstractWriteResponseHandler mutateStandard(Mutation mutation, + ConsistencyLevel consistencyLevel, + String localDataCenter, + StorageProxy.WritePerformer writePerformer, + Runnable callback, + WriteType writeType, + long queryStartNanoTime); + + /** + * Used for LWT mutation at the last (COMMIT) phase of Paxos. + */ + @Nullable + AbstractWriteResponseHandler mutatePaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime); +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/MutatorProvider.java b/src/java/org/apache/cassandra/service/MutatorProvider.java new file mode 100644 index 000000000000..dd5b37e200a5 --- /dev/null +++ b/src/java/org/apache/cassandra/service/MutatorProvider.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_MUTATOR_CLASS; + +/** + * Provides an instance of {@link Mutator} that facilitates mutation writes for standard mutations, unlogged batches, + * counters and paxos commits (LWT)s. + *
+ * An implementation may choose to fallback to the default implementation ({@link StorageProxy.DefaultMutator}) + * obtained via {@link #getDefaultMutator()}. + */ +public abstract class MutatorProvider +{ + static final Mutator instance = getCustomOrDefault(); + + @VisibleForTesting + static Mutator getCustomOrDefault() + { + if (CUSTOM_MUTATOR_CLASS.isPresent()) + { + return FBUtilities.construct(CUSTOM_MUTATOR_CLASS.getString(), + "custom mutator class (set with " + CUSTOM_MUTATOR_CLASS.getKey() + ")"); + } + else + { + return getDefaultMutator(); + } + } + + public static Mutator getDefaultMutator() + { + return new StorageProxy.DefaultMutator(); + } +} diff --git a/src/java/org/apache/cassandra/service/QueryInfoTracker.java b/src/java/org/apache/cassandra/service/QueryInfoTracker.java index ed13b7619918..2dee8603dfc2 100644 --- a/src/java/org/apache/cassandra/service/QueryInfoTracker.java +++ b/src/java/org/apache/cassandra/service/QueryInfoTracker.java @@ -18,12 +18,9 @@ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Collection; import java.util.List; -import com.google.common.base.Preconditions; - import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IMutation; @@ -35,21 +32,20 @@ import org.apache.cassandra.schema.TableMetadata; /** - * A tracker objects that can be registered against {@link StorageProxy} to be called back with information on executed - * queries. + * A tracker notified about executed queries. * - *

The goal of this interface is to provide to implementations enough information for it to accurately estimate how - * much "work" a query has performed. So while for write this mostly just mean passing the generated mutations, for - * reads this mean passing the unfiltered result of the query. + *

The goal of this interface is to provide enough information to accurately estimate how + * much "work" a query has performed. So while for writes this mostly just means passing the generated mutations, for + * reads this means passing the unfiltered result of the query. * - *

The methods of this tracker are called from {@link StorageProxy} and are thus "coordinator level". As such, all - * user writes or reads will trigger the call of one of these methods, as will internal distributed system table - * queries, but internal local system table queries will not. + *

The tracker methods are called from {@link StorageProxy} and are thus "coordinator level". All + * user queries and internal distributed system table queries trigger a call to one of these methods. + * Internal local system table queries don't. * - *

For writes, the {@link #onWrite} method is only called for the "user write", but if that write trigger either - * secondary index or materialized views updates, those additional update do not trigger additional calls. + *

For writes, the {@link #onWrite} method is only called for the "user write", but if that write triggers either + * secondary index or materialized views updates, those additional updates do not trigger additional calls. * - *

The methods of this tracker are called on hot path, so they should be as lightweight as possible. + *

The tracker methods are called on write and read hot paths, so they should be as lightweight as possible. */ public interface QueryInfoTracker { @@ -197,10 +193,10 @@ public void onError(Throwable exception) interface ReadTracker extends Tracker { /** - * Calls just before queries are sent with the contacts from the replica plan. + * Called just before the queries are sent to the replica plan contacts. * Note that this callback method may be invoked more than once for a given read, * e.g. range quries spanning multiple partitions are internally issued as a - * number of subranges requests to different replicas (with different + * number of subrange requests to different replicas (with different * ReplicaPlans). This callback is called at least once for a given read. * * @param replicaPlan the queried nodes. diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 2b9f08e72a7c..1af2574b0d71 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import javax.annotation.Nullable; import com.google.common.base.Preconditions; import com.google.common.cache.CacheLoader; @@ -161,6 +162,29 @@ public class StorageProxy implements StorageProxyMBean public static final StorageProxy instance = new StorageProxy(); + private static final Mutator mutator = MutatorProvider.instance; + + static class DefaultMutator implements Mutator + { + @Override + public AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) + { + return defaultMutateCounter(cm, localDataCenter, queryStartNanoTime); + } + + @Override + public AbstractWriteResponseHandler mutateStandard(Mutation mutation, ConsistencyLevel consistencyLevel, String localDataCenter, WritePerformer standardWritePerformer, Runnable callback, WriteType writeType, long queryStartNanoTime) + { + return performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, callback, writeType, queryStartNanoTime); + } + + @Override + public AbstractWriteResponseHandler mutatePaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime) + { + return defaultCommitPaxos(proposal, consistencyLevel, allowHints, queryStartNanoTime); + } + } + private static volatile int maxHintsInProgress = 128 * FBUtilities.getAvailableProcessors(); private static final CacheLoader hintsInProgress = new CacheLoader() { @@ -744,7 +768,17 @@ private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite r return false; } + @Nullable private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime) throws WriteTimeoutException + { + boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; + AbstractWriteResponseHandler responseHandler = mutator.mutatePaxos(proposal, consistencyLevel, allowHints, queryStartNanoTime); + if (shouldBlock && responseHandler != null) + responseHandler.get(); + } + + @Nullable + private static AbstractWriteResponseHandler defaultCommitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime) throws WriteTimeoutException { boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace); @@ -793,8 +827,7 @@ private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLev } } - if (shouldBlock) - responseHandler.get(); + return responseHandler; } /** @@ -864,13 +897,13 @@ public static void mutate(List mutations, if (mutation instanceof CounterMutation) responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime)); else - responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime)); + responseHandlers.add(mutator.mutateStandard((Mutation)mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime)); } // upgrade to full quorum any failed cheap quorums for (int i = 0 ; i < mutations.size() ; ++i) { - if (!(mutations.get(i) instanceof CounterMutation)) // at the moment, only non-counter writes support cheap quorums + if (!(mutations.get(i) instanceof CounterMutation) && mutator instanceof DefaultMutator) // at the moment, only non-counter writes support cheap quorums responseHandlers.get(i).maybeTryAdditionalReplicas(mutations.get(i), standardWritePerformer, localDataCenter); } @@ -1654,6 +1687,11 @@ protected Verb verb() * the write latencies at the coordinator node to make gathering point similar to the case of standard writes. */ public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException + { + return mutator.mutateCounter(cm, localDataCenter, queryStartNanoTime); + } + + private static AbstractWriteResponseHandler defaultMutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException { Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java index 5f25c7218546..87bf65499449 100644 --- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java +++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java @@ -29,6 +29,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CounterId; import static org.junit.Assert.assertEquals; @@ -215,4 +216,20 @@ public void testDeletes() throws WriteTimeoutException ConsistencyLevel.ONE).apply(); Util.assertEmpty(Util.cmd(cfs).includeRow("cc").columns("val", "val2").build()); } + + @Test + public void testAddingWithoutLocks() throws WriteTimeoutException + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + ColumnMetadata cDef = cfs.metadata().getColumn(ByteBufferUtil.bytes("val")); + + // Do the initial update (+1) + long toAdd = 5; + Mutation m = new RowUpdateBuilder(cfs.metadata(), 5, "key1").clustering("cc").add("val", toAdd).build(); + new CounterMutation(m, ConsistencyLevel.ONE).applyCounterMutationWithoutLocks(1234567, CounterId.getLocalId()); + + Row row = Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build()); + assertEquals(toAdd, CounterContext.instance().total(row.getCell(cDef))); + } } diff --git a/test/unit/org/apache/cassandra/db/rows/ComplexColumnDataTest.java b/test/unit/org/apache/cassandra/db/rows/ComplexColumnDataTest.java new file mode 100644 index 000000000000..2076a99f65ba --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/ComplexColumnDataTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import org.junit.Assert; +import org.junit.Test; + +import junit.framework.TestCase; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.utils.btree.BTree; + +public class ComplexColumnDataTest extends TestCase +{ + private ColumnMetadata column = ColumnMetadata.regularColumn("ks", "tab", "col", + MapType.getInstance(Int32Type.instance, Int32Type.instance, true)); + + @Test + public void testEmptyComplexColumn() + { + ComplexColumnData data = new ComplexColumnData(column, + BTree.empty(), + DeletionTime.LIVE); + Assert.assertFalse(data.hasCells()); + } + + @Test + public void testNonEmptyComplexColumn() + { + + ComplexColumnData data = new ComplexColumnData(column, + BTree.singleton("ignored value"), + DeletionTime.LIVE); + Assert.assertTrue(data.hasCells()); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/service/MutatorProviderTest.java b/test/unit/org/apache/cassandra/service/MutatorProviderTest.java new file mode 100644 index 000000000000..defb16219d5a --- /dev/null +++ b/test/unit/org/apache/cassandra/service/MutatorProviderTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import javax.annotation.Nullable; + +import org.junit.Test; + +import junit.framework.TestCase; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.service.paxos.Commit; + +public class MutatorProviderTest extends TestCase +{ + public static class TestMutator implements Mutator + { + @Override + public AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) + { + return null; + } + + @Override + public AbstractWriteResponseHandler mutateStandard(Mutation mutation, ConsistencyLevel consistencyLevel, String localDataCenter, StorageProxy.WritePerformer writePerformer, Runnable callback, WriteType writeType, long queryStartNanoTime) + { + return null; + } + + @Nullable + @Override + public AbstractWriteResponseHandler mutatePaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime) + { + return null; + } + } + + @Test + public void testInstantinatingCustomMutator() + { + CassandraRelevantProperties.CUSTOM_MUTATOR_CLASS.setString("org.apache.cassandra.service.MutatorProviderTest$TestMutator"); + Mutator mutator = MutatorProvider.getCustomOrDefault(); + assertSame(mutator.getClass(), TestMutator.class); + System.clearProperty(CassandraRelevantProperties.CUSTOM_MUTATOR_CLASS.getKey()); + } +} \ No newline at end of file diff --git a/tools/bin/analyzecompactionlog b/tools/bin/analyzecompactionlog old mode 100755 new mode 100644