Skip to content

Commit

Permalink
Add internode sensors per ks and table
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed May 13, 2024
1 parent 36499a2 commit caa569f
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 119 deletions.
27 changes: 16 additions & 11 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.function.Function;

import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
Expand Down Expand Up @@ -65,27 +66,31 @@ public void run()
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
String keyspace = respondTo.payload.getKeyspaceName();
addSensorsToResponse(response, keyspace, requestSensors, replicaMultiplier);
addSensorsToResponse(response, respondTo.payload.getMutation(), requestSensors, replicaMultiplier);
MessagingService.instance().send(response.build(), respondToAddress);
}

private static void addSensorsToResponse(Message.Builder<NoPayload> response, String keyspace, RequestSensors requestSensors, int replicaMultiplier)
private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, RequestSensors requestSensors, int replicaMultiplier)
{
// Add internode message sensors to the response
Context context = new Context(keyspace);
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> response.withCustomParam(SensorsCustomParams.INTERNODE_MSG_BYTES, bytes));

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> response.withCustomParam(SensorsCustomParams.INTERNODE_MSG_COUNT, count));

// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;

Collection<Sensor> sensors = requestSensors.getSensors(Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);

// Add internode message sensors to the response
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> response.withCustomParam(internodeBytesTableParam, bytes));

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> response.withCustomParam(internodeCountTableParam, count));
}
}

private static void addSensorsToResponse(Collection<Sensor> sensors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
*/
package org.apache.cassandra.db;

import java.util.Collection;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.service.StorageProxy;

Expand All @@ -41,7 +46,8 @@ public void doVerb(final Message<CounterMutation> message)

// Initialize the sensor and set ExecutorLocals
String keyspace = cm.getKeyspaceName();
RequestSensors requestSensors = new RequestSensors(keyspace);
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toSet());
RequestSensors requestSensors = new RequestSensors(keyspace, tables);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

Expand Down
54 changes: 31 additions & 23 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Collection;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ForwardingInfo;
Expand All @@ -31,6 +33,7 @@
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
Expand All @@ -48,13 +51,12 @@ private void respond(Message<Mutation> respondTo, InetAddressAndPort respondToAd
Tracing.trace("Enqueuing response to {}", respondToAddress);

Message.Builder<NoPayload> response = respondTo.emptyResponseBuilder();
String keyspace = respondTo.payload.getKeyspaceName();
addSensorsToResponse(response, keyspace);
addSensorsToResponse(response, respondTo.payload);

MessagingService.instance().send(response.build(), respondToAddress);
}

private void addSensorsToResponse(Message.Builder<NoPayload> response, String keyspace)
private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation)
{
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Expand All @@ -69,18 +71,23 @@ private void addSensorsToResponse(Message.Builder<NoPayload> response, String ke
addSensorsToResponse(sensors, requestParam, tableParam, response);

// Add internode message sensors to the response
Context context = new Context(keyspace);
Optional<Sensor> internodeBytes = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
internodeBytes.ifPresent(sensor -> {
byte[] bytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
response.withCustomParam(SensorsCustomParams.INTERNODE_MSG_BYTES, bytes);
});

Optional<Sensor> internodeCount = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
internodeCount.ifPresent(sensor -> {
byte[] count = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
response.withCustomParam(SensorsCustomParams.INTERNODE_MSG_COUNT, count);
});
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
Context context = Context.from(update.metadata());
Optional<Sensor> internodeBytes = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
internodeBytes.ifPresent(sensor -> {
byte[] bytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
response.withCustomParam(internodeBytesTableParam, bytes);
});

Optional<Sensor> internodeCount = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
internodeCount.ifPresent(sensor -> {
byte[] count = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
response.withCustomParam(internodeCountTableParam, count);
});
}
}

private void addSensorsToResponse(Collection<Sensor> sensors, Function<String, String> requestParamSupplier, Function<String, String> tableParamSupplier, Message.Builder<NoPayload> response)
Expand Down Expand Up @@ -126,7 +133,8 @@ public void doVerb(Message<Mutation> message)
{
// Initialize the sensor and set ExecutorLocals
String keyspace = message.payload.getKeyspaceName();
RequestSensors sensors = new RequestSensors(keyspace);
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toList());
RequestSensors sensors = new RequestSensors(keyspace, tables);
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);

Expand All @@ -144,18 +152,18 @@ public void doVerb(Message<Mutation> message)
private static void forwardToLocalNodes(Message<Mutation> originalMessage, ForwardingInfo forwardTo)
{
Message.Builder<Mutation> builder =
Message.builder(originalMessage)
.withParam(ParamType.RESPOND_TO, originalMessage.from())
.withoutParam(ParamType.FORWARD_TO);
Message.builder(originalMessage)
.withParam(ParamType.RESPOND_TO, originalMessage.from())
.withoutParam(ParamType.FORWARD_TO);

boolean useSameMessageID = forwardTo.useSameMessageID(originalMessage.id());
// reuse the same Message if all ids are identical (as they will be for 4.0+ node originated messages)
Message<Mutation> message = useSameMessageID ? builder.build() : null;

forwardTo.forEach((id, target) ->
{
Tracing.trace("Enqueuing forwarded write to {}", target);
MessagingService.instance().send(useSameMessageID ? message : builder.withId(id).build(), target);
});
{
Tracing.trace("Enqueuing forwarded write to {}", target);
MessagingService.instance().send(useSameMessageID ? message : builder.withId(id).build(), target);
});
}
}
14 changes: 8 additions & 6 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Optional;

import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -62,7 +63,7 @@ public void doVerb(Message<ReadCommand> message)
// Initialize the sensor and set ExecutorLocals
Context context = Context.from(command);
String keyspace = command.metadata().keyspace;
RequestSensors sensors = new RequestSensors(keyspace);
RequestSensors sensors = new RequestSensors(keyspace, ImmutableSet.of(command.metadata()));
sensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);
Expand Down Expand Up @@ -93,12 +94,13 @@ public void doVerb(Message<ReadCommand> message)

private void addInternodeSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
{
Context internodeSensorContext = new Context(context.getKeyspace());
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(internodeSensorContext, Type.INTERNODE_MSG_BYTES);
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> reply.withCustomParam(SensorsCustomParams.INTERNODE_MSG_BYTES, bytes));
Optional<Sensor> internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
internodeBytesSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> reply.withCustomParam(internodeBytesTableParam, bytes));

Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(internodeSensorContext, Type.INTERNODE_MSG_COUNT);
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> reply.withCustomParam(SensorsCustomParams.INTERNODE_MSG_COUNT, count));
Optional<Sensor> internodeCountSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_COUNT);
String internodeCountTableParam = SensorsCustomParams.encodeTableInInternodeCountTableParam(context.getTable());
internodeCountSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(count -> reply.withCustomParam(internodeCountTableParam, count));
}

private void addReadBytesSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
Expand Down
23 changes: 14 additions & 9 deletions src/java/org/apache/cassandra/net/MessagingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.MessagingMetrics;
import org.apache.cassandra.nodes.Nodes;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.utils.ExecutorUtils;
Expand Down Expand Up @@ -633,9 +633,6 @@ public void waitUntilListening() throws InterruptedException
inboundSockets.open().await();
}

/**
* Tracks outbound messages size and count in Sensors Regsitry
*/
private boolean trackOutboundMessages(Message<?> message, InetAddressAndPort ignored)
{
RequestSensors requestSensors = RequestTracker.instance.get();
Expand All @@ -644,11 +641,19 @@ private boolean trackOutboundMessages(Message<?> message, InetAddressAndPort ign

String keyspace = requestSensors.getKeyspace();
double size = message.serializedSize(MessagingService.current_version);
Context context = new Context(keyspace);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_COUNT);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, size);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_COUNT, 1.0);
// split the internode message bytes and count into between tables in the mutation
int tablesCount = requestSensors.getTables().size();
double internodeBytesPerTable = size / tablesCount;
double internodeCountPerTable = 1.0d / tablesCount;

for (TableMetadata tm : requestSensors.getTables())
{
Context context = new Context(keyspace, tm.name, tm.id.toString());
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_COUNT);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, internodeBytesPerTable);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_COUNT, internodeCountPerTable);
}
requestSensors.syncAllSensors();

