Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

[WIP] add partitioner keyBySeriesWithTags #1146

Closed
wants to merge 12 commits into from
Closed

Conversation

replay
Copy link
Contributor

@replay replay commented Nov 22, 2018

This is for: #1123

Still doing some testing

@replay replay changed the title [WIP] add partitioner bySeriesWithTags [WIP] add partitioner keyBySeriesWithTags Nov 22, 2018
@replay
Copy link
Contributor Author

replay commented Nov 22, 2018

I've done testing like this:

  • Modified the docker env docker-cluster to add a tsdb-gw to it, enabled tags and used the keyBySeriesWithTags partitioner
  • Brought it up
  • Fed it with fakemetrics data with tags
  • Browsed the data in grafana, looked OK (name auto complete, tag auto complete, querying of data by name, querying by tag)
  • Changed the fakemetrics settings to feed it metrics without tags
  • Browsed the data in grafana again, still looked as expected

After discussion with Dieter I'm now going to add jump consistent hashing to the new partitioner, because that way we should hopefully be able to reach a more equal distribution among the partitions. Going to test that too once it's done

@replay
Copy link
Contributor Author

replay commented Nov 22, 2018

@Dieterbe I've been reading more through how the partitioning code works:

Currently our "Partitioner" in MT actually doesn't do the partitioning, it only generates a key which is then used as input for Sarama's default partitioner (HashPartitioner): https://github.com/Shopify/sarama/blob/master/partitioner.go#L162

The Sarama default partitioner is using FNV32 for hashing. We could probably run the key through jump before we pass it to the Sarama HashPartitioner, but I'm not sure if that's really going to help at all because I didn't read through the implementations of FNV32 and Jump and I don't know how exactly the distribution will be affected if we combine the two. I think If we'd want to do this properly we should build a Jump partitioner which we then pass to Sarama via its configuration struct, but that's out of the scope of this PR: https://github.com/Shopify/sarama/blob/master/config.go#L126

@woodsaj
Copy link
Member

woodsaj commented Nov 22, 2018

partitioner.NewKafka() is there to provide a partitioner for tools that dont use kafka. eg mt-index-migrate

We could use our partitioner everywhere by simply changing the the sarama config to use the ManualPartitioner and set the partition ID in the kafka message. this is what notifierKafka does as it already knows the partition ID to use.
https://github.com/grafana/metrictank/blob/master/mdata/notifierKafka/cfg.go#L80
https://github.com/grafana/metrictank/blob/master/mdata/notifierKafka/notifierKafka.go#L237

We actually want to do this anyway as it means we then dont need to include the key field in the kafka messages, which will save a lot of bytes.

@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 22, 2018

Before switching to jumphash let's see if we observe uneven distributions. Is easy to check both in docker compose and prod instances or our ops instance via mt-index-cat.

@replay
Copy link
Contributor Author

replay commented Nov 22, 2018

If we change the partitioner to ManualPartitioner then we'll need to make sure there's not going to be any problems when TSDB gets updated, because metrics would get different partitions assigned unless we reimplement Sarama's default partitioner. Probably that should be fine, but it will require more testing.

@replay
Copy link
Contributor Author

replay commented Nov 22, 2018

I just found something interesting when I wanted to test the distribution:

Ran fakemetrics, with tags, generating 10k series, feeding into a tsdb with keyBySeriesWithTags.
Then I wanted to see the distribution:

/ #  mt-index-cat  -from 60min cass -hosts cassandra:9042 '{{.Partition }}\n' | sort | uniq -c
2018-11-22 20:18:36.251 [INFO] cassandra-idx: ensuring that keyspace metrictank exist.
2018-11-22 20:18:36.251 [INFO] cassandra-idx: ensuring that table metric_idx exist.
     83 0
   3104 1
   1851 3
   1851 5
   3194 7

Note that the 4 counts which are assigned to odd partition IDs add up to exactly 10k, so I'm assuming those are the ones generated by fakemetrics.

So I was confused about why all the partitions assigned to the fakemetrics metrics are odd numbers and checked how the distribution looks in our ops cluster. There the distribution looks better:

/tmp #  mt-index-cat  -from 60min cass -hosts cassandra:9042 -keyspace mt_12574_ops '{{.Partition }}\n'  > /tmp/index
2018-11-22 20:32:13.782 [INFO] cassandra-idx: ensuring that keyspace mt_12574_ops exist.
2018-11-22 20:32:13.783 [INFO] cassandra-idx: ensuring that table metric_idx exist.
/tmp # cat /tmp/index | sort | uniq -c
  29719 0
  29354 1
  29306 10
  29723 11
  29622 12
  29539 13
  29318 14
  29069 15
  29419 16
  29328 17
  29441 18
  29442 19
  29295 2
  29496 20
  29387 21
  29431 22
  29605 23
  28278 24
  28142 25
  28314 26
  41760 27
  28230 28
  28490 29
  29620 3
  28439 30
  28272 31
  29318 4
  29485 5
  29431 6
  29092 7
  29641 8
  29023 9

I still can't explain why all the fakemetrics-generated metrics end up with odd partition IDs, but I can reproduce it. This is doing the same like what sarama's default partitioner does and it results in the same numbers (map[+3:+1851 +1:+3104 +7:+3194 +5:+1851]):
https://play.golang.org/p/imHecmzobwx

When playing with the metric names in that play.golang link it's obvious that patterns emerge, f.e. if the tags get removed and only this metric format is used some.id.of.a.metric.%d then the distribution is better:

map[+6:+1258 +3:+1263 +4:+1248 +1:+1247 +2:+1242 +7:+1252 +0:+1252 +5:+1238]

I don't know why this is happening, but it looks like under certain circumstances the distribution of the default partitioner is not great

@woodsaj
Copy link
Member

woodsaj commented Nov 23, 2018

we'll need to make sure there's not going to be any problems when TSDB gets updated, because metrics would get different partitions assigned unless we reimplement Sarama's default partitioner.

That is exactly what https://github.com/grafana/metrictank/blob/master/cluster/partitioner/partitioner.go#L19 gives you.

github.com/grafana/metrictank/cluster/partitioner.Kafka actually uses the sarama code directly.
https://github.com/grafana/metrictank/blob/master/cluster/partitioner/partitioner.go#L28

@replay
Copy link
Contributor Author

replay commented Nov 23, 2018

I'm doing another round of tests. There are mainly two things that I need to check for:

  1. We want a good distribution of metrics among the partitions when the new partitioning scheme bySeriesWithTags is chosen
  2. If another partitioning scheme has been chosen, we want to be sure that the partition assignments of metrics stay the same as before this PR

Regarding point 1) I generated 10k fakemetrics again and pushed them into tsdb-gw with the bySeriesWithTags partitioning scheme. The distribution looks very good:

/ #  mt-index-cat  -from 60min cass -hosts cassandra:9042 '{{.Partition }}\n' | sort | uniq -c
2018-11-23 16:20:18.676 [INFO] cassandra-idx: ensuring that keyspace metrictank exist.
2018-11-23 16:20:18.677 [INFO] cassandra-idx: ensuring that table metric_idx exist.
   1326 0
   1241 1
   1241 2
   1234 3
   1278 4
   1243 5
   1286 6
   1234 7

Regarding point 2) I generated the same 10k fakemetrics 4 times, twice with the master branch partitioning byOrg/bySeries, and twice with my working branch and byOrg/bySeries. Each time I dumped the index into a file in the format <name> <partition>. That way I can verify that the resulting partition assignments are the same:

mst@mst-nb1:~/index_distribution$ wc -l *
  10083 index_master_byOrg
  10083 index_master_bySeries
  10083 index_working_branch_byOrg
  10083 index_working_branch_bySeries
  40332 total
mst@mst-nb1:~/index_distribution$ for file in *; do md5=$(cat $file | sort | md5sum); echo "$md5 $file"; done | sort
0ec1ef8f0d021b968445dbc152f30e1f  - index_master_byOrg
0ec1ef8f0d021b968445dbc152f30e1f  - index_working_branch_byOrg
8003a2490e11f0cb7aee53d29fa663a7  - index_master_bySeries
8003a2490e11f0cb7aee53d29fa663a7  - index_working_branch_bySeries

@replay replay changed the title [WIP] add partitioner keyBySeriesWithTags add partitioner keyBySeriesWithTags Nov 23, 2018
@replay
Copy link
Contributor Author

replay commented Nov 23, 2018

If this PR gets accepted I'll create a corresponding PR to github.com/raintank/schema before merging, so we can keep everything in sync.

@@ -81,6 +85,23 @@ func (m *MetricData) KeyBySeries(b []byte) []byte {
return b
}

func (m *MetricData) KeyBySeriesWithTags(b []byte) []byte {
nameWithTagsBuffer := bytes.NewBuffer(b[:0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it shouldn't be up to this function to modify pre-existing data in the slice.
if you want to discard previous data (we generally do), it's up to the caller to reset len back to 0.
this function should just append data behind pre-existing data, like the other functions (above) do.

for some examples of this pratice in the standard library, see

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just checked all the call sites, they already honor this pattern (by just passing in nil slices)

"testing"
)

func TestNameWithTagFromMetricData(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we are probably doing a bit too much (in general, not just here) testing of particular implementations, rather than behavior.
that said i don't have a concrete suggestion, so ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually my reason to add tests like this one is just to verify that my implementation actually works. It wouldn't necessarily need to be committed, I could also just not commit those, but then somebody who wants to modify the implementation would have to test it again independently.

func (m *MetricDefinition) KeyBySeriesWithTags(b []byte) []byte {
return []byte(m.NameWithTags())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to follow the same style as KeyBySeries and use the given input slice.
note that b = append(b, []byte(foo)...) has recently been optimized. It doesn't allocate anymore:
https://go-review.googlesource.com/c/go/+/109517

here's the difference in action:

~/g/s/g/D/s/append ❯❯❯ cat append_test.go
package main

import "testing"

var a []byte
var nameWithTags = "hello"

func NameWithTags() string {
	return nameWithTags
}

func Benchmark1(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a = []byte(NameWithTags())
	}
	b.Log(len(a))
}

func Benchmark2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a = a[:0]
		a = append(a, []byte(NameWithTags())...)
	}
	b.Log(len(a))
}
~/g/s/g/D/s/append ❯❯❯ go test -bench . -benchmem -v .
goos: linux
goarch: amd64
pkg: github.com/Dieterbe/sandbox/append
Benchmark1-8   	50000000	        29.1 ns/op	       8 B/op	       1 allocs/op
--- BENCH: Benchmark1-8
    append_test.go:16: 5
    append_test.go:16: 5
    append_test.go:16: 5
    append_test.go:16: 5
    append_test.go:16: 5
Benchmark2-8   	100000000	        13.1 ns/op	       0 B/op	       0 allocs/op
--- BENCH: Benchmark2-8
    append_test.go:24: 5
    append_test.go:24: 5
    append_test.go:24: 5
    append_test.go:24: 5
    append_test.go:24: 5
PASS
ok  	github.com/Dieterbe/sandbox/append	2.811s

of course, since currently our callers don't actually recycle slices, we won't see a real benefit, but i rather apply the best practice here now so we can reap the benefit when we do start recycling.

@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 29, 2018

it's worth pondering whether it makes sense to just use the Id field as the key.
after all the Id is generated from name + tags + unit, mtype and interval, and is (almost?) always already set on MetricData and MetricDefiniton values, so we wouldn't have the code as well as runtime overhead of computing something similar when we want to hash.

so the question boils down to how useful is it to keep series hashed to the same partition when only their interval / mtype / unit changes? I think it may be useful in the future, if we ever get to the point of introducing optimizations (e.g. merging series together when the unit changes) which would benefit from the data locality. and also we need them together for pruning to work properly.

case "bySeriesWithTags":
return &Kafka{
PartitionBy: partitionBy,
Partitioner: &jumpPartitioner{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please stop using pointers when there's no need for them.

@woodsaj
Copy link
Member

woodsaj commented Nov 29, 2018

If the id field is used, changing the interval of a series would no longer work properly as the metricDefs for the single series name could be on different mt instances. I think queries might still work, but the loading/pruning would break as we need to know all of the metricDefs for a series name to know if a single metricDef is too old or not.

// so we keep adding slices of 8 bytes to the jump key as uint64 values
// if the result wraps around the uin64 boundary it doesn't matter because
// this only needs to be fast and consistent
key = append(key, make([]byte, 8-len(key)%8)...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably do this logic without heaving to allocate here.
I think i saw something similar in the bstream code. i'll give it a shot

@Dieterbe Dieterbe changed the title add partitioner keyBySeriesWithTags [WIP] add partitioner keyBySeriesWithTags Dec 27, 2018
Dieterbe added a commit to raintank/schema that referenced this pull request Feb 11, 2019
This reverts commit 2c7af35.
This was too hastily merged.
See grafana/metrictank#1146 for continuation
@Dieterbe
Copy link
Contributor

closing in favor of #1282

@Dieterbe Dieterbe closed this Apr 12, 2019
@Dieterbe Dieterbe deleted the partition_with_tags branch December 19, 2019 15:45
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants