Skip to content

Commit

Permalink
Refactor NACK tracking in stream allocator (livekit#1623)
Browse files Browse the repository at this point in the history
* log some NACKs

* split out NACK tracker

* remove debug
  • Loading branch information
boks1971 authored and hautvfami committed Jul 21, 2023
1 parent 7b295f2 commit 7a3fd3d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 46 deletions.
56 changes: 15 additions & 41 deletions pkg/sfu/streamallocator/channelobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type ChannelObserver struct {
logger logger.Logger

estimateTrend *TrendDetector
nackTracker *NackTracker

nackWindowStartTime time.Time
packets uint32
Expand All @@ -87,43 +88,26 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha
DownwardTrendThreshold: params.EstimateDownwardTrendThreshold,
CollapseValues: params.EstimateCollapseValues,
}),
nackTracker: NewNackTracker(NackTrackerParams{
Name: params.Name + "-estimate",
Logger: logger,
WindowMinDuration: params.NackWindowMinDuration,
WindowMaxDuration: params.NackWindowMaxDuration,
RatioThreshold: params.NackRatioThreshold,
}),
}
}

func (c *ChannelObserver) SeedEstimate(estimate int64) {
c.estimateTrend.Seed(estimate)
}

func (c *ChannelObserver) SeedNack(packets uint32, repeatedNacks uint32) {
c.packets = packets
c.repeatedNacks = repeatedNacks
}

func (c *ChannelObserver) AddEstimate(estimate int64) {
c.estimateTrend.AddValue(estimate)
}

func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) {
if c.params.NackWindowMaxDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMaxDuration {
c.nackWindowStartTime = time.Time{}
c.packets = 0
c.repeatedNacks = 0
}

//
// Start NACK monitoring window only when a repeated NACK happens.
// This allows locking tightly to when NACKs start happening and
// check if the NACKs keep adding up (potentially a sign of congestion)
// or isolated losses
//
if c.repeatedNacks == 0 && repeatedNacks != 0 {
c.nackWindowStartTime = time.Now()
}

if !c.nackWindowStartTime.IsZero() {
c.packets += packets
c.repeatedNacks += repeatedNacks
}
c.nackTracker.Add(packets, repeatedNacks)
}

func (c *ChannelObserver) GetLowestEstimate() int64 {
Expand All @@ -134,29 +118,20 @@ func (c *ChannelObserver) GetHighestEstimate() int64 {
return c.estimateTrend.GetHighest()
}

func (c *ChannelObserver) GetNackRatio() (uint32, uint32, float64) {
ratio := 0.0
if c.packets != 0 {
ratio = float64(c.repeatedNacks) / float64(c.packets)
if ratio > 1.0 {
ratio = 1.0
}
}

return c.packets, c.repeatedNacks, ratio
func (c *ChannelObserver) GetNackRatio() float64 {
return c.nackTracker.GetRatio()
}

func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
estimateDirection := c.estimateTrend.GetDirection()
_, _, nackRatio := c.GetNackRatio()

switch {
case estimateDirection == TrendDirectionDownward:
c.logger.Debugw( "stream allocator: channel observer: estimate is trending downward", "channel", c.ToString())
c.logger.Debugw("stream allocator: channel observer: estimate is trending downward", "channel", c.ToString())
return ChannelTrendCongesting, ChannelCongestionReasonEstimate

case c.params.NackWindowMinDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMinDuration && nackRatio > c.params.NackRatioThreshold:
c.logger.Debugw( "stream allocator: channel observer: high rate of repeated NACKs", "channel", c.ToString())
case c.nackTracker.IsTriggered():
c.logger.Debugw("stream allocator: channel observer: high rate of repeated NACKs", "channel", c.ToString())
return ChannelTrendCongesting, ChannelCongestionReasonLoss

case estimateDirection == TrendDirectionUpward:
Expand All @@ -167,8 +142,7 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
}

func (c *ChannelObserver) ToString() string {
packets, repeatedNacks, nackRatio := c.GetNackRatio()
return fmt.Sprintf("name: %s, estimate: %s, packets: %d, repeatedNacks: %d, nackRatio: %f", c.params.Name, c.estimateTrend.ToString(), packets, repeatedNacks, nackRatio)
return fmt.Sprintf("name: %s, estimate: {%s}, nack {%s}", c.params.Name, c.estimateTrend.ToString(), c.nackTracker.ToString())
}

// ------------------------------------------------
85 changes: 85 additions & 0 deletions pkg/sfu/streamallocator/nacktracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package streamallocator

import (
"fmt"
"time"

"github.com/livekit/protocol/logger"
)

// ------------------------------------------------

type NackTrackerParams struct {
Name string
Logger logger.Logger
WindowMinDuration time.Duration
WindowMaxDuration time.Duration
RatioThreshold float64
}

type NackTracker struct {
params NackTrackerParams

windowStartTime time.Time
packets uint32
repeatedNacks uint32
}

func NewNackTracker(params NackTrackerParams) *NackTracker {
return &NackTracker{
params: params,
}
}

func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) {
if n.params.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMaxDuration {
n.windowStartTime = time.Time{}
n.packets = 0
n.repeatedNacks = 0
}

//
// Start NACK monitoring window only when a repeated NACK happens.
// This allows locking tightly to when NACKs start happening and
// check if the NACKs keep adding up (potentially a sign of congestion)
// or isolated losses
//
if n.repeatedNacks == 0 && repeatedNacks != 0 {
n.windowStartTime = time.Now()
}

if !n.windowStartTime.IsZero() {
n.packets += packets
n.repeatedNacks += repeatedNacks
}
}

func (n *NackTracker) GetRatio() float64 {
ratio := 0.0
if n.packets != 0 {
ratio = float64(n.repeatedNacks) / float64(n.packets)
if ratio > 1.0 {
ratio = 1.0
}
}

return ratio
}

func (n *NackTracker) IsTriggered() bool {
if n.params.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.WindowMinDuration {
return n.GetRatio() > n.params.RatioThreshold
}

return false
}

func (n *NackTracker) ToString() string {
window := ""
if !n.windowStartTime.IsZero() {
window = fmt.Sprintf("t: %+v|%+v", n.windowStartTime, time.Since(n.windowStartTime))
}
return fmt.Sprintf("n: %s, t: %s, p: %d, rn: %d, rn/p: %f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio())
}

// ------------------------------------------------
7 changes: 2 additions & 5 deletions pkg/sfu/streamallocator/streamallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,10 +781,9 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {

var estimateToCommit int64
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
packets, repeatedNacks, nackRatio := s.channelObserver.GetNackRatio()
switch reason {
case ChannelCongestionReasonLoss:
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio))
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*s.channelObserver.GetNackRatio()))
if estimateToCommit > s.lastReceivedEstimate {
estimateToCommit = s.lastReceivedEstimate
}
Expand All @@ -799,9 +798,7 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
"new(bps)", estimateToCommit,
"lastReceived(bps)", s.lastReceivedEstimate,
"expectedUsage(bps)", expectedBandwidthUsage,
"packets", packets,
"repeatedNacks", repeatedNacks,
"nackRatio", nackRatio,
"channel", s.channelObserver.ToString(),
)
s.committedChannelCapacity = estimateToCommit

Expand Down

0 comments on commit 7a3fd3d

Please sign in to comment.