-
Notifications
You must be signed in to change notification settings - Fork 0
/
connect.go
186 lines (160 loc) · 5.16 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package cluster
import (
"crypto/tls"
"encoding/base64"
"encoding/pem"
"fmt"
"net"
"time"
"github.com/pkg/errors"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/version"
)
// Connect is a convenience around lxd.ConnectLXD that configures the client
// with the correct parameters for node-to-node communication.
//
// If 'notify' switch is true, then the user agent will be set to the special
// value 'lxd-cluster-notifier', which can be used in some cases to distinguish
// between a regular client request and an internal cluster request.
func Connect(address string, cert *shared.CertInfo, notify bool) (lxd.InstanceServer, error) {
// Wait for a connection to the events API first for non-notify connections.
if !notify {
connected := false
for i := 0; i < 20; i++ {
listenersLock.Lock()
_, ok := listeners[address]
listenersLock.Unlock()
if ok {
connected = true
break
}
time.Sleep(500 * time.Millisecond)
}
if !connected {
return nil, fmt.Errorf("Missing event connection with target cluster member")
}
}
args := &lxd.ConnectionArgs{
TLSServerCert: string(cert.PublicKey()),
TLSClientCert: string(cert.PublicKey()),
TLSClientKey: string(cert.PrivateKey()),
SkipGetServer: true,
UserAgent: version.UserAgent,
}
if notify {
args.UserAgent = "lxd-cluster-notifier"
}
url := fmt.Sprintf("https://%s", address)
return lxd.ConnectLXD(url, args)
}
// ConnectIfInstanceIsRemote figures out the address of the node which is
// running the container with the given name. If it's not the local node will
// connect to it and return the connected client, otherwise it will just return
// nil.
func ConnectIfInstanceIsRemote(cluster *db.Cluster, project, name string, cert *shared.CertInfo, instanceType instancetype.Type) (lxd.InstanceServer, error) {
var address string // Node address
err := cluster.Transaction(func(tx *db.ClusterTx) error {
var err error
address, err = tx.ContainerNodeAddress(project, name, instanceType)
return err
})
if err != nil {
return nil, err
}
if address == "" {
// The container is running right on this node, no need to connect.
return nil, nil
}
return Connect(address, cert, false)
}
// ConnectIfVolumeIsRemote figures out the address of the node on which the
// volume with the given name is defined. If it's not the local node will
// connect to it and return the connected client, otherwise it will just return
// nil.
//
// If there is more than one node with a matching volume name, an error is
// returned.
func ConnectIfVolumeIsRemote(cluster *db.Cluster, poolID int64, volumeName string, volumeType int, cert *shared.CertInfo) (lxd.InstanceServer, error) {
var addresses []string // Node addresses
err := cluster.Transaction(func(tx *db.ClusterTx) error {
var err error
addresses, err = tx.StorageVolumeNodeAddresses(poolID, "default", volumeName, volumeType)
return err
})
if err != nil {
return nil, err
}
if len(addresses) > 1 {
var driver string
err := cluster.Transaction(func(tx *db.ClusterTx) error {
var err error
driver, err = tx.StoragePoolDriver(poolID)
return err
})
if err != nil {
return nil, err
}
if driver == "ceph" || driver == "cephfs" {
return nil, nil
}
return nil, fmt.Errorf("more than one node has a volume named %s", volumeName)
}
address := addresses[0]
if address == "" {
return nil, nil
}
return Connect(address, cert, false)
}
// SetupTrust is a convenience around InstanceServer.CreateCertificate that
// adds the given client certificate to the trusted pool of the cluster at the
// given address, using the given password.
func SetupTrust(cert, targetAddress, targetCert, targetPassword string) error {
// Connect to the target cluster node.
args := &lxd.ConnectionArgs{
TLSServerCert: targetCert,
UserAgent: version.UserAgent,
}
target, err := lxd.ConnectLXD(fmt.Sprintf("https://%s", targetAddress), args)
if err != nil {
return errors.Wrap(err, "failed to connect to target cluster node")
}
block, _ := pem.Decode([]byte(cert))
if block == nil {
return errors.Wrap(err, "failed to decode certificate")
}
certificate := base64.StdEncoding.EncodeToString(block.Bytes)
post := api.CertificatesPost{
Password: targetPassword,
Certificate: certificate,
}
fingerprint, err := shared.CertFingerprintStr(cert)
if err != nil {
return errors.Wrap(err, "failed to calculate fingerprint")
}
post.Name = fmt.Sprintf("lxd.cluster.%s", fingerprint)
post.Type = "client"
err = target.CreateCertificate(post)
if err != nil && err.Error() != "Certificate already in trust store" {
return errors.Wrap(err, "Failed to add client cert to cluster")
}
return nil
}
// Probe network connectivity to the member with the given address.
func hasConnectivity(cert *shared.CertInfo, address string) bool {
config, err := tlsClientConfig(cert)
if err != nil {
return false
}
var conn net.Conn
dialer := &net.Dialer{Timeout: time.Second}
conn, err = tls.DialWithDialer(dialer, "tcp", address, config)
if err == nil {
conn.Close()
return true
}
return false
}