Skip to content

Commit 8110293

Browse files
authored
[Feature] Add agency leader service (#991)
1 parent b4d44a9 commit 8110293

File tree

13 files changed

+295
-88
lines changed

13 files changed

+295
-88
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- (Feature) Add operator shutdown handler for graceful termination
1111
- (Feature) Add agency leader discovery
1212
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
13+
- (Feature) Add agency leader service
1314

1415
## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)
1516
- (Feature) Add CoreV1 Endpoints Inspector

pkg/deployment/agency/cache.go

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,15 @@ import (
3131
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3232
)
3333

34-
type health map[string]uint64
34+
type health struct {
35+
leaderID string
36+
37+
commitIndexes map[string]uint64
38+
}
39+
40+
func (h health) LeaderID() string {
41+
return h.leaderID
42+
}
3543

3644
// IsHealthy returns true if all agencies have the same commit index.
3745
// Returns false when:
@@ -42,7 +50,7 @@ func (h health) IsHealthy() bool {
4250
var globalCommitIndex uint64
4351
first := true
4452

45-
for _, commitIndex := range h {
53+
for _, commitIndex := range h.commitIndexes {
4654
if first {
4755
globalCommitIndex = commitIndex
4856
first = false
@@ -58,14 +66,15 @@ func (h health) IsHealthy() bool {
5866
type Health interface {
5967
// IsHealthy return true when environment is considered as healthy.
6068
IsHealthy() bool
69+
70+
// LeaderID returns a leader ID or empty string if a leader is not known.
71+
LeaderID() string
6172
}
6273

6374
type Cache interface {
6475
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
6576
Data() (State, bool)
6677
CommitIndex() uint64
67-
// GetLeaderID returns a leader ID.
68-
GetLeaderID() string
6978
// Health returns true when healthy object is available.
7079
Health() (Health, bool)
7180
}
@@ -93,11 +102,6 @@ func (c cacheSingle) CommitIndex() uint64 {
93102
return 0
94103
}
95104

96-
// GetLeaderID returns always empty string for a single cache.
97-
func (c cacheSingle) GetLeaderID() string {
98-
return ""
99-
}
100-
101105
// Health returns always false for single cache.
102106
func (c cacheSingle) Health() (Health, bool) {
103107
return nil, false
@@ -121,8 +125,6 @@ type cache struct {
121125
data State
122126

123127
health Health
124-
125-
leaderID string
126128
}
127129

128130
func (c *cache) CommitIndex() uint64 {
@@ -139,14 +141,6 @@ func (c *cache) Data() (State, bool) {
139141
return c.data, c.valid
140142
}
141143

142-
// GetLeaderID returns a leader ID or empty string if a leader is not known.
143-
func (c *cache) GetLeaderID() string {
144-
c.lock.RLock()
145-
defer c.lock.RUnlock()
146-
147-
return c.leaderID
148-
}
149-
150144
// Health returns always false for single cache.
151145
func (c *cache) Health() (Health, bool) {
152146
c.lock.RLock()
@@ -167,7 +161,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
167161
if err != nil {
168162
// Invalidate a leader ID and agency state.
169163
// In the next iteration leaderID will be sat because `valid` will be false.
170-
c.leaderID = ""
171164
c.valid = false
172165

173166
return 0, err
@@ -180,7 +173,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
180173
}
181174

182175
// A leader should be known even if an agency state is invalid.
183-
c.leaderID = leaderConfig.LeaderId
184176
if data, err := loadState(ctx, leaderCli); err != nil {
185177
c.valid = false
186178
return leaderConfig.CommitIndex, err
@@ -194,7 +186,7 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
194186

195187
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
196188
// If there is no quorum for the leader then error is returned.
197-
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) {
189+
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
198190
var mutex sync.Mutex
199191
var anyError error
200192
var wg sync.WaitGroup
@@ -203,10 +195,12 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
203195
if cliLen == 0 {
204196
return nil, nil, nil, errors.New("empty list of agencies' clients")
205197
}
206-
configs := make([]*agencyConfig, cliLen)
207-
leaders := make(map[string]int)
198+
configs := make([]*Config, cliLen)
199+
leaders := make(map[string]int, cliLen)
208200

209-
h := make(health)
201+
var h health
202+
203+
h.commitIndexes = make(map[string]uint64, cliLen)
210204
// Fetch all configs from agencies.
211205
wg.Add(cliLen)
212206
for i, cli := range clients {
@@ -215,7 +209,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
215209

216210
ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
217211
defer cancel()
218-
config, err := getAgencyConfig(ctxLocal, cliLocal)
212+
config, err := GetAgencyConfig(ctxLocal, cliLocal)
219213

220214
mutex.Lock()
221215
defer mutex.Unlock()
@@ -232,7 +226,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
232226
configs[iLocal] = config
233227
// Count leaders.
234228
leaders[config.LeaderId]++
235-
h[config.Configuration.ID] = config.CommitIndex
229+
h.commitIndexes[config.Configuration.ID] = config.CommitIndex
236230
}(i, cli)
237231
}
238232
wg.Wait()
@@ -255,6 +249,8 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
255249
}
256250
}
257251

252+
h.leaderID = leaderID
253+
258254
// Check if a leader has quorum from all possible agencies.
259255
if maxVotes <= cliLen/2 {
260256
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)

pkg/deployment/agency/config.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ import (
2929
"github.com/arangodb/go-driver/agency"
3030
)
3131

32-
func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig, error) {
33-
conn := client.Connection()
32+
func GetAgencyConfig(ctx context.Context, client agency.Agency) (*Config, error) {
33+
return GetAgencyConfigC(ctx, client.Connection())
34+
}
3435

35-
req, err := client.Connection().NewRequest(http.MethodGet, "/_api/agency/config")
36+
func GetAgencyConfigC(ctx context.Context, conn driver.Connection) (*Config, error) {
37+
req, err := conn.NewRequest(http.MethodGet, "/_api/agency/config")
3638
if err != nil {
3739
return nil, err
3840
}
@@ -48,7 +50,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig,
4850
return nil, err
4951
}
5052

51-
var c agencyConfig
53+
var c Config
5254

5355
if err := json.Unmarshal(data, &c); err != nil {
5456
return nil, err
@@ -57,7 +59,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig,
5759
return &c, nil
5860
}
5961

60-
type agencyConfig struct {
62+
type Config struct {
6163
LeaderId string `json:"leaderId"`
6264

6365
CommitIndex uint64 `json:"commitIndex"`

pkg/deployment/agency/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func Test_Config_Unmarshal(t *testing.T) {
6767
"version": "3.10.0-devel"
6868
}`
6969

70-
var cfg agencyConfig
70+
var cfg Config
7171

7272
require.NoError(t, json.Unmarshal([]byte(data), &cfg))
7373

pkg/deployment/deployment.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ func (d *Deployment) GetAgencyCache() (agency.State, bool) {
154154
return d.agencyCache.Data()
155155
}
156156

157+
func (d *Deployment) GetAgencyHealth() (agency.Health, bool) {
158+
return d.agencyCache.Health()
159+
}
160+
157161
func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) {
158162
if d.apiObject.Spec.Mode.Get() == api.DeploymentModeSingle {
159163
return 0, nil

pkg/deployment/deployment_inspector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
188188
nextInterval = nextInterval.ReduceTo(x)
189189
}
190190

191+
if err := d.resources.EnsureLeader(ctx, d.GetCachedStatus()); err != nil {
192+
return minInspectionInterval, errors.Wrapf(err, "Creating agency pod leader failed")
193+
}
194+
191195
if err := d.resources.EnsureArangoMembers(ctx, d.GetCachedStatus()); err != nil {
192196
return minInspectionInterval, errors.Wrapf(err, "ArangoMember creation failed")
193197
}

pkg/deployment/reconcile/action_context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ func (ac *actionContext) GenerateMemberEndpoint(group api.ServerGroup, member ap
187187
return ac.context.GenerateMemberEndpoint(group, member)
188188
}
189189

190+
func (ac *actionContext) GetAgencyHealth() (agencyCache.Health, bool) {
191+
return ac.context.GetAgencyHealth()
192+
}
193+
190194
func (ac *actionContext) GetAgencyCache() (agencyCache.State, bool) {
191195
return ac.context.GetAgencyCache()
192196
}

pkg/deployment/reconcile/plan_builder_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ type testContext struct {
8484
Inspector inspectorInterface.Inspector
8585
}
8686

87+
func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) {
88+
//TODO implement me
89+
panic("implement me")
90+
}
91+
8792
func (c *testContext) RenderPodForMember(ctx context.Context, acs sutil.ACS, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
8893
//TODO implement me
8994
panic("implement me")

pkg/deployment/reconciler/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ type DeploymentCachedStatus interface {
124124

125125
type ArangoAgencyGet interface {
126126
GetAgencyCache() (agencyCache.State, bool)
127+
GetAgencyHealth() (agencyCache.Health, bool)
127128
}
128129

129130
type ArangoAgency interface {

0 commit comments

Comments
 (0)