Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- (Feature) Allow raw json value for license token-v2
- (Update) Replace `beta.kubernetes.io/arch` to `kubernetes.io/arch` in Operator Chart
- (Feature) Add operator shutdown handler for graceful termination
- (Feature) Add agency leader discovery
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method

## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)
Expand Down
193 changes: 180 additions & 13 deletions pkg/deployment/agency/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,52 @@ package agency

import (
"context"
"fmt"
"sync"
"time"

"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

type health map[string]uint64

// IsHealthy returns true if all agencies have the same commit index.
// Returns false when:
// - agencies' list is empty.
// - agencies have different commit indices.
// - agencies have commit indices == 0.
func (h health) IsHealthy() bool {
var globalCommitIndex uint64
first := true

for _, commitIndex := range h {
if first {
globalCommitIndex = commitIndex
first = false
} else if commitIndex != globalCommitIndex {
return false
}
}

return globalCommitIndex != 0
}

// Health describes interface to check healthy of the environment.
type Health interface {
// IsHealthy return true when environment is considered as healthy.
IsHealthy() bool
}

type Cache interface {
Reload(ctx context.Context, client agency.Agency) (uint64, error)
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
Data() (State, bool)
CommitIndex() uint64
// GetLeaderID returns a leader ID.
GetLeaderID() string
// Health returns true when healthy object is available.
Health() (Health, bool)
}

func NewCache(mode *api.DeploymentMode) Cache {
Expand All @@ -57,7 +93,17 @@ func (c cacheSingle) CommitIndex() uint64 {
return 0
}

func (c cacheSingle) Reload(ctx context.Context, client agency.Agency) (uint64, error) {
// GetLeaderID returns always empty string for a single cache.
func (c cacheSingle) GetLeaderID() string {
return ""
}

// Health returns always false for single cache.
func (c cacheSingle) Health() (Health, bool) {
return nil, false
}

func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) {
return 0, nil
}

Expand All @@ -66,48 +112,169 @@ func (c cacheSingle) Data() (State, bool) {
}

type cache struct {
lock sync.Mutex
lock sync.RWMutex

valid bool

commitIndex uint64

data State

health Health

leaderID string
}

func (c *cache) CommitIndex() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.commitIndex
}

func (c *cache) Data() (State, bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()

return c.data, c.valid
}

func (c *cache) Reload(ctx context.Context, client agency.Agency) (uint64, error) {
// GetLeaderID returns a leader ID or empty string if a leader is not known.
func (c *cache) GetLeaderID() string {
c.lock.RLock()
defer c.lock.RUnlock()

return c.leaderID
}

// Health returns always false for single cache.
func (c *cache) Health() (Health, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

if c.health != nil {
return c.health, true
}

return nil, false
}

func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) {
c.lock.Lock()
defer c.lock.Unlock()

cfg, err := getAgencyConfig(ctx, client)
leaderCli, leaderConfig, health, err := getLeader(ctx, clients)
if err != nil {
// Invalidate a leader ID and agency state.
// In the next iteration leaderID will be sat because `valid` will be false.
c.leaderID = ""
c.valid = false

return 0, err
}

if cfg.CommitIndex == c.commitIndex && c.valid {
c.health = health
if leaderConfig.CommitIndex == c.commitIndex && c.valid {
// We are on same index, nothing to do
return cfg.CommitIndex, err
return leaderConfig.CommitIndex, nil
}

if data, err := loadState(ctx, client); err != nil {
// A leader should be known even if an agency state is invalid.
c.leaderID = leaderConfig.LeaderId
if data, err := loadState(ctx, leaderCli); err != nil {
c.valid = false
return cfg.CommitIndex, err
return leaderConfig.CommitIndex, err
} else {
c.data = data
c.valid = true
c.commitIndex = cfg.CommitIndex
return cfg.CommitIndex, nil
c.commitIndex = leaderConfig.CommitIndex
return leaderConfig.CommitIndex, nil
}
}

// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) {
var mutex sync.Mutex
var anyError error
var wg sync.WaitGroup

cliLen := len(clients)
if cliLen == 0 {
return nil, nil, nil, errors.New("empty list of agencies' clients")
}
configs := make([]*agencyConfig, cliLen)
leaders := make(map[string]int)

h := make(health)
// Fetch all configs from agencies.
wg.Add(cliLen)
for i, cli := range clients {
go func(iLocal int, cliLocal agency.Agency) {
defer wg.Done()

ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
config, err := getAgencyConfig(ctxLocal, cliLocal)

mutex.Lock()
defer mutex.Unlock()

if err != nil {
anyError = err
return
} else if config == nil || config.LeaderId == "" {
anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints())
return
}

// Write config on the same index where client is (It will be helpful later).
configs[iLocal] = config
// Count leaders.
leaders[config.LeaderId]++
h[config.Configuration.ID] = config.CommitIndex
}(i, cli)
}
wg.Wait()

if anyError != nil {
return nil, nil, nil, wrapError(anyError, "not all agencies are responsive")
}

if len(leaders) == 0 {
return nil, nil, nil, wrapError(anyError, "failed to get config from agencies")
}

// Find the leader ID which has the most votes from all agencies.
maxVotes := 0
var leaderID string
for id, votes := range leaders {
if votes > maxVotes {
maxVotes = votes
leaderID = id
}
}

// Check if a leader has quorum from all possible agencies.
if maxVotes <= cliLen/2 {
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
return nil, nil, nil, wrapError(anyError, message)
}

// From here on, a leader with quorum is known.
for i, config := range configs {
if config != nil && config.Configuration.ID == leaderID {
return clients[i], config, h, nil
}
}

return nil, nil, nil, wrapError(anyError, "the leader is not responsive")
}

func wrapError(err error, message string) error {
if err != nil {
return errors.WithMessage(err, message)
}

return errors.New(message)
}
11 changes: 9 additions & 2 deletions pkg/deployment/client/client_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
Expand All @@ -46,7 +47,7 @@ type Cache interface {
Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
GetDatabase(ctx context.Context) (driver.Client, error)
GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error)
GetAgency(ctx context.Context) (agency.Agency, error)
GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error)
}

