Skip to content

Commit

Permalink
DownTrack scoring when RR is not received. (livekit#1664)
Browse files Browse the repository at this point in the history
  • Loading branch information
boks1971 authored and hautvfami committed Jul 21, 2023
1 parent 9f36ad8 commit 32a3c60
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 173 deletions.
176 changes: 113 additions & 63 deletions pkg/sfu/buffer/rtpstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type RTPDeltaInfo struct {
type Snapshot struct {
startTime time.Time
extStartSN uint32
extStartSNOverridden uint32
packetsDuplicate uint32
bytesDuplicate uint64
headerBytesDuplicate uint64
Expand Down Expand Up @@ -294,7 +295,11 @@ func (r *RTPStats) NewSnapshotId() uint32 {

id := r.nextSnapshotId
if r.initialized {
r.snapshots[id] = &Snapshot{startTime: time.Now(), extStartSN: r.extStartSN}
r.snapshots[id] = &Snapshot{
startTime: time.Now(),
extStartSN: r.extStartSN,
extStartSNOverridden: r.extStartSN,
}
}

r.nextSnapshotId++
Expand Down Expand Up @@ -334,7 +339,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa

// initialize snapshots if any
for i := uint32(FirstSnapshotId); i < r.nextSnapshotId; i++ {
r.snapshots[i] = &Snapshot{startTime: r.startTime, extStartSN: r.extStartSN}
r.snapshots[i] = &Snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
extStartSNOverridden: r.extStartSN,
}
}
}

Expand Down Expand Up @@ -526,6 +535,13 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
return
}

func (r *RTPStats) LastReceiverReport() time.Time {
r.lock.RLock()
defer r.lock.RUnlock()

return r.lastRRTime
}

func (r *RTPStats) UpdateNack(nackCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
Expand Down Expand Up @@ -777,7 +793,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD

func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport {
r.lock.Lock()
then, now := r.getAndResetSnapshot(snapshotId)
then, now := r.getAndResetSnapshot(snapshotId, false)
r.lock.Unlock()

if now == nil || then == nil {
Expand All @@ -799,17 +815,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
return nil
}

packetsLost := uint32(0)
if r.params.IsReceiverReportDriven {
// receiver report driven should not be set for streams that need to generate reception report, but including code here for consistency
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
}
} else {
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
packetsLost = intervalStats.packetsLost
}
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
packetsLost := intervalStats.packetsLost
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
if proxyFracLost > fracLost {
Expand All @@ -823,12 +830,6 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
dlsr |= (delayMS % 1e3) * 65536 / 1000
}

jitter := r.jitter
if r.params.IsReceiverReportDriven {
// receiver report driven should not be set for streams that need to generate reception report, but including code here for consistency
jitter = r.jitterOverridden
}

lastSR := uint32(0)
if r.srDataExt != nil {
lastSR = uint32(r.srDataExt.SenderReportData.NTPTimestamp >> 16)
Expand All @@ -838,15 +839,15 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
FractionLost: fracLost,
TotalLost: r.packetsLost,
LastSequenceNumber: now.extStartSN,
Jitter: uint32(jitter),
Jitter: uint32(r.jitter),
LastSenderReport: lastSR,
Delay: dlsr,
}
}

func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
r.lock.Lock()
then, now := r.getAndResetSnapshot(snapshotId)
then, now := r.getAndResetSnapshot(snapshotId, false)
r.lock.Unlock()

if now == nil || then == nil {
Expand All @@ -868,53 +869,93 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
return nil
}
if packetsExpected == 0 {
if r.params.IsReceiverReportDriven {
// not received RTCP RR (OR) publisher is not producing any data
return nil
}

return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
}
}

packetsLost := uint32(0)
packetsMissing := uint32(0)
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
if r.params.IsReceiverReportDriven {
packetsMissing = intervalStats.packetsLost
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: packetsExpected - intervalStats.packetsPadding,
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: intervalStats.packetsPadding,
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: intervalStats.packetsLost,
Frames: intervalStats.frames,
RttMax: then.maxRtt,
JitterMax: then.maxJitter / float64(r.params.ClockRate) * 1e6,
Nacks: now.nacks - then.nacks,
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
}
}

packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
}
func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo {
if !r.params.IsReceiverReportDriven {
return nil
}

if packetsLost > packetsExpected {
r.logger.Warnw(
"unexpected number of packets lost",
fmt.Errorf(
"start: %d, end: %d, expected: %d, lost: report: %d, interval: %d",
then.extStartSN,
now.extStartSN,
packetsExpected,
now.packetsLostOverridden-then.packetsLostOverridden,
intervalStats.packetsLost,
),
)
packetsLost = packetsExpected
}
} else {
packetsLost = intervalStats.packetsLost
r.lock.Lock()
then, now := r.getAndResetSnapshot(snapshotId, true)
r.lock.Unlock()

if now == nil || then == nil {
return nil
}

maxJitter := then.maxJitter
if r.params.IsReceiverReportDriven {
// discount jitter from publisher side + internal processing
maxJitter = then.maxJitterOverridden - maxJitter
if maxJitter < 0.0 {
maxJitter = 0.0
}
r.lock.RLock()
defer r.lock.RUnlock()

startTime := then.startTime
endTime := now.startTime

packetsExpected := now.extStartSNOverridden - then.extStartSNOverridden
if packetsExpected > NumSequenceNumbers {
r.logger.Warnw(
"too many packets expected in delta",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSNOverridden, now.extStartSNOverridden, packetsExpected),
)
return nil
}
if packetsExpected == 0 {
// not received RTCP RR (OR) publisher is not producing any data
return nil
}

intervalStats := r.getIntervalStats(uint16(then.extStartSNOverridden), uint16(now.extStartSNOverridden))
packetsMissing := intervalStats.packetsLost
packetsLost := now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
}

if packetsLost > packetsExpected {
r.logger.Warnw(
"unexpected number of packets lost",
fmt.Errorf(
"start: %d, end: %d, expected: %d, lost: report: %d, interval: %d",
then.extStartSNOverridden,
now.extStartSNOverridden,
packetsExpected,
now.packetsLostOverridden-then.packetsLostOverridden,
intervalStats.packetsLost,
),
)
packetsLost = packetsExpected
}

// discount jitter from publisher side + internal processing
maxJitter := then.maxJitterOverridden - then.maxJitter
if maxJitter < 0.0 {
maxJitter = 0.0
}
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6

Expand Down Expand Up @@ -1288,24 +1329,33 @@ func (r *RTPStats) updateGapHistogram(gap int) {
}
}

func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) {
func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snapshot, *Snapshot) {
if !r.initialized || (r.params.IsReceiverReportDriven && r.lastRRTime.IsZero()) {
return nil, nil
}

then := r.snapshots[snapshotId]
if then == nil {
then = &Snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
startTime: r.startTime,
extStartSN: r.extStartSN,
extStartSNOverridden: r.extStartSN,
}
r.snapshots[snapshotId] = then
}

var startTime time.Time
if override && r.params.IsReceiverReportDriven {
startTime = r.lastRRTime
} else {
startTime = time.Now()
}

// snapshot now
r.snapshots[snapshotId] = &Snapshot{
startTime: time.Now(),
extStartSN: r.getExtHighestSNAdjusted() + 1,
startTime: startTime,
extStartSN: r.getExtHighestSN() + 1,
extStartSNOverridden: r.getExtHighestSNAdjusted() + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
Expand Down
Loading

0 comments on commit 32a3c60

Please sign in to comment.