diff --git a/CHANGELOG.md b/CHANGELOG.md index b6a70d680..5339ac9b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - (Feature) Allow raw json value for license token-v2 - (Update) Replace `beta.kubernetes.io/arch` to `kubernetes.io/arch` in Operator Chart - (Feature) Add operator shutdown handler for graceful termination +- (Feature) Add agency leader discovery - (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method ## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10) diff --git a/pkg/deployment/agency/cache.go b/pkg/deployment/agency/cache.go index 8a518bbc4..bf81a1ab9 100644 --- a/pkg/deployment/agency/cache.go +++ b/pkg/deployment/agency/cache.go @@ -22,16 +22,52 @@ package agency import ( "context" + "fmt" "sync" + "time" "github.com/arangodb/go-driver/agency" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/util/errors" ) +type health map[string]uint64 + +// IsHealthy returns true if all agencies have the same commit index. +// Returns false when: +// - agencies' list is empty. +// - agencies have different commit indices. +// - agencies have commit indices == 0. +func (h health) IsHealthy() bool { + var globalCommitIndex uint64 + first := true + + for _, commitIndex := range h { + if first { + globalCommitIndex = commitIndex + first = false + } else if commitIndex != globalCommitIndex { + return false + } + } + + return globalCommitIndex != 0 +} + +// Health describes interface to check healthy of the environment. +type Health interface { + // IsHealthy return true when environment is considered as healthy. + IsHealthy() bool +} + type Cache interface { - Reload(ctx context.Context, client agency.Agency) (uint64, error) + Reload(ctx context.Context, clients []agency.Agency) (uint64, error) Data() (State, bool) CommitIndex() uint64 + // GetLeaderID returns a leader ID. + GetLeaderID() string + // Health returns true when healthy object is available. + Health() (Health, bool) } func NewCache(mode *api.DeploymentMode) Cache { @@ -57,7 +93,17 @@ func (c cacheSingle) CommitIndex() uint64 { return 0 } -func (c cacheSingle) Reload(ctx context.Context, client agency.Agency) (uint64, error) { +// GetLeaderID returns always empty string for a single cache. +func (c cacheSingle) GetLeaderID() string { + return "" +} + +// Health returns always false for single cache. +func (c cacheSingle) Health() (Health, bool) { + return nil, false +} + +func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) { return 0, nil } @@ -66,48 +112,169 @@ func (c cacheSingle) Data() (State, bool) { } type cache struct { - lock sync.Mutex + lock sync.RWMutex valid bool commitIndex uint64 data State + + health Health + + leaderID string } func (c *cache) CommitIndex() uint64 { + c.lock.RLock() + defer c.lock.RUnlock() + return c.commitIndex } func (c *cache) Data() (State, bool) { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() return c.data, c.valid } -func (c *cache) Reload(ctx context.Context, client agency.Agency) (uint64, error) { +// GetLeaderID returns a leader ID or empty string if a leader is not known. +func (c *cache) GetLeaderID() string { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.leaderID +} + +// Health returns always false for single cache. +func (c *cache) Health() (Health, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + if c.health != nil { + return c.health, true + } + + return nil, false +} + +func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) { c.lock.Lock() defer c.lock.Unlock() - cfg, err := getAgencyConfig(ctx, client) + leaderCli, leaderConfig, health, err := getLeader(ctx, clients) if err != nil { + // Invalidate a leader ID and agency state. + // In the next iteration leaderID will be sat because `valid` will be false. + c.leaderID = "" c.valid = false + return 0, err } - if cfg.CommitIndex == c.commitIndex && c.valid { + c.health = health + if leaderConfig.CommitIndex == c.commitIndex && c.valid { // We are on same index, nothing to do - return cfg.CommitIndex, err + return leaderConfig.CommitIndex, nil } - if data, err := loadState(ctx, client); err != nil { + // A leader should be known even if an agency state is invalid. + c.leaderID = leaderConfig.LeaderId + if data, err := loadState(ctx, leaderCli); err != nil { c.valid = false - return cfg.CommitIndex, err + return leaderConfig.CommitIndex, err } else { c.data = data c.valid = true - c.commitIndex = cfg.CommitIndex - return cfg.CommitIndex, nil + c.commitIndex = leaderConfig.CommitIndex + return leaderConfig.CommitIndex, nil } } + +// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page. +// If there is no quorum for the leader then error is returned. +func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) { + var mutex sync.Mutex + var anyError error + var wg sync.WaitGroup + + cliLen := len(clients) + if cliLen == 0 { + return nil, nil, nil, errors.New("empty list of agencies' clients") + } + configs := make([]*agencyConfig, cliLen) + leaders := make(map[string]int) + + h := make(health) + // Fetch all configs from agencies. + wg.Add(cliLen) + for i, cli := range clients { + go func(iLocal int, cliLocal agency.Agency) { + defer wg.Done() + + ctxLocal, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + config, err := getAgencyConfig(ctxLocal, cliLocal) + + mutex.Lock() + defer mutex.Unlock() + + if err != nil { + anyError = err + return + } else if config == nil || config.LeaderId == "" { + anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints()) + return + } + + // Write config on the same index where client is (It will be helpful later). + configs[iLocal] = config + // Count leaders. + leaders[config.LeaderId]++ + h[config.Configuration.ID] = config.CommitIndex + }(i, cli) + } + wg.Wait() + + if anyError != nil { + return nil, nil, nil, wrapError(anyError, "not all agencies are responsive") + } + + if len(leaders) == 0 { + return nil, nil, nil, wrapError(anyError, "failed to get config from agencies") + } + + // Find the leader ID which has the most votes from all agencies. + maxVotes := 0 + var leaderID string + for id, votes := range leaders { + if votes > maxVotes { + maxVotes = votes + leaderID = id + } + } + + // Check if a leader has quorum from all possible agencies. + if maxVotes <= cliLen/2 { + message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen) + return nil, nil, nil, wrapError(anyError, message) + } + + // From here on, a leader with quorum is known. + for i, config := range configs { + if config != nil && config.Configuration.ID == leaderID { + return clients[i], config, h, nil + } + } + + return nil, nil, nil, wrapError(anyError, "the leader is not responsive") +} + +func wrapError(err error, message string) error { + if err != nil { + return errors.WithMessage(err, message) + } + + return errors.New(message) +} diff --git a/pkg/deployment/client/client_cache.go b/pkg/deployment/client/client_cache.go index 763c9ef07..bfb388bfc 100644 --- a/pkg/deployment/client/client_cache.go +++ b/pkg/deployment/client/client_cache.go @@ -31,6 +31,7 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/apis/shared" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" + "github.com/arangodb/kube-arangodb/pkg/handlers/utils" "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" @@ -46,7 +47,7 @@ type Cache interface { Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) GetDatabase(ctx context.Context) (driver.Client, error) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) - GetAgency(ctx context.Context) (agency.Agency, error) + GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) } type CacheGen interface { @@ -167,13 +168,19 @@ func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWra } // GetAgency returns a cached client for the agency -func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) { +func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agency, error) { cc.mutex.Lock() defer cc.mutex.Unlock() // Not found, create a new client var dnsNames []string for _, m := range cc.in.GetStatusSnapshot().Members.Agents { + if len(agencyIDs) > 0 { + if !utils.StringList(agencyIDs).Has(m.ID) { + continue + } + } + endpoint, err := cc.in.GenerateMemberEndpoint(api.ServerGroupAgents, m) if err != nil { return nil, err diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 5228a442c..5b392da71 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -30,9 +30,10 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/globals" - "github.com/arangodb/kube-arangodb/pkg/deployment/patch" "k8s.io/apimachinery/pkg/types" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" "github.com/arangodb/kube-arangodb/pkg/util/errors" @@ -51,6 +52,10 @@ import ( apiErrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/rs/zerolog/log" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/arangodb/arangosync-client/client" "github.com/arangodb/arangosync-client/tasks" driver "github.com/arangodb/go-driver" @@ -70,9 +75,6 @@ import ( serviceaccountv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount/v1" servicemonitorv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor/v1" "github.com/arangodb/kube-arangodb/pkg/util/kclient" - "github.com/rs/zerolog/log" - core "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ resources.Context = &Deployment{} @@ -234,9 +236,9 @@ func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicat return result, nil } -// GetAgency returns a connection to the entire agency. -func (d *Deployment) GetAgency(ctx context.Context) (agency.Agency, error) { - return d.clientCache.GetAgency(ctx) +// GetAgency returns a connection to the agency. +func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) { + return d.clientCache.GetAgency(ctx, agencyIDs...) } func (d *Deployment) getConnConfig() (http.ConnectionConfig, error) { diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 016cb4335..d8dc4bdf2 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -27,42 +27,35 @@ import ( "sync/atomic" "time" - "github.com/arangodb/kube-arangodb/pkg/util/globals" - - "github.com/arangodb/kube-arangodb/pkg/deployment/agency" - - inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" - - deploymentClient "github.com/arangodb/kube-arangodb/pkg/deployment/client" - "github.com/arangodb/kube-arangodb/pkg/util/errors" - - "github.com/arangodb/kube-arangodb/pkg/deployment/patch" - "k8s.io/apimachinery/pkg/types" - - "github.com/arangodb/kube-arangodb/pkg/operator/scope" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" - - "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod" - - "github.com/arangodb/arangosync-client/client" "github.com/rs/zerolog" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "github.com/arangodb/arangosync-client/client" + agencydriver "github.com/arangodb/go-driver/agency" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/acs" "github.com/arangodb/kube-arangodb/pkg/deployment/acs/sutil" + "github.com/arangodb/kube-arangodb/pkg/deployment/agency" "github.com/arangodb/kube-arangodb/pkg/deployment/chaos" + deploymentClient "github.com/arangodb/kube-arangodb/pkg/deployment/client" memberState "github.com/arangodb/kube-arangodb/pkg/deployment/member" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/deployment/resilience" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" + "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" + "github.com/arangodb/kube-arangodb/pkg/operator/scope" "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/globals" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/trigger" ) @@ -169,11 +162,17 @@ func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) { lCtx, c := globals.GetGlobalTimeouts().Agency().WithTimeout(ctx) defer c() - a, err := d.GetAgency(lCtx) - if err != nil { - return 0, err + var clients []agencydriver.Agency + for _, m := range d.GetStatusSnapshot().Members.Agents { + a, err := d.GetAgency(lCtx, m.ID) + if err != nil { + return 0, err + } + + clients = append(clients, a) } - return d.agencyCache.Reload(lCtx, a) + + return d.agencyCache.Reload(lCtx, clients) } func (d *Deployment) SetAgencyMaintenanceMode(ctx context.Context, enabled bool) error { diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 71bd3fbc0..7f8e49e14 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -22,6 +22,7 @@ package reconcile import ( "context" + "time" "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver/agency" @@ -29,8 +30,6 @@ import ( "github.com/rs/zerolog/log" core "k8s.io/api/core/v1" - "time" - "github.com/arangodb/go-driver" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -315,9 +314,9 @@ func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connect return c, nil } -// GetAgency returns a connection to the entire agency. -func (ac *actionContext) GetAgency(ctx context.Context) (agency.Agency, error) { - a, err := ac.context.GetAgency(ctx) +// GetAgency returns a connection to the agency. +func (ac *actionContext) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) { + a, err := ac.context.GetAgency(ctx, agencyIDs...) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 0e1d02227..b5d36e4ba 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -25,6 +25,7 @@ import ( "fmt" "io/ioutil" "testing" + "time" monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" "github.com/rs/zerolog" @@ -37,8 +38,6 @@ import ( "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver/agency" - "time" - "github.com/arangodb/go-driver" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -308,7 +307,7 @@ func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup panic("implement me") } -func (c *testContext) GetAgency(ctx context.Context) (agency.Agency, error) { +func (c *testContext) GetAgency(_ context.Context, _ ...string) (agency.Agency, error) { panic("implement me") } diff --git a/pkg/deployment/reconciler/context.go b/pkg/deployment/reconciler/context.go index b6b4719fd..d558a4277 100644 --- a/pkg/deployment/reconciler/context.go +++ b/pkg/deployment/reconciler/context.go @@ -160,7 +160,7 @@ type DeploymentAgencyClient interface { // GetAgencyClientsWithPredicate returns a client connection for every agency member which match condition. GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) // GetAgency returns a connection to the entire agency. - GetAgency(ctx context.Context) (agency.Agency, error) + GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) } type DeploymentDatabaseClient interface {