Skip to content

Commit

Permalink
Track internode msg bytes (#1081)
Browse files Browse the repository at this point in the history
* Collect counter mutation WRITE_BYTES from replicas

* Rely on replica count to accommodate for mutation replica sensor values

* Empty-Commit

* Track internode msg bytes and counts

* Track internode sensors at the outbound sink level

* Fix NPE for ks only sensors

* Fix more NPEs in unit tests

* Rebase on top of origin/cc-ucu

* Remove ks from headers & track internode messages in MessagingService

* Add internode sensors per ks and table

* Opinionated changes to internode tracking (#1121)

* Opinionated changes to internode tracking:
* Removed message count as bytes should really be the most important/accurate.
* Tracked both inbound and outbound bytes, as writes take up inbound bytes, while reads take up outbound bytes.
* Added internode sensors to custom params in the same place where we add the other sensors.
* Added internode sensors for request.

* Added SensorsInternodeTest + bug fixes.

* * Use only the payload size as internode bytes.
* Cache the payload size where possible.

* Use payload size for inbound messages

---------

Co-authored-by: Sergio Bossa <sergio.bossa@gmail.com>
  • Loading branch information
aymkhalil and sbtourist committed May 30, 2024
1 parent b597688 commit ad66f41
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 55 deletions.
44 changes: 27 additions & 17 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
Expand All @@ -37,16 +38,14 @@
*/
public class CounterMutationCallback implements Runnable
{
private final Message<CounterMutation> respondTo;
private final Message<CounterMutation> requestMessage;
private final InetAddressAndPort respondToAddress;
private final RequestSensors requestSensors;
private int replicaCount = 0;

public CounterMutationCallback(Message<CounterMutation> respondTo, InetAddressAndPort respondToAddress, RequestSensors requestSensors)
public CounterMutationCallback(Message<CounterMutation> requestMessage, InetAddressAndPort respondToAddress)
{
this.respondTo = respondTo;
this.requestMessage = requestMessage;
this.respondToAddress = respondToAddress;
this.requestSensors = requestSensors;
}

/**
Expand All @@ -60,38 +59,49 @@ public void setReplicaCount(Integer replicaCount)
@Override
public void run()
{
Message.Builder<NoPayload> response = respondTo.emptyResponseBuilder();
Message.Builder<NoPayload> responseBuilder = requestMessage.emptyResponseBuilder();
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
addSensorsToResponse(response, requestSensors, replicaMultiplier);
MessagingService.instance().send(response.build(), respondToAddress);
addSensorsToResponse(responseBuilder, requestMessage.payload.getMutation(), replicaMultiplier);
MessagingService.instance().send(responseBuilder.build(), respondToAddress);
}

private static void addSensorsToResponse(Message.Builder<NoPayload> response, RequestSensors requestSensors, int replicaMultiplier)
private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, int replicaMultiplier)
{
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
int tables = mutation.getTableIds().size();

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
int perSensorSize = response.currentPayloadSize(MessagingService.current_version) / tables;
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);

Collection<Sensor> sensors = requestSensors.getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);
// Add write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);
}

private static void addSensorsToResponse(Collection<Sensor> sensors,
private static void addSensorsToResponse(Collection<Sensor> requestSensors,
Function<String, String> requestParamSupplier,
Function<String, String> tableParamSupplier,
Message.Builder<NoPayload> response,
int replicaMultiplier)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue() * replicaMultiplier);
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table counter write bytes as recorded by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(requestSensor.getContext(), requestSensor.getType());
Optional<Sensor> registrySensor = SensorsRegistry.instance.getOrCreateSensor(requestSensor.getContext(), requestSensor.getType());
registrySensor.ifPresent(sensor -> {
String tableBytesParam = tableParamSupplier.apply(sensor.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue() * replicaMultiplier);
Expand Down
18 changes: 17 additions & 1 deletion src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@
*/
package org.apache.cassandra.db;

import java.util.Collection;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.StorageProxy;

public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
Expand All @@ -40,10 +48,18 @@ public void doVerb(final Message<CounterMutation> message)
logger.trace("Applying forwarded {}", cm);

// Initialize the sensor and set ExecutorLocals
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toSet());
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.payloadSize(MessagingService.current_version) / tables.size());
});

