/
kingsmoot.go
235 lines (208 loc) · 5.52 KB
/
kingsmoot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package kingsmoot
import (
"errors"
"fmt"
"time"
)
type Role int8
const (
NotAMember Role = iota
Follower
Leader
Dead
)
var roles = []string{
"NotAMember",
"Follower",
"Leader",
"Dead"}
func (s Role) String() string {
return roles[s]
}
type Config struct {
Name string
DataStoreType string
Addresses []string
DsOpTimeout time.Duration
MasterDownAfter time.Duration
CustomConf map[string]string
}
type MemberShip struct {
Role Role
Leader string
}
type Candidate interface {
fmt.Stringer
UpdateMembership(memberShip MemberShip) error
}
type Kingsmoot struct {
conf *Config
endpoint string
c Candidate
role Role
currLeader string
ds DataStore
quitCh chan bool
}
func New(name string, addresses []string) (*Kingsmoot, error) {
conf := &Config{Name: name, DataStoreType: "etcdv2", Addresses: addresses, DsOpTimeout: 500 * time.Millisecond, MasterDownAfter: 30 * time.Second}
ds, err := CreateDatastore(conf)
if nil != err {
Info.Println("Could not connet to datastore Error: ", err)
return nil, err
}
return &Kingsmoot{conf: conf, ds: ds, quitCh: make(chan bool, 1)}, nil
}
func NewFromConf(conf *Config) (*Kingsmoot, error) {
ds, err := CreateDatastore(conf)
if nil != err {
Info.Println("Could not connet to datastore Error: ", err)
ds.Close()
return nil, err
}
return &Kingsmoot{conf: conf, ds: ds, quitCh: make(chan bool, 1)}, nil
}
func (km *Kingsmoot) Join(endpoint string, c Candidate) error {
if km.isDead() {
return errors.New("Kingsmoot closed, create new instance to join")
}
if km.endpoint != "" {
return errors.New(fmt.Sprintf("Already in use for %v, create new instance to join", km.endpoint))
}
km.endpoint = endpoint
km.c = c
if err := km.joinLeaderElection(); err != nil {
return err
}
go km.candidateLoop()
return nil
}
func (km *Kingsmoot) Leader() (string, error) {
return km.ds.Get(km.conf.Name)
}
func (km *Kingsmoot) isDead() bool {
return km.role == Dead
}
func (km *Kingsmoot) setRole(role Role) {
km.role = role
}
func (km *Kingsmoot) Exit() {
if km.isDead() {
return
}
km.setRole(Dead)
close(km.quitCh)
err := km.ds.CompareAndDel(km.conf.Name, km.endpoint)
if nil != err {
switch err.(Error).Code() {
case CompareFailed, KeyNotFound:
default:
Warning.Println("Error while exitting from kingsmoot", err)
}
}
err = km.ds.Close()
if nil != err {
Warning.Println("Error while exitting from kingsmoot", err)
}
}
func (km *Kingsmoot) candidateLoop() {
var err error
l := km.registerListener()
for !km.isDead() {
switch km.role {
case NotAMember, Follower:
km.joinLeaderElection()
case Leader:
km.refreshTTL()
}
select {
case <-time.After(km.conf.MasterDownAfter / 2):
case change := <-l.changeCh:
Info.Printf("Change event received : %v", change)
case <-km.quitCh:
Info.Println("Quit signal received")
case err = <-l.errCh:
Info.Printf("Error signal received : %v", err)
<-time.After(km.conf.MasterDownAfter)
km.ds.Watch(km.conf.Name, l)
}
}
}
func (km *Kingsmoot) joinLeaderElection() error {
var err error
currLeader, err := km.ds.PutIfAbsent(km.conf.Name, km.endpoint, km.conf.MasterDownAfter)
if err != nil {
switch err.(Error).Code() {
case KeyExists:
if currLeader == km.endpoint {
km.currLeader = currLeader
return km.lead()
} else if currLeader != km.currLeader {
km.currLeader = currLeader
return km.follow()
}
default:
Info.Printf("Leader election failed due to %v, going to kick out from election", err)
km.notAMember()
return errors.New(fmt.Sprintf("Leader election failed due to %v, going to kick out from election", err))
}
} else {
km.currLeader = currLeader
return km.lead()
}
return nil
}
type KeyChangeListener struct {
changeCh chan *Change
errCh chan error
}
func (l *KeyChangeListener) Notify(change *Change) {
l.changeCh <- change
}
func (l *KeyChangeListener) Bye(err error) {
l.errCh <- err
}
func (km *Kingsmoot) notAMember() {
err := km.c.UpdateMembership(MemberShip{Role: NotAMember})
if err != nil {
Fatal.Fatalf("Failed to update membership of %v due %v", km.c, err)
}
km.setRole(NotAMember)
km.currLeader = ""
}
func (km *Kingsmoot) lead() error {
Info.Printf("%v Elected as leader of %v", km.c, km.conf.Name)
err := km.c.UpdateMembership(MemberShip{Role: Leader})
if err != nil {
Info.Printf("%v Failed to start as leader due to %v, going to kick out from election", km.c, err)
km.notAMember()
return errors.New(fmt.Sprintf("%v Failed to start as leader due to %v, going to kick out from election", km.c, err))
}
km.setRole(Leader)
return nil
}
func (km *Kingsmoot) follow() error {
Info.Printf("%v Elected as follower of %v", km.c, km.currLeader)
err := km.c.UpdateMembership(MemberShip{Role: Follower, Leader: km.currLeader})
if err != nil {
Info.Printf("%v Failed to start as follower due to %v, going to kick out from election", km.c, err)
km.notAMember()
return errors.New(fmt.Sprintf("%v Failed to start as follower due to %v, going to kick out from election", km.c, err))
}
km.setRole(Follower)
return nil
}
func (km *Kingsmoot) refreshTTL() {
err := km.ds.RefreshTTL(km.conf.Name, km.endpoint, km.conf.MasterDownAfter)
if err != nil {
Info.Printf("%v is no more the leader due to %v, going to kick out from election", km.c, err)
km.notAMember()
return
}
km.setRole(Leader)
}
func (km *Kingsmoot) registerListener() *KeyChangeListener {
l := &KeyChangeListener{changeCh: make(chan *Change, 1), errCh: make(chan error, 1)}
km.ds.Watch(km.conf.Name, l)
return l
}