From 0678d405fd6e9f26109107829ee4268fc93ce955 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Fri, 13 May 2022 08:08:14 +0200 Subject: [PATCH 1/4] add agency leader discovery --- CHANGELOG.md | 1 + pkg/deployment/agency/cache.go | 128 +++++++++++++++--- pkg/deployment/client/client_cache.go | 11 +- pkg/deployment/context_impl.go | 16 ++- pkg/deployment/deployment.go | 49 ++++--- pkg/deployment/reconcile/action_context.go | 9 +- pkg/deployment/reconcile/plan_builder_test.go | 5 +- pkg/deployment/reconciler/context.go | 2 +- pkg/util/strings.go | 11 ++ 9 files changed, 170 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 869cbb5d0..53e7b0022 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - (Feature) Allow raw json value for license token-v2 - (Update) Replace `beta.kubernetes.io/arch` to `kubernetes.io/arch` in Operator Chart - (Feature) Add operator shutdown handler for graceful termination +- (Feature) Add agency leader discovery ## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10) - (Feature) Add CoreV1 Endpoints Inspector diff --git a/pkg/deployment/agency/cache.go b/pkg/deployment/agency/cache.go index 8a518bbc4..ffb16ba1d 100644 --- a/pkg/deployment/agency/cache.go +++ b/pkg/deployment/agency/cache.go @@ -22,16 +22,18 @@ package agency import ( "context" + "fmt" "sync" "github.com/arangodb/go-driver/agency" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/util/errors" ) type Cache interface { - Reload(ctx context.Context, client agency.Agency) (uint64, error) + Reload(ctx context.Context, clients []agency.Agency) (uint64, error) Data() (State, bool) - CommitIndex() uint64 + GetLeaderID() string } func NewCache(mode *api.DeploymentMode) Cache { @@ -53,11 +55,12 @@ func NewSingleCache() Cache { type cacheSingle struct { } -func (c cacheSingle) CommitIndex() uint64 { - return 0 +// GetLeaderID returns always empty string for a single cache. +func (c cacheSingle) GetLeaderID() string { + return "" } -func (c cacheSingle) Reload(ctx context.Context, client agency.Agency) (uint64, error) { +func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) { return 0, nil } @@ -66,48 +69,135 @@ func (c cacheSingle) Data() (State, bool) { } type cache struct { - lock sync.Mutex + lock sync.RWMutex valid bool commitIndex uint64 data State -} -func (c *cache) CommitIndex() uint64 { - return c.commitIndex + leaderID string } func (c *cache) Data() (State, bool) { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() return c.data, c.valid } -func (c *cache) Reload(ctx context.Context, client agency.Agency) (uint64, error) { +// GetLeaderID returns a leader ID or empty string if a leader is not known. +func (c *cache) GetLeaderID() string { + return c.leaderID +} + +func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) { c.lock.Lock() defer c.lock.Unlock() - cfg, err := getAgencyConfig(ctx, client) + leaderCli, leaderConfig, err := getLeader(ctx, clients) if err != nil { + // Invalidate a leader ID and agency state. + // In the next iteration leaderID will be sat because `valid` will be false. + c.leaderID = "" c.valid = false + return 0, err } - if cfg.CommitIndex == c.commitIndex && c.valid { + if leaderConfig.CommitIndex == c.commitIndex && c.valid { // We are on same index, nothing to do - return cfg.CommitIndex, err + return leaderConfig.CommitIndex, nil } - if data, err := loadState(ctx, client); err != nil { + // A leader should be known even if an agency state is invalid. + c.leaderID = leaderConfig.LeaderId + if data, err := loadState(ctx, leaderCli); err != nil { c.valid = false - return cfg.CommitIndex, err + return leaderConfig.CommitIndex, err } else { c.data = data c.valid = true - c.commitIndex = cfg.CommitIndex - return cfg.CommitIndex, nil + c.commitIndex = leaderConfig.CommitIndex + return leaderConfig.CommitIndex, nil + } +} + +// getLeader returns config and client to a leader agency. +// If there is no quorum for the leader then error is returned. +func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, error) { + var mutex sync.Mutex + var anyError error + var wg sync.WaitGroup + + cliLen := len(clients) + if cliLen == 0 { + return nil, nil, errors.New("empty list of agencies' clients") + } + configs := make([]*agencyConfig, cliLen) + leaders := make(map[string]int) + + // Fetch all configs from agencies. + wg.Add(cliLen) + for i, cli := range clients { + go func(iLocal int, cliLocal agency.Agency) { + defer wg.Done() + config, err := getAgencyConfig(ctx, cliLocal) + + mutex.Lock() + defer mutex.Unlock() + + if err != nil { + anyError = err + return + } else if config == nil || config.LeaderId == "" { + anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints()) + return + } + + // Write config on the same index where client is (It will be helpful later). + configs[iLocal] = config + // Count leaders. + leaders[config.LeaderId]++ + }(i, cli) + } + wg.Wait() + + if len(leaders) == 0 { + return nil, nil, wrapError(anyError, "failed to get config from agencies") } + + // Find the leader ID which has the most votes from all agencies. + maxVotes := 0 + var leaderID string + for id, votes := range leaders { + if votes > maxVotes { + maxVotes = votes + leaderID = id + } + } + + // Check if a leader has quorum from all possible agencies. + if maxVotes <= cliLen/2 { + message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen) + return nil, nil, wrapError(anyError, message) + } + + // From here on, a leader with quorum is known. + for i, config := range configs { + if config != nil && config.LeaderId == leaderID { + return clients[i], config, nil + } + } + + return nil, nil, wrapError(anyError, "the leader is not responsive") +} + +func wrapError(err error, message string) error { + if err != nil { + return errors.WithMessage(err, message) + } + + return errors.New(message) } diff --git a/pkg/deployment/client/client_cache.go b/pkg/deployment/client/client_cache.go index 763c9ef07..f728f610a 100644 --- a/pkg/deployment/client/client_cache.go +++ b/pkg/deployment/client/client_cache.go @@ -31,6 +31,7 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/apis/shared" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" + "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" @@ -46,7 +47,7 @@ type Cache interface { Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) GetDatabase(ctx context.Context) (driver.Client, error) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) - GetAgency(ctx context.Context) (agency.Agency, error) + GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) } type CacheGen interface { @@ -167,13 +168,19 @@ func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWra } // GetAgency returns a cached client for the agency -func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) { +func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agency, error) { cc.mutex.Lock() defer cc.mutex.Unlock() // Not found, create a new client var dnsNames []string for _, m := range cc.in.GetStatusSnapshot().Members.Agents { + if len(agencyIDs) > 0 { + if !util.IsStringInSlice(m.ID, agencyIDs) { + continue + } + } + endpoint, err := cc.in.GenerateMemberEndpoint(api.ServerGroupAgents, m) if err != nil { return nil, err diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 5228a442c..5b392da71 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -30,9 +30,10 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/globals" - "github.com/arangodb/kube-arangodb/pkg/deployment/patch" "k8s.io/apimachinery/pkg/types" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" "github.com/arangodb/kube-arangodb/pkg/util/errors" @@ -51,6 +52,10 @@ import ( apiErrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/rs/zerolog/log" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/arangodb/arangosync-client/client" "github.com/arangodb/arangosync-client/tasks" driver "github.com/arangodb/go-driver" @@ -70,9 +75,6 @@ import ( serviceaccountv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount/v1" servicemonitorv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor/v1" "github.com/arangodb/kube-arangodb/pkg/util/kclient" - "github.com/rs/zerolog/log" - core "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ resources.Context = &Deployment{} @@ -234,9 +236,9 @@ func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicat return result, nil } -// GetAgency returns a connection to the entire agency. -func (d *Deployment) GetAgency(ctx context.Context) (agency.Agency, error) { - return d.clientCache.GetAgency(ctx) +// GetAgency returns a connection to the agency. +func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) { + return d.clientCache.GetAgency(ctx, agencyIDs...) } func (d *Deployment) getConnConfig() (http.ConnectionConfig, error) { diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 016cb4335..d8dc4bdf2 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -27,42 +27,35 @@ import ( "sync/atomic" "time" - "github.com/arangodb/kube-arangodb/pkg/util/globals" - - "github.com/arangodb/kube-arangodb/pkg/deployment/agency" - - inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" - - deploymentClient "github.com/arangodb/kube-arangodb/pkg/deployment/client" - "github.com/arangodb/kube-arangodb/pkg/util/errors" - - "github.com/arangodb/kube-arangodb/pkg/deployment/patch" - "k8s.io/apimachinery/pkg/types" - - "github.com/arangodb/kube-arangodb/pkg/operator/scope" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" - - "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod" - - "github.com/arangodb/arangosync-client/client" "github.com/rs/zerolog" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "github.com/arangodb/arangosync-client/client" + agencydriver "github.com/arangodb/go-driver/agency" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/acs" "github.com/arangodb/kube-arangodb/pkg/deployment/acs/sutil" + "github.com/arangodb/kube-arangodb/pkg/deployment/agency" "github.com/arangodb/kube-arangodb/pkg/deployment/chaos" + deploymentClient "github.com/arangodb/kube-arangodb/pkg/deployment/client" memberState "github.com/arangodb/kube-arangodb/pkg/deployment/member" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/deployment/resilience" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" + "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" + "github.com/arangodb/kube-arangodb/pkg/operator/scope" "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/globals" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/trigger" ) @@ -169,11 +162,17 @@ func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) { lCtx, c := globals.GetGlobalTimeouts().Agency().WithTimeout(ctx) defer c() - a, err := d.GetAgency(lCtx) - if err != nil { - return 0, err + var clients []agencydriver.Agency + for _, m := range d.GetStatusSnapshot().Members.Agents { + a, err := d.GetAgency(lCtx, m.ID) + if err != nil { + return 0, err + } + + clients = append(clients, a) } - return d.agencyCache.Reload(lCtx, a) + + return d.agencyCache.Reload(lCtx, clients) } func (d *Deployment) SetAgencyMaintenanceMode(ctx context.Context, enabled bool) error { diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 71bd3fbc0..7f8e49e14 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -22,6 +22,7 @@ package reconcile import ( "context" + "time" "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver/agency" @@ -29,8 +30,6 @@ import ( "github.com/rs/zerolog/log" core "k8s.io/api/core/v1" - "time" - "github.com/arangodb/go-driver" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -315,9 +314,9 @@ func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connect return c, nil } -// GetAgency returns a connection to the entire agency. -func (ac *actionContext) GetAgency(ctx context.Context) (agency.Agency, error) { - a, err := ac.context.GetAgency(ctx) +// GetAgency returns a connection to the agency. +func (ac *actionContext) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) { + a, err := ac.context.GetAgency(ctx, agencyIDs...) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 0e1d02227..b5d36e4ba 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -25,6 +25,7 @@ import ( "fmt" "io/ioutil" "testing" + "time" monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" "github.com/rs/zerolog" @@ -37,8 +38,6 @@ import ( "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver/agency" - "time" - "github.com/arangodb/go-driver" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -308,7 +307,7 @@ func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup panic("implement me") } -func (c *testContext) GetAgency(ctx context.Context) (agency.Agency, error) { +func (c *testContext) GetAgency(_ context.Context, _ ...string) (agency.Agency, error) { panic("implement me") } diff --git a/pkg/deployment/reconciler/context.go b/pkg/deployment/reconciler/context.go index b6b4719fd..d558a4277 100644 --- a/pkg/deployment/reconciler/context.go +++ b/pkg/deployment/reconciler/context.go @@ -160,7 +160,7 @@ type DeploymentAgencyClient interface { // GetAgencyClientsWithPredicate returns a client connection for every agency member which match condition. GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) // GetAgency returns a connection to the entire agency. - GetAgency(ctx context.Context) (agency.Agency, error) + GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) } type DeploymentDatabaseClient interface { diff --git a/pkg/util/strings.go b/pkg/util/strings.go index b4bc5376a..c0bf68fa1 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -86,3 +86,14 @@ func DiffStrings(compareWhat, compareTo []string) []string { return append(diff, DiffStringsOneWay(compareTo, compareWhat)...) } + +// IsStringInSlice returns true if haystack contains needle. +func IsStringInSlice(needle string, haystack []string) bool { + for _, v := range haystack { + if v == needle { + return true + } + } + + return false +} From b187bd5ee924cc496cff04d8f6de181e7c811278 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Fri, 13 May 2022 10:14:43 +0200 Subject: [PATCH 2/4] check agency healthy --- pkg/deployment/agency/cache.go | 91 ++++++++++++++++++++++++--- pkg/deployment/client/client_cache.go | 4 +- pkg/util/strings.go | 11 ---- 3 files changed, 84 insertions(+), 22 deletions(-) diff --git a/pkg/deployment/agency/cache.go b/pkg/deployment/agency/cache.go index ffb16ba1d..0d20b22d8 100644 --- a/pkg/deployment/agency/cache.go +++ b/pkg/deployment/agency/cache.go @@ -24,16 +24,50 @@ import ( "context" "fmt" "sync" + "time" "github.com/arangodb/go-driver/agency" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/util/errors" ) +type health map[string]uint64 + +// IsHealthy returns true if all agencies have the same commit index. +// Returns false when: +// - agencies' list is empty. +// - agencies have different commit indices. +// - agencies have commit indices == 0. +func (h health) IsHealthy() bool { + var globalCommitIndex uint64 + first := true + + for _, commitIndex := range h { + if first { + globalCommitIndex = commitIndex + first = false + } else if commitIndex != globalCommitIndex { + return false + } + } + + return globalCommitIndex != 0 +} + +// Health describes interface to check healthy of the environment. +type Health interface { + // IsHealthy return true when environment is considered as healthy. + IsHealthy() bool +} + type Cache interface { Reload(ctx context.Context, clients []agency.Agency) (uint64, error) Data() (State, bool) + CommitIndex() uint64 + // GetLeaderID returns a leader ID. GetLeaderID() string + // Health returns true when healthy object is available. + Health() (Health, bool) } func NewCache(mode *api.DeploymentMode) Cache { @@ -55,11 +89,20 @@ func NewSingleCache() Cache { type cacheSingle struct { } +func (c cacheSingle) CommitIndex() uint64 { + return 0 +} + // GetLeaderID returns always empty string for a single cache. func (c cacheSingle) GetLeaderID() string { return "" } +// Health returns always false for single cache. +func (c cacheSingle) Health() (Health, bool) { + return nil, false +} + func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) { return 0, nil } @@ -77,9 +120,18 @@ type cache struct { data State + health Health + leaderID string } +func (c *cache) CommitIndex() uint64 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.commitIndex +} + func (c *cache) Data() (State, bool) { c.lock.RLock() defer c.lock.RUnlock() @@ -89,14 +141,29 @@ func (c *cache) Data() (State, bool) { // GetLeaderID returns a leader ID or empty string if a leader is not known. func (c *cache) GetLeaderID() string { + c.lock.RLock() + defer c.lock.RUnlock() + return c.leaderID } +// Health returns always false for single cache. +func (c *cache) Health() (Health, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + if c.health != nil { + return c.health, true + } + + return nil, false +} + func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) { c.lock.Lock() defer c.lock.Unlock() - leaderCli, leaderConfig, err := getLeader(ctx, clients) + leaderCli, leaderConfig, health, err := getLeader(ctx, clients) if err != nil { // Invalidate a leader ID and agency state. // In the next iteration leaderID will be sat because `valid` will be false. @@ -106,6 +173,7 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er return 0, err } + c.health = health if leaderConfig.CommitIndex == c.commitIndex && c.valid { // We are on same index, nothing to do return leaderConfig.CommitIndex, nil @@ -124,26 +192,30 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er } } -// getLeader returns config and client to a leader agency. +// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page. // If there is no quorum for the leader then error is returned. -func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, error) { +func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) { var mutex sync.Mutex var anyError error var wg sync.WaitGroup cliLen := len(clients) if cliLen == 0 { - return nil, nil, errors.New("empty list of agencies' clients") + return nil, nil, nil, errors.New("empty list of agencies' clients") } configs := make([]*agencyConfig, cliLen) leaders := make(map[string]int) + h := make(health) // Fetch all configs from agencies. wg.Add(cliLen) for i, cli := range clients { go func(iLocal int, cliLocal agency.Agency) { defer wg.Done() - config, err := getAgencyConfig(ctx, cliLocal) + + ctxLocal, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + config, err := getAgencyConfig(ctxLocal, cliLocal) mutex.Lock() defer mutex.Unlock() @@ -160,12 +232,13 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag configs[iLocal] = config // Count leaders. leaders[config.LeaderId]++ + h[config.LeaderId] = config.CommitIndex }(i, cli) } wg.Wait() if len(leaders) == 0 { - return nil, nil, wrapError(anyError, "failed to get config from agencies") + return nil, nil, nil, wrapError(anyError, "failed to get config from agencies") } // Find the leader ID which has the most votes from all agencies. @@ -181,17 +254,17 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag // Check if a leader has quorum from all possible agencies. if maxVotes <= cliLen/2 { message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen) - return nil, nil, wrapError(anyError, message) + return nil, nil, nil, wrapError(anyError, message) } // From here on, a leader with quorum is known. for i, config := range configs { if config != nil && config.LeaderId == leaderID { - return clients[i], config, nil + return clients[i], config, h, nil } } - return nil, nil, wrapError(anyError, "the leader is not responsive") + return nil, nil, nil, wrapError(anyError, "the leader is not responsive") } func wrapError(err error, message string) error { diff --git a/pkg/deployment/client/client_cache.go b/pkg/deployment/client/client_cache.go index f728f610a..bfb388bfc 100644 --- a/pkg/deployment/client/client_cache.go +++ b/pkg/deployment/client/client_cache.go @@ -31,7 +31,7 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/apis/shared" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" - "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/handlers/utils" "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" @@ -176,7 +176,7 @@ func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agenc var dnsNames []string for _, m := range cc.in.GetStatusSnapshot().Members.Agents { if len(agencyIDs) > 0 { - if !util.IsStringInSlice(m.ID, agencyIDs) { + if !utils.StringList(agencyIDs).Has(m.ID) { continue } } diff --git a/pkg/util/strings.go b/pkg/util/strings.go index c0bf68fa1..b4bc5376a 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -86,14 +86,3 @@ func DiffStrings(compareWhat, compareTo []string) []string { return append(diff, DiffStringsOneWay(compareTo, compareWhat)...) } - -// IsStringInSlice returns true if haystack contains needle. -func IsStringInSlice(needle string, haystack []string) bool { - for _, v := range haystack { - if v == needle { - return true - } - } - - return false -} From a9fd7356e05abf9dc095270bd65b79b7ebe15a25 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Fri, 13 May 2022 11:14:10 +0200 Subject: [PATCH 3/4] fix for agency health map --- pkg/deployment/agency/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/deployment/agency/cache.go b/pkg/deployment/agency/cache.go index 0d20b22d8..787924fcf 100644 --- a/pkg/deployment/agency/cache.go +++ b/pkg/deployment/agency/cache.go @@ -232,7 +232,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag configs[iLocal] = config // Count leaders. leaders[config.LeaderId]++ - h[config.LeaderId] = config.CommitIndex + h[config.Configuration.ID] = config.CommitIndex }(i, cli) } wg.Wait() From c7d8499855c357363f9a23021c5494fc476ca234 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Mon, 23 May 2022 08:13:21 +0200 Subject: [PATCH 4/4] fix for agency leader discovery --- pkg/deployment/agency/cache.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/deployment/agency/cache.go b/pkg/deployment/agency/cache.go index 787924fcf..bf81a1ab9 100644 --- a/pkg/deployment/agency/cache.go +++ b/pkg/deployment/agency/cache.go @@ -237,6 +237,10 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag } wg.Wait() + if anyError != nil { + return nil, nil, nil, wrapError(anyError, "not all agencies are responsive") + } + if len(leaders) == 0 { return nil, nil, nil, wrapError(anyError, "failed to get config from agencies") } @@ -259,7 +263,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag // From here on, a leader with quorum is known. for i, config := range configs { - if config != nil && config.LeaderId == leaderID { + if config != nil && config.Configuration.ID == leaderID { return clients[i], config, h, nil } }