String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
Expand All @@ -54,7 +70,7 @@ public void doVerb(final Message<CounterMutation> message)
// it's own in that case.
StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter,
new CounterMutationCallback(message, message.from(), requestSensors),
new CounterMutationCallback(message, message.from()),
queryStartNanoTime);
}
}
62 changes: 43 additions & 19 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Collection;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ForwardingInfo;
Expand All @@ -31,6 +33,8 @@
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
Expand All @@ -42,41 +46,53 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
{
public static final MutationVerbHandler instance = new MutationVerbHandler();

private void respond(Message<?> respondTo, InetAddressAndPort respondToAddress)
private void respond(Message<Mutation> respondTo, InetAddressAndPort respondToAddress)
{
Tracing.trace("Enqueuing response to {}", respondToAddress);

Message.Builder<NoPayload> response = respondTo.emptyResponseBuilder();
addSensorsToResponse(response);
addSensorsToResponse(response, respondTo.payload);

MessagingService.instance().send(response.build(), respondToAddress);
}

private void addSensorsToResponse(Message.Builder<NoPayload> response)
private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation)
{
int tables = mutation.getTableIds().size();

// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Collection<Sensor> sensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add index write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInIndexWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInIndexWriteBytesTableParam;
sensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);
requestSensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentPayloadSize(MessagingService.current_version) / tables;
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response);
}

private void addSensorsToResponse(Collection<Sensor> sensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
private void addSensorsToResponse(Collection<Sensor> requestSensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue());
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table write/index bytes as observed by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(requestSensor.getContext(), requestSensor.getType());
Optional<Sensor> registrySensor = SensorsRegistry.instance.getOrCreateSensor(requestSensor.getContext(), requestSensor.getType());
registrySensor.ifPresent(sensor -> {
String tableBytesParam = tableParamSupplier.apply(sensor.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
Expand Down Expand Up @@ -109,10 +125,18 @@ public void doVerb(Message<Mutation> message)
try
{
// Initialize the sensor and set ExecutorLocals
RequestSensors sensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(sensors);
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toList());
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.payloadSize(MessagingService.current_version) / tables.size());
});

message.payload.applyFuture(WriteOptions.DEFAULT).thenAccept(o -> respond(message, respondToAddress)).exceptionally(wto -> {
failed();
return null;
Expand All @@ -127,18 +151,18 @@ public void doVerb(Message<Mutation> message)
private static void forwardToLocalNodes(Message<Mutation> originalMessage, ForwardingInfo forwardTo)
{
Message.Builder<Mutation> builder =
Message.builder(originalMessage)
.withParam(ParamType.RESPOND_TO, originalMessage.from())
.withoutParam(ParamType.FORWARD_TO);
Message.builder(originalMessage)
.withParam(ParamType.RESPOND_TO, originalMessage.from())
.withoutParam(ParamType.FORWARD_TO);

boolean useSameMessageID = forwardTo.useSameMessageID(originalMessage.id());
// reuse the same Message if all ids are identical (as they will be for 4.0+ node originated messages)
Message<Mutation> message = useSameMessageID ? builder.build() : null;

forwardTo.forEach((id, target) ->
{
Tracing.trace("Enqueuing forwarded write to {}", target);
MessagingService.instance().send(useSameMessageID ? message : builder.withId(id).build(), target);
});
{
Tracing.trace("Enqueuing forwarded write to {}", target);
MessagingService.instance().send(useSameMessageID ? message : builder.withId(id).build(), target);
});
}
}
Loading

0 comments on commit ad66f41

Please sign in to comment.