-
Notifications
You must be signed in to change notification settings - Fork 2
/
multidialer.go
73 lines (61 loc) · 2.41 KB
/
multidialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Package multidialer implements a connection dialer that is able to connect
// to different apiserver to provide HA.
// This is used for bootstrapping components, since the apiserver Service is not
// present, the client-go can connect to one apiserver, and if it is down, try
// to connect to the next available one.
// This is a simple mechanism to provide HA without relying on external components.
// It doesn't provide load balancing or any other advanced features, if a connection
// fails it just simple try with the next server in the pool.
// The retry logic is kept on the clients.
package multidialer
import (
"context"
"net"
"time"
"k8s.io/client-go/kubernetes"
)
// DialFunc is a shorthand for signature of net.DialContext.
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
// Dialer opens connections through Dial.
// It iterates over a list of hosts that can be updated externally until it succeeds.
type Dialer struct {
dial DialFunc
resolver *resolver
}
// NewDialer creates a new Dialer instance.
func NewDialer(dial DialFunc) *Dialer {
if dial == nil {
dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
}
return NewDialerWithAlternateHosts(dial, []string{})
}
// NewDialerWithAlternateHosts creates a new Dialer instance.
// If dial is not nil, it will be used to create new underlying connections.
// Otherwise net.DialContext is used.
// If alternateHosts is not nil, it will be used to retry failed connections.
func NewDialerWithAlternateHosts(dial DialFunc, alternateHosts []string) *Dialer {
return &Dialer{
dial: dial,
resolver: NewResolver(alternateHosts),
}
}
// Dial creates a new tracked connection.
func (d *Dialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(context.Background(), network, address)
}
// DialContext creates a new connection trying to connect serially over the list of ready hosts in the pool
func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
for _, host := range d.resolver.listReady() {
conn, err := d.dial(ctx, network, host)
if err == nil {
// connection working, record the host
// so we can use it the next time
d.resolver.setLast(host)
return conn, nil
}
}
return d.dial(ctx, network, address)
}
func (d *Dialer) Start(ctx context.Context, clientset kubernetes.Interface) {
d.resolver.start(ctx, clientset)
}