@@ -22,16 +22,52 @@ package agency
2222
2323import (
2424 "context"
25+ "fmt"
2526 "sync"
27+ "time"
2628
2729 "github.com/arangodb/go-driver/agency"
2830 api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
31+ "github.com/arangodb/kube-arangodb/pkg/util/errors"
2932)
3033
34+ type health map [string ]uint64
35+
36+ // IsHealthy returns true if all agencies have the same commit index.
37+ // Returns false when:
38+ // - agencies' list is empty.
39+ // - agencies have different commit indices.
40+ // - agencies have commit indices == 0.
41+ func (h health ) IsHealthy () bool {
42+ var globalCommitIndex uint64
43+ first := true
44+
45+ for _ , commitIndex := range h {
46+ if first {
47+ globalCommitIndex = commitIndex
48+ first = false
49+ } else if commitIndex != globalCommitIndex {
50+ return false
51+ }
52+ }
53+
54+ return globalCommitIndex != 0
55+ }
56+
57+ // Health describes interface to check healthy of the environment.
58+ type Health interface {
59+ // IsHealthy return true when environment is considered as healthy.
60+ IsHealthy () bool
61+ }
62+
3163type Cache interface {
32- Reload (ctx context.Context , client agency.Agency ) (uint64 , error )
64+ Reload (ctx context.Context , clients [] agency.Agency ) (uint64 , error )
3365 Data () (State , bool )
3466 CommitIndex () uint64
67+ // GetLeaderID returns a leader ID.
68+ GetLeaderID () string
69+ // Health returns true when healthy object is available.
70+ Health () (Health , bool )
3571}
3672
3773func NewCache (mode * api.DeploymentMode ) Cache {
@@ -57,7 +93,17 @@ func (c cacheSingle) CommitIndex() uint64 {
5793 return 0
5894}
5995
60- func (c cacheSingle ) Reload (ctx context.Context , client agency.Agency ) (uint64 , error ) {
96+ // GetLeaderID returns always empty string for a single cache.
97+ func (c cacheSingle ) GetLeaderID () string {
98+ return ""
99+ }
100+
101+ // Health returns always false for single cache.
102+ func (c cacheSingle ) Health () (Health , bool ) {
103+ return nil , false
104+ }
105+
106+ func (c cacheSingle ) Reload (_ context.Context , _ []agency.Agency ) (uint64 , error ) {
61107 return 0 , nil
62108}
63109
@@ -66,48 +112,169 @@ func (c cacheSingle) Data() (State, bool) {
66112}
67113
68114type cache struct {
69- lock sync.Mutex
115+ lock sync.RWMutex
70116
71117 valid bool
72118
73119 commitIndex uint64
74120
75121 data State
122+
123+ health Health
124+
125+ leaderID string
76126}
77127
78128func (c * cache ) CommitIndex () uint64 {
129+ c .lock .RLock ()
130+ defer c .lock .RUnlock ()
131+
79132 return c .commitIndex
80133}
81134
82135func (c * cache ) Data () (State , bool ) {
83- c .lock .Lock ()
84- defer c .lock .Unlock ()
136+ c .lock .RLock ()
137+ defer c .lock .RUnlock ()
85138
86139 return c .data , c .valid
87140}
88141
89- func (c * cache ) Reload (ctx context.Context , client agency.Agency ) (uint64 , error ) {
142+ // GetLeaderID returns a leader ID or empty string if a leader is not known.
143+ func (c * cache ) GetLeaderID () string {
144+ c .lock .RLock ()
145+ defer c .lock .RUnlock ()
146+
147+ return c .leaderID
148+ }
149+
150+ // Health returns always false for single cache.
151+ func (c * cache ) Health () (Health , bool ) {
152+ c .lock .RLock ()
153+ defer c .lock .RUnlock ()
154+
155+ if c .health != nil {
156+ return c .health , true
157+ }
158+
159+ return nil , false
160+ }
161+
162+ func (c * cache ) Reload (ctx context.Context , clients []agency.Agency ) (uint64 , error ) {
90163 c .lock .Lock ()
91164 defer c .lock .Unlock ()
92165
93- cfg , err := getAgencyConfig (ctx , client )
166+ leaderCli , leaderConfig , health , err := getLeader (ctx , clients )
94167 if err != nil {
168+ // Invalidate a leader ID and agency state.
169+ // In the next iteration leaderID will be sat because `valid` will be false.
170+ c .leaderID = ""
95171 c .valid = false
172+
96173 return 0 , err
97174 }
98175
99- if cfg .CommitIndex == c .commitIndex && c .valid {
176+ c .health = health
177+ if leaderConfig .CommitIndex == c .commitIndex && c .valid {
100178 // We are on same index, nothing to do
101- return cfg .CommitIndex , err
179+ return leaderConfig .CommitIndex , nil
102180 }
103181
104- if data , err := loadState (ctx , client ); err != nil {
182+ // A leader should be known even if an agency state is invalid.
183+ c .leaderID = leaderConfig .LeaderId
184+ if data , err := loadState (ctx , leaderCli ); err != nil {
105185 c .valid = false
106- return cfg .CommitIndex , err
186+ return leaderConfig .CommitIndex , err
107187 } else {
108188 c .data = data
109189 c .valid = true
110- c .commitIndex = cfg .CommitIndex
111- return cfg .CommitIndex , nil
190+ c .commitIndex = leaderConfig .CommitIndex
191+ return leaderConfig .CommitIndex , nil
112192 }
113193}
194+
195+ // getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
196+ // If there is no quorum for the leader then error is returned.
197+ func getLeader (ctx context.Context , clients []agency.Agency ) (agency.Agency , * agencyConfig , Health , error ) {
198+ var mutex sync.Mutex
199+ var anyError error
200+ var wg sync.WaitGroup
201+
202+ cliLen := len (clients )
203+ if cliLen == 0 {
204+ return nil , nil , nil , errors .New ("empty list of agencies' clients" )
205+ }
206+ configs := make ([]* agencyConfig , cliLen )
207+ leaders := make (map [string ]int )
208+
209+ h := make (health )
210+ // Fetch all configs from agencies.
211+ wg .Add (cliLen )
212+ for i , cli := range clients {
213+ go func (iLocal int , cliLocal agency.Agency ) {
214+ defer wg .Done ()
215+
216+ ctxLocal , cancel := context .WithTimeout (ctx , time .Second )
217+ defer cancel ()
218+ config , err := getAgencyConfig (ctxLocal , cliLocal )
219+
220+ mutex .Lock ()
221+ defer mutex .Unlock ()
222+
223+ if err != nil {
224+ anyError = err
225+ return
226+ } else if config == nil || config .LeaderId == "" {
227+ anyError = fmt .Errorf ("leader unknown for the agent %v" , cliLocal .Connection ().Endpoints ())
228+ return
229+ }
230+
231+ // Write config on the same index where client is (It will be helpful later).
232+ configs [iLocal ] = config
233+ // Count leaders.
234+ leaders [config .LeaderId ]++
235+ h [config .Configuration .ID ] = config .CommitIndex
236+ }(i , cli )
237+ }
238+ wg .Wait ()
239+
240+ if anyError != nil {
241+ return nil , nil , nil , wrapError (anyError , "not all agencies are responsive" )
242+ }
243+
244+ if len (leaders ) == 0 {
245+ return nil , nil , nil , wrapError (anyError , "failed to get config from agencies" )
246+ }
247+
248+ // Find the leader ID which has the most votes from all agencies.
249+ maxVotes := 0
250+ var leaderID string
251+ for id , votes := range leaders {
252+ if votes > maxVotes {
253+ maxVotes = votes
254+ leaderID = id
255+ }
256+ }
257+
258+ // Check if a leader has quorum from all possible agencies.
259+ if maxVotes <= cliLen / 2 {
260+ message := fmt .Sprintf ("no quorum for leader %s, votes %d of %d" , leaderID , maxVotes , cliLen )
261+ return nil , nil , nil , wrapError (anyError , message )
262+ }
263+
264+ // From here on, a leader with quorum is known.
265+ for i , config := range configs {
266+ if config != nil && config .Configuration .ID == leaderID {
267+ return clients [i ], config , h , nil
268+ }
269+ }
270+
271+ return nil , nil , nil , wrapError (anyError , "the leader is not responsive" )
272+ }
273+
274+ func wrapError (err error , message string ) error {
275+ if err != nil {
276+ return errors .WithMessage (err , message )
277+ }
278+
279+ return errors .New (message )
280+ }
0 commit comments