This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
/
leaderelection.go
81 lines (67 loc) · 2.14 KB
/
leaderelection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package controller
import (
"context"
"fmt"
"os"
v12 "k8s.io/client-go/kubernetes/typed/coordination/v1"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"k8s.io/apimachinery/pkg/util/rand"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)
const (
// Env var to lookup pod name in. In pod spec, you will have to specify it like this:
// env:
// - name: POD_NAME
// valueFrom:
// fieldRef:
// fieldPath: metadata.name
podNameEnvVar = "POD_NAME"
)
// NewResourceLock creates a new config map resource lock for use in a leader election loop
func newResourceLock(corev1 v1.CoreV1Interface, coordinationV1 v12.CoordinationV1Interface, eventRecorder record.EventRecorder, options config.LeaderElectionConfig) (
resourcelock.Interface, error) {
if !options.Enabled {
return nil, nil
}
// Default the LeaderElectionID
if len(options.LockConfigMap.String()) == 0 {
return nil, fmt.Errorf("to enable leader election, a config map must be provided")
}
// Leader id, needs to be unique
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
options.LockConfigMap.Namespace,
options.LockConfigMap.Name,
corev1,
coordinationV1,
resourcelock.ResourceLockConfig{
Identity: getUniqueLeaderID(),
EventRecorder: eventRecorder,
})
}
func getUniqueLeaderID() string {
val, found := os.LookupEnv(podNameEnvVar)
if found {
return val
}
id, err := os.Hostname()
if err != nil {
id = ""
}
return fmt.Sprintf("%v_%v", id, rand.String(10))
}
func newLeaderElector(lock resourcelock.Interface, cfg config.LeaderElectionConfig,
leaderFn func(ctx context.Context), leaderStoppedFn func()) (*leaderelection.LeaderElector, error) {
return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: cfg.LeaseDuration.Duration,
RenewDeadline: cfg.RenewDeadline.Duration,
RetryPeriod: cfg.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: leaderFn,
OnStoppedLeading: leaderStoppedFn,
},
})
}