diff --git a/CHANGELOG.md b/CHANGELOG.md index 64066a61bf..2bc8af553d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * `-compactor.ring.heartbeat-period` * `-store-gateway.sharding-ring.heartbeat-period` * [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345 +* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4318 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 22859a244e..0f70a3f3e5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3796,6 +3796,11 @@ The `memberlist_config` configures the Gossip memberlist. # CLI flag: -memberlist.dead-node-reclaim-time [dead_node_reclaim_time: | default = 0s] +# Enable message compression. This can be used to reduce bandwidth usage at the +# cost of slightly more CPU utilization. +# CLI flag: -memberlist.compression-enabled +[compression_enabled: | default = true] + # Other cluster members to join. Can be specified multiple times. It can be an # IP, hostname or an entry specified in the DNS Service Discovery format (see # https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index 53c1a945fd..f4cd7b7914 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -23,15 +23,21 @@ import ( func TestSingleBinaryWithMemberlist(t *testing.T) { t.Run("default", func(t *testing.T) { - testSingleBinaryEnv(t, false) + testSingleBinaryEnv(t, false, nil) }) t.Run("tls", func(t *testing.T) { - testSingleBinaryEnv(t, true) + testSingleBinaryEnv(t, true, nil) + }) + + t.Run("compression-disabled", func(t *testing.T) { + testSingleBinaryEnv(t, false, map[string]string{ + "-memberlist.compression-enabled": "false", + }) }) } -func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { +func testSingleBinaryEnv(t *testing.T, tlsEnabled bool, flags map[string]string) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -65,13 +71,13 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { filepath.Join(s.SharedDir(), clientKeyFile), )) - cortex1 = newSingleBinary("cortex-1", memberlistDNS, "") - cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000") - cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000") + cortex1 = newSingleBinary("cortex-1", memberlistDNS, "", flags) + cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000", flags) + cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000", flags) } else { - cortex1 = newSingleBinary("cortex-1", "", "") - cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000") - cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000") + cortex1 = newSingleBinary("cortex-1", "", "", flags) + cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags) + cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000", flags) } // start cortex-1 first, as cortex-2 and cortex-3 both connect to cortex-1 @@ -109,7 +115,7 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { require.NoError(t, s.Stop(cortex3)) } -func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService { +func newSingleBinary(name string, servername string, join string, testFlags map[string]string) *e2ecortex.CortexService { flags := map[string]string{ "-ingester.final-sleep": "0s", "-ingester.join-after": "0s", // join quickly @@ -132,6 +138,7 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor mergeFlags( ChunksStorageFlags(), flags, + testFlags, getTLSFlagsWithPrefix("memberlist", servername, servername == ""), ), "", @@ -170,7 +177,7 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { if i > 0 { join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000) } - c := newSingleBinary(name, "", join) + c := newSingleBinary(name, "", join, nil) require.NoError(t, s.StartAndWaitReady(c)) instances = append(instances, c) } diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 7e45868bd9..79432d4668 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -136,6 +136,7 @@ type KVConfig struct { GossipNodes int `yaml:"gossip_nodes"` GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"` + EnableCompression bool `yaml:"compression_enabled"` // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` @@ -187,6 +188,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.GossipToTheDeadTime, prefix+"memberlist.gossip-to-dead-nodes-time", mlDefaults.GossipToTheDeadTime, "How long to keep gossiping to dead nodes, to give them chance to refute their death.") f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") + f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") cfg.TCPTransport.RegisterFlags(f, prefix) } @@ -380,6 +382,7 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.GossipNodes = m.cfg.GossipNodes mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime + mlCfg.EnableCompression = m.cfg.EnableCompression if m.cfg.NodeName != "" { mlCfg.Name = m.cfg.NodeName