Skip to content

Commit

Permalink
Support simulating subscriber bandwidth. (livekit#1609)
Browse files Browse the repository at this point in the history
* Support simualting subscriber bandwidth.

When non-zero, a full allocation is triggered.
Also, probes are stopped.

When set to zero, normal probing mechanism should catch up.

Adding `allowPause` override which can be a connection option.

* fix log

* allowPause in participant params
  • Loading branch information
boks1971 authored and hautvfami committed Jul 21, 2023
1 parent 93355c5 commit ca41b75
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 10 deletions.
3 changes: 3 additions & 0 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ParticipantParams struct {
VersionGenerator utils.TimedVersionGenerator
TrackResolver types.MediaTrackResolver
DisableDynacast bool
SubscriberAllowPause bool
}

type ParticipantImpl struct {
Expand Down Expand Up @@ -1035,6 +1036,8 @@ func (p *ParticipantImpl) setupTransportManager() error {
tm.OnAnyTransportNegotiationFailed(p.onAnyTransportNegotiationFailed)

tm.OnDataMessage(p.onDataMessage)

tm.SetSubscriberAllowPause(p.params.SubscriberAllowPause)
p.TransportManager = tm
return nil
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/rtc/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (r *Room) OnMetadataUpdate(f func(metadata string)) {
func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error {
switch scenario := simulateScenario.Scenario.(type) {
case *livekit.SimulateScenario_SpeakerUpdate:
r.Logger.Infow("simulating speaker update", "participant", participant.Identity())
r.Logger.Infow("simulating speaker update", "participant", participant.Identity(), "duration", scenario.SpeakerUpdate)
go func() {
<-time.After(time.Duration(scenario.SpeakerUpdate) * time.Second)
r.sendSpeakerChanges([]*livekit.SpeakerInfo{{
Expand Down Expand Up @@ -723,13 +723,19 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen
if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave); err != nil {
return err
}

case *livekit.SimulateScenario_SwitchCandidateProtocol:
r.Logger.Infow("simulating switch candidate protocol", "participant", participant.Identity())
participant.ICERestart(&livekit.ICEConfig{
PreferenceSubscriber: livekit.ICECandidateType(scenario.SwitchCandidateProtocol),
PreferencePublisher: livekit.ICECandidateType(scenario.SwitchCandidateProtocol),
})
case *livekit.SimulateScenario_SubscriberBandwidth:
if scenario.SubscriberBandwidth > 0 {
r.Logger.Infow("simulating subscriber bandwidth start", "participant", participant.Identity(), "bandwidth", scenario.SubscriberBandwidth)
} else {
r.Logger.Infow("simulating subscriber bandwidth end", "participant", participant.Identity())
}
participant.SetSubscriberChannelCapacity(scenario.SubscriberBandwidth)
}
return nil
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/rtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,22 @@ func (t *PCTransport) RemoveTrackFromStreamAllocator(subTrack types.SubscribedTr
t.streamAllocator.RemoveTrack(subTrack.DownTrack())
}

func (t *PCTransport) SetAllowPauseOfStreamAllocator(allowPause bool) {
if t.streamAllocator == nil {
return
}

t.streamAllocator.SetAllowPause(allowPause)
}

func (t *PCTransport) SetChannelCapacityOfStreamAllocator(channelCapacity int64) {
if t.streamAllocator == nil {
return
}

t.streamAllocator.SetChannelCapacity(channelCapacity)
}

func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType {
unknown := types.ICEConnectionTypeUnknown
if t.pc == nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/rtc/transportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,3 +782,11 @@ func (t *TransportManager) SetSignalSourceValid(valid bool) {
t.signalSourceValid.Store(valid)
t.params.Logger.Debugw("signal source valid", "valid", valid)
}

func (t *TransportManager) SetSubscriberAllowPause(allowPause bool) {
t.subscriber.SetAllowPauseOfStreamAllocator(allowPause)
}

func (t *TransportManager) SetSubscriberChannelCapacity(channelCapacity int64) {
t.subscriber.SetChannelCapacityOfStreamAllocator(channelCapacity)
}
4 changes: 4 additions & 0 deletions pkg/rtc/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ type LocalParticipant interface {

UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error
UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error

// down stream bandwidth management
SetSubscriberAllowPause(allowPause bool)
SetSubscriberChannelCapacity(channelCapacity int64)
}

// Room is a container of participants, and can provide room-level actions
Expand Down
78 changes: 78 additions & 0 deletions pkg/rtc/types/typesfakes/fake_local_participant.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/service/roommanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (r *RoomManager) StartSession(
ReconnectOnSubscriptionError: reconnectOnSubscriptionError,
VersionGenerator: r.versionGenerator,
TrackResolver: room.ResolveMediaTrackForSubscriber,
SubscriberAllowPause: r.config.RTC.CongestionControl.AllowPause,
})
if err != nil {
return err
Expand Down
Loading

0 comments on commit ca41b75

Please sign in to comment.