Skip to content

Commit

Permalink
Expose configuration of memberlist packet compression. (cortexproject…
Browse files Browse the repository at this point in the history
…#4346)

* Expose configuration of memberlist packet compression.

Allows manually specifying whether memberlist should compress packets
via a new configuration flag: `-memberlist.enable-compression`.

This typically has little benefit for Cortex, as the ring state messages
are already compressed with Snappy, the second layer of compression does
not achieve any additional saving. It's not clear cut whether there
might still be some benefit for internal memberlist messages; this needs
to be evaluated in a environment of some reasonable scale.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Review comments.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Review comments.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>
Signed-off-by: Alvin Lin <alvinlin@amazon.com>
  • Loading branch information
stevesg authored and alvinlin123 committed Jan 14, 2022
1 parent 869fe06 commit bba2774
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* `-compactor.ring.heartbeat-period`
* `-store-gateway.sharding-ring.heartbeat-period`
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4318

Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3796,6 +3796,11 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.dead-node-reclaim-time
[dead_node_reclaim_time: <duration> | default = 0s]

# Enable message compression. This can be used to reduce bandwidth usage at the
# cost of slightly more CPU utilization.
# CLI flag: -memberlist.compression-enabled
[compression_enabled: <boolean> | default = true]

# Other cluster members to join. Can be specified multiple times. It can be an
# IP, hostname or an entry specified in the DNS Service Discovery format (see
# https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery
Expand Down
29 changes: 18 additions & 11 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ import (

func TestSingleBinaryWithMemberlist(t *testing.T) {
t.Run("default", func(t *testing.T) {
testSingleBinaryEnv(t, false)
testSingleBinaryEnv(t, false, nil)
})

t.Run("tls", func(t *testing.T) {
testSingleBinaryEnv(t, true)
testSingleBinaryEnv(t, true, nil)
})

t.Run("compression-disabled", func(t *testing.T) {
testSingleBinaryEnv(t, false, map[string]string{
"-memberlist.compression-enabled": "false",
})
})
}

func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {
func testSingleBinaryEnv(t *testing.T, tlsEnabled bool, flags map[string]string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand Down Expand Up @@ -65,13 +71,13 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {
filepath.Join(s.SharedDir(), clientKeyFile),
))

cortex1 = newSingleBinary("cortex-1", memberlistDNS, "")
cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000")
cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000")
cortex1 = newSingleBinary("cortex-1", memberlistDNS, "", flags)
cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000", flags)
cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000", flags)
} else {
cortex1 = newSingleBinary("cortex-1", "", "")
cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000")
cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000")
cortex1 = newSingleBinary("cortex-1", "", "", flags)
cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags)
cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000", flags)
}

// start cortex-1 first, as cortex-2 and cortex-3 both connect to cortex-1
Expand Down Expand Up @@ -109,7 +115,7 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {
require.NoError(t, s.Stop(cortex3))
}

func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService {
func newSingleBinary(name string, servername string, join string, testFlags map[string]string) *e2ecortex.CortexService {
flags := map[string]string{
"-ingester.final-sleep": "0s",
"-ingester.join-after": "0s", // join quickly
Expand All @@ -132,6 +138,7 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor
mergeFlags(
ChunksStorageFlags(),
flags,
testFlags,
getTLSFlagsWithPrefix("memberlist", servername, servername == ""),
),
"",
Expand Down Expand Up @@ -170,7 +177,7 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) {
if i > 0 {
join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000)
}
c := newSingleBinary(name, "", join)
c := newSingleBinary(name, "", join, nil)
require.NoError(t, s.StartAndWaitReady(c))
instances = append(instances, c)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type KVConfig struct {
GossipNodes int `yaml:"gossip_nodes"`
GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"`
DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"`
EnableCompression bool `yaml:"compression_enabled"`

// List of members to join
JoinMembers flagext.StringSlice `yaml:"join_members"`
Expand Down Expand Up @@ -187,6 +188,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.GossipToTheDeadTime, prefix+"memberlist.gossip-to-dead-nodes-time", mlDefaults.GossipToTheDeadTime, "How long to keep gossiping to dead nodes, to give them chance to refute their death.")
f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.")
f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.")
f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.")

cfg.TCPTransport.RegisterFlags(f, prefix)
}
Expand Down Expand Up @@ -380,6 +382,7 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
mlCfg.GossipNodes = m.cfg.GossipNodes
mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime
mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime
mlCfg.EnableCompression = m.cfg.EnableCompression

if m.cfg.NodeName != "" {
mlCfg.Name = m.cfg.NodeName
Expand Down

0 comments on commit bba2774

Please sign in to comment.