Skip to content

Commit

Permalink
Avoid closure to reduce life span of objects. (livekit#1809)
Browse files Browse the repository at this point in the history
A subscription in subscription manager could live till the source
track goes away even though the participant with that subscription
is long gone due to closure on source track removal. Handle it by using
trackID to look up on source track removal.

Also, logging SDPs when a negotiation failure happens to check
if there are any mismatches.
  • Loading branch information
boks1971 authored and hautvfami committed Jul 21, 2023
1 parent 5820a60 commit 7080d4d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
29 changes: 20 additions & 9 deletions pkg/rtc/subscriptionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,26 +436,27 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
return ErrSubscriptionLimitExceeded
}

res := m.params.TrackResolver(m.params.Participant.Identity(), s.trackID)
trackID := s.trackID
res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
s.logger.Debugw("resolved track", "result", res)

if res.TrackChangedNotifier != nil && s.setChangedNotifier(res.TrackChangedNotifier) {
// set callback only when we haven't done it before
// we set the observer before checking for existence of track, so that we may get notified
// when the track becomes available
res.TrackChangedNotifier.AddObserver(string(m.params.Participant.ID()), func() {
m.queueReconcile(s.trackID)
m.queueReconcile(trackID)
})
}
if res.TrackRemovedNotifier != nil && s.setRemovedNotifier(res.TrackRemovedNotifier) {
res.TrackRemovedNotifier.AddObserver(string(m.params.Participant.ID()), func() {
// re-resolve the track in case the same track had been re-published
res := m.params.TrackResolver(m.params.Participant.Identity(), s.trackID)
res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
if res.Track != nil {
// do not unsubscribe, track is still available
return
}
s.handleSourceTrackRemoved()
m.handleSourceTrackRemoved(trackID)
})
}

Expand All @@ -474,7 +475,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
// that we discover permissions were denied
permChanged := s.setHasPermission(res.HasPermission)
if permChanged {
m.params.Participant.SubscriptionPermissionUpdate(s.getPublisherID(), s.trackID, res.HasPermission)
m.params.Participant.SubscriptionPermissionUpdate(s.getPublisherID(), trackID, res.HasPermission)
}
if !res.HasPermission {
return ErrNoTrackPermission
Expand All @@ -493,8 +494,8 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
if err != nil {
s.logger.Infow("failed to bind track", "err", err)
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
m.UnsubscribeFromTrack(s.trackID)
m.params.OnSubscriptionError(s.trackID, false, err)
m.UnsubscribeFromTrack(trackID)
m.params.OnSubscriptionError(trackID, false, err)
return
}
s.setBound()
Expand All @@ -516,7 +517,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
go m.params.OnTrackSubscribed(subTrack)
}

m.params.Logger.Debugw("subscribed to track", "trackID", s.trackID, "subscribedAudioCount", m.subscribedAudioCount.Load(), "subscribedVideoCount", m.subscribedVideoCount.Load())
m.params.Logger.Debugw("subscribed to track", "trackID", trackID, "subscribedAudioCount", m.subscribedAudioCount.Load(), "subscribedVideoCount", m.subscribedVideoCount.Load())

// add mark the participant as someone we've subscribed to
firstSubscribe := false
Expand All @@ -529,7 +530,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
m.subscribedTo[publisherID] = pTracks
firstSubscribe = true
}
pTracks[s.trackID] = struct{}{}
pTracks[trackID] = struct{}{}
m.lock.Unlock()

if changedCB != nil && firstSubscribe {
Expand Down Expand Up @@ -558,6 +559,16 @@ func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error {
return nil
}

func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) {
m.lock.Lock()
sub := m.subscriptions[trackID]
m.lock.Unlock()

if sub != nil {
sub.handleSourceTrackRemoved()
}
}

// DownTrack closing is how the publisher signifies that the subscription is no longer fulfilled
// this could be due to a few reasons:
// - subscriber-initiated unsubscribe
Expand Down
7 changes: 7 additions & 0 deletions pkg/rtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,6 +1610,13 @@ func (t *PCTransport) setupSignalStateCheckTimer() {
failed := t.negotiationState != NegotiationStateNone

if t.negotiateCounter.Load() == negotiateVersion && failed {
t.params.Logger.Infow(
"negotiation timed out",
"localCurrent", t.pc.CurrentLocalDescription(),
"localPending", t.pc.PendingLocalDescription(),
"remoteCurrent", t.pc.CurrentRemoteDescription(),
"remotePending", t.pc.PendingRemoteDescription(),
)
if onNegotiationFailed := t.getOnNegotiationFailed(); onNegotiationFailed != nil {
onNegotiationFailed()
}
Expand Down

0 comments on commit 7080d4d

Please sign in to comment.