forked from livekit/livekit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription_monitor.go
158 lines (129 loc) · 3.24 KB
/
subscription_monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package supervisor
import (
"errors"
"sync"
"time"
"github.com/abdulhaseeb08/livekit-server/pkg/rtc/types"
"github.com/abdulhaseeb08/protocol/livekit"
"github.com/abdulhaseeb08/protocol/logger"
"github.com/gammazero/deque"
)
const (
transitionWaitDuration = 10 * time.Second
)
var (
errSubscribeTimeout = errors.New("subscribe time out")
errUnsubscribeTimeout = errors.New("unsubscribe time out")
)
type transition struct {
isSubscribe bool
at time.Time
}
type SubscriptionMonitorParams struct {
TrackID livekit.TrackID
Logger logger.Logger
}
type SubscriptionMonitor struct {
params SubscriptionMonitorParams
lock sync.RWMutex
desiredTransitions deque.Deque
subscribedTrack types.SubscribedTrack
lastError error
}
func NewSubscriptionMonitor(params SubscriptionMonitorParams) *SubscriptionMonitor {
s := &SubscriptionMonitor{
params: params,
}
s.desiredTransitions.SetMinCapacity(2)
return s
}
func (s *SubscriptionMonitor) PostEvent(ome types.OperationMonitorEvent, omd types.OperationMonitorData) {
switch ome {
case types.OperationMonitorEventUpdateSubscription:
s.updateSubscription(omd.(bool))
case types.OperationMonitorEventSetSubscribedTrack:
s.setSubscribedTrack(omd.(types.SubscribedTrack))
case types.OperationMonitorEventClearSubscribedTrack:
s.clearSubscribedTrack(omd.(types.SubscribedTrack))
}
}
func (s *SubscriptionMonitor) updateSubscription(isSubscribe bool) {
s.lock.Lock()
s.desiredTransitions.PushBack(
&transition{
isSubscribe: isSubscribe,
at: time.Now(),
},
)
s.update()
s.lock.Unlock()
}
func (s *SubscriptionMonitor) setSubscribedTrack(subTrack types.SubscribedTrack) {
s.lock.Lock()
s.subscribedTrack = subTrack
s.update()
s.lock.Unlock()
}
func (s *SubscriptionMonitor) clearSubscribedTrack(subTrack types.SubscribedTrack) {
s.lock.Lock()
if s.subscribedTrack == subTrack {
s.subscribedTrack = nil
} else {
s.params.Logger.Errorw("mismatched subscribed track on clear", nil, "trackID", s.params.TrackID)
}
s.update()
s.lock.Unlock()
}
func (s *SubscriptionMonitor) Check() error {
s.lock.RLock()
if s.lastError != nil {
s.lock.RUnlock()
// return an error only once
return nil
}
var tx *transition
if s.desiredTransitions.Len() > 0 {
tx = s.desiredTransitions.Front().(*transition)
}
s.lock.RUnlock()
if tx == nil {
return nil
}
if time.Since(tx.at) > transitionWaitDuration {
// timed out waiting for transition
var err error
if tx.isSubscribe {
err = errSubscribeTimeout
} else {
err = errUnsubscribeTimeout
}
s.lock.Lock()
s.lastError = err
s.lock.Unlock()
return err
}
// give more time for transition to happen
return nil
}
func (s *SubscriptionMonitor) IsIdle() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.desiredTransitions.Len() == 0 && s.subscribedTrack == nil
}
func (s *SubscriptionMonitor) update() {
for {
var tx *transition
if s.desiredTransitions.Len() > 0 {
tx = s.desiredTransitions.PopFront().(*transition)
}
if tx == nil {
return
}
if (tx.isSubscribe && s.subscribedTrack == nil) || (!tx.isSubscribe && s.subscribedTrack != nil) {
// put it back as the condition is not satisfied
s.desiredTransitions.PushFront(tx)
return
}
s.lastError = nil
}
}