Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opinionated changes to internode tracking #1121

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 26 additions & 31 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.Optional;
import java.util.function.Function;

import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
Expand All @@ -39,16 +38,14 @@
*/
public class CounterMutationCallback implements Runnable
{
private final Message<CounterMutation> respondTo;
private final Message<CounterMutation> requestMessage;
private final InetAddressAndPort respondToAddress;
private final RequestSensors requestSensors;
private int replicaCount = 0;

public CounterMutationCallback(Message<CounterMutation> respondTo, InetAddressAndPort respondToAddress, RequestSensors requestSensors)
public CounterMutationCallback(Message<CounterMutation> requestMessage, InetAddressAndPort respondToAddress)
{
this.respondTo = respondTo;
this.requestMessage = requestMessage;
this.respondToAddress = respondToAddress;
this.requestSensors = requestSensors;
}

/**
Expand All @@ -62,51 +59,49 @@ public void setReplicaCount(Integer replicaCount)
@Override
public void run()
{
Message.Builder<NoPayload> response = respondTo.emptyResponseBuilder();
Message.Builder<NoPayload> responseBuilder = requestMessage.emptyResponseBuilder();
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
addSensorsToResponse(response, respondTo.payload.getMutation(), requestSensors, replicaMultiplier);
MessagingService.instance().send(response.build(), respondToAddress);
addSensorsToResponse(responseBuilder, requestMessage.payload.getMutation(), replicaMultiplier);
MessagingService.instance().send(responseBuilder.build(), respondToAddress);
}

private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, RequestSensors requestSensors, int replicaMultiplier)
private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, int replicaMultiplier)
{
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;

Collection<Sensor> sensors = requestSensors.getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);
int tables = mutation.getTableIds().size();

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> response.withCustomParam(internodeBytesTableParam, bytes));
// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
int perSensorSize = response.currentPayloadSize(MessagingService.current_version) / tables;
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> response.withCustomParam(internodeCountTableParam, count));
}
// Add write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);
}

private static void addSensorsToResponse(Collection<Sensor> sensors,
private static void addSensorsToResponse(Collection<Sensor> requestSensors,
Function<String, String> requestParamSupplier,
Function<String, String> tableParamSupplier,
Message.Builder<NoPayload> response,
int replicaMultiplier)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue() * replicaMultiplier);
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table counter write bytes as recorded by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(requestSensor.getContext(), requestSensor.getType());
Optional<Sensor> registrySensor = SensorsRegistry.instance.getOrCreateSensor(requestSensor.getContext(), requestSensor.getType());
registrySensor.ifPresent(sensor -> {
String tableBytesParam = tableParamSupplier.apply(sensor.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue() * replicaMultiplier);
Expand Down
15 changes: 12 additions & 3 deletions src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.StorageProxy;

public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
Expand All @@ -45,12 +48,18 @@ public void doVerb(final Message<CounterMutation> message)
logger.trace("Applying forwarded {}", cm);

// Initialize the sensor and set ExecutorLocals
String keyspace = cm.getKeyspaceName();
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toSet());
RequestSensors requestSensors = new RequestSensors(keyspace, tables);
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
Expand All @@ -61,7 +70,7 @@ public void doVerb(final Message<CounterMutation> message)
// it's own in that case.
StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter,
new CounterMutationCallback(message, message.from(), requestSensors),
new CounterMutationCallback(message, message.from()),
queryStartNanoTime);
}
}
57 changes: 28 additions & 29 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,48 +58,41 @@ private void respond(Message<Mutation> respondTo, InetAddressAndPort respondToAd

private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation)
{
int tables = mutation.getTableIds().size();

// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Collection<Sensor> sensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add index write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInIndexWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInIndexWriteBytesTableParam;
sensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytes = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
internodeBytes.ifPresent(sensor -> {
byte[] bytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
response.withCustomParam(internodeBytesTableParam, bytes);
});

Optional<Sensor> internodeCount = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
internodeCount.ifPresent(sensor -> {
byte[] count = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
response.withCustomParam(internodeCountTableParam, count);
});
}
requestSensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add internode bytes sensors to the response after updating each per-table sensor with the current response

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented in the ReadCommandVerbHandler regarding this - I mean if we go with the approximation, we need to make it consistent - here we are including the sensors values from above (WRITE_BYTES and INDEX_WRITE_BYTES) are already added, but in the read example, READ_BYTES is added after we calculate the size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we can refactor this.

// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentPayloadSize(MessagingService.current_version) / tables;
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the syncAllSensors is redundant (and in other handlers) - it is already taken care of when mutation is applied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this comes after the mutation is applied right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. In that case, the syncAllSensors call in the apply method (or when the row iterator is closed in the read case) is redundant. Should we just keep the call in the verb handler? Now that you made idempotent, I'm less concerned about it but still a small performance hit

requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response);
}

private void addSensorsToResponse(Collection<Sensor> sensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
private void addSensorsToResponse(Collection<Sensor> requestSensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue());
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table write/index bytes as observed by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(requestSensor.getContext(), requestSensor.getType());
Optional<Sensor> registrySensor = SensorsRegistry.instance.getOrCreateSensor(requestSensor.getContext(), requestSensor.getType());
registrySensor.ifPresent(sensor -> {
String tableBytesParam = tableParamSupplier.apply(sensor.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
Expand Down Expand Up @@ -132,12 +125,18 @@ public void doVerb(Message<Mutation> message)
try
{
// Initialize the sensor and set ExecutorLocals
String keyspace = message.payload.getKeyspaceName();
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toList());
RequestSensors sensors = new RequestSensors(keyspace, tables);
ExecutorLocals locals = ExecutorLocals.create(sensors);
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

message.payload.applyFuture(WriteOptions.DEFAULT).thenAccept(o -> respond(message, respondToAddress)).exceptionally(wto -> {
failed();
return null;
Expand Down
Loading