forked from livekit/livekit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
downtrackspreader.go
132 lines (105 loc) · 2.94 KB
/
downtrackspreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package sfu
import (
"runtime"
"sync"
"go.uber.org/atomic"
"github.com/abdulhaseeb08/protocol/livekit"
"github.com/abdulhaseeb08/protocol/logger"
)
type DownTrackSpreaderParams struct {
Threshold int
Logger logger.Logger
}
type DownTrackSpreader struct {
params DownTrackSpreaderParams
downTrackMu sync.RWMutex
downTracks map[livekit.ParticipantID]TrackSender
downTracksShadow []TrackSender
numProcs int
}
func NewDownTrackSpreader(params DownTrackSpreaderParams) *DownTrackSpreader {
d := &DownTrackSpreader{
params: params,
downTracks: make(map[livekit.ParticipantID]TrackSender),
numProcs: runtime.NumCPU(),
}
if runtime.GOMAXPROCS(0) < d.numProcs {
d.numProcs = runtime.GOMAXPROCS(0)
}
return d
}
func (d *DownTrackSpreader) GetDownTracks() []TrackSender {
d.downTrackMu.RLock()
defer d.downTrackMu.RUnlock()
return d.downTracksShadow
}
func (d *DownTrackSpreader) ResetAndGetDownTracks() []TrackSender {
d.downTrackMu.Lock()
defer d.downTrackMu.Unlock()
downTracks := d.downTracksShadow
d.downTracks = make(map[livekit.ParticipantID]TrackSender)
d.downTracksShadow = nil
return downTracks
}
func (d *DownTrackSpreader) Store(ts TrackSender) {
d.downTrackMu.Lock()
defer d.downTrackMu.Unlock()
d.downTracks[ts.SubscriberID()] = ts
d.shadowDownTracks()
}
func (d *DownTrackSpreader) Free(subscriberID livekit.ParticipantID) {
d.downTrackMu.Lock()
defer d.downTrackMu.Unlock()
delete(d.downTracks, subscriberID)
d.shadowDownTracks()
}
func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) bool {
d.downTrackMu.RLock()
defer d.downTrackMu.RUnlock()
_, ok := d.downTracks[subscriberID]
return ok
}
func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) {
downTracks := d.GetDownTracks()
if d.params.Threshold == 0 || (len(downTracks)) < d.params.Threshold {
// serial - not enough down tracks for parallelization to outweigh overhead
for _, dt := range downTracks {
writer(dt)
}
} else {
// parallel - enables much more efficient multi-core utilization
start := atomic.NewUint64(0)
end := uint64(len(downTracks))
// 100µs is enough to amortize the overhead and provide sufficient load balancing.
// WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop.
step := uint64(2)
var wg sync.WaitGroup
wg.Add(d.numProcs)
for p := 0; p < d.numProcs; p++ {
go func() {
defer wg.Done()
for {
n := start.Add(step)
if n >= end+step {
return
}
for i := n - step; i < n && i < end; i++ {
writer(downTracks[i])
}
}
}()
}
wg.Wait()
}
}
func (d *DownTrackSpreader) DownTrackCount() int {
d.downTrackMu.RLock()
defer d.downTrackMu.RUnlock()
return len(d.downTracksShadow)
}
func (d *DownTrackSpreader) shadowDownTracks() {
d.downTracksShadow = make([]TrackSender, 0, len(d.downTracks))
for _, dt := range d.downTracks {
d.downTracksShadow = append(d.downTracksShadow, dt)
}
}