88 "io"
99 "net"
1010 "net/url"
11+ "os"
1112 "sync"
1213 "time"
1314
@@ -16,11 +17,18 @@ import (
1617 "golang.org/x/net/proxy"
1718 "nhooyr.io/websocket"
1819
20+ "cdr.dev/slog"
21+ "cdr.dev/slog/sloggers/sloghuman"
22+
1923 "cdr.dev/coder-cli/coder-sdk"
2024)
2125
2226// DialOptions are configurable options for a wsnet connection.
2327type DialOptions struct {
28+ // Logger is an optional logger to use for logging mostly debug messages. If
29+ // set to nil, nothing will be logged.
30+ Log * slog.Logger
31+
2432 // ICEServers is an array of STUN or TURN servers to use for negotiation purposes.
2533 // See: https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration/iceServers
2634 ICEServers []webrtc.ICEServer
@@ -36,6 +44,17 @@ type DialOptions struct {
3644
3745// DialWebsocket dials the broker with a WebSocket and negotiates a connection.
3846func DialWebsocket (ctx context.Context , broker string , netOpts * DialOptions , wsOpts * websocket.DialOptions ) (* Dialer , error ) {
47+ if netOpts == nil {
48+ netOpts = & DialOptions {}
49+ }
50+ if netOpts .Log == nil {
51+ // This logger will log nothing.
52+ log := slog .Make ()
53+ netOpts .Log = & log
54+ }
55+ log := * netOpts .Log
56+
57+ log .Debug (ctx , "connecting to broker" , slog .F ("broker" , broker ))
3958 conn , resp , err := websocket .Dial (ctx , broker , wsOpts )
4059 if err != nil {
4160 if resp != nil {
@@ -46,6 +65,8 @@ func DialWebsocket(ctx context.Context, broker string, netOpts *DialOptions, wsO
4665 }
4766 return nil , fmt .Errorf ("dial websocket: %w" , err )
4867 }
68+ log .Debug (ctx , "connected to broker" )
69+
4970 nconn := websocket .NetConn (ctx , conn , websocket .MessageBinary )
5071 defer func () {
5172 _ = nconn .Close ()
@@ -60,6 +81,11 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
6081 if options == nil {
6182 options = & DialOptions {}
6283 }
84+ if options .Log == nil {
85+ log := slog .Make (sloghuman .Sink (os .Stderr )).Leveled (slog .LevelInfo ).Named ("wsnet_dial" )
86+ options .Log = & log
87+ }
88+ log := * options .Log
6389 if options .ICEServers == nil {
6490 options .ICEServers = []webrtc.ICEServer {}
6591 }
@@ -71,13 +97,20 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
7197 token : options .TURNProxyAuthToken ,
7298 }
7399 }
100+
101+ log .Debug (ctx , "creating peer connection" , slog .F ("options" , options ), slog .F ("turn_proxy" , turnProxy ))
74102 rtc , err := newPeerConnection (options .ICEServers , turnProxy )
75103 if err != nil {
76104 return nil , fmt .Errorf ("create peer connection: %w" , err )
77105 }
106+ log .Debug (ctx , "created peer connection" )
107+ rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
108+ log .Debug (ctx , "connection state change" , slog .F ("state" , pcs .String ()))
109+ })
78110
79111 flushCandidates := proxyICECandidates (rtc , conn )
80112
113+ log .Debug (ctx , "creating control channel" , slog .F ("proto" , controlChannel ))
81114 ctrl , err := rtc .CreateDataChannel (controlChannel , & webrtc.DataChannelInit {
82115 Protocol : stringPtr (controlChannel ),
83116 Ordered : boolPtr (true ),
@@ -90,6 +123,7 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
90123 if err != nil {
91124 return nil , fmt .Errorf ("create offer: %w" , err )
92125 }
126+ log .Debug (ctx , "created offer" , slog .F ("offer" , offer ))
93127 err = rtc .SetLocalDescription (offer )
94128 if err != nil {
95129 return nil , fmt .Errorf ("set local offer: %w" , err )
@@ -100,21 +134,25 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
100134 turnProxyURL = options .TURNProxyURL .String ()
101135 }
102136
103- offerMessage , err := json . Marshal ( & BrokerMessage {
137+ bmsg := BrokerMessage {
104138 Offer : & offer ,
105139 Servers : options .ICEServers ,
106140 TURNProxyURL : turnProxyURL ,
107- })
141+ }
142+ log .Debug (ctx , "sending offer message" , slog .F ("msg" , bmsg ))
143+ offerMessage , err := json .Marshal (& bmsg )
108144 if err != nil {
109145 return nil , fmt .Errorf ("marshal offer message: %w" , err )
110146 }
147+
111148 _ , err = conn .Write (offerMessage )
112149 if err != nil {
113150 return nil , fmt .Errorf ("write offer: %w" , err )
114151 }
115152 flushCandidates ()
116153
117154 dialer := & Dialer {
155+ log : log ,
118156 conn : conn ,
119157 ctrl : ctrl ,
120158 rtc : rtc ,
@@ -128,6 +166,7 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
128166// inside a workspace. The opposing end of the WebSocket messages
129167// should be proxied with a Listener.
130168type Dialer struct {
169+ log slog.Logger
131170 conn net.Conn
132171 ctrl * webrtc.DataChannel
133172 ctrlrw datachannel.ReadWriteCloser
@@ -152,20 +191,25 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
152191 defer func () {
153192 _ = d .conn .Close ()
154193 }()
155- err := waitForConnectionOpen (ctx , d .rtc )
194+
195+ err := waitForConnectionOpen (context .Background (), d .rtc )
156196 if err != nil {
197+ d .log .Debug (ctx , "negotiation error" , slog .Error (err ))
157198 if errors .Is (err , context .DeadlineExceeded ) {
158199 _ = d .conn .Close ()
159200 }
160- errCh <- err
201+ errCh <- fmt . Errorf ( "wait for connection to open: %w" , err )
161202 return
162203 }
204+
163205 d .rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
164206 if pcs == webrtc .PeerConnectionStateConnected {
207+ d .log .Debug (ctx , "connected" )
165208 return
166209 }
167210
168211 // Close connections opened when RTC was alive.
212+ d .log .Warn (ctx , "closing connections due to connection state change" , slog .F ("pcs" , pcs .String ()))
169213 d .connClosersMut .Lock ()
170214 defer d .connClosersMut .Unlock ()
171215 for _ , connCloser := range d .connClosers {
@@ -175,6 +219,7 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
175219 })
176220 }()
177221
222+ d .log .Debug (ctx , "beginning negotiation" )
178223 for {
179224 var msg BrokerMessage
180225 err = decoder .Decode (& msg )
@@ -184,6 +229,8 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
184229 if err != nil {
185230 return fmt .Errorf ("read: %w" , err )
186231 }
232+ d .log .Debug (ctx , "got message from handshake conn" , slog .F ("msg" , msg ))
233+
187234 if msg .Candidate != "" {
188235 c := webrtc.ICECandidateInit {
189236 Candidate : msg .Candidate ,
@@ -192,17 +239,22 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
192239 pendingCandidates = append (pendingCandidates , c )
193240 continue
194241 }
242+
243+ d .log .Debug (ctx , "adding remote ICE candidate" , slog .F ("c" , c ))
195244 err = d .rtc .AddICECandidate (c )
196245 if err != nil {
197246 return fmt .Errorf ("accept ice candidate: %s: %w" , msg .Candidate , err )
198247 }
199248 continue
200249 }
250+
201251 if msg .Answer != nil {
252+ d .log .Debug (ctx , "received answer" , slog .F ("a" , * msg .Answer ))
202253 err = d .rtc .SetRemoteDescription (* msg .Answer )
203254 if err != nil {
204255 return fmt .Errorf ("set answer: %w" , err )
205256 }
257+
206258 for _ , candidate := range pendingCandidates {
207259 err = d .rtc .AddICECandidate (candidate )
208260 if err != nil {
@@ -212,11 +264,15 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
212264 pendingCandidates = nil
213265 continue
214266 }
267+
215268 if msg .Error != "" {
216- return errors .New (msg .Error )
269+ d .log .Debug (ctx , "got error from peer" , slog .F ("err" , msg .Error ))
270+ return fmt .Errorf ("error from peer: %v" , msg .Error )
217271 }
272+
218273 return fmt .Errorf ("unhandled message: %+v" , msg )
219274 }
275+
220276 return <- errCh
221277}
222278
@@ -234,6 +290,7 @@ func (d *Dialer) activeConnections() int {
234290// Close closes the RTC connection.
235291// All data channels dialed will be closed.
236292func (d * Dialer ) Close () error {
293+ d .log .Debug (context .Background (), "close called" )
237294 return d .rtc .Close ()
238295}
239296
@@ -242,6 +299,7 @@ func (d *Dialer) Ping(ctx context.Context) error {
242299 if d .ctrl .ReadyState () == webrtc .DataChannelStateClosed || d .ctrl .ReadyState () == webrtc .DataChannelStateClosing {
243300 return webrtc .ErrConnectionClosed
244301 }
302+
245303 // Since we control the client and server we could open this
246304 // data channel with `Negotiated` true to reduce traffic being
247305 // sent when the RTC connection is opened.
@@ -257,6 +315,7 @@ func (d *Dialer) Ping(ctx context.Context) error {
257315 }
258316 d .pingMut .Lock ()
259317 defer d .pingMut .Unlock ()
318+ d .log .Debug (ctx , "sending ping" )
260319 _ , err = d .ctrlrw .Write ([]byte {'a' })
261320 if err != nil {
262321 return fmt .Errorf ("write: %w" , err )
@@ -281,13 +340,18 @@ func (d *Dialer) Ping(ctx context.Context) error {
281340
282341// DialContext dials the network and address on the remote listener.
283342func (d * Dialer ) DialContext (ctx context.Context , network , address string ) (net.Conn , error ) {
343+ proto := fmt .Sprintf ("%s:%s" , network , address )
344+ ctx = slog .With (ctx , slog .F ("proto" , proto ))
345+
346+ d .log .Debug (ctx , "opening data channel" )
284347 dc , err := d .rtc .CreateDataChannel ("proxy" , & webrtc.DataChannelInit {
285348 Ordered : boolPtr (network != "udp" ),
286- Protocol : stringPtr ( fmt . Sprintf ( "%s:%s" , network , address )) ,
349+ Protocol : & proto ,
287350 })
288351 if err != nil {
289352 return nil , fmt .Errorf ("create data channel: %w" , err )
290353 }
354+
291355 d .connClosersMut .Lock ()
292356 d .connClosers = append (d .connClosers , dc )
293357 d .connClosersMut .Unlock ()
@@ -296,10 +360,18 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
296360 if err != nil {
297361 return nil , fmt .Errorf ("wait for open: %w" , err )
298362 }
363+
364+ ctx = slog .With (ctx , slog .F ("dc_id" , dc .ID ()))
365+ d .log .Debug (ctx , "data channel opened" )
366+
299367 rw , err := dc .Detach ()
300368 if err != nil {
301369 return nil , fmt .Errorf ("detach: %w" , err )
302370 }
371+ d .log .Debug (ctx , "data channel detached" )
372+
373+ ctx , cancel := context .WithTimeout (ctx , time .Second * 5 )
374+ defer cancel ()
303375
304376 errCh := make (chan error )
305377 go func () {
@@ -309,6 +381,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
309381 errCh <- fmt .Errorf ("read dial response: %w" , err )
310382 return
311383 }
384+ d .log .Debug (ctx , "dial response" , slog .F ("res" , res ))
312385 if res .Err == "" {
313386 close (errCh )
314387 return
@@ -323,8 +396,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
323396 }
324397 errCh <- err
325398 }()
326- ctx , cancel := context .WithTimeout (ctx , time .Second * 5 )
327- defer cancel ()
399+
328400 select {
329401 case err := <- errCh :
330402 if err != nil {
@@ -343,5 +415,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
343415 rw : rw ,
344416 }
345417 c .init ()
418+
419+ d .log .Debug (ctx , "dial channel ready" )
346420 return c , nil
347421}
0 commit comments