Skip to content

Commit

Permalink
etcd: switch lock session to lease manager
Browse files Browse the repository at this point in the history
This commit drops the currently used lock session acquisition logic and
replaces it with a new instance of the lease manager, adapting the call
sites. The lock session is still established regardless of whether the
client is used in normal or read-only mode, which will be improved while
subsequently refactoring the status checking logic.

Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 authored and julianwiedmann committed Nov 16, 2023
1 parent 8d18fc9 commit c6eb358
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 172 deletions.
208 changes: 39 additions & 169 deletions pkg/kvstore/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"sigs.k8s.io/yaml"

"github.com/cilium/cilium/pkg/backoff"
"github.com/cilium/cilium/pkg/contexthelpers"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/inctimer"
"github.com/cilium/cilium/pkg/lock"
Expand Down Expand Up @@ -62,8 +60,6 @@ const (
// by ListAndWatch operations. A 0 value equals to no limit.
EtcdListLimitOption = "etcd.limit"

etcdLockSessionRenewNamePrefix = "kvstore-etcd-lock-session-renew"

// etcdMaxKeysPerLease is the maximum number of keys that can be attached to a lease
etcdMaxKeysPerLease = 1000
)
Expand All @@ -74,8 +70,6 @@ var (
ErrLockLeaseExpired = errors.New("transaction did not succeed: lock lease expired")

randGen = rand.NewSafeRand(time.Now().UnixNano())

etcdLockSessionRenewControllerGroup = controller.NewGroup(etcdLockSessionRenewNamePrefix)
)

