forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
etcd.go
149 lines (134 loc) · 4.64 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package etcd
import (
"fmt"
"net"
"net/http"
"net/http/httputil"
"time"
newetcdclient "github.com/coreos/etcd/client"
etcdclient "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/client/restclient"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
knet "k8s.io/kubernetes/pkg/util/net"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
)
// GetAndTestEtcdClient creates an etcd client based on the provided config. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
// TODO: switch this function to use EtcdHelper.
func GetAndTestEtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdclient.Client, error) {
etcdClient, err := EtcdClient(etcdClientInfo)
if err != nil {
return nil, err
}
if err := TestEtcdClient(etcdClient); err != nil {
return nil, err
}
return etcdClient, nil
}
// EtcdClient creates an etcd client based on the provided config.
func EtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdclient.Client, error) {
tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
CertFile: etcdClientInfo.ClientCert.CertFile,
KeyFile: etcdClientInfo.ClientCert.KeyFile,
CAFile: etcdClientInfo.CA,
},
})
if err != nil {
return nil, err
}
transport := knet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
Dial: (&net.Dialer{
// default from http.DefaultTransport
Timeout: 30 * time.Second,
// Lower the keep alive for connections.
KeepAlive: 1 * time.Second,
}).Dial,
// Because watches are very bursty, defends against long delays in watch reconnections.
MaxIdleConnsPerHost: 500,
})
etcdClient := etcdclient.NewClient(etcdClientInfo.URLs)
etcdClient.SetTransport(transport)
etcdClient.CheckRetry = NeverRetryOnFailure
return etcdClient, nil
}
// MakeNewEtcdClient creates an etcd client based on the provided config.
func MakeNewEtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (newetcdclient.Client, error) {
tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
CertFile: etcdClientInfo.ClientCert.CertFile,
KeyFile: etcdClientInfo.ClientCert.KeyFile,
CAFile: etcdClientInfo.CA,
},
})
if err != nil {
return nil, err
}
transport := knet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
Dial: (&net.Dialer{
// default from http.DefaultTransport
Timeout: 30 * time.Second,
// Lower the keep alive for connections.
KeepAlive: 1 * time.Second,
}).Dial,
// Because watches are very bursty, defends against long delays in watch reconnections.
MaxIdleConnsPerHost: 500,
})
cfg := newetcdclient.Config{
Endpoints: etcdClientInfo.URLs,
// TODO: Determine if transport needs optimization
Transport: transport,
}
return newetcdclient.New(cfg)
}
// TestEtcdClient verifies a client is functional. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
func TestEtcdClient(etcdClient *etcdclient.Client) error {
for i := 0; ; i++ {
_, err := etcdClient.Get("/", false, false)
if err == nil || etcdutil.IsEtcdNotFound(err) {
break
}
if i > 100 {
return fmt.Errorf("could not reach etcd: %v", err)
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
// TestEtcdClient verifies a client is functional. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
func TestNewEtcdClient(etcdClient newetcdclient.Client) error {
for i := 0; ; i++ {
_, err := newetcdclient.NewKeysAPI(etcdClient).Get(context.Background(), "/", nil)
if err == nil || etcdutil.IsEtcdNotFound(err) {
break
}
if i > 100 {
return fmt.Errorf("could not reach etcd: %v", err)
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
// NeverRetryOnFailure is a retry function for the etcdClient. If there's only one machine, master election doesn't make much sense,
// so we don't bother to retry, we simply dump the failure and return the error directly.
func NeverRetryOnFailure(cluster *etcdclient.Cluster, numReqs int, lastResp http.Response, err error) error {
if len(cluster.Machines) > 1 {
return etcdclient.DefaultCheckRetry(cluster, numReqs, lastResp, err)
}
content, err := httputil.DumpResponse(&lastResp, true)
if err != nil {
glog.Errorf("failure dumping response: %v", err)
} else {
glog.Errorf("etcd failure response: %s", string(content))
}
return err
}