Skip to content

Commit

Permalink
Opinionated changes to internode tracking:
Browse files Browse the repository at this point in the history
* Removed message count as bytes should really be the most important/accurate.
* Tracked both inbound and outbound bytes, as writes take up inbound
bytes, while reads take up outbound bytes.
* Added internode sensors to custom params in the same place where we add the other sensors.
* Added internode sensors for request.
  • Loading branch information
sbtourist committed May 15, 2024
1 parent 1d7ec4a commit 8aa88ca
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 84 deletions.
30 changes: 15 additions & 15 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
*/
public class CounterMutationCallback implements Runnable
{
private final Message<CounterMutation> respondTo;
private final Message<CounterMutation> requestMessage;
private final InetAddressAndPort respondToAddress;
private int replicaCount = 0;

public CounterMutationCallback(Message<CounterMutation> respondTo, InetAddressAndPort respondToAddress)
public CounterMutationCallback(Message<CounterMutation> requestMessage, InetAddressAndPort respondToAddress)
{
this.respondTo = respondTo;
this.requestMessage = requestMessage;
this.respondToAddress = respondToAddress;
}

Expand All @@ -59,30 +59,30 @@ public void setReplicaCount(Integer replicaCount)
@Override
public void run()
{
Message.Builder<NoPayload> response = respondTo.emptyResponseBuilder();
Message.Builder<NoPayload> responseBuilder = requestMessage.emptyResponseBuilder();
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
addSensorsToResponse(response, respondTo.payload.getMutation(), replicaMultiplier);
MessagingService.instance().send(response.build(), respondToAddress);
addSensorsToResponse(responseBuilder, requestMessage.payload.getMutation(), replicaMultiplier);
MessagingService.instance().send(responseBuilder.build(), respondToAddress);
}

private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation mutation, int replicaMultiplier)
{
// Add write bytes sensors to the response
Function<String, String> requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_MSG_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);

// Add write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
requestSensors = RequestTracker.instance.get().getSensors(Type.WRITE_BYTES);
addSensorsToResponse(requestSensors, requestParam, tableParam, response, replicaMultiplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public void doVerb(final Message<CounterMutation> message)
// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, message.serializedSize(MessagingService.current_version));
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation
// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentSize(MessagingService.current_version) / requestSensors.size();
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_MSG_BYTES);
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Expand Down Expand Up @@ -131,8 +131,8 @@ public void doVerb(Message<Mutation> message)
// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
Context context = Context.from(tm);
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, message.serializedSize(MessagingService.current_version));
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

message.payload.applyFuture(WriteOptions.DEFAULT).thenAccept(o -> respond(message, respondToAddress)).exceptionally(wto -> {
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void doVerb(Message<ReadCommand> message)

// Initialize internode bytes with the inbound message size:
tables.forEach(tm -> {
requestSensors.registerSensor(context, Type.INTERNODE_MSG_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_MSG_BYTES, message.serializedSize(MessagingService.current_version));
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version) / tables.size());
});

long timeout = message.expiresAtNanos() - message.createdAtNanos();
Expand All @@ -94,7 +94,7 @@ public void doVerb(Message<ReadCommand> message)

Message.Builder<ReadResponse> reply = message.responseWithBuilder(response);
int size = reply.currentSize(MessagingService.current_version);
RequestTracker.instance.get().incrementSensor(context, Type.INTERNODE_MSG_BYTES, size);
RequestTracker.instance.get().incrementSensor(context, Type.INTERNODE_BYTES, size);
RequestTracker.instance.get().syncAllSensors();

addInternodeSensorToResponse(reply, context);
Expand All @@ -106,13 +106,13 @@ public void doVerb(Message<ReadCommand> message)

private void addInternodeSensorToResponse(Message.Builder<ReadResponse> reply, Context context)
{
Optional<Sensor> requestSensor = RequestTracker.instance.get().getSensor(context, Type.INTERNODE_MSG_BYTES);
Optional<Sensor> requestSensor = RequestTracker.instance.get().getSensor(context, Type.INTERNODE_BYTES);
requestSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesRequestParam(context.getTable()),
bytes);
});

