@@ -163,9 +163,11 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
163163 ctx = slog .With (ctx , slog .F ("conn_id" , id ))
164164
165165 var (
166- err error
167- decoder = json .NewDecoder (conn )
168- rtc * webrtc.PeerConnection
166+ err error
167+ decoder = json .NewDecoder (conn )
168+ rtc * webrtc.PeerConnection
169+ connClosers = make ([]io.Closer , 0 )
170+ connClosersMut sync.Mutex
169171 // If candidates are sent before an offer, we place them here.
170172 // We currently have no assurances to ensure this can't happen,
171173 // so it's better to buffer and process than fail.
@@ -255,6 +257,9 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
255257 closeError (err )
256258 return
257259 }
260+ l .connClosersMut .Lock ()
261+ l .connClosers = append (l .connClosers , rtc )
262+ l .connClosersMut .Unlock ()
258263 rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
259264 l .log .Info (ctx , "connection state change" , slog .F ("state" , pcs .String ()))
260265 switch pcs {
@@ -267,16 +272,16 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
267272 }
268273
269274 // Close connections opened when RTC was alive.
270- l . connClosersMut .Lock ()
271- defer l . connClosersMut .Unlock ()
272- for _ , connCloser := range l . connClosers {
275+ connClosersMut .Lock ()
276+ defer connClosersMut .Unlock ()
277+ for _ , connCloser := range connClosers {
273278 _ = connCloser .Close ()
274279 }
275- l . connClosers = make ([]io.Closer , 0 )
280+ connClosers = make ([]io.Closer , 0 )
276281 })
277282
278283 flushCandidates := proxyICECandidates (rtc , conn )
279- rtc .OnDataChannel (l .handle (ctx , msg ))
284+ rtc .OnDataChannel (l .handle (ctx , msg , & connClosers , & connClosersMut ))
280285
281286 l .log .Debug (ctx , "set remote description" , slog .F ("offer" , * msg .Offer ))
282287 err = rtc .SetRemoteDescription (* msg .Offer )
@@ -329,7 +334,7 @@ func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
329334}
330335
331336// nolint:gocognit
332- func (l * listener ) handle (ctx context.Context , msg BrokerMessage ) func (dc * webrtc.DataChannel ) {
337+ func (l * listener ) handle (ctx context.Context , msg BrokerMessage , connClosers * []io. Closer , connClosersMut * sync. Mutex ) func (dc * webrtc.DataChannel ) {
333338 return func (dc * webrtc.DataChannel ) {
334339 if dc .Protocol () == controlChannel {
335340 // The control channel handles pings.
@@ -430,9 +435,9 @@ func (l *listener) handle(ctx context.Context, msg BrokerMessage) func(dc *webrt
430435 dc : dc ,
431436 rw : rw ,
432437 }
433- l . connClosersMut .Lock ()
434- l . connClosers = append (l . connClosers , co )
435- l . connClosersMut .Unlock ()
438+ connClosersMut .Lock ()
439+ * connClosers = append (* connClosers , co )
440+ connClosersMut .Unlock ()
436441 co .init ()
437442 defer nc .Close ()
438443 defer co .Close ()
0 commit comments