Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

partitionBy bySeriesWithTags (aka "shard by tag") #1282

Closed
wants to merge 18 commits into from
Closed

Conversation

Dieterbe
Copy link
Contributor

@Dieterbe Dieterbe commented Apr 12, 2019

fix #1123
This PR is a reworked version of #1146
It intends to be a more well-founded approach to implementing bySeriesWithTags.
Making changes to our sharding scheme is not a trivial operation (lots of overhead to deploy), so I want to make sure we get it right.
This PR tells a story: If you look at the commits one by one, you'll see I've added a bunch of different possible implementations for the actual sharding, and a testing framework to validate the different implementations.
Once we agree that the tests are conclusive and we pick a winner, we can simply remove all other implementations.

Criteria (somewhat in order of importance):

  1. distribution performance (metrics must be as evenly distributed across partitions as possible)
  2. maturity of library
  3. computational performance

In particular, I want to answer questions such as:

  1. does jump hashing bring value? (it has a reputation for resulting in very even distributions). Is the Sarama hasher good enough? (note: it uses 32bit FNV-1a, whereas JumpPartitionerFnv uses 64bit FNV-1a)
  2. as jump needs a uint64 input, which pre-hashing step do we use to feed input into it?
  3. how correct is @dgryski in the conversation below (he is a hashing algorithm wizard). there's a plethora of hashing functions that all seem like viable candidates (siphash, metro, xxhash, fnv, ...)
Dieter Plaetinck10:42 PM
hash master @dgryski, is there a best practice for converting arbitrary length []byte slices to uint64's for feeding into go-jump? the byte slices are string id's (metric names)
Damian Gryski10:43 PM
@dieter_be any fast hash function; go-metro or what-have-you
Dieter Plaetinck10:46 PM
@dgryski so not some kind of hand written loop that goes over all byte values and adds them together or something? also, which library would you recommend for a production app?
Damian Gryski10:47 PM
@dieter_be pick a fast one: https://github.com/dgryski/go-metro

As far as analysis goes, i tested with fakemetrics and 4 datasets that I pulled from some of our HM instances - with permission.
Of course I can't share the contents or name names here, though ops is our internal monitoring instance.

-rw-r--r--  1 dieter dieter 1.5G Apr 12 15:49 fng.txt
-rw-r--r--  1 dieter dieter 860M Nov 30 23:47 id.txt
-rw-r--r--  1 dieter dieter 113M Nov 30 22:14 ops.txt
-rw-r--r--  1 dieter dieter 774K Nov 30 23:52 rtm.txt

Now, there's obviously lots of output, some more relevant than others (e.g. no one should run large amounts of metrics on a small number of partitions). I have mentioned some tips in cluster/partitioner/partitioner_test.go
So I have just grepped for the dataset-partitioncount combinations that seem most relevant (r means grep)

~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/32.*fng' results.txt

              sarama/32          fng   3024891 -> 0.002749
          jump-mauro/32          fng   3024891 -> 0.003652
            jump-fnv/32          fng   3024891 -> 0.003544
          jump-metro/32          fng   3024891 -> 0.003134
            jump-sip/32          fng   3024891 -> 0.003200
         jump-xxhash/32          fng   3024891 -> 0.003050
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/128.*fng' results.txt

              sarama/128         fng   3024891 -> 0.005799
          jump-mauro/128         fng   3024891 -> 0.006008
            jump-fnv/128         fng   3024891 -> 0.007099
          jump-metro/128         fng   3024891 -> 0.006588
            jump-sip/128         fng   3024891 -> 0.006446
         jump-xxhash/128         fng   3024891 -> 0.006058
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/32.*ops' results.txt

              sarama/32          ops   1009213 -> 0.007097
          jump-mauro/32          ops   1009213 -> 0.006237
            jump-fnv/32          ops   1009213 -> 0.006406
          jump-metro/32          ops   1009213 -> 0.006473
            jump-sip/32          ops   1009213 -> 0.004689
         jump-xxhash/32          ops   1009213 -> 0.006385
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/128.*ops' results.txt

              sarama/128         ops   1009213 -> 0.012501
          jump-mauro/128         ops   1009213 -> 0.012525
            jump-fnv/128         ops   1009213 -> 0.012115
          jump-metro/128         ops   1009213 -> 0.011460
            jump-sip/128         ops   1009213 -> 0.012769
         jump-xxhash/128         ops   1009213 -> 0.013628
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/32.*id' results.txt

              sarama/32           id    687540 -> 0.007669
          jump-mauro/32           id    687540 -> 0.006550
            jump-fnv/32           id    687540 -> 0.007153
          jump-metro/32           id    687540 -> 0.008065
            jump-sip/32           id    687540 -> 0.006398
         jump-xxhash/32           id    687540 -> 0.005489
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/128.*id' results.txt

              sarama/128          id    687540 -> 0.013978
          jump-mauro/128          id    687540 -> 0.014216
            jump-fnv/128          id    687540 -> 0.012401
          jump-metro/128          id    687540 -> 0.012911
            jump-sip/128          id    687540 -> 0.013363
         jump-xxhash/128          id    687540 -> 0.014056
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/8.*rtm' results.txt

              sarama/8           rtm      3424 -> 0.030103
          jump-mauro/8           rtm      3424 -> 0.054707
            jump-fnv/8           rtm      3424 -> 0.024084
          jump-metro/8           rtm      3424 -> 0.051627
            jump-sip/8           rtm      3424 -> 0.032121
         jump-xxhash/8           rtm      3424 -> 0.031434

original scaling is 25-50k up to 250k-500k per instance (per 8 shards)
though in prod we see 1-2M per 8 shards
so test targets should be, for
1 shard: 62.5k-250k
32 shards: 2-8M
128 shards: about 8-32M total

nameWithTagsBuffer.WriteString(t)
}

return nameWithTagsBuffer.Bytes()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robert-milan do you think we can make this better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a pool, and also call Grow on the buffer before writing to it, but we would just be guessing at the size. This could decrease allocations.

I haven't followed the entire code path, but it looks like we are always passing in nil for the b []byte, so that doesn't help us at all. I think a pool makes the most sense.

Copy link
Contributor Author

@Dieterbe Dieterbe Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, we could use a pool but that's a concern for the caller.
I should have been clearer, i specifically wonder:

  1. if you think it's silly to use a human friendly ascii separator like ; and not 0x00 or something . i don't see any downside to using something friendly. Though i will say since it's only a single char we should probably use WriteRune instead of WriteString
  2. is there anything there to take into account wrt the upcoming interning. To be precise, i don't want to be in a situation where the output is different once we support interning, or where keeping the output the same makes the implementation suboptimal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, as I have not reviewed the PR in great detail, but as I understand it this is only used for partitioning. My PR deals strictly with interning in the index. The only time I touch any MetricData is when I convert it into a MetricDefinition, so I don't think the two affect each other.

  1. I don't think it matters. Also, I would use WriteByte instead of WriteRune. Are we worried about needing an escape sequence (like when processing stream data) that we have to scan for or something? That would require a different answer.

  2. Since this only appears to operate on MetricData I don't think it will be an issue.

@Dieterbe
Copy link
Contributor Author

want to do a bit more analysis before drawing conclusions.

@dgryski
Copy link

dgryski commented Apr 12, 2019

An even distribution is important, as is the "peak-to-mean" ratio. That is, how wide your distribution is. What is the maximum number of elements that are mapped to a single shard vs. the mean. A small standard deviation will help with capacity planning.

As for choosing a hash function, metro and cespare's xxhash should both be sufficiently fast and give an equivalent distribution. Siphash will also give a good distribution but will be slower (assuming you're using siphash2-4). You'll get a speedup with siphash1-3 which should give the same distribution but will probably end up being slower than both metrohash and xxhash.

Edit: my siphash1-3 implementation: https://github.com/dgryski/go-sip13

@Dieterbe
Copy link
Contributor Author

I have added a "% diff between min and max".
The new results are in. I also removed the test for 1k metrics because that was basically useless.
interestingly, sarama's partitioner definitely has a worst case as can be seen here:

~/g/s/g/g/m/c/partitioner ❯❯❯ grep '32.*fake-true.*1000000\b'  results.txt
              sarama/32    fake-true   1000000 -> cov=1.109, diff=+Inf%
          jump-mauro/32    fake-true   1000000 -> cov=0.005, diff=2.21%
            jump-fnv/32    fake-true   1000000 -> cov=0.006, diff=2.25%
          jump-metro/32    fake-true   1000000 -> cov=0.005, diff=2.17%
            jump-sip/32    fake-true   1000000 -> cov=0.005, diff=2.40%
         jump-xxhash/32    fake-true   1000000 -> cov=0.004, diff=1.55%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '32.*fake-true.*10000000\b'  results.txt
              sarama/32    fake-true  10000000 -> cov=1.107, diff=+Inf%
          jump-mauro/32    fake-true  10000000 -> cov=0.001, diff=0.55%
            jump-fnv/32    fake-true  10000000 -> cov=0.002, diff=0.59%
          jump-metro/32    fake-true  10000000 -> cov=0.002, diff=0.66%
            jump-sip/32    fake-true  10000000 -> cov=0.002, diff=0.90%
         jump-xxhash/32    fake-true  10000000 -> cov=0.002, diff=0.75%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '128.*fake-true.*10000000\b'  results.txt
              sarama/128   fake-true  10000000 -> cov=1.107, diff=+Inf%
          jump-mauro/128   fake-true  10000000 -> cov=0.003, diff=1.46%
            jump-fnv/128   fake-true  10000000 -> cov=0.003, diff=1.70%
          jump-metro/128   fake-true  10000000 -> cov=0.004, diff=1.99%
            jump-sip/128   fake-true  10000000 -> cov=0.004, diff=1.93%
         jump-xxhash/128   fake-true  10000000 -> cov=0.003, diff=1.57%

all cases with an Inf percentage diff is the sarama fake-true case, in fact.

doing the previous tests again:

~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/32.*fng' results.txt
              sarama/32          fng   3024891 -> cov=0.003, diff=1.27%
          jump-mauro/32          fng   3024891 -> cov=0.004, diff=1.45%
            jump-fnv/32          fng   3024891 -> cov=0.004, diff=2.04%
          jump-metro/32          fng   3024891 -> cov=0.003, diff=1.16%
            jump-sip/32          fng   3024891 -> cov=0.003, diff=1.45%
         jump-xxhash/32          fng   3024891 -> cov=0.003, diff=1.55%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/128.*fng' results.txt
              sarama/128         fng   3024891 -> cov=0.006, diff=2.97%
          jump-mauro/128         fng   3024891 -> cov=0.006, diff=3.83%
            jump-fnv/128         fng   3024891 -> cov=0.007, diff=3.96%
          jump-metro/128         fng   3024891 -> cov=0.007, diff=3.66%
            jump-sip/128         fng   3024891 -> cov=0.006, diff=3.08%
         jump-xxhash/128         fng   3024891 -> cov=0.006, diff=3.12%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/32.*ops' results.txt
              sarama/32          ops   1009213 -> cov=0.007, diff=3.03%
          jump-mauro/32          ops   1009213 -> cov=0.006, diff=2.71%
            jump-fnv/32          ops   1009213 -> cov=0.006, diff=2.77%
          jump-metro/32          ops   1009213 -> cov=0.006, diff=3.18%
            jump-sip/32          ops   1009213 -> cov=0.005, diff=1.83%
         jump-xxhash/32          ops   1009213 -> cov=0.006, diff=2.63%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/128.*ops' results.txt
              sarama/128         ops   1009213 -> cov=0.013, diff=6.39%
          jump-mauro/128         ops   1009213 -> cov=0.013, diff=6.31%
            jump-fnv/128         ops   1009213 -> cov=0.012, diff=7.46%
          jump-metro/128         ops   1009213 -> cov=0.011, diff=5.93%
            jump-sip/128         ops   1009213 -> cov=0.013, diff=6.47%
         jump-xxhash/128         ops   1009213 -> cov=0.014, diff=7.96%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/32.*id' results.txt
              sarama/32           id    687540 -> cov=0.008, diff=2.68%
          jump-mauro/32           id    687540 -> cov=0.007, diff=3.18%
            jump-fnv/32           id    687540 -> cov=0.007, diff=3.11%
          jump-metro/32           id    687540 -> cov=0.008, diff=2.88%
            jump-sip/32           id    687540 -> cov=0.006, diff=2.68%
         jump-xxhash/32           id    687540 -> cov=0.005, diff=2.23%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/128.*id' results.txt
              sarama/128          id    687540 -> cov=0.014, diff=8.13%
          jump-mauro/128          id    687540 -> cov=0.014, diff=6.73%
            jump-fnv/128          id    687540 -> cov=0.012, diff=6.63%
          jump-metro/128          id    687540 -> cov=0.013, diff=8.26%
            jump-sip/128          id    687540 -> cov=0.013, diff=7.45%
         jump-xxhash/128          id    687540 -> cov=0.014, diff=7.52%
~/g/s/g/g/m/c/partitioner ❯❯❯ grep '/8.*rtm' results.txt
              sarama/8           rtm      3424 -> cov=0.030, diff=9.73%
          jump-mauro/8           rtm      3424 -> cov=0.055, diff=17.14%
            jump-fnv/8           rtm      3424 -> cov=0.024, diff=8.43%
          jump-metro/8           rtm      3424 -> cov=0.052, diff=17.41%
            jump-sip/8           rtm      3424 -> cov=0.032, diff=9.51%
         jump-xxhash/8           rtm      3424 -> cov=0.031, diff=9.69%

it's hard to find meaningfull differences between the implementations. especially between metro, sip and xxhash (as predicted)
I'm inclined to go with jump-xxhash because it's fast and seems like a popular library, also used by other projects such as influxdb

// partition by series: metrics are distrubted across all metrictank instances
// to allow horizontal scalability
return m.KeyBySeries(b), nil
func (p jumpPartitionerMauro) RequiresConsistency() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is RequiresConsistency() for? is this meant to be part of the "Partitioner" interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dieterbe
Copy link
Contributor Author

went with xxhash+jump. removed all others.

  • callers like tsdb-gw no longer have to set the key property, and can instead use ManualPartitioner and just obtain the partition themselves (actually they could already do this before by calling Kafka.Partition() but it used to instantiate a temporary sarama message for some reason.. but not anymore)

how does it look now?

"github.com/raintank/schema"
)

type Partitioner interface {
Partition(schema.PartitionedMetric, int32) (int32, error)
Partition(schema.PartitionedMetric, int32) int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this interface still used anywhere? same question in the tsdb code btw. can't see where it's used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was wondering the same thing and looked around a bit, and also didn't see it being used anywhere. i'll just remove it.


func (k *Kafka) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
key := k.GetPartitionKey(m, nil)
return (jumpPartitioner{}).PartitionKey(key, numPartitions)
Copy link
Contributor

@replay replay Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this is the only place where the jumpPartitioner is ever used. So what's still the benefit of making it satisfy the sarama.Partitioner interface? Due to the fact that it currently implements that interface we need to first instantiate it and then call a method on that instance, that seems unnecessary if it's never used as a sarama partitioner. so we might as well just have a standalone function like partitionKeyWithJumpHash([]byte, int32) int32 instead of instantiating this struct and then call the method on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yes. this and some other things here are rather confusing.
i'll see if i can refactor it.

@Dieterbe
Copy link
Contributor Author

@replay @woodsaj how does it look now?
i kept the interface, but now it is actually used.

Copy link
Member

@woodsaj woodsaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a huge mess. You cant add a new implementation of an interface "Partitioner" and then add comments to instruct users not to use certain methods because they will get unexpected and incorrect results. I dont even understand how you could think this is a good idea.

I am all for moving to xxhash and jump, but to do that, we need to completely refactor how we do partitioning.

I suggest we just get rid of metrictank/cluster/partitioner package and move everything into raintank/schema.

The "schema.PartitionedMetric" interface should just be updated to something like

type PartitionByMethod uint8
const (
	PartitionByOrg PartitionByMethod = iota
	PartitionBySeries
	PartitionBySeriesWithTags
)
type PartitionedMetric interface {
	Validate() error
	SetId()		SetId()
	GetPartitionID(method PartitionByMethod, partitions int32) int32
}

@Dieterbe
Copy link
Contributor Author

the awoods-style partitioning is now merged in schema.
raintank/schema#26
I will create a new PR to replace this one.

@replay
Copy link
Contributor

replay commented Jul 10, 2019

This one looks good to me. But if you want to replace it that's fine too

@robert-milan
Copy link
Contributor

Closed in favor of #1427

@Dieterbe
Copy link
Contributor Author

Note:
in hindsight, turns out there was a bug in BenchmarkJumpPartitionerFnv.
It was allocating on each iteration due to the constructor.
With this fix fix.txt
the benchmark becomes

taskset --cpu-list 1,2 go test . -run='^$' -test.benchmem -bench .
goos: linux
goarch: amd64
pkg: github.com/grafana/metrictank/cluster/partitioner
BenchmarkPartitionerSarama-2       	 5727127	       206 ns/op	       0 B/op	       0 allocs/op
BenchmarkJumpPartitionerMauro-2    	 4032470	       284 ns/op	     210 B/op	       1 allocs/op
BenchmarkJumpPartitionerFnv-2      	 4189651	       287 ns/op	       0 B/op	       0 allocs/op
BenchmarkJumpPartitionerMetro-2    	11723461	       103 ns/op	       0 B/op	       0 allocs/op
BenchmarkJumpPartitionerSip-2      	 8235633	       146 ns/op	       0 B/op	       0 allocs/op
BenchmarkJumpPartitionerXxhash-2   	12077864	        99.7 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/grafana/metrictank/cluster/partitioner	8.389s

BenchmarkJumpPartitionerFnv is now faster and no longer allocates, but the conclusion would still be the same (xxhash still leads)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

partitioning metrics by nameWithTags (aka shard by tag)
5 participants