Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
sbtourist committed May 15, 2024
1 parent caa569f commit 1d7ec4a
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 171 deletions.
41 changes: 17 additions & 24 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.Optional;
import java.util.function.Function;

import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
Expand All @@ -41,14 +40,12 @@ public class CounterMutationCallback implements Runnable
{
private final Message<CounterMutation> respondTo;
private final InetAddressAndPort respondToAddress;
private final RequestSensors requestSensors;
private int replicaCount = 0;

public CounterMutationCallback(Message<CounterMutation> respondTo, InetAddressAndPort respondToAddress, RequestSensors requestSensors)
public CounterMutationCallback(Message<CounterMutation> respondTo, InetAddressAndPort respondToAddress)
{
this.respondTo = respondTo;
this.respondToAddress = respondToAddress;
this.requestSensors = requestSensors;
}

/**
Expand All @@ -66,40 +63,36 @@ public void run()
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
addSensorsToResponse(response, respondTo.payload.getMutation(), requestSensors, replicaMultiplier);
addSensorsToResponse(response, respondTo.payload.getMutation(), replicaMultiplier);
MessagingService.instance().send(response.build(), respondToAddress);
}

private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, RequestSensors requestSensors, int replicaMultiplier)
private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, int replicaMultiplier)
{
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);

Collection<Sensor> sensors = requestSensors.getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> response.withCustomParam(internodeBytesTableParam, bytes));

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> response.withCustomParam(internodeCountTableParam, count));
}
// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_MSG_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);
}

private static void addSensorsToResponse(Collection<Sensor> sensors,
private static void addSensorsToResponse(Collection<Sensor> requestSensors,
Function<String, String> requestParamSupplier,
Function<String, String> tableParamSupplier,
Message.Builder<NoPayload> response,
int replicaMultiplier)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue() * replicaMultiplier);
Expand Down
15 changes: 12 additions & 3 deletions src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.StorageProxy;

public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
Expand All @@ -45,12 +48,18 @@ public void doVerb(final Message<CounterMutation> message)
logger.trace("Applying forwarded {}", cm);

// Initialize the sensor and set ExecutorLocals
String keyspace = cm.getKeyspaceName();
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toSet());
RequestSensors requestSensors = new RequestSensors(keyspace, tables);
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, message.serializedSize(MessagingService.current_version));
});

String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
Expand All @@ -61,7 +70,7 @@ public void doVerb(final Message<CounterMutation> message)
// it's own in that case.
StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter,
new CounterMutationCallback(message, message.from(), requestSensors),
new CounterMutationCallback(message, message.from()),
queryStartNanoTime);
}
}
53 changes: 25 additions & 28 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,29 @@ private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Collection<Sensor> sensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add index write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInIndexWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInIndexWriteBytesTableParam;
sensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytes = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
internodeBytes.ifPresent(sensor -> {
byte[] bytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
response.withCustomParam(internodeBytesTableParam, bytes);
});

Optional<Sensor> internodeCount = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
internodeCount.ifPresent(sensor -> {
byte[] count = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
response.withCustomParam(internodeCountTableParam, count);
});
}
requestSensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_MSG_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response);
}

private void addSensorsToResponse(Collection<Sensor> sensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
private void addSensorsToResponse(Collection<Sensor> requestSensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue());
Expand Down Expand Up @@ -132,12 +123,18 @@ public void doVerb(Message<Mutation> message)
try
{
// Initialize the sensor and set ExecutorLocals
String keyspace = message.payload.getKeyspaceName();
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toList());
RequestSensors sensors = new RequestSensors(keyspace, tables);
ExecutorLocals locals = ExecutorLocals.create(sensors);
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, message.serializedSize(MessagingService.current_version));
});

message.payload.applyFuture(WriteOptions.DEFAULT).thenAccept(o -> respond(message, respondToAddress)).exceptionally(wto -> {
failed();
return null;
Expand Down
42 changes: 29 additions & 13 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
Expand Down Expand Up @@ -62,12 +63,18 @@ public void doVerb(Message<ReadCommand> message)

// Initialize the sensor and set ExecutorLocals
Context context = Context.from(command);
String keyspace = command.metadata().keyspace;
RequestSensors sensors = new RequestSensors(keyspace, ImmutableSet.of(command.metadata()));
sensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(sensors);
ImmutableSet<TableMetadata> tables = ImmutableSet.of(command.metadata());
RequestSensors requestSensors = new RequestSensors();
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, message.serializedSize(MessagingService.current_version));
});

long timeout = message.expiresAtNanos() - message.createdAtNanos();
command.setMonitoringTime(message.createdAtNanos(), message.isCrossNode(), timeout, DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));

Expand All @@ -85,22 +92,31 @@ public void doVerb(Message<ReadCommand> message)
return;
}

Tracing.trace("Enqueuing response to {}", message.from());
Message.Builder<ReadResponse> reply = message.responseWithBuilder(response);
addReadBytesSensorToResponse(reply, context);
int size = reply.currentSize(MessagingService.current_version);
RequestTracker.instance.get().incrementSensor(context, Type.INTERNODE_MSG_BYTES, size);
RequestTracker.instance.get().syncAllSensors();

addInternodeSensorToResponse(reply, context);
addReadBytesSensorToResponse(reply, context);

Tracing.trace("Enqueuing response to {}", message.from());
MessagingService.instance().send(reply.build(), message.from());
}

private void addInternodeSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
{
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> reply.withCustomParam(internodeBytesTableParam, bytes));

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> reply.withCustomParam(internodeCountTableParam, count));
Optional<Sensor> requestSensor = RequestTracker.instance.get().getSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesRequestParam(context.getTable()),
bytes);
});

Optional<Sensor> tableSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
tableSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable()),
bytes);
});
}

private void addReadBytesSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
Expand Down
20 changes: 19 additions & 1 deletion src/java/org/apache/cassandra/net/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,25 @@ public Message<T> build()
if (payload == null)
throw new IllegalArgumentException();

return new Message<>(new Header(hasId ? id : nextId(), verb, from, createdAtNanos, expiresAtNanos, flags, params), payload);
return doBuild(hasId ? id : nextId());
}

public int currentSize(int version)
{
// use dummy id just for the sake of computing the serialized size
return doBuild(0).serializedSize(version);
}

private Message<T> doBuild(long id)
{
if (verb == null)
throw new IllegalArgumentException();
if (from == null)
throw new IllegalArgumentException();
if (payload == null)
throw new IllegalArgumentException();

return new Message<>(new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, params), payload);
}
}

Expand Down
Loading

0 comments on commit 1d7ec4a

Please sign in to comment.