type CacheGen interface {
Expand Down Expand Up @@ -167,13 +168,19 @@ func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWra
}

// GetAgency returns a cached client for the agency
func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) {
func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agency, error) {
cc.mutex.Lock()
defer cc.mutex.Unlock()

// Not found, create a new client
var dnsNames []string
for _, m := range cc.in.GetStatusSnapshot().Members.Agents {
if len(agencyIDs) > 0 {
if !utils.StringList(agencyIDs).Has(m.ID) {
continue
}
}

endpoint, err := cc.in.GenerateMemberEndpoint(api.ServerGroupAgents, m)
if err != nil {
return nil, err
Expand Down
16 changes: 9 additions & 7 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

"github.com/arangodb/kube-arangodb/pkg/util/globals"

"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"k8s.io/apimachinery/pkg/types"

"github.com/arangodb/kube-arangodb/pkg/deployment/patch"

"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"

"github.com/arangodb/kube-arangodb/pkg/util/errors"
Expand All @@ -51,6 +52,10 @@ import (

apiErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/arangosync-client/tasks"
driver "github.com/arangodb/go-driver"
Expand All @@ -70,9 +75,6 @@ import (
serviceaccountv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount/v1"
servicemonitorv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor/v1"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ resources.Context = &Deployment{}
Expand Down Expand Up @@ -234,9 +236,9 @@ func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicat
return result, nil
}

// GetAgency returns a connection to the entire agency.
func (d *Deployment) GetAgency(ctx context.Context) (agency.Agency, error) {
return d.clientCache.GetAgency(ctx)
// GetAgency returns a connection to the agency.
func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
return d.clientCache.GetAgency(ctx, agencyIDs...)
}

func (d *Deployment) getConnConfig() (http.ConnectionConfig, error) {
Expand Down
Loading