forked from docker-archive/classicswarm
/
candidate.go
127 lines (106 loc) · 2.63 KB
/
candidate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package leadership
import (
"sync"
"time"
"github.com/docker/libkv/store"
)
// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
client store.Store
key string
node string
electedCh chan bool
lock sync.Mutex
lockTTL time.Duration
leader bool
stopCh chan struct{}
resignCh chan bool
errCh chan error
}
// NewCandidate creates a new Candidate
func NewCandidate(client store.Store, key, node string, ttl time.Duration) *Candidate {
return &Candidate{
client: client,
key: key,
node: node,
leader: false,
lockTTL: ttl,
resignCh: make(chan bool),
stopCh: make(chan struct{}),
}
}
// IsLeader returns true if the candidate is currently a leader.
func (c *Candidate) IsLeader() bool {
return c.leader
}
// RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel.
//
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
c.electedCh = make(chan bool)
c.errCh = make(chan error)
lock, err := c.client.NewLock(c.key, &store.LockOptions{
Value: []byte(c.node),
TTL: c.lockTTL,
RenewLock: make(chan struct{}),
})
if err != nil {
c.errCh <- err
} else {
go c.campaign(lock)
}
return c.electedCh, c.errCh
}
// Stop running for election.
func (c *Candidate) Stop() {
close(c.stopCh)
}
// Resign forces the candidate to step-down and try again.
// If the candidate is not a leader, it doesn't have any effect.
// Candidate will retry immediately to acquire the leadership. If no-one else
// took it, then the Candidate will end up being a leader again.
func (c *Candidate) Resign() {
c.lock.Lock()
defer c.lock.Unlock()
if c.leader {
c.resignCh <- true
}
}
func (c *Candidate) update(status bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.leader = status
c.electedCh <- status
}
func (c *Candidate) campaign(lock store.Locker) {
defer close(c.electedCh)
defer close(c.errCh)
for {
// Start as a follower.
c.update(false)
lostCh, err := lock.Lock(nil)
if err != nil {
c.errCh <- err
return
}
// Hooray! We acquired the lock therefore we are the new leader.
c.update(true)
select {
case <-c.resignCh:
// We were asked to resign, give up the lock and go back
// campaigning.
lock.Unlock()
case <-c.stopCh:
// Give up the leadership and quit.
if c.leader {
lock.Unlock()
}
return
case <-lostCh:
// We lost the lock. Someone else is the leader, try again.
}
}
}