Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opinionated changes to internode tracking #1121

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 24 additions & 31 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.Optional;
import java.util.function.Function;

import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
Expand All @@ -39,16 +38,14 @@
*/
public class CounterMutationCallback implements Runnable
{
private final Message<CounterMutation> respondTo;
private final Message<CounterMutation> requestMessage;
private final InetAddressAndPort respondToAddress;
private final RequestSensors requestSensors;
private int replicaCount = 0;

public CounterMutationCallback(Message<CounterMutation> respondTo, InetAddressAndPort respondToAddress, RequestSensors requestSensors)
public CounterMutationCallback(Message<CounterMutation> requestMessage, InetAddressAndPort respondToAddress)
{
this.respondTo = respondTo;
this.requestMessage = requestMessage;
this.respondToAddress = respondToAddress;
this.requestSensors = requestSensors;
}

/**
Expand All @@ -62,44 +59,40 @@ public void setReplicaCount(Integer replicaCount)
@Override
public void run()
{
Message.Builder<NoPayload> response = respondTo.emptyResponseBuilder();
Message.Builder<NoPayload> responseBuilder = requestMessage.emptyResponseBuilder();
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
addSensorsToResponse(response, respondTo.payload.getMutation(), requestSensors, replicaMultiplier);
MessagingService.instance().send(response.build(), respondToAddress);
addSensorsToResponse(responseBuilder, requestMessage.payload.getMutation(), replicaMultiplier);
MessagingService.instance().send(responseBuilder.build(), respondToAddress);
}

private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, RequestSensors requestSensors, int replicaMultiplier)
private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, int replicaMultiplier)
{
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;

Collection<Sensor> sensors = requestSensors.getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> response.withCustomParam(internodeBytesTableParam, bytes));
// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> response.withCustomParam(internodeCountTableParam, count));
}
// Add write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);
}

private static void addSensorsToResponse(Collection<Sensor> sensors,
private static void addSensorsToResponse(Collection<Sensor> requestSensors,
Function<String, String> requestParamSupplier,
Function<String, String> tableParamSupplier,
Message.Builder<NoPayload> response,
int replicaMultiplier)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue() * replicaMultiplier);
Expand Down
15 changes: 12 additions & 3 deletions src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.StorageProxy;

public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
Expand All @@ -45,12 +48,18 @@ public void doVerb(final Message<CounterMutation> message)
logger.trace("Applying forwarded {}", cm);

// Initialize the sensor and set ExecutorLocals
String keyspace = cm.getKeyspaceName();
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toSet());
RequestSensors requestSensors = new RequestSensors(keyspace, tables);
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
Expand All @@ -61,7 +70,7 @@ public void doVerb(final Message<CounterMutation> message)
// it's own in that case.
StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter,
new CounterMutationCallback(message, message.from(), requestSensors),
new CounterMutationCallback(message, message.from()),
queryStartNanoTime);
}
}
53 changes: 25 additions & 28 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,29 @@ private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Collection<Sensor> sensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add index write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInIndexWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInIndexWriteBytesTableParam;
sensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response);

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytes = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
internodeBytes.ifPresent(sensor -> {
byte[] bytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
response.withCustomParam(internodeBytesTableParam, bytes);
});

Optional<Sensor> internodeCount = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
internodeCount.ifPresent(sensor -> {
byte[] count = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
response.withCustomParam(internodeCountTableParam, count);
});
}
requestSensors = RequestTracker.instance.get().getSensors(Type.INDEX_WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response);

// Add internode bytes sensors to the response after updating each per-table sensor with the current response

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented in the ReadCommandVerbHandler regarding this - I mean if we go with the approximation, we need to make it consistent - here we are including the sensors values from above (WRITE_BYTES and INDEX_WRITE_BYTES) are already added, but in the read example, READ_BYTES is added after we calculate the size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we can refactor this.

// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the syncAllSensors is redundant (and in other handlers) - it is already taken care of when mutation is applied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this comes after the mutation is applied right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. In that case, the syncAllSensors call in the apply method (or when the row iterator is closed in the read case) is redundant. Should we just keep the call in the verb handler? Now that you made idempotent, I'm less concerned about it but still a small performance hit

requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response);
}

private void addSensorsToResponse(Collection<Sensor> sensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
private void addSensorsToResponse(Collection<Sensor> requestSensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
{
for (Sensor requestSensor : sensors)
for (Sensor requestSensor : requestSensors)
{
String requestBytesParam = requestParamSupplier.apply(requestSensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(requestSensor.getValue());
Expand Down Expand Up @@ -132,12 +123,18 @@ public void doVerb(Message<Mutation> message)
try
{
// Initialize the sensor and set ExecutorLocals
String keyspace = message.payload.getKeyspaceName();
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toList());
RequestSensors sensors = new RequestSensors(keyspace, tables);
ExecutorLocals locals = ExecutorLocals.create(sensors);
RequestSensors requestSensors = new RequestSensors();
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

message.payload.applyFuture(WriteOptions.DEFAULT).thenAccept(o -> respond(message, respondToAddress)).exceptionally(wto -> {
failed();
return null;
Expand Down
42 changes: 29 additions & 13 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
Expand Down Expand Up @@ -62,12 +63,18 @@ public void doVerb(Message<ReadCommand> message)

// Initialize the sensor and set ExecutorLocals
Context context = Context.from(command);
String keyspace = command.metadata().keyspace;
RequestSensors sensors = new RequestSensors(keyspace, ImmutableSet.of(command.metadata()));
sensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(sensors);
ImmutableSet<TableMetadata> tables = ImmutableSet.of(command.metadata());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always a single table, not sure we need the Set.

RequestSensors requestSensors = new RequestSensors();
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

long timeout = message.expiresAtNanos() - message.createdAtNanos();
command.setMonitoringTime(message.createdAtNanos(), message.isCrossNode(), timeout, DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));

Expand All @@ -85,22 +92,31 @@ public void doVerb(Message<ReadCommand> message)
return;
}

Tracing.trace("Enqueuing response to {}", message.from());
Message.Builder<ReadResponse> reply = message.responseWithBuilder(response);
addReadBytesSensorToResponse(reply, context);
int size = reply.currentSize(MessagingService.current_version);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, the sensor custom headers themselves as not added so just confirming you intentionally want to exclude them? On one hand, they are not part of the user data, but at the end of the day they are bytes consuming network BW so I would've thought we should include them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is intentional. Rationale is they should be a pale share of the overall message size in prod.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it, but bc it doesn't cost us anything to include them (just calculating the size right before reply.build() I think why not just include all sensors do - it is as accurate of a measure of network bytes as it gets. Alternatively, we could just keep the payload and forget the headers altogether.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bc it doesn't cost us anything to include them (just calculating the size right before reply.build() I think why not just include all sensors

It's a chicken and egg problem :) How do you include the internode bytes in the header if you haven't computed them yet?

Alternatively, we could just keep the payload and forget the headers altogether.

I actually like this :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a chicken and egg problem :) How do you include the internode bytes in the header if you haven't computed them yet?

I'm just reference to the other UCU sensors (WRITE/READ/INDEX BYTES) can be included. The minimal bytes footprint to exclude is the one that actually captures INTERNODE BYTES

But payload only works for me too

RequestTracker.instance.get().incrementSensor(context, Type.INTERNODE_BYTES, size);
RequestTracker.instance.get().syncAllSensors();

addInternodeSensorToResponse(reply, context);
addReadBytesSensorToResponse(reply, context);

Tracing.trace("Enqueuing response to {}", message.from());
MessagingService.instance().send(reply.build(), message.from());
}

private void addInternodeSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
{
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> reply.withCustomParam(internodeBytesTableParam, bytes));

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> reply.withCustomParam(internodeCountTableParam, count));
Optional<Sensor> requestSensor = RequestTracker.instance.get().getSensor(context, Type.INTERNODE_BYTES);
requestSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesRequestParam(context.getTable()),
bytes);
});

Optional<Sensor> tableSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES);
tableSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable()),
bytes);
});
}

private void addReadBytesSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
Expand Down
20 changes: 19 additions & 1 deletion src/java/org/apache/cassandra/net/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,25 @@ public Message<T> build()
if (payload == null)
throw new IllegalArgumentException();

return new Message<>(new Header(hasId ? id : nextId(), verb, from, createdAtNanos, expiresAtNanos, flags, params), payload);
return doBuild(hasId ? id : nextId());
}

public int currentSize(int version)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically we build the message twice now - once we calculate the size, and another when want to build the actual response, is this ok performance wise? This was one of the reasons to use register to the outbound sink (tradeoff being complexity of the overall approach, just making sure you think the redundant message build is worth it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point taken. The most expensive computation is probably the payload size, what if we cache it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how caching the payload size would help. It seems you call it only once? I was also thinking building the message to calculate the size has a memory hit.

The size calculation of the message and including it in the headers is a chicken and egg problem. tbh don't see an effective way around it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you call it only once?

Once per message, but we build the message twice. The reason we cache it in the message is exactly the fact that it's the most expensive size computation.

I was also thinking building the message to calculate the size has a memory hit.

Building the Message is just a bunch of objects assignment, isn't it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once per message, but we build the message twice. The reason we cache it in the message is exactly the fact that it's the most expensive size computation.

Exactly - that's why I wasn't sure how caching would help, maybe I misunderstood your suggestion.

Building the Message is just a bunch of objects assignment, isn't it?

Fair point. No more concerns here.

{
// use dummy id just for the sake of computing the serialized size
return doBuild(0).serializedSize(version);
}

private Message<T> doBuild(long id)
{
if (verb == null)
throw new IllegalArgumentException();
if (from == null)
throw new IllegalArgumentException();
if (payload == null)
throw new IllegalArgumentException();

return new Message<>(new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, params), payload);
}
}

Expand Down
Loading