Skip to content

Commit

Permalink
Merge pull request #477 from grafana/consistentHashing-v2
Browse files Browse the repository at this point in the history
Consistent hashing v2 - experimental
  • Loading branch information
Dieterbe committed Nov 19, 2021
2 parents f19c22c + 7caa2d3 commit 37996a1
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 189 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ The conditions are AND-ed. Regexes are more resource intensive and hence should
* for grafanaNet / kafkaMdm / Google PubSub routes, there is only a single endpoint so that's where the data goes. For standard/carbon routes you can control how data gets routed into destinations (note that destinations have settings to match on prefix/sub/regex, just like routes):
* sendAllMatch: send all metrics to all the defined endpoints (possibly, and commonly only 1 endpoint).
* sendFirstMatch: send the metrics to the first endpoint that matches it.
* consistentHashing: the algorithm is the same as Carbon's consistent hashing.
* consistentHashing (older carbon consistent hashing behavior)/consistentHashing-v2 (experimental new behavior). (see [config docs](docs/config.md#carbon-route) and [PR 447](https://github.com/grafana/carbon-relay-ng/pull/477)for details)
* round robin: the route is a RR pool (not implemented)


Expand Down
6 changes: 4 additions & 2 deletions cfg/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func InitRoutes(table table.Interface, config Config, meta toml.MetaData) error
return fmt.Errorf("error adding route '%s'", routeConfig.Key)
}
table.AddRoute(route)
case "consistentHashing":
case "consistentHashing", "consistentHashing-v2":
destinations, err := imperatives.ParseDestinations(routeConfig.Destinations, table, false, routeConfig.Key)
if err != nil {
log.Error(err.Error())
Expand All @@ -200,7 +200,9 @@ func InitRoutes(table table.Interface, config Config, meta toml.MetaData) error
return fmt.Errorf("must get at least 2 destination for route '%s'", routeConfig.Key)
}

route, err := route.NewConsistentHashing(routeConfig.Key, matcher, destinations)
withFix := (routeConfig.Type == "consistentHashing-v2")

route, err := route.NewConsistentHashing(routeConfig.Key, matcher, destinations, withFix)
if err != nil {
log.Error(err.Error())
return fmt.Errorf("error adding route '%s'", routeConfig.Key)
Expand Down
29 changes: 19 additions & 10 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,25 @@ max = -1

### Options

setting | mandatory | values | default | description
---------------|-----------|-----------------------------------------------|---------|------------
key | Y | string | N/A |
type | Y | sendAllMatch/sendFirstMatch/consistentHashing | N/A | send to all destinations vs first matching destination vs distribute via consistent hashing
prefix | N | string | "" |
notPrefix | N | string | "" |
sub | N | string | "" |
notSub | N | string | "" |
regex | N | string | "" |
notRegex | N | string | "" |
setting | mandatory | values | default | description
---------------|-----------|-------------------|---------|------------
key | Y | string | N/A |
type | Y | See below | N/A | See below
prefix | N | string | "" |
notPrefix | N | string | "" |
sub | N | string | "" |
notSub | N | string | "" |
regex | N | string | "" |
notRegex | N | string | "" |

The following route types are supported:

* `sendAllMatch` : send to all destinations
* `sendFirstMatch` : send to first matching destination
* `consistentHashing` : distribute via consistent hashing as done in Graphite until december 2013. (I think up to version 0.9.12) (https://github.com/graphite-project/carbon/pull/196)
* `consistentHashing-v2` : distribute via consistent hashing as done in Graphite/carbon as of https://github.com/graphite-project/carbon/pull/196 (**experimental**) See [PR 447](https://github.com/grafana/carbon-relay-ng/pull/477) for more information.

Note that the carbon style consistent hashing does [not accurately balance workload across nodes](https://github.com/graphite-project/carbon/issues/485). See [issue 211](https://github.com/grafana/carbon-relay-ng/issues/211)

### Examples

Expand Down
5 changes: 3 additions & 2 deletions docs/tcp-admin-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ commands:
<type>:
sendAllMatch send metrics in the route to all destinations
sendFirstMatch send metrics in the route to the first one that matches it
consistentHashing distribute metrics between destinations using a hash algorithm
consistentHashing distribute metrics between destinations using a hash algorithm, old-carbon style
consistentHashing-v2 distribute metrics between destinations using a hash algorithm, current carbon style (experimental. see PR 477)
<opts>:
prefix=<str> only take in metrics that have this prefix
notPrefix=<str> only take in metrics that don't have this prefix
Expand All @@ -51,7 +52,7 @@ commands:
notRegex=<regex> only take in metrics that don't match this regex (expensive!)
<dest>: <addr> <opts>
<addr> a tcp endpoint. i.e. ip:port or hostname:port
for consistentHashing routes, an instance identifier can also be present:
for consistentHashing and consistentHashing-v2 routes, an instance identifier can also be present:
hostname:port:instance
The instance is used to disambiguate multiple endpoints on the same host, as the Carbon-compatible consistent hashing algorithm does not take the port into account.
<opts>:
Expand Down
10 changes: 7 additions & 3 deletions imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
addRouteSendAllMatch
addRouteSendFirstMatch
addRouteConsistentHashing
addRouteConsistentHashingV2
addRouteGrafanaNet
addRouteKafkaMdm
addRoutePubSub
Expand Down Expand Up @@ -103,6 +104,7 @@ var tokens = []toki.Def{
{Token: addRouteSendAllMatch, Pattern: "addRoute sendAllMatch"},
{Token: addRouteSendFirstMatch, Pattern: "addRoute sendFirstMatch"},
{Token: addRouteConsistentHashing, Pattern: "addRoute consistentHashing"},
{Token: addRouteConsistentHashingV2, Pattern: "addRoute consistentHashing-v2"},
{Token: addRouteGrafanaNet, Pattern: "addRoute grafanaNet"},
{Token: addRouteKafkaMdm, Pattern: "addRoute kafkaMdm"},
{Token: addRoutePubSub, Pattern: "addRoute pubsub"},
Expand Down Expand Up @@ -202,7 +204,9 @@ func Apply(table table.Interface, cmd string) error {
case addRouteSendFirstMatch:
return readAddRoute(s, table, route.NewSendFirstMatch)
case addRouteConsistentHashing:
return readAddRouteConsistentHashing(s, table)
return readAddRouteConsistentHashing(s, table, false)
case addRouteConsistentHashingV2:
return readAddRouteConsistentHashing(s, table, true)
case addRouteGrafanaNet:
return readAddRouteGrafanaNet(s, table)
case addRouteKafkaMdm:
Expand Down Expand Up @@ -437,7 +441,7 @@ func readAddRoute(s *toki.Scanner, table table.Interface, constructor func(key s
return nil
}

func readAddRouteConsistentHashing(s *toki.Scanner, table table.Interface) error {
func readAddRouteConsistentHashing(s *toki.Scanner, table table.Interface, withFix bool) error {
t := s.Next()
if t.Token != word {
return errFmtAddRoute
Expand All @@ -462,7 +466,7 @@ func readAddRouteConsistentHashing(s *toki.Scanner, table table.Interface) error
return fmt.Errorf("must get at least 2 destination for route '%s'", key)
}

route, err := route.NewConsistentHashing(key, matcher, destinations)
route, err := route.NewConsistentHashing(key, matcher, destinations, withFix)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions route/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewCloudWatch(key string, matcher matcher.Matcher, awsProfile, awsRegion, a
awsNamespace: awsNamespace,
storageResolution: storageResolution,
putMetricDataInput: cloudwatch.PutMetricDataInput{Namespace: aws.String(awsNamespace)},
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"CloudWatch", sync.Mutex{}, atomic.Value{}, key},
buf: make(chan []byte, bufSize),
blocking: blocking,
bufSize: bufSize,
Expand Down Expand Up @@ -221,8 +221,3 @@ func (r *CloudWatch) Shutdown() error {
close(r.buf)
return nil
}

// Snapshot clones the current config for update operations
func (r *CloudWatch) Snapshot() Snapshot {
return makeSnapshot(&r.baseRoute, "CloudWatch")
}
30 changes: 25 additions & 5 deletions route/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"crypto/md5"
"encoding/binary"
dest "github.com/grafana/carbon-relay-ng/destination"
"sort"
"strconv"
"strings"

dest "github.com/grafana/carbon-relay-ng/destination"
)

type hashRingEntry struct {
Expand Down Expand Up @@ -38,6 +39,9 @@ type ConsistentHasher struct {
Ring hashRing
destinations []*dest.Destination
replicaCount int

// Align with https://github.com/graphite-project/carbon/commit/024f9e67ca47619438951c59154c0dec0b0518c7#diff-1486787206e06af358b8d935577e76f5
withFix bool // See https://github.com/grafana/carbon-relay-ng/pull/477 for details.
}

func computeRingPosition(key []byte) uint16 {
Expand All @@ -48,12 +52,15 @@ func computeRingPosition(key []byte) uint16 {
return Position
}

func NewConsistentHasher(destinations []*dest.Destination) ConsistentHasher {
return NewConsistentHasherReplicaCount(destinations, 100)
func NewConsistentHasher(destinations []*dest.Destination, withFix bool) ConsistentHasher {
return NewConsistentHasherReplicaCount(destinations, 100, withFix)
}

func NewConsistentHasherReplicaCount(destinations []*dest.Destination, replicaCount int) ConsistentHasher {
hashRing := ConsistentHasher{replicaCount: replicaCount}
func NewConsistentHasherReplicaCount(destinations []*dest.Destination, replicaCount int, withFix bool) ConsistentHasher {
hashRing := ConsistentHasher{
replicaCount: replicaCount,
withFix: withFix,
}
for _, d := range destinations {
hashRing.AddDestination(d)
}
Expand Down Expand Up @@ -85,6 +92,19 @@ func (h *ConsistentHasher) AddDestination(d *dest.Destination) {
keyBuf.WriteString(":")
keyBuf.WriteString(strconv.Itoa(i))
position := computeRingPosition(keyBuf.Bytes())
if h.withFix {
outer:
for {
for i := 0; i < len(h.Ring); i++ {
if position == h.Ring[i].Position {
position++
continue outer
}
}
break
}
}

newRingEntries[i].Position = position
newRingEntries[i].Hostname = server[0]
newRingEntries[i].Instance = d.Instance
Expand Down
31 changes: 30 additions & 1 deletion route/consistent_hashing_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package route

import (
"fmt"
"testing"

"github.com/bmizerany/assert"
Expand All @@ -12,12 +13,40 @@ func TestConsistentHashingComputeRingPosition(t *testing.T) {
assert.Equal(t, uint16(54301), computeRingPosition([]byte("")))
}

func TestIssue335(t *testing.T) {
initialDestinations := []*destination.Destination{
{Addr: "10.20.34.114:12003"},
{Addr: "10.20.39.104:12003"},
{Addr: "10.20.40.161:12003"},
{Addr: "10.20.35.158:12003"},
{Addr: "10.20.37.70:12003"},
{Addr: "10.20.40.126:12003"},
{Addr: "10.20.33.78:12003"},
{Addr: "10.20.39.19:12003"},
{Addr: "10.20.42.66:12003"},
{Addr: "10.20.34.131:12003"},
{Addr: "10.20.38.55:12003"},
{Addr: "10.20.41.75:12003"},
{Addr: "10.20.32.8:12003"},
{Addr: "10.20.37.165:12003"},
}
hasherWithoutFix := NewConsistentHasherReplicaCount(initialDestinations, 2, false)
hasherWithFix := NewConsistentHasherReplicaCount(initialDestinations, 2, true)
fmt.Println("len without fix and with fix - should be different", len(hasherWithoutFix.Ring), len(hasherWithFix.Ring))
for _, v := range hasherWithFix.Ring {
if v.Position == 59418 {
fmt.Println("found our missing value!") // this should trigger
}
fmt.Printf("%+v\n", v)
}
}

func TestConsistentHashingDestinations(t *testing.T) {
initialDestinations := []*destination.Destination{
{Addr: "10.0.0.1"},
{Addr: "127.0.0.1:2003", Instance: "a"},
{Addr: "127.0.0.1:2004", Instance: "b"}}
hasher := NewConsistentHasherReplicaCount(initialDestinations, 2)
hasher := NewConsistentHasherReplicaCount(initialDestinations, 2, false)
expectedHashRing := hashRing{
hashRingEntry{Position: uint16(7885), Hostname: "127.0.0.1", Instance: "a", DestinationIndex: 1},
hashRingEntry{Position: uint16(10461), Hostname: "127.0.0.1", Instance: "b", DestinationIndex: 2},
Expand Down
4 changes: 2 additions & 2 deletions route/grafananet.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func NewGrafanaNet(key string, matcher matcher.Matcher, cfg GrafanaNetConfig) (R
cleanAddr := util.AddrToPath(cfg.Addr)

r := &GrafanaNet{
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"GrafanaNet", sync.Mutex{}, atomic.Value{}, key},
Cfg: cfg,
schemas: schemas,
schemasStr: schemas.String(),
Expand Down Expand Up @@ -467,7 +467,7 @@ func (route *GrafanaNet) Shutdown() error {
}

func (route *GrafanaNet) Snapshot() Snapshot {
snapshot := makeSnapshot(&route.baseRoute, "GrafanaNet")
snapshot := route.baseRoute.Snapshot()
snapshot.Addr = route.Cfg.Addr
return snapshot
}
6 changes: 1 addition & 5 deletions route/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewKafkaMdm(key string, matcher matcher.Matcher, topic, codec, schemasFile,
cleanAddr := util.AddrToPath(brokers[0])

r := &KafkaMdm{
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"KafkaMdm", sync.Mutex{}, atomic.Value{}, key},
topic: topic,
brokers: brokers,
buf: make(chan []byte, bufSize),
Expand Down Expand Up @@ -288,10 +288,6 @@ func (r *KafkaMdm) Shutdown() error {
return nil
}

func (r *KafkaMdm) Snapshot() Snapshot {
return makeSnapshot(&r.baseRoute, "KafkaMdm")
}

func getCompression(codec string) (sarama.CompressionCodec, error) {
switch codec {
case "none":
Expand Down
7 changes: 1 addition & 6 deletions route/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type PubSub struct {
// We will automatically run the route and the destination
func NewPubSub(key string, matcher matcher.Matcher, project, topic, format, codec string, bufSize, flushMaxSize, flushMaxWait int, blocking bool) (Route, error) {
r := &PubSub{
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"pubsub", sync.Mutex{}, atomic.Value{}, key},
project: project,
topic: topic,
format: format,
Expand Down Expand Up @@ -251,8 +251,3 @@ func (r *PubSub) Shutdown() error {
r.psTopic.Stop()
return nil
}

// Snapshot clones the current config for update operations
func (r *PubSub) Snapshot() Snapshot {
return makeSnapshot(&r.baseRoute, "pubsub")
}
Loading

0 comments on commit 37996a1

Please sign in to comment.