Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
simplify: Partition() can never return a non-nil error
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Apr 23, 2019
1 parent 6e76a26 commit d883f6c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 26 deletions.
24 changes: 9 additions & 15 deletions cluster/partitioner/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type Partitioner interface {
Key(schema.PartitionedMetric, []byte) []byte
Partition(schema.PartitionedMetric, int32) (int32, error)
Partition(schema.PartitionedMetric, int32) int32
}

// NewKafka returns the appropriate kafka partitioner
Expand Down Expand Up @@ -42,17 +42,14 @@ func (k KafkaByOrg) Key(m schema.PartitionedMetric, b []byte) []byte {
}

// Partition partitions just like sarama.HashPartitioner but without needing a *sarama.ProducerMessage
func (k KafkaByOrg) Partition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
func (k KafkaByOrg) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
k.hasher.Reset()
_, err := k.hasher.Write(k.Key(m, nil))
if err != nil {
return -1, err
}
k.hasher.Write(k.Key(m, nil)) // fnv can never return a non-nil error
partition := int32(k.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
return partition, nil
return partition
}

// KafkaBySeries partitions a schema.PartitionedMetric by Series name, using hashing equivalent to sarama.HashPartitioner
Expand All @@ -65,17 +62,14 @@ func (k KafkaBySeries) Key(m schema.PartitionedMetric, b []byte) []byte {
}

// Partition partitions just like sarama.HashPartitioner but without needing a *sarama.ProducerMessage
func (k KafkaBySeries) Partition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
func (k KafkaBySeries) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
k.hasher.Reset()
_, err := k.hasher.Write(k.Key(m, nil))
if err != nil {
return -1, err
}
k.hasher.Write(k.Key(m, nil)) // fnv can never return a non-nil error
partition := int32(k.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
return partition, nil
return partition
}

// KafkaBySeriesWithTags partitions a schema.PartitionedMetric by nameWithTags, using a custom xxhash+jump hashing scheme
Expand All @@ -87,7 +81,7 @@ func (k KafkaBySeriesWithTags) Key(m schema.PartitionedMetric, b []byte) []byte
}

// Partition partitions using a custom xxhash+jump hashing scheme
func (k KafkaBySeriesWithTags) Partition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
func (k KafkaBySeriesWithTags) Partition(m schema.PartitionedMetric, numPartitions int32) int32 {
jumpKey := xxhash.Sum64(k.Key(m, nil))
return jump.Hash(jumpKey, int(numPartitions)), nil
return jump.Hash(jumpKey, int(numPartitions))
}
7 changes: 1 addition & 6 deletions cmd/mt-index-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,7 @@ func getDefs(session *gocql.Session, defsChan chan *schema.MetricDefinition) {
if *numPartitions == 1 {
mdef.Partition = 0
} else {
p, err := partitioner.Partition(&mdef, int32(*numPartitions))
if err != nil {
log.Fatalf("failed to get partition id of metric. %s", err.Error())
} else {
mdef.Partition = p
}
mdef.Partition = partitioner.Partition(&mdef, int32(*numPartitions))
}
defsChan <- &mdef
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,7 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
throwError(fmt.Sprintf("Invalid MetricData.Id: %s", err))
}

partition, err := s.Partitioner.Partition(&metric.MetricData, int32(*numPartitions))
if err != nil {
throwError(fmt.Sprintf("Error partitioning: %q", err))
return
}
partition := s.Partitioner.Partition(&metric.MetricData, int32(*numPartitions))
s.Index.AddOrUpdate(mkey, &metric.MetricData, partition)

for archiveIdx, a := range metric.Archives {
Expand Down

0 comments on commit d883f6c

Please sign in to comment.