Skip to content

Commit

Permalink
Added SensorsInternodeTest + bug fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbtourist committed May 16, 2024
1 parent 1bba5f7 commit 0d57897
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 54 deletions.
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ public void run()

private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, int replicaMultiplier)
{
int tables = mutation.getTableIds().size();

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
int perSensorSize = response.currentSize(MessagingService.current_version) / tables;
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Expand All @@ -99,7 +101,7 @@ private static void addSensorsToResponse(Collection<Sensor> requestSensors,
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table counter write bytes as recorded by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(requestSensor.getContext(), requestSensor.getType());
Optional<Sensor> registrySensor = SensorsRegistry.instance.getOrCreateSensor(requestSensor.getContext(), requestSensor.getType());
registrySensor.ifPresent(sensor -> {
String tableBytesParam = tableParamSupplier.apply(sensor.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue() * replicaMultiplier);
Expand Down
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ private void respond(Message<Mutation> respondTo, InetAddressAndPort respondToAd

private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation)
{
int tables = mutation.getTableIds().size();

// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Expand All @@ -72,7 +74,7 @@ private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
int perSensorSize = response.currentSize(MessagingService.current_version) / tables;
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Expand All @@ -90,7 +92,7 @@ private void addSensorsToResponse(Collection<Sensor> requestSensors, Function<St
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table write/index bytes as observed by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(requestSensor.getContext(), requestSensor.getType());
Optional<Sensor> registrySensor = SensorsRegistry.instance.getOrCreateSensor(requestSensor.getContext(), requestSensor.getType());
registrySensor.ifPresent(sensor -> {
String tableBytesParam = tableParamSupplier.apply(sensor.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void addInternodeSensorToResponse(Message.Builder<ReadResponse> reply, C
bytes);
});

Optional<Sensor> tableSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES);
Optional<Sensor> tableSensor = SensorsRegistry.instance.getOrCreateSensor(context, Type.INTERNODE_BYTES);
tableSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable()),
bytes);
Expand All @@ -133,7 +133,7 @@ private void addSensorDataToResponse(Message.Builder<ReadResponse> reply, Contex
readRequestSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue()))
.ifPresent(bytes -> reply.withCustomParam(requestBytesParam, bytes));

Optional<Sensor> readTableSensor = SensorsRegistry.instance.getSensor(context, type);
Optional<Sensor> readTableSensor = SensorsRegistry.instance.getOrCreateSensor(context, type);
readTableSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue()))
.ifPresent(bytes -> reply.withCustomParam(tableBytesParam, bytes));
}
Expand Down
71 changes: 23 additions & 48 deletions test/unit/org/apache/cassandra/sensors/SensorsInternodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -120,11 +119,11 @@ public void testInternodeSensorsForRead()
.build()
.applyUnsafe();


DecoratedKey key = store.getPartitioner().decorateKey(ByteBufferUtil.bytes("0"));
ReadCommand command = Util.cmd(store, key).build();
Runnable handler = () -> ReadCommandVerbHandler.instance.doVerb(Message.builder(Verb.READ_REQ, command).build());
testInternodeSensors(handler, ImmutableSet.of(context));
Message request = Message.builder(Verb.READ_REQ, command).build();
Runnable handler = () -> ReadCommandVerbHandler.instance.doVerb(request);
testInternodeSensors(request, handler, ImmutableSet.of(context));
}

@Test
Expand All @@ -137,8 +136,9 @@ public void testInternodeSensorsForMutation()
.add("val", "0")
.build();

Runnable handler = () -> MutationVerbHandler.instance.doVerb(Message.builder(Verb.MUTATION_REQ, mutation).build());
testInternodeSensors(handler, ImmutableSet.of(context));
Message request = Message.builder(Verb.MUTATION_REQ, mutation).build();
Runnable handler = () -> MutationVerbHandler.instance.doVerb(request);
testInternodeSensors(request, handler, ImmutableSet.of(context));
}

@Test
Expand All @@ -164,8 +164,9 @@ public void testInternodeSensorsForBatchMutation()
.build());

