Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ upgrade_job_only: &upgrade_job_only
- build
# Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
env_settings: &env_settings
<<: *default_env_settings
#<<: *high_capacity_env_settings
#<<: *default_env_settings
<<: *high_capacity_env_settings
env_vars: &env_vars
<<: *resource_constrained_env_vars
#<<: *high_capacity_env_vars
#<<: *resource_constrained_env_vars
<<: *high_capacity_env_vars
workflows:
version: 2
build_and_run_tests: *default_jobs
#build_and_run_tests: *default_jobs
#build_and_run_tests: *with_dtest_jobs_only
#build_and_run_tests: *with_dtest_jobs
build_and_run_tests: *with_dtest_jobs
#build_and_run_tests: *upgrade_job_only
docker_image: &docker_image aweisberg/cassandra-testing-ubuntu18-java11-w-dependencies
version: 2
Expand Down
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@
<dependency groupId="com.github.rholder" artifactId="snowball-stemmer" version="1.3.0.581.1" />
<dependency groupId="com.googlecode.concurrent-trees" artifactId="concurrent-trees" version="2.4.0" />
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.3.5" />
<dependency groupId="org.jctools" artifactId="jctools-core" version="1.2.1"/>
<dependency groupId="org.jctools" artifactId="jctools-core" version="2.1.2"/>
<dependency groupId="org.ow2.asm" artifactId="asm" version="${asm.version}" />
<dependency groupId="org.ow2.asm" artifactId="asm-tree" version="${asm.version}" />
<dependency groupId="org.ow2.asm" artifactId="asm-commons" version="${asm.version}" />
Expand Down
Binary file removed lib/jctools-core-1.2.1.jar
Binary file not shown.
Binary file added lib/jctools-core-2.1.2.jar
Binary file not shown.
474 changes: 471 additions & 3 deletions src/java/org/apache/cassandra/net/MessageIn.java

Large diffs are not rendered by default.

124 changes: 99 additions & 25 deletions src/java/org/apache/cassandra/net/MessageOut.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,49 @@

package org.apache.cassandra.net;

import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.vint.VIntCoding;

import static org.apache.cassandra.tracing.Tracing.isTracing;

/**
* Each message contains a header with several fixed fields, an optional key-value parameters section, and then
* the message payload itself. Note: the legacy IP address (pre-4.0) in the header may be either IPv4 (4 bytes)
* or IPv6 (16 bytes). The diagram below shows the IPv4 address for brevity. In pre-4.0, the payloadSize was
* encoded as a 4-byte integer; in 4.0 and up it is an unsigned byte (255 parameters should be enough for anyone).
* the message payload itself. Below is a visualization of the layout.
*
* The parameters are prefixed by a length in bytes for the entire parameter section; this value is encoded as unsigned vint.
* An individual parameter has a String key (more specifically, a {@link ParameterType}), and a byte array value.
* The key is serialized with it's length, encoded as two bytes, followed by the UTF-8 byte encoding of the string
* (see {@link java.io.DataOutput#writeUTF(java.lang.String)}). The parameter value is prefixed with it's length,
* encoded as an unsigned vint, followed by by the value's bytes.
*
* Legacy Notes (see {@link #serializePre40(DataOutputPlus, int)} for complete details):
* - pre 4.0, the IP address was sent along in the header, before the verb. The IP address may be either IPv4 (4 bytes) or IPv6 (16 bytes).
* - In pre-4.0, the payloadSize was encoded as a 4-byte integer; in 4.0 and up it is an unsigned vint.
* - In pre-4.0, the length of a parameter values was encoded as a 4-byte integer; in 4.0 and up it is an unsigned vint.
*
* <pre>
* {@code
Expand All @@ -70,21 +86,24 @@
* }
* </pre>
*
* An individual parameter has a String key and a byte array value. The key is serialized with it's length,
* encoded as two bytes, followed by the UTF-8 byte encoding of the string (see {@link java.io.DataOutput#writeUTF(java.lang.String)}).
* The body is serialized with it's length, encoded as four bytes, followed by the bytes of the value.
*
* * @param <T> The type of the message payload.
*/
public class MessageOut<T>
{
private static final Logger logger = LoggerFactory.getLogger(MessageOut.class);

/**
* The amount of prefix data, in bytes, before the serialized message.
*/
public static final int MESSAGE_PREFIX_SIZE = 12;

private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1;
//Parameters are stored in an object array as tuples of size two
public static final int PARAMETER_TUPLE_SIZE = 2;
private static final int PARAMETER_TUPLE_SIZE = 2;
//Offset in a parameter tuple containing the type of the parameter
public static final int PARAMETER_TUPLE_TYPE_OFFSET = 0;
private static final int PARAMETER_TUPLE_TYPE_OFFSET = 0;
//Offset in a parameter tuple containing the actual parameter represented as a POJO
public static final int PARAMETER_TUPLE_PARAMETER_OFFSET = 1;
private static final int PARAMETER_TUPLE_PARAMETER_OFFSET = 1;

public final InetAddressAndPort from;
public final MessagingService.Verb verb;
Expand Down Expand Up @@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}

/**
* The main entry point for sending an internode message to a peer node in the cluster.
*/
public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
{
captureTracingInfo(destinationId);

out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);

// int cast cuts off the high-order half of the timestamp, which we can assume remains
// the same between now and when the recipient reconstructs it.
out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
serialize(out, messagingVersion);
}

/**
* Record any tracing data, if enabled on this message.
*/
@VisibleForTesting
void captureTracingInfo(OutboundConnectionIdentifier destinationId)
{
try
{
UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
if (sessionId != null)
{
TraceState state = Tracing.instance.get(sessionId);
String logMessage = String.format("Sending %s message to %s", verb, destinationId.connectionAddress());
// session may have already finished; see CASSANDRA-5668
if (state == null)
{
Tracing.TraceType traceType = (Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE);
traceType = traceType == null ? Tracing.TraceType.QUERY : traceType;
Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), logMessage, traceType.getTTL());
}
else
{
state.trace(logMessage);
if (verb == MessagingService.Verb.REQUEST_RESPONSE)
Tracing.instance.doneWithNonLocalSession(state);
}
}
}
catch (Exception e)
{
logger.warn("failed to capture the tracing info for an outbound message to {}, ignoring", destinationId, e);
}
}

private Object getParameter(ParameterType type)
{
for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
{
if ((parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
{
return parameters.get(ii + PARAMETER_TUPLE_PARAMETER_OFFSET);
}
}
return null;
}

/**
* Use {@link #serialize(DataOutputPlus, int, OutboundConnectionIdentifier, int, long)} as the main entry point
* for writing out a message.
*/
@VisibleForTesting
public void serialize(DataOutputPlus out, int version) throws IOException
{
if (version >= MessagingService.VERSION_40)
Expand Down Expand Up @@ -364,22 +450,10 @@ public int serializedSize(int version)
return Ints.checkedCast(sizes.messageSize);
}

public Object getParameter(ParameterType type)
{
for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
{
if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
{
return parameters.get(ii + PARAMETER_TUPLE_PARAMETER_OFFSET);
}
}
return null;
}

private static class MessageOutSizes
{
public final long messageSize;
public final long payloadSize;
final long messageSize;
final long payloadSize;

private MessageOutSizes(long messageSize, long payloadSize)
{
Expand Down
Loading