Skip to content

Commit

Permalink
CEP-15 (Accord) Expected reply message with verb ACCORD_INFORM_OF_TXN…
Browse files Browse the repository at this point in the history
…ID_RSP but got ACCORD_SIMPLE_RSP

patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18375
  • Loading branch information
dcapwell committed Mar 31, 2023
1 parent 4f84f9b commit 9b90c90
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 42 deletions.
27 changes: 27 additions & 0 deletions src/java/org/apache/cassandra/net/Messaging.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.net;

import org.apache.cassandra.locator.InetAddressAndPort;

public interface Messaging
{
void send(Message message, InetAddressAndPort to);
void sendWithCallback(Message message, InetAddressAndPort to, RequestCallback cb);
}
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/net/MessagingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
* implemented in {@link org.apache.cassandra.db.virtual.InternodeInboundTable} and
* {@link org.apache.cassandra.db.virtual.InternodeOutboundTable} respectively.
*/
public class MessagingService extends MessagingServiceMBeanImpl
public class MessagingService extends MessagingServiceMBeanImpl implements Messaging
{
private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);

Expand Down Expand Up @@ -352,6 +352,7 @@ public RequestFailureReason failureReason()
* @param cb callback interface which is used to pass the responses or
* suggest that a timeout occurred to the invoker of the send().
*/
@Override
public void sendWithCallback(Message message, InetAddressAndPort to, RequestCallback cb)
{
sendWithCallback(message, to, cb, null);
Expand Down Expand Up @@ -390,6 +391,7 @@ public void sendWriteWithCallback(Message message, Replica to, AbstractWriteResp
* @param message messages to be sent.
* @param to endpoint to which the message needs to be sent
*/
@Override
public void send(Message message, InetAddressAndPort to)
{
send(message, to, null);
Expand Down
99 changes: 63 additions & 36 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.IntStream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import org.agrona.collections.IntHashSet;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
Expand Down Expand Up @@ -222,42 +228,31 @@ public enum Verb
PAXOS2_CLEANUP_COMPLETE_REQ (48, P2, repairTimeout, PAXOS_REPAIR, () -> PaxosCleanupComplete.serializer, () -> PaxosCleanupComplete.verbHandler, PAXOS2_CLEANUP_COMPLETE_RSP ),

// accord
ACCORD_SIMPLE_RSP (119, P2, writeTimeout, REQUEST_RESPONSE, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER),

ACCORD_PREACCEPT_RSP (121, P2, writeTimeout, REQUEST_RESPONSE, () -> PreacceptSerializers.reply, RESPONSE_HANDLER),
ACCORD_PREACCEPT_REQ (120, P2, writeTimeout, ACCORD, () -> PreacceptSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP),

ACCORD_ACCEPT_RSP (124, P2, writeTimeout, REQUEST_RESPONSE, () -> AcceptSerializers.reply, RESPONSE_HANDLER),
ACCORD_ACCEPT_REQ (122, P2, writeTimeout, ACCORD, () -> AcceptSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
ACCORD_ACCEPT_INVALIDATE_REQ (123, P2, writeTimeout, ACCORD, () -> AcceptSerializers.invalidate, () -> AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),

ACCORD_READ_RSP (128, P2, writeTimeout, REQUEST_RESPONSE, () -> ReadDataSerializers.reply, RESPONSE_HANDLER),
ACCORD_READ_REQ (127, P2, writeTimeout, ACCORD, () -> ReadDataSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
ACCORD_COMMIT_REQ (125, P2, writeTimeout, ACCORD, () -> CommitSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
ACCORD_COMMIT_INVALIDATE_REQ (126, P2, writeTimeout, ACCORD, () -> CommitSerializers.invalidate, () -> AccordService.instance().verbHandler()),

ACCORD_APPLY_RSP (130, P2, writeTimeout, REQUEST_RESPONSE, () -> ApplySerializers.reply, RESPONSE_HANDLER),
ACCORD_APPLY_REQ (129, P2, writeTimeout, ACCORD, () -> ApplySerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_APPLY_RSP),

ACCORD_RECOVER_RSP (134, P2, writeTimeout, REQUEST_RESPONSE, () -> RecoverySerializers.reply, RESPONSE_HANDLER),
ACCORD_RECOVER_REQ (133, P2, writeTimeout, ACCORD, () -> RecoverySerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP ),
ACCORD_BEGIN_INVALIDATE_RSP (136, P2, writeTimeout, REQUEST_RESPONSE, () -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER),
ACCORD_BEGIN_INVALIDATE_REQ (135, P2, writeTimeout, ACCORD, () -> BeginInvalidationSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP),
ACCORD_WAIT_COMMIT_RSP (138, P2, writeTimeout, REQUEST_RESPONSE, () -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER),
ACCORD_WAIT_COMMIT_REQ (137, P2, writeTimeout, ACCORD, () -> WaitOnCommitSerializer.request, () -> AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP),

ACCORD_INFORM_OF_TXNID_RSP(140, P2, writeTimeout, REQUEST_RESPONSE, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER),
ACCORD_INFORM_OF_TXNID_REQ(139, P2, writeTimeout, ACCORD, () -> InformOfTxnIdSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_INFORM_OF_TXNID_RSP),

ACCORD_INFORM_HOME_DURABLE_REQ(141, P2, writeTimeout, ACCORD, () -> InformHomeDurableSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP),

ACCORD_INFORM_DURABLE_REQ(143, P2, writeTimeout, ACCORD, () -> InformDurableSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP),

ACCORD_CHECK_STATUS_RSP (146, P2, writeTimeout, REQUEST_RESPONSE, () -> CheckStatusSerializers.reply, RESPONSE_HANDLER),
ACCORD_CHECK_STATUS_REQ (145, P2, writeTimeout, ACCORD, () -> CheckStatusSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP),

ACCORD_GET_DEPS_RSP (148, P2, writeTimeout, REQUEST_RESPONSE, () -> GetDepsSerializers.reply, RESPONSE_HANDLER),
ACCORD_GET_DEPS_REQ (147, P2, writeTimeout, ACCORD, () -> GetDepsSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP),
ACCORD_SIMPLE_RSP (119, P2, writeTimeout, REQUEST_RESPONSE, () -> EnumSerializer.simpleReply, RESPONSE_HANDLER ),
ACCORD_PREACCEPT_RSP (121, P2, writeTimeout, REQUEST_RESPONSE, () -> PreacceptSerializers.reply, RESPONSE_HANDLER ),
ACCORD_PREACCEPT_REQ (120, P2, writeTimeout, ACCORD, () -> PreacceptSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP ),
ACCORD_ACCEPT_RSP (124, P2, writeTimeout, REQUEST_RESPONSE, () -> AcceptSerializers.reply, RESPONSE_HANDLER ),
ACCORD_ACCEPT_REQ (122, P2, writeTimeout, ACCORD, () -> AcceptSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
ACCORD_ACCEPT_INVALIDATE_REQ (123, P2, writeTimeout, ACCORD, () -> AcceptSerializers.invalidate, () -> AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
ACCORD_READ_RSP (128, P2, writeTimeout, REQUEST_RESPONSE, () -> ReadDataSerializers.reply, RESPONSE_HANDLER ),
ACCORD_READ_REQ (127, P2, writeTimeout, ACCORD, () -> ReadDataSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
ACCORD_COMMIT_REQ (125, P2, writeTimeout, ACCORD, () -> CommitSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
ACCORD_COMMIT_INVALIDATE_REQ (126, P2, writeTimeout, ACCORD, () -> CommitSerializers.invalidate, () -> AccordService.instance().verbHandler() ),
ACCORD_APPLY_RSP (130, P2, writeTimeout, REQUEST_RESPONSE, () -> ApplySerializers.reply, RESPONSE_HANDLER ),
ACCORD_APPLY_REQ (129, P2, writeTimeout, ACCORD, () -> ApplySerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_APPLY_RSP ),
ACCORD_RECOVER_RSP (132, P2, writeTimeout, REQUEST_RESPONSE, () -> RecoverySerializers.reply, RESPONSE_HANDLER ),
ACCORD_RECOVER_REQ (131, P2, writeTimeout, ACCORD, () -> RecoverySerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP ),
ACCORD_BEGIN_INVALIDATE_RSP (134, P2, writeTimeout, REQUEST_RESPONSE, () -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER ),
ACCORD_BEGIN_INVALIDATE_REQ (133, P2, writeTimeout, ACCORD, () -> BeginInvalidationSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP ),
ACCORD_WAIT_COMMIT_RSP (136, P2, writeTimeout, REQUEST_RESPONSE, () -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER ),
ACCORD_WAIT_COMMIT_REQ (135, P2, writeTimeout, ACCORD, () -> WaitOnCommitSerializer.request, () -> AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP ),
ACCORD_INFORM_OF_TXNID_REQ (137, P2, writeTimeout, ACCORD, () -> InformOfTxnIdSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_INFORM_HOME_DURABLE_REQ (138, P2, writeTimeout, ACCORD, () -> InformHomeDurableSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_INFORM_DURABLE_REQ (139, P2, writeTimeout, ACCORD, () -> InformDurableSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, REQUEST_RESPONSE, () -> CheckStatusSerializers.reply, RESPONSE_HANDLER ),
ACCORD_CHECK_STATUS_REQ (140, P2, writeTimeout, ACCORD, () -> CheckStatusSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP ),
ACCORD_GET_DEPS_RSP (143, P2, writeTimeout, REQUEST_RESPONSE, () -> GetDepsSerializers.reply, RESPONSE_HANDLER ),
ACCORD_GET_DEPS_REQ (142, P2, writeTimeout, ACCORD, () -> GetDepsSerializers.request, () -> AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP ),


// generic failure response
Expand Down Expand Up @@ -467,6 +462,7 @@ ToLongFunction<TimeUnit> unsafeSetExpiration(ToLongFunction<TimeUnit> expiration
static
{
Verb[] verbs = values();
checkForGaps(verbs);
int max = -1;
int minCustom = Integer.MAX_VALUE;
for (Verb v : verbs)
Expand Down Expand Up @@ -515,6 +511,37 @@ ToLongFunction<TimeUnit> unsafeSetExpiration(ToLongFunction<TimeUnit> expiration
idToCustomVerbMap = customIdMap;
}

private static void checkForGaps(Verb[] array)
{
// If a verb is removed please add the id to this list, so this logic to detect gaps won't complain
IntHashSet allowedMissing = new IntHashSet();
for (int i : new int[]{ 7, 8, 12, 13, 17, 21, 25, 26, 32, 36, 64, 67, 68, 70, 71, 72, 73, 74, 75, 76, 77, 78, 81, 83, 85, 86, 89, 90, 92, 96,
/* gap for accord, should fix when merging to trunk */ 116, 117, 118})
allowedMissing.add(i);
List<Verb> verbs = new ArrayList<>(Arrays.asList(array));
Collections.sort(verbs, Comparator.comparingInt(a -> a.id));
Verb previous = null;
List<String> errors = new ArrayList<>();
int minCustomVerb = Verb.UNUSED_CUSTOM_VERB.id;
for (Verb v : verbs)
{
if (v.id >= minCustomVerb)
continue; // ignore custom ids
if (allowedMissing.contains(v.id))
throw new AssertionError("Verb " + v + " used id " + v.id + " which is in the list of allowed missing; please remove from that list");
if (previous != null)
{
Verb finalPrevious = previous;
int[] missing = IntStream.range(previous.id + 1, v.id).filter(i -> !allowedMissing.contains(i)).toArray();
if (missing.length > 0)
errors.add("Gap detected between verbs " + Arrays.asList(finalPrevious, v) + "; " + Arrays.asList(finalPrevious.id, v.id) + "; missing ids are " + Arrays.toString(missing));
}
previous = v;
}
if (!errors.isEmpty())
throw new AssertionError(String.join("\n", errors));
}

public static Verb fromId(int id)
{
Verb[] verbs = idToVerbMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import accord.messages.Request;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Messaging;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;

Expand Down Expand Up @@ -92,6 +93,18 @@ private static Verb getVerb(MessageType type)
return VerbMapping.instance.mapping.get(type);
}

private final Messaging messaging;

public AccordMessageSink(Messaging messaging)
{
this.messaging = messaging;
}

public AccordMessageSink()
{
this(MessagingService.instance());
}

@Override
public void send(Node.Id to, Request request)
{
Expand All @@ -100,7 +113,7 @@ public void send(Node.Id to, Request request)
Message<Request> message = Message.out(verb, request);
InetAddressAndPort endpoint = getEndpoint(to);
logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
MessagingService.instance().send(message, endpoint);
messaging.send(message, endpoint);
}

@Override
Expand All @@ -111,7 +124,7 @@ public void send(Node.Id to, Request request, Callback callback)
Message<Request> message = Message.out(verb, request);
InetAddressAndPort endpoint = getEndpoint(to);
logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
MessagingService.instance().sendWithCallback(message, endpoint, new AccordCallback<>((Callback<Reply>) callback));
messaging.sendWithCallback(message, endpoint, new AccordCallback<>((Callback<Reply>) callback));
}

@Override
Expand All @@ -122,6 +135,6 @@ public void reply(Node.Id replyingToNode, ReplyContext replyContext, Reply reply
Preconditions.checkArgument(replyMsg.verb() == getVerb(reply.type()));
InetAddressAndPort endpoint = getEndpoint(replyingToNode);
logger.debug("Replying {} {} to {}", replyMsg.verb(), replyMsg.payload, endpoint);
MessagingService.instance().send(replyMsg, endpoint);
messaging.send(replyMsg, endpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,19 @@ enum State
AWAITING_SAVE, // wait for writes to complete
COMPLETING,
FINISHED,
FAILED
FAILED;

boolean isComplete()
{
switch (this)
{
case FAILED:
case FINISHED:
return true;
default:
return false;
}
}
}

private State state = State.INITIALIZED;
Expand Down Expand Up @@ -179,7 +191,8 @@ private void finish(R result)
private void fail(Throwable throwable)
{
Invariants.nonNull(throwable);
Invariants.checkArgument(state != State.FINISHED && state != State.FAILED, "Unexpected state %s", state);
if (state.isComplete())
throw new IllegalStateException("Unexpected state " + state, throwable);
try
{
switch (state)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service.accord;

import org.junit.BeforeClass;
import org.junit.Test;

import accord.local.Node;
import accord.messages.InformOfTxnId;
import accord.messages.SimpleReply;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Messaging;
import org.apache.cassandra.net.Verb;
import org.mockito.Mockito;

public class AccordMessageSinkTest
{
@BeforeClass
public static void setup()
{
DatabaseDescriptor.clientInitialization();
}

@Test
public void informOfTxn()
{
// There was an issue where the reply was the wrong verb
// see CASSANDRA-18375
InformOfTxnId info = Mockito.mock(InformOfTxnId.class);
Message<InformOfTxnId> req = Message.builder(Verb.ACCORD_INFORM_OF_TXNID_REQ, info).build();
SimpleReply reply = SimpleReply.Ok;

Messaging messaging = Mockito.mock(Messaging.class);
AccordMessageSink sink = new AccordMessageSink(messaging);
sink.reply(new Node.Id(1), req, reply);

Mockito.verify(messaging).send(Mockito.any(), Mockito.any());
}
}

0 comments on commit 9b90c90

Please sign in to comment.