-
Notifications
You must be signed in to change notification settings - Fork 13
/
leader_elector.go
116 lines (97 loc) · 3.03 KB
/
leader_elector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package kubernetes
import (
"context"
"fmt"
"github.com/asaskevich/EventBus"
"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/membership"
"github.com/Trendyol/go-dcp/models"
"github.com/Trendyol/go-dcp/leaderelector"
"github.com/Trendyol/go-dcp/logger"
"github.com/Trendyol/go-dcp/helpers"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
type leaderElector struct {
client Client
handler leaderelector.Handler
bus EventBus.Bus
myIdentity *models.Identity
leaderElectorConfig *config.KubernetesLeaderElector
}
func (le *leaderElector) Run(ctx context.Context) {
callback := leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
logger.Log.Debug("granted to leader")
le.client.AddLabel("role", "leader")
le.handler.OnBecomeLeader()
},
OnStoppedLeading: func() {
logger.Log.Debug("revoked from leader")
le.client.RemoveLabel("role")
le.handler.OnResignLeader()
},
OnNewLeader: func(leaderIdentityStr string) {
leaderIdentity := models.NewIdentityFromStr(leaderIdentityStr)
if le.myIdentity.Equal(leaderIdentity) {
return
}
logger.Log.Debug("granted to follower for leader: %s", leaderIdentity.Name)
le.client.AddLabel("role", "follower")
le.handler.OnBecomeFollower(leaderIdentity)
},
}
go func() {
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: v1.ObjectMeta{
Name: le.leaderElectorConfig.LeaseLockName,
Namespace: le.leaderElectorConfig.LeaseLockNamespace,
},
Client: le.client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: le.myIdentity.String(),
},
},
ReleaseOnCancel: true,
LeaseDuration: le.leaderElectorConfig.LeaseDuration,
RenewDeadline: le.leaderElectorConfig.RenewDeadline,
RetryPeriod: le.leaderElectorConfig.RetryPeriod,
Callbacks: callback,
})
}()
}
func (le *leaderElector) Close() {
err := le.bus.Unsubscribe(helpers.MembershipChangedBusEventName, le.membershipChangedListener)
if err != nil {
logger.Log.Error("error while unsubscribe: %v", err)
}
}
func (le *leaderElector) membershipChangedListener(model *membership.Model) {
le.client.AddLabel(
"member",
fmt.Sprintf("%v_%v", model.MemberNumber, model.TotalMembers),
)
}
func NewLeaderElector(
client Client,
config *config.Dcp,
myIdentity *models.Identity,
handler leaderelector.Handler,
bus EventBus.Bus,
) leaderelector.LeaderElector {
le := &leaderElector{
client: client,
myIdentity: myIdentity,
handler: handler,
leaderElectorConfig: config.GetKubernetesLeaderElector(),
bus: bus,
}
err := bus.SubscribeAsync(helpers.MembershipChangedBusEventName, le.membershipChangedListener, true)
if err != nil {
logger.Log.Error("error while subscribe to membership changed event, err: %v", err)
panic(err)
}
return le
}