forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
retry_join.go
130 lines (110 loc) · 3.14 KB
/
retry_join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package agent
import (
"fmt"
"log"
"strings"
"time"
"github.com/hashicorp/consul/lib"
discover "github.com/hashicorp/go-discover"
discoverk8s "github.com/hashicorp/go-discover/provider/k8s"
)
func (a *Agent) retryJoinLAN() {
r := &retryJoiner{
cluster: "LAN",
addrs: a.config.RetryJoinLAN,
maxAttempts: a.config.RetryJoinMaxAttemptsLAN,
interval: a.config.RetryJoinIntervalLAN,
join: a.JoinLAN,
logger: a.logger,
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}
func (a *Agent) retryJoinWAN() {
r := &retryJoiner{
cluster: "WAN",
addrs: a.config.RetryJoinWAN,
maxAttempts: a.config.RetryJoinMaxAttemptsWAN,
interval: a.config.RetryJoinIntervalWAN,
join: a.JoinWAN,
logger: a.logger,
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}
// retryJoiner is used to handle retrying a join until it succeeds or all
// retries are exhausted.
type retryJoiner struct {
// cluster is the name of the serf cluster, e.g. "LAN" or "WAN".
cluster string
// addrs is the list of servers or go-discover configurations
// to join with.
addrs []string
// maxAttempts is the number of join attempts before giving up.
maxAttempts int
// interval is the time between two join attempts.
interval time.Duration
// join adds the discovered or configured servers to the given
// serf cluster.
join func([]string) (int, error)
// logger is the agent logger. Log messages should contain the
// "agent: " prefix.
logger *log.Logger
}
func (r *retryJoiner) retryJoin() error {
if len(r.addrs) == 0 {
return nil
}
// Copy the default providers, and then add the non-default
providers := make(map[string]discover.Provider)
for k, v := range discover.Providers {
providers[k] = v
}
providers["k8s"] = &discoverk8s.Provider{}
disco, err := discover.New(
discover.WithUserAgent(lib.UserAgent()),
discover.WithProviders(providers),
)
if err != nil {
return err
}
r.logger.Printf("[INFO] agent: Retry join %s is supported for: %s", r.cluster, strings.Join(disco.Names(), " "))
r.logger.Printf("[INFO] agent: Joining %s cluster...", r.cluster)
attempt := 0
for {
var addrs []string
var err error
for _, addr := range r.addrs {
switch {
case strings.Contains(addr, "provider="):
servers, err := disco.Addrs(addr, r.logger)
if err != nil {
r.logger.Printf("[ERR] agent: Join %s: %s", r.cluster, err)
} else {
addrs = append(addrs, servers...)
r.logger.Printf("[INFO] agent: Discovered %s servers: %s", r.cluster, strings.Join(servers, " "))
}
default:
addrs = append(addrs, addr)
}
}
if len(addrs) > 0 {
n, err := r.join(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Join %s completed. Synced with %d initial agents", r.cluster, n)
return nil
}
}
if len(addrs) == 0 {
err = fmt.Errorf("No servers to join")
}
attempt++
if r.maxAttempts > 0 && attempt > r.maxAttempts {
return fmt.Errorf("agent: max join %s retry exhausted, exiting", r.cluster)
}
r.logger.Printf("[WARN] agent: Join %s failed: %v, retrying in %v", r.cluster, err, r.interval)
time.Sleep(r.interval)
}
}