Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- (Feature) Add operator shutdown handler for graceful termination
- (Feature) Add agency leader discovery
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
- (Feature) Add agency leader service

## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)
- (Feature) Add CoreV1 Endpoints Inspector
Expand Down
50 changes: 23 additions & 27 deletions pkg/deployment/agency/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

type health map[string]uint64
type health struct {
leaderID string

commitIndexes map[string]uint64
}

func (h health) LeaderID() string {
return h.leaderID
}

// IsHealthy returns true if all agencies have the same commit index.
// Returns false when:
Expand All @@ -42,7 +50,7 @@ func (h health) IsHealthy() bool {
var globalCommitIndex uint64
first := true

for _, commitIndex := range h {
for _, commitIndex := range h.commitIndexes {
if first {
globalCommitIndex = commitIndex
first = false
Expand All @@ -58,14 +66,15 @@ func (h health) IsHealthy() bool {
type Health interface {
// IsHealthy return true when environment is considered as healthy.
IsHealthy() bool

// LeaderID returns a leader ID or empty string if a leader is not known.
LeaderID() string
}

type Cache interface {
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
Data() (State, bool)
CommitIndex() uint64
// GetLeaderID returns a leader ID.
GetLeaderID() string
// Health returns true when healthy object is available.
Health() (Health, bool)
}
Expand Down Expand Up @@ -93,11 +102,6 @@ func (c cacheSingle) CommitIndex() uint64 {
return 0
}

// GetLeaderID returns always empty string for a single cache.
func (c cacheSingle) GetLeaderID() string {
return ""
}

// Health returns always false for single cache.
func (c cacheSingle) Health() (Health, bool) {
return nil, false
Expand All @@ -121,8 +125,6 @@ type cache struct {
data State

health Health

leaderID string
}

func (c *cache) CommitIndex() uint64 {
Expand All @@ -139,14 +141,6 @@ func (c *cache) Data() (State, bool) {
return c.data, c.valid
}

// GetLeaderID returns a leader ID or empty string if a leader is not known.
func (c *cache) GetLeaderID() string {
c.lock.RLock()
defer c.lock.RUnlock()

return c.leaderID
}

// Health returns always false for single cache.
func (c *cache) Health() (Health, bool) {
c.lock.RLock()
Expand All @@ -167,7 +161,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
if err != nil {
// Invalidate a leader ID and agency state.
// In the next iteration leaderID will be sat because `valid` will be false.
c.leaderID = ""
c.valid = false

return 0, err
Expand All @@ -180,7 +173,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
}

// A leader should be known even if an agency state is invalid.
c.leaderID = leaderConfig.LeaderId
if data, err := loadState(ctx, leaderCli); err != nil {
c.valid = false
return leaderConfig.CommitIndex, err
Expand All @@ -194,7 +186,7 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er

// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) {
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
var mutex sync.Mutex
var anyError error
var wg sync.WaitGroup
Expand All @@ -203,10 +195,12 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
if cliLen == 0 {
return nil, nil, nil, errors.New("empty list of agencies' clients")
}
configs := make([]*agencyConfig, cliLen)
leaders := make(map[string]int)
configs := make([]*Config, cliLen)
leaders := make(map[string]int, cliLen)

h := make(health)
var h health

h.commitIndexes = make(map[string]uint64, cliLen)
// Fetch all configs from agencies.
wg.Add(cliLen)
for i, cli := range clients {
Expand All @@ -215,7 +209,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag

ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
config, err := getAgencyConfig(ctxLocal, cliLocal)
config, err := GetAgencyConfig(ctxLocal, cliLocal)

mutex.Lock()
defer mutex.Unlock()
Expand All @@ -232,7 +226,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
configs[iLocal] = config
// Count leaders.
leaders[config.LeaderId]++
h[config.Configuration.ID] = config.CommitIndex
h.commitIndexes[config.Configuration.ID] = config.CommitIndex
}(i, cli)
}
wg.Wait()
Expand All @@ -255,6 +249,8 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
}
}

h.leaderID = leaderID

// Check if a leader has quorum from all possible agencies.
if maxVotes <= cliLen/2 {
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
Expand Down
12 changes: 7 additions & 5 deletions pkg/deployment/agency/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/arangodb/go-driver/agency"
)

func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig, error) {
conn := client.Connection()
func GetAgencyConfig(ctx context.Context, client agency.Agency) (*Config, error) {
return GetAgencyConfigC(ctx, client.Connection())
}

req, err := client.Connection().NewRequest(http.MethodGet, "/_api/agency/config")
func GetAgencyConfigC(ctx context.Context, conn driver.Connection) (*Config, error) {
req, err := conn.NewRequest(http.MethodGet, "/_api/agency/config")
if err != nil {
return nil, err
}
Expand All @@ -48,7 +50,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig,
return nil, err
}

var c agencyConfig
var c Config

if err := json.Unmarshal(data, &c); err != nil {
return nil, err
Expand All @@ -57,7 +59,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig,
return &c, nil
}

type agencyConfig struct {
type Config struct {
LeaderId string `json:"leaderId"`

CommitIndex uint64 `json:"commitIndex"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/agency/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Test_Config_Unmarshal(t *testing.T) {
"version": "3.10.0-devel"
}`

var cfg agencyConfig
var cfg Config

require.NoError(t, json.Unmarshal([]byte(data), &cfg))

Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (d *Deployment) GetAgencyCache() (agency.State, bool) {
return d.agencyCache.Data()
}

func (d *Deployment) GetAgencyHealth() (agency.Health, bool) {
return d.agencyCache.Health()
}

func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) {
if d.apiObject.Spec.Mode.Get() == api.DeploymentModeSingle {
return 0, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
nextInterval = nextInterval.ReduceTo(x)
}

if err := d.resources.EnsureLeader(ctx, d.GetCachedStatus()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Creating agency pod leader failed")
}

if err := d.resources.EnsureArangoMembers(ctx, d.GetCachedStatus()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "ArangoMember creation failed")
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func (ac *actionContext) GenerateMemberEndpoint(group api.ServerGroup, member ap
return ac.context.GenerateMemberEndpoint(group, member)
}

func (ac *actionContext) GetAgencyHealth() (agencyCache.Health, bool) {
return ac.context.GetAgencyHealth()
}

func (ac *actionContext) GetAgencyCache() (agencyCache.State, bool) {
return ac.context.GetAgencyCache()
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/deployment/reconcile/plan_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type testContext struct {
Inspector inspectorInterface.Inspector
}

func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) {
//TODO implement me
panic("implement me")
}

func (c *testContext) RenderPodForMember(ctx context.Context, acs sutil.ACS, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
//TODO implement me
panic("implement me")
Expand Down
1 change: 1 addition & 0 deletions pkg/deployment/reconciler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type DeploymentCachedStatus interface {

type ArangoAgencyGet interface {
GetAgencyCache() (agencyCache.State, bool)
GetAgencyHealth() (agencyCache.Health, bool)
}

type ArangoAgency interface {
Expand Down
Loading