Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework msgpack buffers #2300

Merged
merged 5 commits into from
Jan 28, 2021
Merged

rework msgpack buffers #2300

merged 5 commits into from
Jan 28, 2021

Conversation

richardstartin
Copy link
Member

@richardstartin richardstartin commented Jan 14, 2021

The main change here is that buffering is abstracted away from the format writer(s?) so that all the format writer needs to be aware of is the format. There is a new abstraction called StreamingBuffer with two flavours

  • FlushingBuffer - fixed size, flushes when full, this is how we have been sending traces for months. Traces are awkward because they are large and the format is length prefixed, which means we need to tell the agent how many traces to expect before sending the traces, which forbids streaming.
  • GrowableBuffer - resizes its buffer when necessary, should only be used when there is an implicit limit on its growth in practice, e.g. for the serialized string table in the v0.5 trace format.

@richardstartin richardstartin added the tag: do not merge Do not merge changes label Jan 14, 2021
@@ -25,4 +29,24 @@ int traceCount() {
abstract void writeTo(WritableByteChannel channel) throws IOException;

abstract RequestBody toRequest();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bleeding of msgpack into here seems a reasonable tradeoff for not needing to store the header in the buffer itself, which makes MsgPackWriter more generic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to have these as static methods on a msgpack specific class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this effectively is a msgpack specific class and will be for the foreseeable future

return allocationFreeUTF8Encode(s);
}

private int allocationFreeUTF8Encode(CharSequence s) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UTF8BytesString obviated the need for these sorts of shenanigans, which are actually a lot slower than calling getBytes() because every charAt() and every put(byte) is bounds-checked. The more we use UTF8BytesString, the less we'll allocate in serialisation.

@@ -1,150 +1,3 @@
package datadog.trace.core.serialization;

import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This existed to share flushing logic between the abandoned protobuf writer and the msgpack writer. We won't be trying protobuf again, and the flushing has been moved into the buffer implementation.

@@ -391,9 +392,8 @@ class DDAgentApiTest extends DDSpecification {
}

Payload prepareTraces(String agentVersion, List<List<DDSpan>> traces) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of thing is in too many places and made this change too difficult. I've patched up all of these, but this needs refactoring.

@@ -200,7 +201,7 @@ class DDAgentWriterCombinedTest extends DDSpecification {
when:
def mapper = agentVersion.equals("v0.5/traces") ? new TraceMapperV0_5() : new TraceMapperV0_4()
int traceSize = calculateSize(minimalTrace, mapper)
int maxedPayloadTraceCount = ((int) ((mapper.messageBufferSize() - 5) / traceSize))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 was the space reserved for a header.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no longer needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there is no header in the buffer any more, it's added when writing the bytes out to the network

@@ -204,6 +205,13 @@ class TraceMapperV04PayloadTest extends DDSpecification {

@Override
int write(ByteBuffer src) {
if (captured.remaining() < src.remaining()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

letting this grow allows the test to explore more cases, rather than rejecting the output, but the increase in heap usage needs to be monitored.

@@ -32,49 +33,6 @@ import static org.msgpack.core.MessageFormat.UINT8

class TraceMapperV05PayloadTest extends DDSpecification {


def "dictionary overflow causes a flush"() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dictionary doesn't flush any more. Keeping two isolated callbacks triggered when fixed capacities were reached in sync made this code extremely hard to reason about.

@@ -98,7 +56,9 @@ class TraceMapperV05PayloadTest extends DDSpecification {
UUID.randomUUID().toString(),
false))
int traceSize = calculateSize(repeatedTrace)
int tracesRequiredToOverflowBody = (traceMapper.messageBufferSize() + traceSize - 1) / traceSize
// 30KB body
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use MBs of data to check there's a flush when the message buffer is full

@@ -53,6 +53,12 @@ public void transferTo(ByteBuffer buffer) {
buffer.put(utf8Bytes);
}

/** Writes the UTF8 encoding of the wrapped {@code String}. */
public byte[] getUtf8Bytes() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally a naked reference to this byte[] should be avoided

@richardstartin richardstartin force-pushed the rgs/streaming-buffers branch 2 times, most recently from 4273061 to 19afa9c Compare January 15, 2021 00:06
@richardstartin richardstartin marked this pull request as ready for review January 27, 2021 10:29
@richardstartin richardstartin requested a review from a team as a code owner January 27, 2021 10:29
@richardstartin richardstartin changed the title streaming buffers rework msgpack buffers Jan 27, 2021
@@ -25,4 +29,24 @@ int traceCount() {
abstract void writeTo(WritableByteChannel channel) throws IOException;

abstract RequestBody toRequest();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to have these as static methods on a msgpack specific class?


public SerializingMetricWriter(WellKnownTags wellKnownTags, Sink sink) {
this.wellKnownTags = wellKnownTags;
this.writer = new MsgPackWriter(sink, ByteBuffer.allocate(1 << 20), EnumSet.of(SINGLE_MESSAGE));
this.buffer = new GrowableBuffer(512 << 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the doc for GrowableBuffer you say only use if bounded... what is it that makes metrics limited in size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the metrics points are stored in a bounded LRU cache

@@ -200,7 +201,7 @@ class DDAgentWriterCombinedTest extends DDSpecification {
when:
def mapper = agentVersion.equals("v0.5/traces") ? new TraceMapperV0_5() : new TraceMapperV0_4()
int traceSize = calculateSize(minimalTrace, mapper)
int maxedPayloadTraceCount = ((int) ((mapper.messageBufferSize() - 5) / traceSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no longer needed?

@richardstartin richardstartin merged commit 2dbce2f into master Jan 28, 2021
@richardstartin richardstartin deleted the rgs/streaming-buffers branch January 28, 2021 16:55
@github-actions github-actions bot added this to the 0.73.0 milestone Jan 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants