Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

partitionBy bySeriesWithTags (aka "shard by tag") #1282

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 65 additions & 30 deletions cluster/partitioner/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,86 @@ package partitioner

import (
"fmt"
"hash"
"hash/fnv"

"github.com/Shopify/sarama"
"github.com/cespare/xxhash"
jump "github.com/dgryski/go-jump"
"github.com/raintank/schema"
)

type Partitioner interface {
Partition(schema.PartitionedMetric, int32) (int32, error)
Key(schema.PartitionedMetric, []byte) []byte
Partition(schema.PartitionedMetric, int32) int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this interface still used anywhere? same question in the tsdb code btw. can't see where it's used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was wondering the same thing and looked around a bit, and also didn't see it being used anywhere. i'll just remove it.

}

type Kafka struct {
PartitionBy string
Partitioner sarama.Partitioner
}

func NewKafka(partitionBy string) (*Kafka, error) {
// NewKafka returns the appropriate kafka partitioner
// note: for "bySeriesWithTags", do not feed a key obtained from it into a sarama producer
// that uses the default HashPartitioner lest assignments be different!
// for simplicity's sake we recommend you simply construct sarama messages with the partition set
// from the Partition method, which also allows you to not publish any message key.
func NewKafka(partitionBy string) (Partitioner, error) {
switch partitionBy {
case "byOrg":
return KafkaByOrg{fnv.New32a()}, nil
case "bySeries":
default:
return nil, fmt.Errorf("partitionBy must be one of 'byOrg|bySeries'. got %s", partitionBy)
return KafkaBySeries{fnv.New32a()}, nil
case "bySeriesWithTags":
return KafkaBySeriesWithTags{}, nil
}
return &Kafka{
PartitionBy: partitionBy,
Partitioner: sarama.NewHashPartitioner(""),
}, nil
return nil, fmt.Errorf("partitionBy must be one of 'byOrg|bySeries|bySeriesWithTags'. got %s", partitionBy)
}

func (k *Kafka) Partition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
key, err := k.GetPartitionKey(m, nil)
if err != nil {
return 0, err
// KafkaByOrg partitions a schema.PartitionedMetric by OrgId, using hashing equivalent to sarama.HashPartitioner
type KafkaByOrg struct {
hasher hash.Hash32
}

func (k KafkaByOrg) Key(m schema.PartitionedMetric, b []byte) []byte {
return m.KeyByOrgId(b)
}

// Partition partitions just like sarama.HashPartitioner but without needing a *sarama.ProducerMessage
func (k KafkaByOrg) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
k.hasher.Reset()
k.hasher.Write(k.Key(m, nil)) // fnv can never return a non-nil error
partition := int32(k.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
return k.Partitioner.Partition(&sarama.ProducerMessage{Key: sarama.ByteEncoder(key)}, numPartitions)
return partition
}

func (k *Kafka) GetPartitionKey(m schema.PartitionedMetric, b []byte) ([]byte, error) {
switch k.PartitionBy {
case "byOrg":
// partition by organisation: metrics for the same org should go to the same
// partition/MetricTank (optimize for locality~performance)
return m.KeyByOrgId(b), nil
case "bySeries":
// partition by series: metrics are distrubted across all metrictank instances
// to allow horizontal scalability
return m.KeyBySeries(b), nil
// KafkaBySeries partitions a schema.PartitionedMetric by Series name, using hashing equivalent to sarama.HashPartitioner
type KafkaBySeries struct {
hasher hash.Hash32
}

func (k KafkaBySeries) Key(m schema.PartitionedMetric, b []byte) []byte {
return m.KeyBySeries(b)
}

// Partition partitions just like sarama.HashPartitioner but without needing a *sarama.ProducerMessage
func (k KafkaBySeries) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
k.hasher.Reset()
k.hasher.Write(k.Key(m, nil)) // fnv can never return a non-nil error
partition := int32(k.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
return b, fmt.Errorf("unknown partitionBy setting.")
return partition
}

// KafkaBySeriesWithTags partitions a schema.PartitionedMetric by nameWithTags, using a custom xxhash+jump hashing scheme
// DO NOT feed a key obtained from this into a sarama producer that uses the default HashPartitioner lest assignments be different!
type KafkaBySeriesWithTags struct{}

func (k KafkaBySeriesWithTags) Key(m schema.PartitionedMetric, b []byte) []byte {
return m.KeyBySeriesWithTags(b)
}

// Partition partitions using a custom xxhash+jump hashing scheme
func (k KafkaBySeriesWithTags) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
jumpKey := xxhash.Sum64(k.Key(m, nil))
return jump.Hash(jumpKey, int(numPartitions))
}
9 changes: 2 additions & 7 deletions cmd/mt-index-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
dstCassAddr = flag.String("dst-cass-addr", "localhost", "Address of cassandra host to migrate to.")
srcKeyspace = flag.String("src-keyspace", "raintank", "Cassandra keyspace in use on source.")
dstKeyspace = flag.String("dst-keyspace", "raintank", "Cassandra keyspace in use on destination.")
partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries)")
partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries|bySeriesWithTags)")
numPartitions = flag.Int("num-partitions", 1, "number of partitions in cluster")
schemaFile = flag.String("schema-file", "/etc/metrictank/schema-idx-cassandra.toml", "File containing the needed schemas in case database needs initializing")

Expand Down Expand Up @@ -181,12 +181,7 @@ func getDefs(session *gocql.Session, defsChan chan *schema.MetricDefinition) {
if *numPartitions == 1 {
mdef.Partition = 0
} else {
p, err := partitioner.Partition(&mdef, int32(*numPartitions))
if err != nil {
log.Fatalf("failed to get partition id of metric. %s", err.Error())
} else {
mdef.Partition = p
}
mdef.Partition = partitioner.Partition(&mdef, int32(*numPartitions))
}
defsChan <- &mdef
}
Expand Down
8 changes: 2 additions & 6 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
partitionScheme = globalFlags.String(
"partition-scheme",
"bySeries",
"method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)",
"method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries|bySeriesWithTags)",
)
uriPath = globalFlags.String(
"uri-path",
Expand Down Expand Up @@ -238,11 +238,7 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
throwError(fmt.Sprintf("Invalid MetricData.Id: %s", err))
}

partition, err := s.Partitioner.Partition(&metric.MetricData, int32(*numPartitions))
if err != nil {
throwError(fmt.Sprintf("Error partitioning: %q", err))
return
}
partition := s.Partitioner.Partition(&metric.MetricData, int32(*numPartitions))
s.Index.AddOrUpdate(mkey, &metric.MetricData, partition)

for archiveIdx, a := range metric.Archives {
Expand Down
4 changes: 2 additions & 2 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ Flags:
-num-partitions int
number of partitions in cluster (default 1)
-partition-scheme string
method used for partitioning metrics. (byOrg|bySeries) (default "byOrg")
method used for partitioning metrics. (byOrg|bySeries|bySeriesWithTags) (default "byOrg")
-schema-file string
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-idx-cassandra.toml")
-src-cass-addr string
Expand Down Expand Up @@ -763,7 +763,7 @@ global config flags:
-overwrite-chunks
If true existing chunks may be overwritten (default true)
-partition-scheme string
method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) (default "bySeries")
method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries|bySeriesWithTags) (default "bySeries")
-ttls string
list of ttl strings used by MT separated by ',' (default "35d")
-uri-path string
Expand Down
83 changes: 46 additions & 37 deletions stacktest/fakemetrics/out/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (

type KafkaMdm struct {
out.OutStats
topic string
brokers []string
config *sarama.Config
client sarama.SyncProducer
hash hash.Hash32
part *p.Kafka
lmPart LastNumPartitioner
partScheme string
topic string
brokers []string
config *sarama.Config
client sarama.Client
numPartitions int32 // by convention we do not alter the partitions of live topics, so this only needs to be set once
producer sarama.SyncProducer
hash hash.Hash32
part p.Partitioner
lmPart LastNumPartitioner
partScheme string
}

// map the last number in the metricname to the partition
Expand All @@ -46,7 +48,7 @@ func (p *LastNumPartitioner) Partition(m schema.PartitionedMetric, numPartitions
}

// key is by metric name, but won't be used for partition setting
func (p *LastNumPartitioner) GetPartitionKey(m schema.PartitionedMetric, b []byte) ([]byte, error) {
func (p *LastNumPartitioner) Key(m schema.PartitionedMetric, b []byte) ([]byte, error) {
return m.KeyBySeries(b), nil
}

Expand All @@ -72,43 +74,55 @@ func New(topic string, brokers []string, codec string, stats met.Backend, partit
return nil, err
}

client, err := sarama.NewSyncProducer(brokers, config)
client, err := sarama.NewClient(brokers, config)
if err != nil {
return nil, err
}
var part *p.Kafka
partitions, err := client.Partitions(topic)
if err != nil {
return nil, err
}
numPartitions := int32(len(partitions))
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return nil, err
}
var part p.Partitioner
var lmPart LastNumPartitioner
switch partitionScheme {
case "byOrg":
part, err = p.NewKafka("byOrg")
case "bySeries":
part, err = p.NewKafka("bySeries")
case "byOrg", "bySeries", "bySeriesWithTags":
part, err = p.NewKafka(partitionScheme)
case "lastNum":
lmPart = LastNumPartitioner{}
// sets partition based on message partition field
config.Producer.Partitioner = sarama.NewManualPartitioner
lmPart = LastNumPartitioner{}
default:
err = fmt.Errorf("partitionScheme must be one of 'byOrg|bySeries|lastNum'. got %s", partitionScheme)
err = fmt.Errorf("partitionScheme must be one of 'byOrg|bySeries|bySeriesWithTags|lastNum'. got %s", partitionScheme)
}
if err != nil {
return nil, err
}

return &KafkaMdm{
OutStats: out.NewStats(stats, "kafka-mdm"),
topic: topic,
brokers: brokers,
config: config,
client: client,
hash: fnv.New32a(),
part: part,
lmPart: lmPart,
partScheme: partitionScheme,
OutStats: out.NewStats(stats, "kafka-mdm"),
topic: topic,
brokers: brokers,
config: config,
client: client,
producer: producer,
numPartitions: numPartitions,
hash: fnv.New32a(),
part: part,
lmPart: lmPart,
partScheme: partitionScheme,
}, nil
}

func (k *KafkaMdm) Close() error {
return k.client.Close()
err := k.client.Close()
if err != nil {
return err
}
return k.producer.Close()
}

func (k *KafkaMdm) Flush(metrics []*schema.MetricData) error {
Expand Down Expand Up @@ -143,21 +157,16 @@ func (k *KafkaMdm) Flush(metrics []*schema.MetricData) error {
Value: sarama.ByteEncoder(data),
}
} else {
key, err := k.part.GetPartitionKey(metric, nil)
if err != nil {
return fmt.Errorf("Failed to get partition for metric. %s", err)
}

payload[i] = &sarama.ProducerMessage{
Key: sarama.ByteEncoder(key),
Topic: k.topic,
Value: sarama.ByteEncoder(data),
Partition: k.part.Partition(metric, k.numPartitions),
Topic: k.topic,
Value: sarama.ByteEncoder(data),
}
}

}
prePub := time.Now()
err := k.client.SendMessages(payload)
err := k.producer.SendMessages(payload)
if err != nil {
k.PublishErrors.Inc(1)
if errors, ok := err.(sarama.ProducerErrors); ok {
Expand Down
Loading