Skip to content

Commit

Permalink
* Use only the payload size as internode bytes.
Browse files Browse the repository at this point in the history
* Cache the payload size where possible.
  • Loading branch information
sbtourist committed May 23, 2024
1 parent 0d57897 commit a14504f
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static void addSensorsToResponse(Message.Builder<NoPayload> response, Mu
// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
Collection<Sensor> requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
int perSensorSize = response.currentSize(MessagingService.current_version) / tables;
int perSensorSize = response.currentPayloadSize(MessagingService.current_version) / tables;
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private void addSensorsToResponse(Message.Builder<NoPayload> response, Mutation

// Add internode bytes sensors to the response after updating each per-table sensor with the current response
// message size: this is missing the sensor values, but it's a good enough approximation
int perSensorSize = response.currentSize(MessagingService.current_version) / tables;
int perSensorSize = response.currentPayloadSize(MessagingService.current_version) / tables;
requestSensors = RequestTracker.instance.get().getSensors(Type.INTERNODE_BYTES);
requestSensors.forEach(sensor -> RequestTracker.instance.get().incrementSensor(sensor.getContext(), sensor.getType(), perSensorSize));
RequestTracker.instance.get().syncAllSensors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void doVerb(Message<ReadCommand> message)
}

Message.Builder<ReadResponse> reply = message.responseWithBuilder(response);
int size = reply.currentSize(MessagingService.current_version);
int size = reply.currentPayloadSize(MessagingService.current_version);
RequestTracker.instance.get().incrementSensor(context, Type.INTERNODE_BYTES, size);
RequestTracker.instance.get().syncAllSensors();

Expand Down
35 changes: 31 additions & 4 deletions src/java/org/apache/cassandra/net/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ public static class Builder<T>

private boolean hasId;

private Message cachedMessage;

private Builder()
{
}
Expand Down Expand Up @@ -607,10 +609,12 @@ public Message<T> build()
return doBuild(hasId ? id : nextId());
}

public int currentSize(int version)
public int currentPayloadSize(int version)
{
// use dummy id just for the sake of computing the serialized size
return doBuild(0).serializedSize(version);
Message<T> tmp = doBuild(0);
cachedMessage = tmp;
return tmp.payloadSize(version);
}

private Message<T> doBuild(long id)
Expand All @@ -622,7 +626,11 @@ private Message<T> doBuild(long id)
if (payload == null)
throw new IllegalArgumentException();

return new Message<>(new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, params), payload);
Message<T> tmp = new Message<>(new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, params), payload);
if (cachedMessage != null)
tmp.maybeCachePayloadSize(cachedMessage);

return tmp;
}
}

Expand Down Expand Up @@ -1473,7 +1481,7 @@ public int serializedSize(int version)
private int payloadSizeSG10 = -1;
private int payloadSizeDSE68 = -1;

private int payloadSize(int version)
protected int payloadSize(int version)
{
switch (version)
{
Expand Down Expand Up @@ -1506,6 +1514,25 @@ private int payloadSize(int version)
}
}

protected void maybeCachePayloadSize(Message other)
{
if (payload == other.payload)
{
if (other.payloadSize30 > 0)
payloadSize30 = other.payloadSize30;
if (other.payloadSize3014 > 0)
payloadSize3014 = other.payloadSize3014;
if (other.payloadSize40 > 0)
payloadSize40 = other.payloadSize40;
if (other.payloadSize41 > 0)
payloadSize41 = other.payloadSize41;
if (other.payloadSizeSG10 > 0)
payloadSizeSG10 = other.payloadSizeSG10;
if (other.payloadSizeDSE68 > 0)
payloadSizeDSE68 = other.payloadSizeDSE68;
}
}

static class OversizedMessageException extends RuntimeException
{
OversizedMessageException(int size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testCounterMutationCallback()
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE)
.withParam(TRACE_SESSION, UUID.randomUUID())
.build();
int responseSize = msg.emptyResponseBuilder().currentSize(MessagingService.current_version);
int responseSize = msg.emptyResponseBuilder().currentPayloadSize(MessagingService.current_version);

RequestSensors requestSensors = new RequestSensors();
RequestTracker.instance.set(requestSensors);
Expand Down

1 comment on commit a14504f

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 2 NEW test failure(s) in 4 builds., Build 4: ran 13370 tests with 2 failures and 109 skipped.
Butler analysis done on ds-cassandra-pr-gate/cc-ucu-intenode-sb vs last 16 runs of ds-cassandra-build-nightly/ds-trunk.
org.apache.cassandra.distributed.test.UnableToParseClientMessageTest.badHeader[version=5/v5]: test failed in the recent build. No failures on upstream;
branch story: [F+++] vs upstream: [+++++++++++++++]; [NEW]
org.apache.cassandra.distributed.test.SSTableIdGenerationTest.testSnapshot: test looks flaky. No failures on upstream;
branch story: [+F++] vs upstream: [+++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.