type etcdModule struct {
Expand Down Expand Up @@ -346,8 +340,7 @@ type etcdClient struct {
// stopStatusChecker is closed when the status checker can be terminated
stopStatusChecker chan struct{}

client *client.Client
controllers *controller.Manager
client *client.Client

// config and configPath are initialized once and never written to again, they can be accessed without locking
config *client.Config
Expand All @@ -359,11 +352,13 @@ type etcdClient struct {
// protects all sessions and sessionErr from concurrent access
lock.RWMutex

sessionErr error
lockSession *concurrency.Session
lockSessionCancel context.CancelFunc
sessionErr error

// leaseManager manages the acquisition of etcd leases for generic purposes
leaseManager *etcdLeaseManager
// lockLeaseManager manages the acquisition of etcd leases for locking
// purposes, associated with a shorter TTL
lockLeaseManager *etcdLeaseManager

// statusLock protects latestStatusSnapshot and latestErrorStatus for
// read/write access
Expand All @@ -389,10 +384,12 @@ type etcdClient struct {
}

type etcdMutex struct {
mutex *concurrency.Mutex
mutex *concurrency.Mutex
onUnlock func()
}

func (e *etcdMutex) Unlock(ctx context.Context) error {
e.onUnlock()
return e.mutex.Unlock(ctx)
}

Expand All @@ -405,36 +402,6 @@ func (e *etcdClient) StatusCheckErrors() <-chan error {
return e.statusCheckErrors
}

// GetLockSessionLeaseID returns the current lease ID for the lock session.
func (e *etcdClient) GetLockSessionLeaseID() client.LeaseID {
e.RWMutex.RLock()
l := e.lockSession.Lease()
e.RWMutex.RUnlock()
return l
}

// checkSession verifies if the lease is still valid from the return error of
// an etcd API call. If the error explicitly states that a lease was not found
// we mark the session has an orphan for this etcd client. If we would not mark
// it as an Orphan() the session would be considered expired after the leaseTTL
// By make it orphan we guarantee the session will be marked to be renewed.
func (e *etcdClient) checkLockSession(err error, leaseID client.LeaseID) {
if errors.Is(err, v3rpcErrors.ErrLeaseNotFound) {
e.closeLockSession(leaseID)
}
}

// closeSession closes the current session.
func (e *etcdClient) closeLockSession(leaseID client.LeaseID) {
e.RWMutex.RLock()
// only mark a session as orphan if the leaseID is the same as the
// session ID to avoid making any other sessions as orphan.
if e.lockSession.Lease() == leaseID {
e.lockSession.Orphan()
}
e.RWMutex.RUnlock()
}

func (e *etcdClient) waitForInitLock(ctx context.Context) <-chan error {
initLockSucceeded := make(chan error)

Expand Down Expand Up @@ -494,25 +461,13 @@ func (e *etcdClient) isConnectedAndHasQuorum(ctx context.Context) error {
return fmt.Errorf("timeout while waiting for initial connection")
}

e.RLock()
ch := e.lockSession.Done()
e.RUnlock()

initLockSucceeded := e.waitForInitLock(ctxTimeout)
select {
// Catch disconnect event, no success
case <-ch:
recordQuorumError("session timeout")
return fmt.Errorf("etcd session ended")
// wait for initial lock to succeed
case err := <-initLockSucceeded:
if err != nil {
recordQuorumError("lock timeout")
return fmt.Errorf("unable to acquire lock: %w", err)
}

return nil
if err := <-initLockSucceeded; err != nil {
recordQuorumError("lock timeout")
return fmt.Errorf("unable to acquire lock: %w", err)
}

return nil
}

// Connected closes the returned channel when the etcd client is connected. If
Expand Down Expand Up @@ -546,68 +501,16 @@ func (e *etcdClient) Connected(ctx context.Context) <-chan error {
// connected with the kvstore.
func (e *etcdClient) Disconnected() <-chan struct{} {
<-e.firstSession
e.RLock()
ch := e.lockSession.Done()
e.RUnlock()
return ch
}

func (e *etcdClient) renewLockSession(ctx context.Context) error {
if err := e.waitForInitialSession(ctx); err != nil {
return err
}

e.RWMutex.RLock()
lockSessionChan := e.lockSession.Done()
e.RWMutex.RUnlock()
for {
session, err := e.lockLeaseManager.GetSession(context.Background(), InitLockPath)
if err == nil {
return session.Done()
}

select {
// session has ended
case <-lockSessionChan:
// controller has stopped or etcd client is closing
case <-ctx.Done():
return nil
}
// This is an attempt to avoid concurrent access of a session that was
// already expired. It's not perfect as there is still a period between the
// e.lockSession.Done() is closed and the e.Lock() is held where parallel go
// routines can get a lease ID of an already expired lease.
e.Lock()

if e.lockSessionCancel != nil {
e.lockSessionCancel()
e.lockSessionCancel = nil
}

// Create a context representing the lifetime of the lock session. It
// will timeout if the session creation does not succeed in time and
// persists until any of the below conditions are met:
// - The parent context is cancelled due to the etcd client closing or
// the controller being shut down
// - The above call to sessionCancel() cancels the session due to the
// session ending and requiring renewal.
sessionContext, sessionCancel, sessionSuccess := contexthelpers.NewConditionalTimeoutContext(ctx, statusCheckTimeout)
defer close(sessionSuccess)

newSession, err := concurrency.NewSession(
e.client,
concurrency.WithTTL(int(defaults.LockLeaseTTL.Seconds())),
concurrency.WithContext(sessionContext),
)
if err != nil {
e.UnlockIgnoreTime()
return fmt.Errorf("unable to renew etcd lock session: %s", err)
e.logger.WithError(err).Warning("Failed to acquire lock session")
time.Sleep(100 * time.Millisecond)
}
sessionSuccess <- true
log.Infof("Got new lock lease ID %x", newSession.Lease())

e.lockSession = newSession
e.lockSessionCancel = sessionCancel
e.UnlockIgnoreTime()

e.logger.WithField(fieldSession, newSession).Debug("Renewing etcd lock session")

return nil
}

func connectEtcdClient(ctx context.Context, config *client.Config, cfgPath string, errChan chan error, clientOptions clientOptions, opts *ExtraOptions) (BackendOperations, error) {
Expand Down Expand Up @@ -653,7 +556,6 @@ func connectEtcdClient(ctx context.Context, config *client.Config, cfgPath strin
return nil, err
}

var ls concurrency.Session
errorChan := make(chan error)

limiter := ciliumrate.NewAPILimiter(makeSessionName("etcd", opts), ciliumrate.APILimiterParameters{
Expand All @@ -666,9 +568,7 @@ func connectEtcdClient(ctx context.Context, config *client.Config, cfgPath strin
client: c,
config: config,
configPath: cfgPath,
lockSession: &ls,
firstSession: make(chan struct{}),
controllers: controller.NewManager(),
latestStatusSnapshot: "Waiting for initial connection to be established",
stopStatusChecker: make(chan struct{}),
extraOptions: opts,
Expand All @@ -689,20 +589,17 @@ func connectEtcdClient(ctx context.Context, config *client.Config, cfgPath strin
}

ec.leaseManager = newEtcdLeaseManager(c, leaseTTL, etcdMaxKeysPerLease, ec.expiredLeaseObserver, ec.logger)
ec.lockLeaseManager = newEtcdLeaseManager(c, defaults.LockLeaseTTL, etcdMaxKeysPerLease, nil, ec.logger)

// create session in parallel as this is a blocking operation
go func() {
lockSession, err := concurrency.NewSession(c, concurrency.WithTTL(int(defaults.LockLeaseTTL.Seconds())))
ls, err := ec.lockLeaseManager.GetSession(ctx, InitLockPath)
if err != nil {
errorChan <- err
close(errorChan)
return
}

ec.RWMutex.Lock()
ls = *lockSession
ec.RWMutex.Unlock()

log.Infof("Got lock lease ID %x", ls.Lease())
close(errorChan)
}()
Expand Down Expand Up @@ -778,19 +675,6 @@ func connectEtcdClient(ctx context.Context, config *client.Config, cfgPath strin
}
}()

ec.controllers.UpdateController(
makeSessionName(etcdLockSessionRenewNamePrefix, opts),
controller.ControllerParams{
Group: etcdLockSessionRenewControllerGroup,
// Stop controller function when etcd client is terminating
Context: ec.client.Ctx(),
DoFunc: func(ctx context.Context) error {
return ec.renewLockSession(ctx)
},
RunInterval: time.Duration(10) * time.Millisecond,
},
)

return ec, nil
}

Expand Down Expand Up @@ -824,28 +708,25 @@ func (e *etcdClient) waitForInitialSession(ctx context.Context) error {
}

func (e *etcdClient) LockPath(ctx context.Context, path string) (KVLocker, error) {
if err := e.waitForInitialSession(ctx); err != nil {
return nil, err
}

// Create the context first so that if a connectivity issue causes the
// RLock acquisition below to block, this timeout will run concurrently
// with the timeouts in renewSession() rather than running serially.
// Create the context first, so that the timeout also accounts for the time
// possibly required to acquire a new session (if not already established).
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

e.RLock()
mu := concurrency.NewMutex(e.lockSession, path)
leaseID := e.lockSession.Lease()
e.RUnlock()
session, err := e.lockLeaseManager.GetSession(ctx, path)
if err != nil {
return nil, Hint(err)
}

err := mu.Lock(ctx)
mu := concurrency.NewMutex(session, path)
err = mu.Lock(ctx)
if err != nil {
e.checkLockSession(err, leaseID)
e.lockLeaseManager.CancelIfExpired(err, session.Lease())
return nil, Hint(err)
}

return &etcdMutex{mutex: mu}, nil
release := func() { e.lockLeaseManager.Release(path) }
return &etcdMutex{mutex: mu, onUnlock: release}, nil
}

func (e *etcdClient) DeletePrefix(ctx context.Context, path string) (err error) {
Expand Down Expand Up @@ -1151,7 +1032,6 @@ func (e *etcdClient) statusChecker() {
allConnected := len(endpoints) == ok

e.RWMutex.RLock()
lockSessionLeaseID := e.lockSession.Lease()
lastHeartbeat := e.lastHeartbeat
e.RWMutex.RUnlock()

Expand Down Expand Up @@ -1181,8 +1061,8 @@ func (e *etcdClient) statusChecker() {
e.latestStatusSnapshot = e.latestErrorStatus.Error()
default:
e.latestErrorStatus = nil
e.latestStatusSnapshot = fmt.Sprintf("etcd: %d/%d connected, leases=%d, lock lease-ID=%x, has-quorum=%s: %s",
ok, len(endpoints), e.leaseManager.TotalLeases(), lockSessionLeaseID, quorumString, strings.Join(newStatus, "; "))
e.latestStatusSnapshot = fmt.Sprintf("etcd: %d/%d connected, leases=%d, lock leases=%d, has-quorum=%s: %s",
ok, len(endpoints), e.leaseManager.TotalLeases(), e.leaseManager.TotalLeases(), quorumString, strings.Join(newStatus, "; "))
}

e.statusLock.Unlock()
Expand Down Expand Up @@ -1790,25 +1670,15 @@ func (e *etcdClient) ListPrefix(ctx context.Context, prefix string) (v KeyValueP
// Close closes the etcd session
func (e *etcdClient) Close(ctx context.Context) {
close(e.stopStatusChecker)
sessionErr := e.waitForInitialSession(ctx)
if e.controllers != nil {
e.controllers.RemoveAll()
}
e.RLock()
defer e.RUnlock()
// Only close e.lockSession if the initial session was successful
if sessionErr == nil {
if err := e.lockSession.Close(); err != nil {
e.logger.WithError(err).Warning("Failed to revoke lock session while closing etcd client")
}
}
e.waitForInitialSession(ctx)

if err := e.client.Close(); err != nil {
e.logger.WithError(err).Warning("Failed to close etcd client")
}

// Wait until all child goroutines spawned by the lease manager have terminated.
// Wait until all child goroutines spawned by the lease managers have terminated.
e.leaseManager.Wait()
e.lockLeaseManager.Wait()
}

// GetCapabilities returns the capabilities of the backend
Expand Down
3 changes: 0 additions & 3 deletions pkg/kvstore/logfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ const (
// key revision
fieldRev = "revision"

// fieldSession refers to a connection/session with the kvstore
fieldSession = "session"

// fieldPrefix is the prefix of the key used in the operation
fieldPrefix = "prefix"

Expand Down

0 comments on commit c6eb358

Please sign in to comment.