-
Notifications
You must be signed in to change notification settings - Fork 907
/
migration_connection.go
176 lines (145 loc) · 4.27 KB
/
migration_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
"github.com/canonical/lxd/shared/tcp"
"github.com/canonical/lxd/shared/ws"
)
// setupWebsocketDialer uses a certificate to parse and configure a websocket.Dialer.
func setupWebsocketDialer(certificate string) (*websocket.Dialer, error) {
var err error
var cert *x509.Certificate
if certificate != "" {
certBlock, _ := pem.Decode([]byte(certificate))
if certBlock == nil {
return nil, fmt.Errorf("Failed PEM decoding certificate")
}
cert, err = x509.ParseCertificate(certBlock.Bytes)
if err != nil {
return nil, fmt.Errorf("Failed parsing certificate: %w", err)
}
}
config, err := shared.GetTLSConfig(cert)
if err != nil {
return nil, fmt.Errorf("Failed configuring TLS: %w", err)
}
dialer := &websocket.Dialer{
TLSClientConfig: config,
NetDialContext: shared.RFC3493Dialer,
HandshakeTimeout: time.Second * 5,
}
return dialer, nil
}
// newMigrationConn configures a new migration connection handler.
func newMigrationConn(secret string, outgoingDialer *websocket.Dialer, outgoingURL *url.URL) *migrationConn {
return &migrationConn{
secret: secret,
outgoingDialer: outgoingDialer,
outgoingURL: outgoingURL,
connected: make(chan struct{}),
}
}
// migrationConn represents a handler for both accepting and making new migration connections.
type migrationConn struct {
mu sync.Mutex
secret string
outgoingDialer *websocket.Dialer
outgoingURL *url.URL
conn *websocket.Conn
connected chan struct{}
disconnected bool
}
// Secret returns the secret for this connection.
func (c *migrationConn) Secret() string {
return c.secret
}
// AcceptIncoming takes an incoming HTTP request and upgrades it to a websocket.
func (c *migrationConn) AcceptIncoming(r *http.Request, w http.ResponseWriter) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.disconnected {
return fmt.Errorf("Connection already disconnected")
}
if c.conn != nil {
return api.StatusErrorf(http.StatusConflict, "Connection already established")
}
var err error
c.conn, err = ws.Upgrader.Upgrade(w, r, nil)
if err != nil {
return fmt.Errorf("Failed upgrading incoming request to websocket: %w", err)
}
// Set TCP timeout options.
remoteTCP, _ := tcp.ExtractConn(c.conn.UnderlyingConn())
if remoteTCP != nil {
err = tcp.SetTimeouts(remoteTCP, 0)
if err != nil {
logger.Warn("Failed setting TCP timeouts on incoming websocket connection", logger.Ctx{"err": err})
}
}
close(c.connected)
return nil
}
// WebSocket returns the underlying websocket connection.
// If the connection isn't yet active it will either wait for an incoming connection or if configured, will atempt
// to initiate a new outbound connection. If the context is cancelled before the connection is established it
// will return with an error.
func (c *migrationConn) WebSocket(ctx context.Context) (*websocket.Conn, error) {
c.mu.Lock()
if c.disconnected {
c.mu.Unlock()
return nil, fmt.Errorf("Connection already disconnected")
}
if c.conn != nil {
c.mu.Unlock()
return c.conn, nil
}
if c.outgoingURL != nil && c.outgoingDialer != nil {
var err error
q := c.outgoingURL.Query()
q.Set("secret", c.secret)
c.outgoingURL.RawQuery = q.Encode()
c.conn, _, err = c.outgoingDialer.DialContext(ctx, c.outgoingURL.String(), http.Header{})
if err != nil {
c.mu.Unlock()
return nil, err
}
c.mu.Unlock()
return c.conn, nil
}
c.mu.Unlock()
select {
case <-c.connected:
return c.conn, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// WebsocketIO calls WebSocket and returns it wrapped for io.ReadWriteCloser compatibility.
func (c *migrationConn) WebsocketIO(ctx context.Context) (io.ReadWriteCloser, error) {
wsConn, err := c.WebSocket(ctx)
if err != nil {
return nil, err
}
return ws.NewWrapper(wsConn), nil
}
// Close closes the connection (if established) and marks it as disconnected so that it cannot be used again.
func (c *migrationConn) Close() {
c.mu.Lock()
defer c.mu.Unlock()
c.disconnected = true
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}