Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Support batching of MetricPoints #2011

Merged
merged 10 commits into from
Feb 9, 2022
Merged

Conversation

shanson7
Copy link
Collaborator

@shanson7 shanson7 commented Oct 27, 2021

Resolves #1002

In our staging set up, this batching reduced allocation rate cluster wide from 620MB/s to 270MB/s. It also reduced CPU usage by ~8%. There was also a nominal reduction in RSS usage (probably due to fewer allocations) but is <4% and difficult to say is statistically significant.

Another nice benefit is that max start up ingest rate went from ~1 million to ~2 million dp/s.

Checklist

  • Update docs/inputs.md when format is finalized.

shanson7 and others added 3 commits October 22, 2021 11:59
Co-authored-by: djedruszczak <93140157+djedruszczak@users.noreply.github.com>
@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 19, 2021

Cool that you're tackling this. Nice results too.

I wonder if this will throw off anything around lag measurement/priority calculation.
Specifically:

  1. does the "cost" of 1 offset need to be the same for all peers? E.g. is clustering still fair/well-behaved if some peers consume large batches and others smaller ones, or single points? Or do we expressly forbid having such different types of messages in different partitions? (that would be fine with me)
  2. Do we assume anywhere that 1 offset = 1 metric? I think we do.

Also, docs/inputs.md needs to be updated. There's a kafka section and "future formats" at the bottom.

t.Fatalf("%s", iter.Err().Error())
}

if !reflect.DeepEqual(mp, outPoints) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this test and the below one, i suggest making a deep copy of the input slice before passing it into any code under test.
That way, when we compare, we know we compare against a slice that cannot have been mistakenly modified by the code under test

@@ -319,6 +319,18 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset

func (k *KafkaMdm) handleMsg(data []byte, partition int32) {
format, isPointMsg := msg.IsPointMsg(data)
if isPointMsg && format == msg.FormatMetricPointArray {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make it such that batches can only contain MetricPointMsg?
At least, that's what we're implying here by how we use isPointMsg.

We already use the 2nd byte to describe the type of metrics embedded within a batch. I'ld like us to be able to use types that are not MetricPointMsg. for example we could have samples that are MetricPoint-like, but don't have a timestamp field, and instead use a shared timestamp for the entire batch. Obviously such a type would not make sense outside of a batch context and this shoudn't be called MetricPoint.

Perhaps the different types of batches can all be expressed as "first order types", ie scrap the 2nd byte. 256 options ought to be enough to describe all types (including all batched versions)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really have all 256 bytes though. The MetricData is msgp format so will start with an arbitrary byte (not sure if it's just one without examining msgp closer). The second byte gives us a little more confidence that it's not a fluke overlap as we have limited value combinations that are valid.

I'ld like us to be able to use types that are not MetricPointMsg. for example we could have samples that are MetricPoint-like, but don't have a timestamp field, and instead use a shared timestamp for the entire batch

This would, of course, need a custom iterator for each format anyway. It seems like that concept and this implementation aren't mutually exclusive. Maybe it would be cleaner as a decoupled implementation (via an iterator interface) that does something like:

type MetricBatchIterator interface {
...
}
if isPointMsg {
  ...
} else if isMsgBatch {
  iter := getIterator(payload) 
  // iterate over iterator
} else {
 // decode as MetricData
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe I misunderstood this comment and the idea is that the format for a batch should be more fluid and allow multiple formats in a single batch? This is an interesting idea as it would allow embedding a MetricData in batch. Currently our producer needs to flush the MetricPointArray whenever it needs to send a MetricData formatted message and this is actually limiting our maximum batch size in practice (averaging 50-60 points per batch).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we have in master today, is for any incoming message from the mdm topic, it has the first byte available to encode a Format, correct? So today, we use 5 out of 256 possible values - See https://github.com/grafana/metrictank/blob/f1e39bc1cfcbf3977d6a0fd58e97a2925eccb2f5/schema/msg/format.go

My suggestion is to use this same first byte, and only this first byte to use it for the batch formats we want to support.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we have in master today, is for any incoming message from the mdm topic, it has the first byte available to encode a Format, correct?

Unfortunately, no. See kafkamdm.handleMsg. It doesn't seem to use a format byte for a plain old MetricData and seems to just call UnmarshalMsg directly. This means that there are some Msgp bytes that could come in and conflict with predefined format types. I don't even see where the MetricDataArray{Json,Msgp} format types are used.

Copy link
Contributor

@Dieterbe Dieterbe Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, no. See kafkamdm.handleMsg. It doesn't seem to use a format byte for a plain old MetricData and seems to just call UnmarshalMsg directly. This means that there are some Msgp bytes that could come in and conflict with predefined format types.

OMG. this is ugly. (and probably my doing).
It's exactly as you say.
I have reviewed the code for all Grafana's relevant Go tools (tsdb-gw, mt-gateway, and carbon-relay) and confirmed they generate kafka messages that are messagepack encoded MetricData payloads without preceeding version byte. (tsdb-gw and mt-gateway do support metricpoint, in which case they add the version bytes). Luckily, in metric_gen.go we can easily confirm the first generated byte is 0x89, representing the start of a 9-element map in messagepack encoding (i have also tested this with a small test program to be sure)
I presume 3rd party tools such as your publishers behave the same. Could you confirm? (just in case there's an alternative way to encode the same data)

This leaves us 2 options:

  1. make 0x89 the official format code for the MetricData format, and have some special exception in decoding/encoding steps to read from/write to the full buffer, knowing we should always use 0x89 as the first byte. This will work because I doubt the messagepack format will ever introduce a breaking change here, and if they do, we can always use a different format code as needed.
  2. switch to a real, proper format code in 3 steps :
  • add a new format to format.go for MetricData and the proper codepath to recognize it in the first byte, and decode subsequent content as MetricData. roll out upgraded metrictank clusters.
  • update publishers to publish any MetricData with a real preceeding version byte. roll this out.
  • remove the legacy metrictank code that assumes MetricData for unknown formats. roll out upgraded metrictank clusters

I suggest option 1.

I don't even see where the MetricDataArray{Json,Msgp} format types are used.

looks like we have some legacy/unused code to publish such messages, but i don't think metrictank still receives/can handle such messages. I can do some more spelunking into this later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with making 0x89 the format value for MetricData but it seems to me that the decode step supports eliding elements, so users may be skipping (for example) Tags, Unit or maybe even more interesting skipping Value as an ill-advised optimization for zero values! This would result in a map of size 6-9 and it currently decode with default values filled in. I tested this via a quick and dirty test case. Perhaps we should block off the 0x85 -> 0x90 for MetricData just to be safe? I doubt we will get to that point any time soon with formats. If we do, we can always reserve a byte for Extended formats.

Anyway, assuming that we solve that problem, I'm interested in what batch format(s) we want. I'm already quite invested in the one we have (we are all rolled out to prod and it works well). As I said, the idea of supporting more compact batches (e.g. identical timestamps, shared OrgId, etc) is not mutually exclusive with the proposed format. IMO, the single byte cost to be able to support 2 different types in one is not a big deal.

I am, however, more interested in how we can make adding new formats less intrusive to the code that is doing the decoding. Above I proposed an Iterator interface that would allow multiple batched formats to be supported together, but maybe that should be extended to include the existing single message existing formats (MetricPoint, MetricData, IndexControlMsg) and possibly even a format that allows any or all of these to be in a single batch.

schema/msg/msg.go Outdated Show resolved Hide resolved
@shanson7
Copy link
Collaborator Author

I wonder if this will throw off anything around lag measurement/priority calculation. Specifically:

  1. does the "cost" of 1 offset need to be the same for all peers? E.g. is clustering still fair/well-behaved if some peers consume large batches and others smaller ones, or single points? Or do we expressly forbid having such different types of messages in different partitions? (that would be fine with me)
  2. Do we assume anywhere that 1 offset = 1 metric? I think we do.

Yeah, probably. I have a TODO to start recalculating the lag priority using the kafka message timestamps. I think that should ultimately be more accurate. Unfortunately, it's not a trivial change. I think in most places we are treating an offset as an offset. IIRC, the lag is computed using the offset growth against the offset lag, so as long as the number of messages/sec is roughly consistent the priority should remain accurate. This is what I saw when doing Metrictank restarts after making this change:
mt_priority

Also, docs/inputs.md needs to be updated. There's a kafka section and "future formats" at the bottom.

Will do

shanson7 and others added 2 commits November 24, 2021 13:26
Co-authored-by: Dieter Plaetinck <dieter@plaetinck.be>
@Dieterbe
Copy link
Contributor

I think in most places we are treating an offset as an offset. IIRC, the lag is computed using the offset growth against the offset lag, so as long as the number of messages/sec is roughly consistent the priority should remain accurate. This is what I saw when doing Metrictank restarts after making this change

So with a consistent workload across all peers, we know that relative between peers, it should work the same way. But there's a couple places where we set thresholds based on absolute priority numbers. E.g. the max-priority config option.

@shanson7
Copy link
Collaborator Author

I think in most places we are treating an offset as an offset. IIRC, the lag is computed using the offset growth against the offset lag, so as long as the number of messages/sec is roughly consistent the priority should remain accurate. This is what I saw when doing Metrictank restarts after making this change

So with a consistent workload across all peers, we know that relative between peers, it should work the same way. But there's a couple places where we set thresholds based on absolute priority numbers. E.g. the max-priority config option.

Yes, but the priority is already an (attempted) normalization. It's a rough calculation using the rate of incoming kafka messages and lag to estimate how many seconds behind the instance is.

@Dieterbe
Copy link
Contributor

Dieterbe commented Feb 8, 2022

Yes, but the priority is already an (attempted) normalization. It's a rough calculation using the rate of incoming kafka messages and lag to estimate how many seconds behind the instance is.

Good point. That is correct. You've been running it in prod for a while now, so I stopped worrying about an in-depth review wrt lag/priority/ready state etc.

I gave your PR another look. I think your 2-level-deep versioning scheme is fine.
Sounds like we agree that for the MetricData mess, the minimal amount of effort is fine (e.g. documenting it) but that is unrelated to this PR. I will create a new ticket for that. (edit: #2028)

As usual, nice work. Just the docs/inputs.md update left then. Perhaps also something like this:

diff --git a/schema/msg/format.go b/schema/msg/format.go
index 499064b0..69e3fba8 100644
--- a/schema/msg/format.go
+++ b/schema/msg/format.go
@@ -4,6 +4,24 @@ package msg
 
 type Format uint8
 
+// incoming messages in the mdm topic come in 3 flavors, and are mostly defined by the below constants
+
+// 1) MetricPoint observations and array variants
+// <FormatMetricPointWithoutOrg><28 more bytes>
+// <FormatMetricPoint><32 more bytes>
+// <FormatMetricPointAray><FormatMetricPoint|FormatMetricPointWithoutOrg><28 or more bytes>
+//
+// 2) index control messages
+//
+// <FormatIndexControlMessage><any other bytes>
+
+// 3) MetricData
+//
+// anything that has a first byte not matching any of the predefined constants.
+// in practice, the first byte of a MetricData will be 0x89,
+// though experts writing their own publishers and encoders may have found ways to
+// use bytes 0x85 through 0x90 as first byte, though this seems unlikely and discouraged
+// see https://github.com/grafana/metrictank/issues/2028
+
 // identifier of message format
 const (
        FormatMetricDataArrayJson Format = iota

Copy link
Contributor

@Dieterbe Dieterbe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting new feature ! 💯

@Dieterbe Dieterbe merged commit 70a479a into grafana:master Feb 9, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Metricpoint batch format
2 participants