return true;
Expand Down
17 changes: 14 additions & 3 deletions src/java/org/apache/cassandra/net/SensorsCustomParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.cassandra.net;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import org.apache.cassandra.sensors.Sensor;

Expand Down Expand Up @@ -67,12 +66,14 @@ public final class SensorsCustomParams
public static final String INDEX_WRITE_BYTES_TABLE_TEMPLATE = "INDEX_WRITE_BYTES_TABLE.%s";
/**
* The total internode message bytes received by the writer or coordinator for a given keyspace.
* To support batch writes, table name is encoded in the following format: INTERNODE_MSG_BYTES.<table>
*/
public static final String INTERNODE_MSG_BYTES = "INTERNODE_MSG_BYTES";
public static final String INTERNODE_MSG_BYTES_TABLE_TEMPLATE = "INTERNODE_MSG_BYTES_TABLE.%s";
/**
* The total internode message count received by the writer or coordinator for a given keyspace.
* To support batch writes, table name is encoded in the following format: INTERNODE_MSG_COUNT.<table>
*/
public static final String INTERNODE_MSG_COUNT = "INTERNODE_MSG_COUNT";
public static final String INTERNODE_MSG_COUNT_TABLE_TEMPLATE = "INTERNODE_MSG_COUNT_TABLE.%s";

private SensorsCustomParams()
{
Expand Down Expand Up @@ -116,4 +117,14 @@ public static String encodeTableInIndexWriteBytesTableParam(String tableName)
{
return String.format(INDEX_WRITE_BYTES_TABLE_TEMPLATE, tableName);
}

public static String encodeTableInInternodeBytesTableParam(String tableName)
{
return String.format(INTERNODE_MSG_BYTES_TABLE_TEMPLATE, tableName);
}

public static String encodeTableInInternodeCountTableParam(String tableName)
{
return String.format(INTERNODE_MSG_COUNT_TABLE_TEMPLATE, tableName);
}
}
8 changes: 0 additions & 8 deletions src/java/org/apache/cassandra/sensors/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ public Context(String keyspace, String table, String tableId)
this.tableId = tableId;
}

/**
* Use to track sensors at the keyspace level.
*/
public Context(String keyspace)
{
this(keyspace, null, null);
}

public String getKeyspace()
{
return keyspace;
Expand Down
20 changes: 17 additions & 3 deletions src/java/org/apache/cassandra/sensors/RequestSensors.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.cassandra.sensors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -26,6 +30,9 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;

import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.Pair;

/**
Expand All @@ -47,16 +54,18 @@ public class RequestSensors
private final Supplier<SensorsRegistry> sensorsRegistry;
private final ConcurrentMap<Pair<Context, Type>, Sensor> sensors = new ConcurrentHashMap<>();
private final String keyspace;
private final Collection<TableMetadata> tables;

public RequestSensors(String keyspace)
public RequestSensors(String keyspace, Collection<TableMetadata> tables)
{
this(() -> SensorsRegistry.instance, keyspace);
this(() -> SensorsRegistry.instance, keyspace, tables);
}

public RequestSensors(Supplier<SensorsRegistry> sensorsRegistry, String keyspace)
public RequestSensors(Supplier<SensorsRegistry> sensorsRegistry, String keyspace, Collection<TableMetadata> tables)
{
this.sensorsRegistry = sensorsRegistry;
this.keyspace = keyspace;
this.tables = tables;
}

public void registerSensor(Context context, Type type)
Expand Down Expand Up @@ -84,6 +93,11 @@ public String getKeyspace()
return keyspace;
}

public Collection<TableMetadata> getTables()
{
return tables;
}

public void syncAllSensors()
{
sensors.values().forEach(s -> sensorsRegistry.get().updateSensor(s.getContext(), s.getType(), s.getValue()));
Expand Down
Loading

1 comment on commit caa569f

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 1 NEW test failure(s) in 14 builds., Build 14: ran 13369 tests with 1 failures and 109 skipped.
Butler analysis done on ds-cassandra-pr-gate/cc-ucu-intenode vs last 16 runs of ds-cassandra-build-nightly/ds-trunk.
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.replaceDownedHost: test looks flaky. No failures on upstream;
branch story: [+F++] vs upstream: [+++++++++++++]; [NEW]
butler comparison

Please sign in to comment.