@@ -159,9 +159,11 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
159159// so the cognitive overload linter has been disabled.
160160// nolint:gocognit,nestif
161161func (l * listener ) negotiate (ctx context.Context , conn net.Conn ) {
162+ id := atomic .AddInt64 (& l .nextConnNumber , 1 )
163+ ctx = slog .With (ctx , slog .F ("conn_id" , id ))
164+
162165 var (
163166 err error
164- id = atomic .AddInt64 (& l .nextConnNumber , 1 )
165167 decoder = json .NewDecoder (conn )
166168 rtc * webrtc.PeerConnection
167169 // If candidates are sent before an offer, we place them here.
@@ -171,7 +173,7 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
171173 // Sends the error provided then closes the connection.
172174 // If RTC isn't connected, we'll close it.
173175 closeError = func (err error ) {
174- l .log .Warn (ctx , "negotiation error, closing connection" , slog .Error (err ))
176+ // l.log.Warn(ctx, "negotiation error, closing connection", slog.Error(err))
175177
176178 d , _ := json .Marshal (& BrokerMessage {
177179 Error : err .Error (),
@@ -187,7 +189,6 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
187189 }
188190 )
189191
190- ctx = slog .With (ctx , slog .F ("conn_id" , id ))
191192 l .log .Info (ctx , "accepted new session from broker connection, negotiating" )
192193
193194 for {
@@ -255,17 +256,26 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
255256 return
256257 }
257258 rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
258- l .log .Debug (ctx , "connection state change" , slog .F ("state" , pcs .String ()))
259- if pcs == webrtc .PeerConnectionStateConnecting {
259+ l .log .Info (ctx , "connection state change" , slog .F ("state" , pcs .String ()))
260+ switch pcs {
261+ case webrtc .PeerConnectionStateConnected :
260262 return
263+ case webrtc .PeerConnectionStateConnecting :
264+ // Safe to close the negotiating WebSocket.
265+ _ = conn .Close ()
266+ return
267+ }
268+
269+ // Close connections opened when RTC was alive.
270+ l .connClosersMut .Lock ()
271+ defer l .connClosersMut .Unlock ()
272+ for _ , connCloser := range l .connClosers {
273+ _ = connCloser .Close ()
261274 }
262- _ = conn . Close ( )
275+ l . connClosers = make ([]io. Closer , 0 )
263276 })
264277
265278 flushCandidates := proxyICECandidates (rtc , conn )
266- l .connClosersMut .Lock ()
267- l .connClosers = append (l .connClosers , rtc )
268- l .connClosersMut .Unlock ()
269279 rtc .OnDataChannel (l .handle (ctx , msg ))
270280
271281 l .log .Debug (ctx , "set remote description" , slog .F ("offer" , * msg .Offer ))
@@ -420,6 +430,9 @@ func (l *listener) handle(ctx context.Context, msg BrokerMessage) func(dc *webrt
420430 dc : dc ,
421431 rw : rw ,
422432 }
433+ l .connClosersMut .Lock ()
434+ l .connClosers = append (l .connClosers , co )
435+ l .connClosersMut .Unlock ()
423436 co .init ()
424437 defer nc .Close ()
425438 defer co .Close ()
0 commit comments