Mutation mutation = Mutation.merge(mutations);
Runnable handler = () -> MutationVerbHandler.instance.doVerb(Message.builder(Verb.MUTATION_REQ, mutation).build());
testInternodeSensors(handler, ImmutableSet.of(context1, context2));
Message request = Message.builder(Verb.MUTATION_REQ, mutation).build();
Runnable handler = () -> MutationVerbHandler.instance.doVerb(request);
testInternodeSensors(request, handler, ImmutableSet.of(context1, context2));
}

@Test
Expand All @@ -182,56 +183,30 @@ public void testInternodeSensorsForCounterMutation()

CounterMutation counterMutation = new CounterMutation(mutation, ConsistencyLevel.ANY);

Runnable handler = () -> CounterMutationVerbHandler.instance.doVerb(Message.builder(Verb.COUNTER_MUTATION_REQ, counterMutation).build());
testInternodeSensors(handler, ImmutableSet.of(context));
Message request = Message.builder(Verb.COUNTER_MUTATION_REQ, counterMutation).build();
Runnable handler = () -> CounterMutationVerbHandler.instance.doVerb(request);
testInternodeSensors(request, handler, ImmutableSet.of(context));
}

private void testInternodeSensors(Runnable handler, Collection<Context> contexts)
private void testInternodeSensors(Message request, Runnable handler, Collection<Context> contexts)
{

/** TODO: not sure how to rewrite this test
// Boostraps the internode sensors in the registry. Notice that the first outbound message will not have any internode sensors
// because the resitry will be initilized after the first outbound message is intercepted.
// Run the handler:
handler.run();
Sensor firstInternodeBytesSensor = SensorsRegistry.instance.getSensor(contexts.iterator().next(), Type.INTERNODE_BYTES).get();
double baselineMsgSize = firstInternodeBytesSensor.getValue(); // the size of the first internode message, which doesn't include the internode sensors headers

for (Context context : contexts)
{
SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES).get().reset();
}
// Get the request size, response size and total size per table:
int tableCount = contexts.size();
int requestSizePerTable = request.serializedSize(MessagingService.current_version) / tableCount;
Message response = capturedOutboundMessages.get(capturedOutboundMessages.size() - 1);
int responseSizePerTable = response.serializedSize(MessagingService.current_version) / tableCount;
int total = requestSizePerTable + responseSizePerTable;

// Capture the first values of internode message sensors
handler.run();
// For each context/table, get the internode bytes and verify their value is between the request and total size:
// it can't be equal to the total size because we don't record the custom headers in the internode sensor.
for (Context context : contexts)
{
Sensor internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES).get();
double internodeBytes = internodeBytesSensor.getValue();
double tableCount = contexts.size();
assertThat(internodeBytes).isGreaterThan(baselineMsgSize / tableCount);
}
double internodeBytes = firstInternodeBytesSensor.getValue();
// handle the same command/mutation two more times
handler.run();
handler.run();
for (Context context : contexts)
{
Sensor internodeBytesSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES).get();
double newInternodeBytes = internodeBytesSensor.getValue();
assertThat(newInternodeBytes).isEqualTo(internodeBytes * 3.0);
// check the latest outbound message accomodated for the previous two internode messages
Message<?> message = capturedOutboundMessages.get(capturedOutboundMessages.size() - 1);
assertThat(message.header.customParams()).isNotNull();
String internodeBytesTableParam = SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable());
assertThat(message.header.customParams()).containsKey(internodeBytesTableParam);
double internodeBytesInHeader = SensorsTestUtil.bytesToDouble(message.header.customParams().get(internodeBytesTableParam));
assertThat(internodeBytesInHeader).isEqualTo(internodeBytes * 2.0);
assertThat(internodeBytes).isBetween(requestSizePerTable * 1.0, total * 1.0);
}
**/
}
}

1 comment on commit 0d57897

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 1 NEW test failure(s) in 3 builds., Build 3: ran 13367 tests with 4 failures and 109 skipped.
Butler analysis done on ds-cassandra-pr-gate/cc-ucu-intenode-sb vs last 16 runs of ds-cassandra-build-nightly/ds-trunk.
org.apache.cassandra.distributed.test.SSTableIdGenerationTest.testSnapshot: test failed in the recent build. No failures on upstream;
branch story: [F++] vs upstream: [++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.