From da43bb7fc1336e8f2f7e04d84be1a44271eafba9 Mon Sep 17 00:00:00 2001 From: Dikang Gu Date: Wed, 9 May 2018 15:14:39 -0700 Subject: [PATCH] Improve the performance of CAS operation Combine the paxos prepare and quorum read stage together --- .../cassandra/net/MessagingService.java | 17 +- .../cassandra/service/StorageProxy.java | 170 ++++++++++++++---- .../cassandra/service/StorageService.java | 2 + .../cassandra/service/paxos/PaxosState.java | 2 + .../service/paxos/PrepareAndReadCallback.java | 58 ++++++ .../service/paxos/PrepareAndReadCommand.java | 65 +++++++ .../service/paxos/PrepareAndReadResponse.java | 68 +++++++ .../paxos/PrepareAndReadVerbHandler.java | 54 ++++++ .../service/paxos/PrepareCallback.java | 7 +- .../cassandra/cql3/CASOperationTest.java | 91 ++++++++++ .../net/MessageDeliveryTaskTest.java | 8 +- 11 files changed, 500 insertions(+), 42 deletions(-) create mode 100644 src/java/org/apache/cassandra/service/paxos/PrepareAndReadCallback.java create mode 100644 src/java/org/apache/cassandra/service/paxos/PrepareAndReadCommand.java create mode 100644 src/java/org/apache/cassandra/service/paxos/PrepareAndReadResponse.java create mode 100644 src/java/org/apache/cassandra/service/paxos/PrepareAndReadVerbHandler.java create mode 100644 test/unit/org/apache/cassandra/cql3/CASOperationTest.java diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index f5051fbf57df..2ae368da9a2e 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -104,6 +104,8 @@ import org.apache.cassandra.service.AbstractWriteResponseHandler; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.paxos.PrepareAndReadCommand; +import org.apache.cassandra.service.paxos.PrepareAndReadResponse; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PrepareResponse; import org.apache.cassandra.tracing.TraceState; @@ -261,6 +263,13 @@ public long getTimeout() return DatabaseDescriptor.getPingTimeout(); } }, + PAXOS_PREPARE_AND_READ + { + public long getTimeout() + { + return DatabaseDescriptor.getReadRpcTimeout(); + } + }, // UNUSED verbs were used as padding for backward/forward compatability before 4.0, // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries @@ -268,7 +277,6 @@ public long getTimeout() // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used // for correctly adding VERBs that need to be emergency additions to 3.0/3.11. // We can reclaim them (their id's, to be correct) in future versions, if desireed, though. - UNUSED_2, UNUSED_3, UNUSED_4, UNUSED_5, @@ -359,13 +367,12 @@ public static Verb fromId(int id) put(Verb.REPLICATION_FINISHED, Stage.MISC); put(Verb.SNAPSHOT, Stage.MISC); put(Verb.ECHO, Stage.GOSSIP); + put(Verb.PING, Stage.READ); + put(Verb.PAXOS_PREPARE_AND_READ, Stage.MUTATION); - put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_4, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_5, Stage.INTERNAL_RESPONSE); - - put(Verb.PING, Stage.READ); }}; /** @@ -401,6 +408,7 @@ public static Verb fromId(int id) put(Verb.PAXOS_PREPARE, Commit.serializer); put(Verb.PAXOS_PROPOSE, Commit.serializer); put(Verb.PAXOS_COMMIT, Commit.serializer); + put(Verb.PAXOS_PREPARE_AND_READ, PrepareAndReadCommand.serializer); put(Verb.HINT, HintMessage.serializer); put(Verb.BATCH_STORE, Batch.serializer); put(Verb.BATCH_REMOVE, UUIDSerializer.serializer); @@ -429,6 +437,7 @@ public static Verb fromId(int id) put(Verb.PAXOS_PREPARE, PrepareResponse.serializer); put(Verb.PAXOS_PROPOSE, BooleanSerializer.serializer); + put(Verb.PAXOS_PREPARE_AND_READ, PrepareAndReadResponse.serializer); put(Verb.BATCH_STORE, WriteResponse.serializer); put(Verb.BATCH_REMOVE, WriteResponse.serializer); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 7e9b0f96e52e..6c9f5b13e18a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -48,6 +48,8 @@ import org.apache.cassandra.service.reads.AbstractReadExecutor; import org.apache.cassandra.service.reads.DataResolver; import org.apache.cassandra.service.reads.ReadCallback; +import org.apache.cassandra.service.reads.ResponseResolver; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; @@ -69,6 +71,8 @@ import org.apache.cassandra.locator.*; import org.apache.cassandra.metrics.*; import org.apache.cassandra.net.*; +import org.apache.cassandra.service.paxos.PrepareAndReadCallback; +import org.apache.cassandra.service.paxos.PrepareAndReadCommand; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.PrepareCallback; @@ -216,6 +220,9 @@ public void apply(IMutation mutation, * values) between the prepare and accept phases. This gives us a slightly longer window for another * coordinator to come along and trump our own promise with a newer one but is otherwise safe. * + * In the case of USE_FASTPAXOS enabled, we piggyback the read results in the Prepare responses, so that + * we do not need to spend a separate round trip to do the quorum read. + * * @param keyspaceName the keyspace for the CAS * @param cfName the column family for the CAS * @param key the row key for the row to CAS @@ -252,18 +259,39 @@ public static RowIterator cas(String keyspaceName, PaxosParticipants p = getPaxosParticipants(metadata, key, consistencyForPaxos); List liveEndpoints = p.liveEndpoints; int requiredParticipants = p.participants; - - final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); - final UUID ballot = pair.ballot; - contentions += pair.contentions; - + // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds()); ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; - FilteredPartition current; - try (RowIterator rowIter = readOne(readCommand, readConsistency, queryStartNanoTime)) + + final BeginAndRepairPaxosResult result = beginAndRepairPaxos(queryStartNanoTime, + key, metadata, + liveEndpoints, + requiredParticipants, + consistencyForPaxos, + consistencyForCommit, + true, + state, + PaxosState.USE_FASTPAXOS, + readCommand); + UUID ballot = result.ballot; + contentions += result.contentions; + PartitionIterator iterator; + + if (PaxosState.USE_FASTPAXOS) + { + // we already piggyback the read response in the result + iterator = result.iterator; + } + else + { + iterator = read(SinglePartitionReadCommand.Group.one(readCommand), readConsistency, + state, queryStartNanoTime); + } + + try (RowIterator rowIter = PartitionIterators.getOnlyElement(iterator, readCommand)) { current = FilteredPartition.create(rowIter); } @@ -340,7 +368,11 @@ public static RowIterator cas(String keyspaceName, private static void recordCasContention(int contentions) { if(contentions > 0) + { casWriteMetrics.contention.update(contentions); + if (logger.isDebugEnabled()) + logger.debug("# of contentions: " + contentions); + } } private static Predicate sameDCPredicateFor(final String dc) @@ -392,20 +424,23 @@ private static PaxosParticipants getPaxosParticipants(TableMetadata metadata, De * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime, - DecoratedKey key, - TableMetadata metadata, - List liveEndpoints, - int requiredParticipants, - ConsistencyLevel consistencyForPaxos, - ConsistencyLevel consistencyForCommit, - final boolean isWrite, - ClientState state) + private static BeginAndRepairPaxosResult beginAndRepairPaxos(long queryStartNanoTime, + DecoratedKey key, + TableMetadata metadata, + List liveEndpoints, + int requiredParticipants, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForCommit, + final boolean isWrite, + ClientState state, + boolean piggybackReadResponse, + SinglePartitionReadCommand readCommand) throws WriteTimeoutException, WriteFailureException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); PrepareCallback summary = null; + PartitionIterator iterator = null; int contentions = 0; while (System.nanoTime() - queryStartNanoTime < timeout) { @@ -422,7 +457,25 @@ private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoT // prepare Tracing.trace("Preparing {}", ballot); Commit toPrepare = Commit.newPrepare(key, metadata, ballot); - summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime); + + if(piggybackReadResponse) + { + PrepareAndReadCallback callback = preparePaxosAndRead(metadata.keyspace, + toPrepare, + liveEndpoints, + requiredParticipants, + consistencyForPaxos, + queryStartNanoTime, + readCommand); + summary = callback.prepareCallback; + iterator = callback.iterator; + } + else + { + summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, + consistencyForPaxos, queryStartNanoTime); + } + if (!summary.promised) { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); @@ -485,7 +538,7 @@ private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoT continue; } - return new PaxosBallotAndContention(ballot, contentions); + return new BeginAndRepairPaxosResult(ballot, contentions, iterator); } recordCasContention(contentions); @@ -501,6 +554,40 @@ private static void sendCommit(Commit commit, Iterable repli for (InetAddressAndPort target : replicas) MessagingService.instance().sendOneWay(message, target); } + + private static PrepareAndReadCallback preparePaxosAndRead(String keyspaceName, + Commit toPrepare, + List liveEndpoints, + int requiredParticipants, + ConsistencyLevel consistencyForPaxos, + long queryStartNanoTime, + SinglePartitionReadCommand readCommand) + { + PrepareCallback prepareCallback = new PrepareCallback(toPrepare.update.partitionKey(), + toPrepare.update.metadata(), + requiredParticipants, + consistencyForPaxos, + queryStartNanoTime); + + ConsistencyLevel consistencyForRead = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; + + Keyspace keyspace = Keyspace.open(keyspaceName); + ResponseResolver resolver = new DataResolver(keyspace, readCommand, consistencyForRead, liveEndpoints.size(), + queryStartNanoTime, NoopReadRepair.instance); + ReadCallback readCallback = new ReadCallback(resolver, consistencyForRead, readCommand, liveEndpoints, + queryStartNanoTime, NoopReadRepair.instance); + + PrepareAndReadCallback callback = new PrepareAndReadCallback(prepareCallback, readCallback); + PrepareAndReadCommand prepareCommand = new PrepareAndReadCommand(toPrepare, readCommand); + MessageOut message = new MessageOut<>(Verb.PAXOS_PREPARE_AND_READ, prepareCommand, PrepareAndReadCommand.serializer); + for (InetAddressAndPort target : liveEndpoints) + { + // Probably can optimize for local requests later + MessagingService.instance().sendRR(message, target, callback); + } + callback.await(); + return callback; + } private static PrepareCallback preparePaxos(Commit toPrepare, List endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime) throws WriteTimeoutException @@ -1657,7 +1744,7 @@ private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group TableMetadata metadata = command.metadata(); DecoratedKey key = command.partitionKey(); - PartitionIterator result = null; + PartitionIterator partitionIterator = null; try { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read @@ -1672,20 +1759,32 @@ private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group try { - final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); - if (pair.contentions > 0) - casReadMetrics.contention.update(pair.contentions); + final BeginAndRepairPaxosResult paxosResult = beginAndRepairPaxos(start, key, metadata, + liveEndpoints, requiredParticipants, + consistencyLevel, + consistencyForCommitOrFetch, + false, + state, + PaxosState.USE_FASTPAXOS, + command); + if (paxosResult.contentions > 0) + casReadMetrics.contention.update(paxosResult.contentions); + + if (PaxosState.USE_FASTPAXOS) + partitionIterator = paxosResult.iterator; + else + partitionIterator = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime); } catch (WriteTimeoutException e) { - throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(metadata.keyspace)), false); + throw new ReadTimeoutException(consistencyLevel, 0, + consistencyLevel.blockFor(Keyspace.open(metadata.keyspace)), + false); } catch (WriteFailureException e) { throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint); } - - result = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime); } catch (UnavailableException e) { @@ -1717,7 +1816,7 @@ private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } - return result; + return partitionIterator; } @SuppressWarnings("resource") @@ -2829,7 +2928,6 @@ public void setOtcBacklogExpirationInterval(int intervalInMillis) { DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis); } - static class PaxosParticipants { final List liveEndpoints; @@ -2859,32 +2957,38 @@ public final boolean equals(Object o) } } - static class PaxosBallotAndContention + static class BeginAndRepairPaxosResult { final UUID ballot; final int contentions; + final PartitionIterator iterator; - PaxosBallotAndContention(UUID ballot, int contentions) + BeginAndRepairPaxosResult(UUID ballot, int contentions, PartitionIterator iterator) { this.ballot = ballot; this.contentions = contentions; + this.iterator = iterator; } @Override public final int hashCode() { int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode()); - return 31 * hashCode * this.contentions; + hashCode = 31 * hashCode + this.contentions; + hashCode = 31 * hashCode + (iterator == null ? 0 : iterator.hashCode()); + return hashCode; } @Override public final boolean equals(Object o) { - if(!(o instanceof PaxosBallotAndContention)) + if(!(o instanceof BeginAndRepairPaxosResult)) return false; - PaxosBallotAndContention that = (PaxosBallotAndContention)o; + BeginAndRepairPaxosResult that = (BeginAndRepairPaxosResult)o; // handles nulls properly - return Objects.equals(ballot, that.ballot) && contentions == that.contentions; + return Objects.equals(ballot, that.ballot) + && contentions == that.contentions + && Objects.equals(iterator, that.iterator); } } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 96fd63f08ba2..950e646c997f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -99,6 +99,7 @@ import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.repair.RepairMessageVerbHandler; +import org.apache.cassandra.service.paxos.PrepareAndReadVerbHandler; import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; @@ -288,6 +289,7 @@ public StorageService() MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PROPOSE, new ProposeVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE_AND_READ, new PrepareAndReadVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.HINT, new HintVerbHandler()); // see BootStrapper for a summary of how the bootstrap verbs interact diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index 7d593741075b..3f3c5bc1234e 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -32,6 +32,8 @@ public class PaxosState { + public static final boolean USE_FASTPAXOS = Boolean.getBoolean("cassandra.use_fastpaxos"); + private static final Striped LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024); private final Commit promised; diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareAndReadCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadCallback.java new file mode 100644 index 000000000000..680acb694278 --- /dev/null +++ b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadCallback.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.service.reads.DataResolver; +import org.apache.cassandra.service.reads.ReadCallback; + +public class PrepareAndReadCallback implements IAsyncCallback +{ + public final PrepareCallback prepareCallback; + public final ReadCallback readCallback; + public PartitionIterator iterator = null; + + public PrepareAndReadCallback(PrepareCallback prepareCallback, ReadCallback readCallback) + { + this.prepareCallback = prepareCallback; + this.readCallback = readCallback; + } + + @Override + public void response(MessageIn msg) + { + PrepareAndReadResponse response = msg.payload; + prepareCallback.response(response.prepareResponse, msg.from); + readCallback.response(response.readResponse); + } + + @Override + public boolean isLatencyForSnitch() + { + return false; + } + + public void await() + { + readCallback.awaitResults(); + iterator = ((DataResolver)readCallback.resolver).getData(); + } +} diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareAndReadCommand.java b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadCommand.java new file mode 100644 index 000000000000..72a813c2d7ac --- /dev/null +++ b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadCommand.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; + +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class PrepareAndReadCommand +{ + public static final CASPrepareCommandSerializer serializer = new CASPrepareCommandSerializer(); + + public final Commit commit; + public final ReadCommand readCommand; + + public PrepareAndReadCommand(Commit commit, ReadCommand readCommand) + { + this.commit = commit; + this.readCommand = readCommand; + } + + public static class CASPrepareCommandSerializer implements IVersionedSerializer + { + @Override + public void serialize(PrepareAndReadCommand prepareAndReadCommand, DataOutputPlus out, int version) throws IOException + { + Commit.serializer.serialize(prepareAndReadCommand.commit, out, version); + ReadCommand.serializer.serialize(prepareAndReadCommand.readCommand, out, version); + } + + @Override + public PrepareAndReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + Commit commit = Commit.serializer.deserialize(in, version); + ReadCommand readCommand = ReadCommand.serializer.deserialize(in, version); + return new PrepareAndReadCommand(commit, readCommand); + } + + @Override + public long serializedSize(PrepareAndReadCommand prepareAndReadCommand, int version) + { + return Commit.serializer.serializedSize(prepareAndReadCommand.commit, version) + + ReadCommand.serializer.serializedSize(prepareAndReadCommand.readCommand, version); + } + } +} diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareAndReadResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadResponse.java new file mode 100644 index 000000000000..4fad14a593c0 --- /dev/null +++ b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; + +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class PrepareAndReadResponse +{ + public static final PrepareResponseSerializer serializer = new PrepareResponseSerializer(); + + public final PrepareResponse prepareResponse; + public final ReadResponse readResponse; + + public PrepareAndReadResponse(PrepareResponse prepareResponse, ReadResponse readResponse) + { + this.prepareResponse = prepareResponse; + this.readResponse = readResponse; + } + + @Override + public String toString() + { + return String.format("PrepareAndReadResponse(%s, %s)", prepareResponse, readResponse); + } + + public static class PrepareResponseSerializer implements IVersionedSerializer + { + public void serialize(PrepareAndReadResponse response, DataOutputPlus out, int version) throws IOException + { + ReadResponse.serializer.serialize(response.readResponse, out, version); + PrepareResponse.serializer.serialize(response.prepareResponse, out, version); + } + + public PrepareAndReadResponse deserialize(DataInputPlus in, int version) throws IOException + { + ReadResponse readResponse = ReadResponse.serializer.deserialize(in, version); + PrepareResponse prepareResponse = PrepareResponse.serializer.deserialize(in, version); + return new PrepareAndReadResponse(prepareResponse, readResponse); + } + + public long serializedSize(PrepareAndReadResponse response, int version) + { + return PrepareResponse.serializer.serializedSize(response.prepareResponse, version) + + ReadResponse.serializer.serializedSize(response.readResponse, version); + } + } +} diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareAndReadVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadVerbHandler.java new file mode 100644 index 000000000000..ea85527e1997 --- /dev/null +++ b/src/java/org/apache/cassandra/service/paxos/PrepareAndReadVerbHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; + +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; + +public class PrepareAndReadVerbHandler implements IVerbHandler +{ + @Override + public void doVerb(MessageIn message, int id) throws IOException + { + PrepareAndReadCommand prepareCommand = message.payload; + PrepareResponse paxosResponse = PaxosState.prepare(prepareCommand.commit); + ReadCommand command = prepareCommand.readCommand; + + ReadResponse readResponse; + + try (ReadExecutionController executionController = command.executionController(); + UnfilteredPartitionIterator iterator = command.executeLocally(executionController)) + { + readResponse = command.createResponse(iterator); + } + + PrepareAndReadResponse response = new PrepareAndReadResponse(paxosResponse, readResponse); + + MessageOut reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, PrepareAndReadResponse.serializer); + MessagingService.instance().sendReply(reply, id, message.from); + } +} diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index ed70e964c03e..478039bf4bdf 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -64,6 +64,11 @@ public synchronized void response(MessageIn message) PrepareResponse response = message.payload; logger.trace("Prepare response {} from {}", response, message.from); + response(response, message.from); + } + + public synchronized void response(PrepareResponse response, InetAddressAndPort from) + { // In case of clock skew, another node could be proposing with ballot that are quite a bit // older than our own. In that case, we record the more recent commit we've received to make // sure we re-prepare on an older ballot. @@ -78,7 +83,7 @@ public synchronized void response(MessageIn message) return; } - commitsByReplica.put(message.from, response.mostRecentCommit); + commitsByReplica.put(from, response.mostRecentCommit); if (response.mostRecentCommit.isAfter(mostRecentCommit)) mostRecentCommit = response.mostRecentCommit; diff --git a/test/unit/org/apache/cassandra/cql3/CASOperationTest.java b/test/unit/org/apache/cassandra/cql3/CASOperationTest.java new file mode 100644 index 000000000000..0c0432092992 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/CASOperationTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.service.paxos.PaxosState; +import org.apache.cassandra.transport.ProtocolVersion; + +import static org.junit.Assert.assertTrue; + +public class CASOperationTest extends CQLTester +{ + ProtocolVersion protocolVersion = ProtocolVersion.V4; + + @BeforeClass() + public static void setup() + { + System.setProperty("cassandra.use_fastpaxos", "true"); + } + + @Test + public void testNotExists() throws Throwable + { + assertTrue(PaxosState.USE_FASTPAXOS); + + createTable("CREATE TABLE %s (k text, v1 int, v2 text, PRIMARY KEY (k, v1))"); + + executeNet(protocolVersion, "INSERT INTO %s (k, v1, v2) values (?, ?, ?) IF NOT EXISTS", "first", 1, "value1"); + executeNet(protocolVersion, "INSERT INTO %s (k, v1, v2) values (?, ?, ?) IF NOT EXISTS", "second", 2, "value2"); + + assertRows(execute("SELECT * FROM %s WHERE k = ?", "first"), + row("first", 1, "value1") + ); + + assertRows(execute("SELECT v2 FROM %s WHERE k = ?", "second"), + row("value2") + ); + + executeNet(protocolVersion, "INSERT INTO %s (k, v1, v2) values (?, ?, ?) IF NOT EXISTS", "second", 2, "value3"); + + assertRows(execute("SELECT v2 FROM %s WHERE k = ?", "second"), + row("value2") + ); + } + + @Test + public void testCompareAndSet() throws Throwable + { + assertTrue(PaxosState.USE_FASTPAXOS); + + createTable("CREATE TABLE %s (k text, v1 int, v2 int, PRIMARY KEY (k, v1))"); + + executeNet(protocolVersion, "INSERT INTO %s (k, v1, v2) values (?, ?, ?) IF NOT EXISTS", "first", 1, 1); + executeNet(protocolVersion, "INSERT INTO %s (k, v1, v2) values (?, ?, ?) IF NOT EXISTS", "second", 2, 2); + + assertRows(execute("SELECT * FROM %s WHERE k = ?", "first"), + row("first", 1, 1)); + assertRows(execute("SELECT v2 FROM %s WHERE k = ?", "second"), + row(2)); + + executeNet(protocolVersion, "UPDATE %s SET v2 = ? WHERE k = ? AND v1 = ? if v2 = ? ", 3, "second", 2, 2); + assertRows(execute("SELECT v2 FROM %s WHERE k = ?", "second"), + row(3)); + + executeNet(protocolVersion, "UPDATE %s SET v2 = ? WHERE k = ? AND v1 = ? if v2 < ? ", 4, "second", 2, 3); + assertRows(execute("SELECT v2 FROM %s WHERE k = ?", "second"), + row(3)); + + executeNet(protocolVersion, "UPDATE %s SET v2 = ? WHERE k = ? AND v1 = ? if v2 < ? ", 4, "second", 2, 4); + assertRows(execute("SELECT v2 FROM %s WHERE k = ?", "second"), + row(4)); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java b/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java index db38efb7234d..3a16ef3954f2 100644 --- a/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java +++ b/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java @@ -38,13 +38,13 @@ public class MessageDeliveryTaskTest public static void before() { DatabaseDescriptor.daemonInitialization(); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.UNUSED_2, VERB_HANDLER); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.UNUSED_3, VERB_HANDLER); } @AfterClass public static void after() { - MessagingService.instance().removeVerbHandler(MessagingService.Verb.UNUSED_2); + MessagingService.instance().removeVerbHandler(MessagingService.Verb.UNUSED_3); } @Before @@ -57,7 +57,7 @@ public void setUp() public void process_HappyPath() throws UnknownHostException { InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); - MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1); + MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_3, 1); MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); Assert.assertTrue(task.process()); Assert.assertEquals(1, VERB_HANDLER.invocationCount); @@ -97,7 +97,7 @@ public void process_ExpiredDroppableMessage() throws UnknownHostException public void process_ExpiredMessage() throws UnknownHostException { InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1"); - MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1, 0); + MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_3, 1, 0); MessageDeliveryTask task = new MessageDeliveryTask(msg, 42); Assert.assertTrue(task.process()); Assert.assertEquals(1, VERB_HANDLER.invocationCount);