From 05cd21cbfb42546ea108ee81129891090a29f629 Mon Sep 17 00:00:00 2001 From: Forrest Marshall Date: Thu, 13 Jul 2023 15:23:37 +0000 Subject: [PATCH 1/2] etcd client pool --- lib/backend/etcdbk/etcd.go | 81 +++++++++++++--------- lib/backend/etcdbk/etcd_test.go | 2 +- lib/backend/etcdbk/helpers.go | 39 +++++++++++ lib/backend/etcdbk/helpers_test.go | 108 +++++++++++++++++++++++++++++ 4 files changed, 197 insertions(+), 33 deletions(-) create mode 100644 lib/backend/etcdbk/helpers.go create mode 100644 lib/backend/etcdbk/helpers_test.go diff --git a/lib/backend/etcdbk/etcd.go b/lib/backend/etcdbk/etcd.go index 3de5f7719b07c..d79f77920ed56 100644 --- a/lib/backend/etcdbk/etcd.go +++ b/lib/backend/etcdbk/etcd.go @@ -38,7 +38,6 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -53,6 +52,11 @@ import ( cq "github.com/gravitational/teleport/lib/utils/concurrentqueue" ) +const ( + // defaultClientPoolSize is the default number of etcd clients to use + defaultClientPoolSize = 3 +) + var ( writeRequests = prometheus.NewCounter( prometheus.CounterOpts{ @@ -140,7 +144,7 @@ type EtcdBackend struct { nodes []string *log.Entry cfg *Config - client *clientv3.Client + clients roundRobin[*clientv3.Client] cancelC chan bool stopC chan bool clock clockwork.Clock @@ -181,6 +185,8 @@ type Config struct { // MaxClientMsgSizeBytes optionally specifies the size limit on client send message size. // See https://github.com/etcd-io/etcd/blob/221f0cc107cb3497eeb20fb241e1bcafca2e9115/clientv3/config.go#L49 MaxClientMsgSizeBytes int `json:"etcd_max_client_msg_size_bytes,omitempty"` + // ClientPoolSize is the number of concurrent clients to use. + ClientPoolSize int `json:"client_pool_size,omitempty"` } // GetName returns the name of etcd backend as it appears in 'storage/type' section @@ -265,6 +271,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke b := &EtcdBackend{ Entry: log.WithFields(log.Fields{trace.Component: GetName()}), cfg: cfg, + clients: newRoundRobin[*clientv3.Client](nil), // initialized below in reconnect() nodes: cfg.Nodes, cancelC: make(chan bool, 1), stopC: make(chan bool, 1), @@ -284,7 +291,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke timeout, cancel := context.WithTimeout(ctx, time.Second*3*time.Duration(len(cfg.Nodes))) defer cancel() for _, n := range cfg.Nodes { - status, err := b.client.Status(timeout, n) + status, err := b.clients.Next().Status(timeout, n) if err != nil { return nil, trace.Wrap(err) } @@ -339,6 +346,10 @@ func (cfg *Config) Validate() error { // trim newlines as passwords in files tend to have newlines cfg.Password = strings.TrimSpace(string(out)) } + + if cfg.ClientPoolSize < 1 { + cfg.ClientPoolSize = defaultClientPoolSize + } return nil } @@ -353,7 +364,11 @@ func (b *EtcdBackend) Clock() clockwork.Clock { func (b *EtcdBackend) Close() error { b.cancel() b.buf.Close() - return b.client.Close() + var errs []error + for _, clt := range b.clients.items { + errs = append(errs, clt.Close()) + } + return trace.NewAggregate(errs...) } // CloseWatchers closes all the watchers @@ -363,11 +378,12 @@ func (b *EtcdBackend) CloseWatchers() { } func (b *EtcdBackend) reconnect(ctx context.Context) error { - if b.client != nil { - if err := b.client.Close(); err != nil { + for _, clt := range b.clients.items { + if err := clt.Close(); err != nil { b.Entry.WithError(err).Warning("Failed closing existing etcd client on reconnect.") } } + b.clients.items = nil tlsConfig := utils.TLSConfig(nil) @@ -406,22 +422,23 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error { tlsConfig.ClientCAs = certPool } - clt, err := clientv3.New(clientv3.Config{ - Endpoints: b.nodes, - TLS: tlsConfig, - DialTimeout: b.cfg.DialTimeout, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, - Username: b.cfg.Username, - Password: b.cfg.Password, - MaxCallSendMsgSize: b.cfg.MaxClientMsgSizeBytes, - }) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return trace.WrapWithMessage(err, "timed out dialing etcd endpoints: %s", b.nodes) + for i := 0; i < b.cfg.ClientPoolSize; i++ { + clt, err := clientv3.New(clientv3.Config{ + Endpoints: b.nodes, + TLS: tlsConfig, + DialTimeout: b.cfg.DialTimeout, + Username: b.cfg.Username, + Password: b.cfg.Password, + MaxCallSendMsgSize: b.cfg.MaxClientMsgSizeBytes, + }) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return trace.WrapWithMessage(err, "timed out dialing etcd endpoints: %s", b.nodes) + } + return trace.Wrap(err) } - return trace.Wrap(err) + b.clients.items = append(b.clients.items, clt) } - b.client = clt return nil } @@ -495,7 +512,7 @@ func (b *EtcdBackend) watchEvents(ctx context.Context) error { emitDone := make(chan struct{}) // watcher must be registered before we initialize the buffer - eventsC := b.client.Watch(ctx, b.cfg.Key, clientv3.WithPrefix()) + eventsC := b.clients.Next().Watch(ctx, b.cfg.Key, clientv3.WithPrefix()) // set buffer to initialized state. b.buf.SetInit() @@ -585,7 +602,7 @@ func (b *EtcdBackend) GetRange(ctx context.Context, startKey, endKey []byte, lim opts = append(opts, clientv3.WithLimit(int64(limit))) } start := b.clock.Now() - re, err := b.client.Get(ctx, b.prependPrefix(startKey), opts...) + re, err := b.clients.Next().Get(ctx, b.prependPrefix(startKey), opts...) batchReadLatencies.Observe(time.Since(start).Seconds()) batchReadRequests.Inc() if err := convertErr(err); err != nil { @@ -618,7 +635,7 @@ func (b *EtcdBackend) Create(ctx context.Context, item backend.Item) (*backend.L } } start := b.clock.Now() - re, err := b.client.Txn(ctx). + re, err := b.clients.Next().Txn(ctx). If(clientv3.Compare(clientv3.CreateRevision(b.prependPrefix(item.Key)), "=", 0)). Then(clientv3.OpPut(b.prependPrefix(item.Key), base64.StdEncoding.EncodeToString(item.Value), opts...)). Commit() @@ -643,7 +660,7 @@ func (b *EtcdBackend) Update(ctx context.Context, item backend.Item) (*backend.L } } start := b.clock.Now() - re, err := b.client.Txn(ctx). + re, err := b.clients.Next().Txn(ctx). If(clientv3.Compare(clientv3.CreateRevision(b.prependPrefix(item.Key)), "!=", 0)). Then(clientv3.OpPut(b.prependPrefix(item.Key), base64.StdEncoding.EncodeToString(item.Value), opts...)). Commit() @@ -680,7 +697,7 @@ func (b *EtcdBackend) CompareAndSwap(ctx context.Context, expected backend.Item, encodedPrev := base64.StdEncoding.EncodeToString(expected.Value) start := b.clock.Now() - re, err := b.client.Txn(ctx). + re, err := b.clients.Next().Txn(ctx). If(clientv3.Compare(clientv3.Value(b.prependPrefix(expected.Key)), "=", encodedPrev)). Then(clientv3.OpPut(b.prependPrefix(expected.Key), base64.StdEncoding.EncodeToString(replaceWith.Value), opts...)). Commit() @@ -710,7 +727,7 @@ func (b *EtcdBackend) Put(ctx context.Context, item backend.Item) (*backend.Leas } } start := b.clock.Now() - _, err := b.client.Put( + _, err := b.clients.Next().Put( ctx, b.prependPrefix(item.Key), base64.StdEncoding.EncodeToString(item.Value), @@ -739,7 +756,7 @@ func (b *EtcdBackend) KeepAlive(ctx context.Context, lease backend.Lease, expire return trace.Wrap(err) } opts = append(opts, clientv3.WithIgnoreValue()) - _, err := b.client.Put(ctx, b.prependPrefix(lease.Key), "", opts...) + _, err := b.clients.Next().Put(ctx, b.prependPrefix(lease.Key), "", opts...) err = convertErr(err) if trace.IsNotFound(err) { return trace.NotFound("item %q is not found", string(lease.Key)) @@ -750,7 +767,7 @@ func (b *EtcdBackend) KeepAlive(ctx context.Context, lease backend.Lease, expire // Get returns a single item or not found error func (b *EtcdBackend) Get(ctx context.Context, key []byte) (*backend.Item, error) { - re, err := b.client.Get(ctx, b.prependPrefix(key)) + re, err := b.clients.Next().Get(ctx, b.prependPrefix(key)) if err != nil { return nil, convertErr(err) } @@ -768,7 +785,7 @@ func (b *EtcdBackend) Get(ctx context.Context, key []byte) (*backend.Item, error // Delete deletes item by key func (b *EtcdBackend) Delete(ctx context.Context, key []byte) error { start := b.clock.Now() - re, err := b.client.Delete(ctx, b.prependPrefix(key)) + re, err := b.clients.Next().Delete(ctx, b.prependPrefix(key)) writeLatencies.Observe(time.Since(start).Seconds()) writeRequests.Inc() if err != nil { @@ -790,7 +807,7 @@ func (b *EtcdBackend) DeleteRange(ctx context.Context, startKey, endKey []byte) return trace.BadParameter("missing parameter endKey") } start := b.clock.Now() - _, err := b.client.Delete(ctx, b.prependPrefix(startKey), clientv3.WithRange(b.prependPrefix(endKey))) + _, err := b.clients.Next().Delete(ctx, b.prependPrefix(startKey), clientv3.WithRange(b.prependPrefix(endKey))) writeLatencies.Observe(time.Since(start).Seconds()) writeRequests.Inc() if err != nil { @@ -814,7 +831,7 @@ func (b *EtcdBackend) setupLease(ctx context.Context, item backend.Item, lease * bucket := roundUp(item.Expires, b.leaseBucket) leaseID, err := utils.FnCacheGet(ctx, b.leaseCache, leaseKey{bucket: bucket}, func(ctx context.Context) (clientv3.LeaseID, error) { ttl := b.ttl(bucket) - elease, err := b.client.Grant(ctx, seconds(ttl)) + elease, err := b.clients.Next().Grant(ctx, seconds(ttl)) if err != nil { return 0, convertErr(err) } @@ -865,7 +882,7 @@ func (b *EtcdBackend) fromEvent(ctx context.Context, e clientv3.Event) (*backend // reduce the number of requests per shared ttl we cache the results per lease id. if e.Kv.Lease != 0 { ttl, err := utils.FnCacheGet(ctx, b.leaseCache, ttlKey{leaseID: e.Kv.Lease}, func(ctx context.Context) (int64, error) { - re, err := b.client.TimeToLive(ctx, clientv3.LeaseID(e.Kv.Lease)) + re, err := b.clients.Next().TimeToLive(ctx, clientv3.LeaseID(e.Kv.Lease)) if err != nil { return 0, convertErr(err) } diff --git a/lib/backend/etcdbk/etcd_test.go b/lib/backend/etcdbk/etcd_test.go index 301761bbb3431..0f79fbdd973fd 100644 --- a/lib/backend/etcdbk/etcd_test.go +++ b/lib/backend/etcdbk/etcd_test.go @@ -153,7 +153,7 @@ func TestPrefix(t *testing.T) { func requireKV(ctx context.Context, t *testing.T, bk *EtcdBackend, key, val string) { t.Logf("assert that key %q contains value %q", key, val) - resp, err := bk.client.Get(ctx, key) + resp, err := bk.clients.Next().Get(ctx, key) require.NoError(t, err) require.Len(t, resp.Kvs, 1) require.Equal(t, key, string(resp.Kvs[0].Key)) diff --git a/lib/backend/etcdbk/helpers.go b/lib/backend/etcdbk/helpers.go new file mode 100644 index 0000000000000..31e14558b5aae --- /dev/null +++ b/lib/backend/etcdbk/helpers.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcdbk + +import "sync/atomic" + +// roundRobin is a helper for distributing load across multiple resources in a round-robin +// fashion (used to implement simple client pooling). +type roundRobin[T any] struct { + ct *atomic.Uint64 + items []T +} + +func newRoundRobin[T any](items []T) roundRobin[T] { + return roundRobin[T]{ + ct: new(atomic.Uint64), + items: items, + } +} + +func (r roundRobin[T]) Next() T { + n := r.ct.Add(1) - 1 + l := uint64(len(r.items)) + return r.items[int(n%l)] +} diff --git a/lib/backend/etcdbk/helpers_test.go b/lib/backend/etcdbk/helpers_test.go new file mode 100644 index 0000000000000..bc775e0e93aed --- /dev/null +++ b/lib/backend/etcdbk/helpers_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2023 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcdbk + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRoundRobinConcurrent(t *testing.T) { + t.Parallel() + + const workers = 100 + const rounds = 100 + + rr := newRoundRobin([]bool{true, false}) + + var tct atomic.Uint64 + var fct atomic.Uint64 + + var wg sync.WaitGroup + + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for r := 0; r < rounds; r++ { + if rr.Next() { + tct.Add(1) + } else { + fct.Add(1) + } + } + }() + } + + wg.Wait() + + require.Equal(t, workers*rounds, int(tct.Load()+fct.Load())) + require.InDelta(t, tct.Load(), fct.Load(), 1.0) +} + +func TestRoundRobinSequential(t *testing.T) { + t.Parallel() + tts := []struct { + desc string + items []string + expect []string + }{ + { + desc: "single-item", + items: []string{"foo"}, + expect: []string{ + "foo", + "foo", + "foo", + }, + }, + { + desc: "multi-item", + items: []string{ + "foo", + "bar", + "bin", + "baz", + }, + expect: []string{ + "foo", + "bar", + "bin", + "baz", + "foo", + "bar", + "bin", + "baz", + "foo", + "bar", + "bin", + "baz", + }, + }, + } + for _, tt := range tts { + t.Run(tt.desc, func(t *testing.T) { + rr := newRoundRobin(tt.items) + for _, exp := range tt.expect { + require.Equal(t, exp, rr.Next()) + } + }) + } +} From 84d1157b9f4c509586cb26f1544a0d00ec922ad3 Mon Sep 17 00:00:00 2001 From: Forrest Marshall Date: Thu, 13 Jul 2023 16:38:39 +0000 Subject: [PATCH 2/2] bumb backlog grace period --- lib/backend/defaults.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/backend/defaults.go b/lib/backend/defaults.go index e723c573847fe..357ce94db5b44 100644 --- a/lib/backend/defaults.go +++ b/lib/backend/defaults.go @@ -22,10 +22,12 @@ const ( // DefaultBufferCapacity is a default circular buffer size // used by backends to fan out events DefaultBufferCapacity = 1024 - // DefaultBacklogGracePeriod is the default amount of time - // that the circular buffer will tolerate an event backlog - // in one of its watchers. - DefaultBacklogGracePeriod = time.Second * 30 + // DefaultBacklogGracePeriod is the default amount of time that the circular buffer + // will tolerate an event backlog in one of its watchers. Value was selected to be + // just under 1m since 1m is typically the highest rate that high volume events + // (e.g. heartbeats) are be created. If a watcher can't catch up in under a minute, + // it probably won't catch up. + DefaultBacklogGracePeriod = time.Second * 59 // DefaultPollStreamPeriod is a default event poll stream period DefaultPollStreamPeriod = time.Second // DefaultEventsTTL is a default events TTL period