Optional<Sensor> tableSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_MSG_BYTES);
Optional<Sensor> tableSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES);
tableSensor.map(s -> SensorsCustomParams.sensorValueAsBytes(s.getValue())).ifPresent(bytes -> {
reply.withCustomParam(SensorsCustomParams.encodeTableInInternodeBytesTableParam(context.getTable()),
bytes);
Expand Down
33 changes: 27 additions & 6 deletions src/java/org/apache/cassandra/sensors/RequestSensors.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -47,7 +48,8 @@ public class RequestSensors
{
private final Supplier<SensorsRegistry> sensorsRegistry;
private final ConcurrentMap<Pair<Context, Type>, Sensor> sensors = new ConcurrentHashMap<>();
private AtomicBoolean synced = new AtomicBoolean(false);
private final ConcurrentMap<Sensor, Double> latestSyncedValuePerSensor = new ConcurrentHashMap<>();
private final ReadWriteLock updateLock = new ReentrantReadWriteLock();

public RequestSensors()
{
Expand Down Expand Up @@ -76,14 +78,33 @@ public Set<Sensor> getSensors(Type type)

public void incrementSensor(Context context, Type type, double value)
{
Optional.ofNullable(sensors.get(Pair.create(context, type))).ifPresent(s -> s.increment(value));
synced.set(false);
updateLock.readLock().lock();
try
{
Optional.ofNullable(sensors.get(Pair.create(context, type))).ifPresent(s -> s.increment(value));
}
finally
{
updateLock.readLock().unlock();
}
}

public void syncAllSensors()
{
if (synced.compareAndSet(false, true))
sensors.values().forEach(s -> sensorsRegistry.get().updateSensor(s.getContext(), s.getType(), s.getValue()));
updateLock.writeLock().lock();
try
{
sensors.values().forEach(sensor -> {
double current = latestSyncedValuePerSensor.getOrDefault(sensor, 0d);
double update = sensor.getValue() - current;
latestSyncedValuePerSensor.put(sensor, sensor.getValue());
sensorsRegistry.get().incrementSensor(sensor.getContext(), sensor.getType(), update);
});
}
finally
{
updateLock.writeLock().unlock();
}
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/sensors/SensorsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
* </ul>
* The returned sensors are global, meaning that their value spans across requests/responses, but cannot be modified either
* directly or indirectly via this class (whose update methods are package protected). In order to modify a sensor value,
* it must be registered to a request/response via {@link RequestSensors#registerSensor(Type)} and incremented via
* {@link RequestSensors#incrementSensor(Type, double)}, then synced via {@link RequestSensors#syncAllSensors()}, which
* it must be registered to a request/response via {@link RequestSensors#registerSensor(Context, Type)} and incremented via
* {@link RequestSensors#incrementSensor(Context, Type, double)}, then synced via {@link RequestSensors#syncAllSensors()}, which
* will update the related global sensors.
* <br/><br/>
* Given sensors are tied to a context, that is to a given keyspace and table, their global instance will be deleted
Expand Down Expand Up @@ -146,16 +146,16 @@ public Optional<Sensor> getOrCreateSensor(Context context, Type type)
}
}

protected void updateSensor(Context context, Type type, double value)
protected void incrementSensor(Context context, Type type, double value)
{
getOrCreateSensor(context, type).ifPresent(s -> s.increment(value));
}

protected Future<Void> updateSensorAsync(Context context, Type type, double value, long delay, TimeUnit unit)
protected Future<Void> incrementSensorAsync(Context context, Type type, double value, long delay, TimeUnit unit)
{
return asyncUpdater.onTimeout(() ->
getOrCreateSensor(context, type).ifPresent(s -> s.increment(value)),
delay, unit);
getOrCreateSensor(context, type).ifPresent(s -> s.increment(value)),
delay, unit);
}

public Set<Sensor> getSensorsByKeyspace(String keyspace)
Expand Down
3 changes: 1 addition & 2 deletions src/java/org/apache/cassandra/sensors/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
*/
public enum Type
{
CLIENT_BYTES,
INTERNODE_MSG_BYTES,
INTERNODE_BYTES,

READ_BYTES,

Expand Down
23 changes: 18 additions & 5 deletions test/unit/org/apache/cassandra/db/CounterMutationCallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -120,21 +119,22 @@ public void testCounterMutationCallback()
// dummy mutation
TableMetadata metadata = MockSchema.newTableMetadata(KEYSPACE1, CF_COUTNER);
Mutation mutation = new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
CounterMutation counterMutation = new CounterMutation(mutation, null);
Message<CounterMutation> msg =
Message.builder(Verb.COUNTER_MUTATION_REQ, counterMutation)
CounterMutation counterMutation = new CounterMutation(mutation, ConsistencyLevel.ANY); // CL here just for serialization, otherwise ignored
Message<CounterMutation> msg = Message.builder(Verb.COUNTER_MUTATION_REQ, counterMutation)
.withId(1)
.from(FBUtilities.getLocalAddressAndPort())
.withCreatedAt(approxTime.now())
.withExpiresAt(approxTime.now() + TimeUnit.SECONDS.toNanos(1))
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE)
.withParam(TRACE_SESSION, UUID.randomUUID())
.build();
int responseSize = msg.emptyResponseBuilder().currentSize(MessagingService.current_version);

RequestSensors requestSensors = new RequestSensors();
RequestTracker.instance.set(requestSensors);

Context context = Context.from(Keyspace.open(KEYSPACE1).getMetadata().tables.get(CF_COUTNER).get());
requestSensors.registerSensor(context, Type.INTERNODE_BYTES);
requestSensors.registerSensor(context, Type.WRITE_BYTES);
requestSensors.incrementSensor(context, Type.WRITE_BYTES, COUNTER_MUTATION_BYTES); // mimic a counter mutation of size 56 bytes on the leader node
requestSensors.syncAllSensors();
Expand All @@ -149,9 +149,12 @@ public void testCounterMutationCallback()
assertThat(localSensor.getValue()).isEqualTo(COUNTER_MUTATION_BYTES);
Sensor registerSensor = SensorsRegistry.instance.getSensor(context, Type.WRITE_BYTES).get();
assertThat(registerSensor.getValue()).isEqualTo(COUNTER_MUTATION_BYTES);
localSensor = SensorsTestUtil.getThreadLocalRequestSensor(context, Type.INTERNODE_BYTES);
assertThat(localSensor.getValue()).isEqualTo(responseSize);
registerSensor = SensorsRegistry.instance.getSensor(context, Type.INTERNODE_BYTES).get();
assertThat(registerSensor.getValue()).isEqualTo(responseSize);

// verify custom headers have the sensors values adjusted for the replica count
assertThat(localSensor.getValue()).isEqualTo(COUNTER_MUTATION_BYTES);
assertThat(capturedOutboundMessages).size().isEqualTo(1);
Map<String, byte[]> customParam = capturedOutboundMessages.get(0).header.customParams();
assertThat(customParam).isNotNull();
Expand All @@ -166,5 +169,15 @@ public void testCounterMutationCallback()
double actual = SensorsCustomParams.sensorValueFromBytes(v);
assertThat(actual).isEqualTo(expectedSensorValue);
});
assertThat(customParam).hasEntrySatisfying("INTERNODE_MSG_BYTES_REQUEST.Counter",
v -> {
double actual = SensorsCustomParams.sensorValueFromBytes(v);
assertThat(actual).isEqualTo(responseSize * Math.max(replicaCount, 1));
});
assertThat(customParam).hasEntrySatisfying("INTERNODE_MSG_BYTES_TABLE.Counter",
v -> {
double actual = SensorsCustomParams.sensorValueFromBytes(v);
assertThat(actual).isEqualTo(responseSize * Math.max(replicaCount, 1));
});
}
}
Loading

1 comment on commit 8aa88ca

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 247 NEW test failure(s) in 1 builds., Build 1: ran 13370 tests with 251 failures and 109 skipped.
Butler analysis done on ds-cassandra-pr-gate/cc-ucu-intenode-sb vs last 16 runs of ds-cassandra-build-nightly/ds-trunk.
Showing only first 13 NEW test failures
org.apache.cassandra.distributed.test.metrics.StreamingMetricsTest.testMetricsWithRepairAndStreamingToTwoNodes: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.ReadRepairTest.testGCableTombstoneResurrectionOnRangeSliceQuery: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.ShortReadProtectionTest.testDescendingOrder[0: read_cl=ALL flush=false paging=false]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.ShortReadProtectionTest.testDeletePartitionWithStatic[0: read_cl=ALL flush=false paging=false]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
replica_side_filtering_test.TestAllowFiltering.test_update_on_collection: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [+++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.ReplicaFilteringProtectionTest.testMissedUpdatesBelowCachingWarnThreshold: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.SimpleReadWriteTest.testRangeQuery[5: compressor=DeflateCompressor reverse=true]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.SimpleReadWriteTest.testPartitionQuery[9: compressor=ZstdCompressor reverse=true]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.NetstatsRepairStreamingTest.testWithCompressionEnabled: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.ShortReadProtectionTest.testAscendingOrder[7: read_cl=QUORUM flush=true paging=true]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.OptimiseStreamsRepairTest.testBasic: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.PreviewRepairTest.testStartNonIntersectingPreviewRepair: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.ShortReadProtectionTest.testMultipleMissedRows[1: read_cl=ALL flush=false paging=true]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.