Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent hashing v2 #477

Merged
merged 4 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ The conditions are AND-ed. Regexes are more resource intensive and hence should
* for grafanaNet / kafkaMdm / Google PubSub routes, there is only a single endpoint so that's where the data goes. For standard/carbon routes you can control how data gets routed into destinations (note that destinations have settings to match on prefix/sub/regex, just like routes):
* sendAllMatch: send all metrics to all the defined endpoints (possibly, and commonly only 1 endpoint).
* sendFirstMatch: send the metrics to the first endpoint that matches it.
* consistentHashing: the algorithm is the same as Carbon's consistent hashing.
* consistentHashing (older carbon consistent hashing behavior)/consistentHashing-v2 (experimental new behavior). (see [config docs](docs/config.md#carbon-route) and [PR 447](https://github.com/grafana/carbon-relay-ng/pull/477)for details)
* round robin: the route is a RR pool (not implemented)


Expand Down
6 changes: 4 additions & 2 deletions cfg/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func InitRoutes(table table.Interface, config Config, meta toml.MetaData) error
return fmt.Errorf("error adding route '%s'", routeConfig.Key)
}
table.AddRoute(route)
case "consistentHashing":
case "consistentHashing", "consistentHashing-v2":
destinations, err := imperatives.ParseDestinations(routeConfig.Destinations, table, false, routeConfig.Key)
if err != nil {
log.Error(err.Error())
Expand All @@ -200,7 +200,9 @@ func InitRoutes(table table.Interface, config Config, meta toml.MetaData) error
return fmt.Errorf("must get at least 2 destination for route '%s'", routeConfig.Key)
}

route, err := route.NewConsistentHashing(routeConfig.Key, matcher, destinations)
withFix := (routeConfig.Type == "consistentHashing-v2")

route, err := route.NewConsistentHashing(routeConfig.Key, matcher, destinations, withFix)
if err != nil {
log.Error(err.Error())
return fmt.Errorf("error adding route '%s'", routeConfig.Key)
Expand Down
29 changes: 19 additions & 10 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,25 @@ max = -1

### Options

setting | mandatory | values | default | description
---------------|-----------|-----------------------------------------------|---------|------------
key | Y | string | N/A |
type | Y | sendAllMatch/sendFirstMatch/consistentHashing | N/A | send to all destinations vs first matching destination vs distribute via consistent hashing
prefix | N | string | "" |
notPrefix | N | string | "" |
sub | N | string | "" |
notSub | N | string | "" |
regex | N | string | "" |
notRegex | N | string | "" |
setting | mandatory | values | default | description
---------------|-----------|-------------------|---------|------------
key | Y | string | N/A |
type | Y | See below | N/A | See below
prefix | N | string | "" |
notPrefix | N | string | "" |
sub | N | string | "" |
notSub | N | string | "" |
regex | N | string | "" |
notRegex | N | string | "" |

The following route types are supported:

* `sendAllMatch` : send to all destinations
* `sendFirstMatch` : send to first matching destination
* `consistentHashing` : distribute via consistent hashing as done in Graphite until december 2013. (I think up to version 0.9.12) (https://github.com/graphite-project/carbon/pull/196)
* `consistentHashing-v2` : distribute via consistent hashing as done in Graphite/carbon as of https://github.com/graphite-project/carbon/pull/196 (**experimental**) See [PR 447](https://github.com/grafana/carbon-relay-ng/pull/477) for more information.

Note that the carbon style consistent hashing does [not accurately balance workload across nodes](https://github.com/graphite-project/carbon/issues/485). See [issue 211](https://github.com/grafana/carbon-relay-ng/issues/211)

### Examples

Expand Down
5 changes: 3 additions & 2 deletions docs/tcp-admin-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ commands:
<type>:
sendAllMatch send metrics in the route to all destinations
sendFirstMatch send metrics in the route to the first one that matches it
consistentHashing distribute metrics between destinations using a hash algorithm
consistentHashing distribute metrics between destinations using a hash algorithm, old-carbon style
consistentHashing-v2 distribute metrics between destinations using a hash algorithm, current carbon style (experimental. see PR 477)
<opts>:
prefix=<str> only take in metrics that have this prefix
notPrefix=<str> only take in metrics that don't have this prefix
Expand All @@ -51,7 +52,7 @@ commands:
notRegex=<regex> only take in metrics that don't match this regex (expensive!)
<dest>: <addr> <opts>
<addr> a tcp endpoint. i.e. ip:port or hostname:port
for consistentHashing routes, an instance identifier can also be present:
for consistentHashing and consistentHashing-v2 routes, an instance identifier can also be present:
hostname:port:instance
The instance is used to disambiguate multiple endpoints on the same host, as the Carbon-compatible consistent hashing algorithm does not take the port into account.
<opts>:
Expand Down
10 changes: 7 additions & 3 deletions imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
addRouteSendAllMatch
addRouteSendFirstMatch
addRouteConsistentHashing
addRouteConsistentHashingV2
addRouteGrafanaNet
addRouteKafkaMdm
addRoutePubSub
Expand Down Expand Up @@ -103,6 +104,7 @@ var tokens = []toki.Def{
{Token: addRouteSendAllMatch, Pattern: "addRoute sendAllMatch"},
{Token: addRouteSendFirstMatch, Pattern: "addRoute sendFirstMatch"},
{Token: addRouteConsistentHashing, Pattern: "addRoute consistentHashing"},
{Token: addRouteConsistentHashingV2, Pattern: "addRoute consistentHashing-v2"},
{Token: addRouteGrafanaNet, Pattern: "addRoute grafanaNet"},
{Token: addRouteKafkaMdm, Pattern: "addRoute kafkaMdm"},
{Token: addRoutePubSub, Pattern: "addRoute pubsub"},
Expand Down Expand Up @@ -202,7 +204,9 @@ func Apply(table table.Interface, cmd string) error {
case addRouteSendFirstMatch:
return readAddRoute(s, table, route.NewSendFirstMatch)
case addRouteConsistentHashing:
return readAddRouteConsistentHashing(s, table)
return readAddRouteConsistentHashing(s, table, false)
case addRouteConsistentHashingV2:
return readAddRouteConsistentHashing(s, table, true)
case addRouteGrafanaNet:
return readAddRouteGrafanaNet(s, table)
case addRouteKafkaMdm:
Expand Down Expand Up @@ -437,7 +441,7 @@ func readAddRoute(s *toki.Scanner, table table.Interface, constructor func(key s
return nil
}

func readAddRouteConsistentHashing(s *toki.Scanner, table table.Interface) error {
func readAddRouteConsistentHashing(s *toki.Scanner, table table.Interface, withFix bool) error {
t := s.Next()
if t.Token != word {
return errFmtAddRoute
Expand All @@ -462,7 +466,7 @@ func readAddRouteConsistentHashing(s *toki.Scanner, table table.Interface) error
return fmt.Errorf("must get at least 2 destination for route '%s'", key)
}

route, err := route.NewConsistentHashing(key, matcher, destinations)
route, err := route.NewConsistentHashing(key, matcher, destinations, withFix)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions route/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewCloudWatch(key string, matcher matcher.Matcher, awsProfile, awsRegion, a
awsNamespace: awsNamespace,
storageResolution: storageResolution,
putMetricDataInput: cloudwatch.PutMetricDataInput{Namespace: aws.String(awsNamespace)},
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"CloudWatch", sync.Mutex{}, atomic.Value{}, key},
buf: make(chan []byte, bufSize),
blocking: blocking,
bufSize: bufSize,
Expand Down Expand Up @@ -221,8 +221,3 @@ func (r *CloudWatch) Shutdown() error {
close(r.buf)
return nil
}

// Snapshot clones the current config for update operations
func (r *CloudWatch) Snapshot() Snapshot {
return makeSnapshot(&r.baseRoute, "CloudWatch")
}
30 changes: 25 additions & 5 deletions route/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"crypto/md5"
"encoding/binary"
dest "github.com/grafana/carbon-relay-ng/destination"
"sort"
"strconv"
"strings"

dest "github.com/grafana/carbon-relay-ng/destination"
)

type hashRingEntry struct {
Expand Down Expand Up @@ -38,6 +39,9 @@ type ConsistentHasher struct {
Ring hashRing
destinations []*dest.Destination
replicaCount int

// Align with https://github.com/graphite-project/carbon/commit/024f9e67ca47619438951c59154c0dec0b0518c7#diff-1486787206e06af358b8d935577e76f5
withFix bool // See https://github.com/grafana/carbon-relay-ng/pull/477 for details.
}

func computeRingPosition(key []byte) uint16 {
Expand All @@ -48,12 +52,15 @@ func computeRingPosition(key []byte) uint16 {
return Position
}

func NewConsistentHasher(destinations []*dest.Destination) ConsistentHasher {
return NewConsistentHasherReplicaCount(destinations, 100)
func NewConsistentHasher(destinations []*dest.Destination, withFix bool) ConsistentHasher {
return NewConsistentHasherReplicaCount(destinations, 100, withFix)
}

func NewConsistentHasherReplicaCount(destinations []*dest.Destination, replicaCount int) ConsistentHasher {
hashRing := ConsistentHasher{replicaCount: replicaCount}
func NewConsistentHasherReplicaCount(destinations []*dest.Destination, replicaCount int, withFix bool) ConsistentHasher {
hashRing := ConsistentHasher{
replicaCount: replicaCount,
withFix: withFix,
}
for _, d := range destinations {
hashRing.AddDestination(d)
}
Expand Down Expand Up @@ -85,6 +92,19 @@ func (h *ConsistentHasher) AddDestination(d *dest.Destination) {
keyBuf.WriteString(":")
keyBuf.WriteString(strconv.Itoa(i))
position := computeRingPosition(keyBuf.Bytes())
if h.withFix {
outer:
for {
for i := 0; i < len(h.Ring); i++ {
if position == h.Ring[i].Position {
position++
continue outer
}
}
break
}
}

newRingEntries[i].Position = position
newRingEntries[i].Hostname = server[0]
newRingEntries[i].Instance = d.Instance
Expand Down
31 changes: 30 additions & 1 deletion route/consistent_hashing_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package route

import (
"fmt"
"testing"

"github.com/bmizerany/assert"
Expand All @@ -12,12 +13,40 @@ func TestConsistentHashingComputeRingPosition(t *testing.T) {
assert.Equal(t, uint16(54301), computeRingPosition([]byte("")))
}

func TestIssue335(t *testing.T) {
initialDestinations := []*destination.Destination{
{Addr: "10.20.34.114:12003"},
{Addr: "10.20.39.104:12003"},
{Addr: "10.20.40.161:12003"},
{Addr: "10.20.35.158:12003"},
{Addr: "10.20.37.70:12003"},
{Addr: "10.20.40.126:12003"},
{Addr: "10.20.33.78:12003"},
{Addr: "10.20.39.19:12003"},
{Addr: "10.20.42.66:12003"},
{Addr: "10.20.34.131:12003"},
{Addr: "10.20.38.55:12003"},
{Addr: "10.20.41.75:12003"},
{Addr: "10.20.32.8:12003"},
{Addr: "10.20.37.165:12003"},
}
hasherWithoutFix := NewConsistentHasherReplicaCount(initialDestinations, 2, false)
hasherWithFix := NewConsistentHasherReplicaCount(initialDestinations, 2, true)
fmt.Println("len without fix and with fix - should be different", len(hasherWithoutFix.Ring), len(hasherWithFix.Ring))
for _, v := range hasherWithFix.Ring {
if v.Position == 59418 {
fmt.Println("found our missing value!") // this should trigger
}
fmt.Printf("%+v\n", v)
}
}

func TestConsistentHashingDestinations(t *testing.T) {
initialDestinations := []*destination.Destination{
{Addr: "10.0.0.1"},
{Addr: "127.0.0.1:2003", Instance: "a"},
{Addr: "127.0.0.1:2004", Instance: "b"}}
hasher := NewConsistentHasherReplicaCount(initialDestinations, 2)
hasher := NewConsistentHasherReplicaCount(initialDestinations, 2, false)
expectedHashRing := hashRing{
hashRingEntry{Position: uint16(7885), Hostname: "127.0.0.1", Instance: "a", DestinationIndex: 1},
hashRingEntry{Position: uint16(10461), Hostname: "127.0.0.1", Instance: "b", DestinationIndex: 2},
Expand Down
4 changes: 2 additions & 2 deletions route/grafananet.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func NewGrafanaNet(key string, matcher matcher.Matcher, cfg GrafanaNetConfig) (R
cleanAddr := util.AddrToPath(cfg.Addr)

r := &GrafanaNet{
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"GrafanaNet", sync.Mutex{}, atomic.Value{}, key},
Cfg: cfg,
schemas: schemas,
schemasStr: schemas.String(),
Expand Down Expand Up @@ -467,7 +467,7 @@ func (route *GrafanaNet) Shutdown() error {
}

func (route *GrafanaNet) Snapshot() Snapshot {
snapshot := makeSnapshot(&route.baseRoute, "GrafanaNet")
snapshot := route.baseRoute.Snapshot()
snapshot.Addr = route.Cfg.Addr
return snapshot
}
6 changes: 1 addition & 5 deletions route/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewKafkaMdm(key string, matcher matcher.Matcher, topic, codec, schemasFile,
cleanAddr := util.AddrToPath(brokers[0])

r := &KafkaMdm{
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"KafkaMdm", sync.Mutex{}, atomic.Value{}, key},
topic: topic,
brokers: brokers,
buf: make(chan []byte, bufSize),
Expand Down Expand Up @@ -288,10 +288,6 @@ func (r *KafkaMdm) Shutdown() error {
return nil
}

func (r *KafkaMdm) Snapshot() Snapshot {
return makeSnapshot(&r.baseRoute, "KafkaMdm")
}

func getCompression(codec string) (sarama.CompressionCodec, error) {
switch codec {
case "none":
Expand Down
7 changes: 1 addition & 6 deletions route/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type PubSub struct {
// We will automatically run the route and the destination
func NewPubSub(key string, matcher matcher.Matcher, project, topic, format, codec string, bufSize, flushMaxSize, flushMaxWait int, blocking bool) (Route, error) {
r := &PubSub{
baseRoute: baseRoute{sync.Mutex{}, atomic.Value{}, key},
baseRoute: baseRoute{"pubsub", sync.Mutex{}, atomic.Value{}, key},
project: project,
topic: topic,
format: format,
Expand Down Expand Up @@ -251,8 +251,3 @@ func (r *PubSub) Shutdown() error {
r.psTopic.Stop()
return nil
}

// Snapshot clones the current config for update operations
func (r *PubSub) Snapshot() Snapshot {
return makeSnapshot(&r.baseRoute, "pubsub")
}
Loading