Skip to content

Commit

Permalink
Fix data race of RTX packet
Browse files Browse the repository at this point in the history
Fix data race of RTX packet
  • Loading branch information
cnderrauber committed Feb 5, 2024
1 parent f68b789 commit 219c6a3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
21 changes: 19 additions & 2 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ type trackStreams struct {
type rtxPacketWithAttributes struct {
pkt []byte
attributes interceptor.Attributes
pool *sync.Pool
}

func (p *rtxPacketWithAttributes) release() {
if p.pkt != nil {
b := p.pkt[:cap(p.pkt)]
p.pool.Put(b) // nolint:staticcheck
p.pkt = nil
}
}

// RTPReceiver allows an application to inspect the receipt of a TrackRemote
Expand All @@ -59,6 +68,8 @@ type RTPReceiver struct {

// A reference to the associated api object
api *API

rtxPool sync.Pool
}

// NewRTPReceiver constructs a new RTPReceiver
Expand All @@ -74,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT
closed: make(chan interface{}),
received: make(chan interface{}),
tracks: []trackStreams{},
rtxPool: sync.Pool{New: func() interface{} {
return make([]byte, api.settingEngine.getReceiveMTU())
}},
}

return r, nil
Expand Down Expand Up @@ -411,10 +425,11 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
track.repairStreamChannel = make(chan rtxPacketWithAttributes)

go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return
}

Expand All @@ -435,6 +450,7 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep

if i-int(headerLength)-paddingLength < 2 {
// BWE probe packet, ignore
r.rtxPool.Put(b) // nolint:staticcheck
continue
}

Expand All @@ -450,8 +466,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep

select {
case <-r.closed:
r.rtxPool.Put(b) // nolint:staticcheck
return
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}:
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
}
}
}()
Expand Down
1 change: 1 addition & 0 deletions track_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
rtxPacketReceived.release()
err = nil
} else {
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return
Expand Down

0 comments on commit 219c6a3

Please sign in to comment.