Skip to content

Commit

Permalink
reproduce
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseevx committed Apr 23, 2024
1 parent a9e88d2 commit f8a4a76
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 6 deletions.
68 changes: 63 additions & 5 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"io"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1255,43 +1256,70 @@ func TestPeerConnection_Simulcast(t *testing.T) {
return ridCount == 3
}

var rtxPacketRead atomic.Int32
var wg sync.WaitGroup
wg.Add(3)

pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
ridMapLock.Unlock()

defer wg.Done()

for {
pkt, _, err := trackRemote.ReadRTP()
if err != nil {
break
}
if pkt.PayloadType == 97 {
rtxPacketRead.Add(1)
}
}
})

parameters := sender.GetParameters()
assert.Equal(t, "a", parameters.Encodings[0].RID)
assert.Equal(t, "b", parameters.Encodings[1].RID)
assert.Equal(t, "c", parameters.Encodings[2].RID)

var midID, ridID uint8
var midID, ridID, rsid uint8
for _, extension := range parameters.HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID)
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID)
case sdesRepairRTPStreamIDURI:
rsid = uint8(extension.ID)
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)
assert.NotZero(t, rsid)

err = signalPairWithModification(pcOffer, pcAnswer, func (sdp string) string {
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
res := re.ReplaceAllString(sdp, "")
return res
})
assert.NoError(t, err)

assert.NoError(t, signalPair(pcOffer, pcAnswer))

// padding only packets should not affect simulcast probe
var sequenceNumber uint16
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
for i, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
Padding: true,
SSRC: uint32(i),
},
Payload: []byte{0x00, 0x02},
}
Expand All @@ -1304,12 +1332,13 @@ func TestPeerConnection_Simulcast(t *testing.T) {
for ; !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
for i, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
},
Payload: []byte{0x00},
}
Expand All @@ -1321,7 +1350,36 @@ func TestPeerConnection_Simulcast(t *testing.T) {
}

assertRidCorrect(t)

for i := 0; i < simulcastProbeCount+10; i++ {
sequenceNumber++
time.Sleep(10 * time.Millisecond)

for j, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 97,
SSRC: uint32(100 + j),
},
Payload: []byte{0x00, 0x00, 0x00, 0x00, 0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
assert.NoError(t, pkt.Header.SetExtension(rsid, []byte(track.RID())))

assert.NoError(t, track.WriteRTP(pkt))
}
}

time.Sleep(time.Second)

closePairNow(t, pcOffer, pcAnswer)

wg.Wait()

assert.Greater(t, rtxPacketRead.Load(), int32(0), "no rtx packet read")
})

t.Run("RTCP", func(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC()))
copy(b[headerLength:i-2], b[headerLength+2:i])

fmt.Println("trying to write to track.repairStreamChannel")

select {
case <-r.closed:
r.rtxPool.Put(b) // nolint:staticcheck
Expand Down Expand Up @@ -523,6 +525,7 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
fmt.Println("readRTX not reading from t.repairStreamChannel, reader.HasRTX == false")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion track_local_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *TrackLocalStaticRTP) writeRTP(p *rtp.Packet) error {
writeErrs := []error{}

for _, b := range s.bindings {
p.Header.SSRC = uint32(b.ssrc)
// p.Header.SSRC = uint32(b.ssrc)
p.Header.PayloadType = uint8(b.payloadType)
if _, err := b.writeStream.WriteRTP(&p.Header, p.Payload); err != nil {
writeErrs = append(writeErrs, err)
Expand Down

0 comments on commit f8a4a76

Please sign in to comment.