-
Notifications
You must be signed in to change notification settings - Fork 25
/
election.go
133 lines (117 loc) · 3.89 KB
/
election.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package election
import (
"context"
"sync/atomic"
agentinfo "github.com/fluxninja/aperture/v2/pkg/agent-info"
"github.com/fluxninja/aperture/v2/pkg/config"
etcd "github.com/fluxninja/aperture/v2/pkg/etcd/client"
"github.com/fluxninja/aperture/v2/pkg/info"
"github.com/fluxninja/aperture/v2/pkg/log"
"github.com/fluxninja/aperture/v2/pkg/notifiers"
panichandler "github.com/fluxninja/aperture/v2/pkg/panic-handler"
"github.com/fluxninja/aperture/v2/pkg/utils"
concurrencyv3 "go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/fx"
)
var (
// FxTagBase is the tag's base used to identify the election result Tracker.
FxTagBase = "etcd_election"
// FxTag is the tag used to identify the election result Tracker.
FxTag = config.NameTag(FxTagBase)
// ElectionResultKey is the key used to identify the election result in the election Tracker.
ElectionResultKey = notifiers.Key("election_result")
)
// Module is a fx module that provides etcd based leader election per agent group.
func Module() fx.Option {
return fx.Options(
notifiers.TrackersConstructor{Name: FxTagBase}.Annotate(),
fx.Provide(ProvideElection),
)
}
// ElectionIn holds parameters for ProvideElection.
type ElectionIn struct {
fx.In
Lifecycle fx.Lifecycle
Shutdowner fx.Shutdowner
Session *etcd.Session
AgentInfo *agentinfo.AgentInfo
Trackers notifiers.Trackers `name:"etcd_election"`
}
// ProvideElection provides a wrapper around etcd based leader election.
func ProvideElection(in ElectionIn) (*Election, error) {
ctx, cancel := context.WithCancel(context.Background())
election := &Election{
doneChan: make(chan struct{}),
}
in.Lifecycle.Append(fx.Hook{
OnStart: func(_ context.Context) error {
// A goroutine to do leader election
panichandler.Go(func() {
defer close(election.doneChan)
session, err := in.Session.WaitSession(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to get etcd session for leader election")
utils.Shutdown(in.Shutdowner)
return
}
// Create an election for this client
election.election = concurrencyv3.NewElection(session, "/election/"+in.AgentInfo.GetAgentGroup())
// Campaign for leadership
err = election.election.Campaign(ctx, info.GetHostInfo().Uuid)
if err != nil {
log.Error().Err(err).Msg("Unable to elect a leader")
utils.Shutdown(in.Shutdowner)
return
}
// Check if canceled
if ctx.Err() != nil {
return
}
// This is the leader
election.isLeader.Store(true)
log.Info().Msg("Node is now a leader")
in.Trackers.WriteEvent(ElectionResultKey, []byte("true"))
})
return nil
},
OnStop: func(stopCtx context.Context) error {
var err error
cancel()
// Wait for the election goroutine to finish
<-election.Done()
// resign from the election if we are the leader
if election.IsLeader() {
election.isLeader.Store(false)
if election.election == nil {
return nil
}
err = election.election.Resign(stopCtx)
if err != nil {
log.Error().Err(err).Msg("Unable to resign from the election")
}
}
return err
},
})
return election, nil
}
// Election is a wrapper around etcd election.
type Election struct {
election *concurrencyv3.Election
isLeader atomic.Bool
// When closed, leader election has stopped (either due to becoming the
// leader or due to cancellation).
// Note: chan is used here instead of WaitGroup, so that calls to
// WaitUntilLeader done before election is started do not immediately return.
doneChan chan struct{}
}
// IsLeader returns true if the current node is the leader.
func (e *Election) IsLeader() bool {
return e.isLeader.Load()
}
// Done returns a channel that could be used to wait for election results.
//
// When the channel is closed then either:
// * Node became a leader (IsLeader() == true),
// * Leader election was canceled (IsLeader() == false).
func (e *Election) Done() <-chan struct{} { return e.doneChan }