forked from gqf2008/babylon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rtmp_broadcast.go
125 lines (112 loc) · 2.87 KB
/
rtmp_broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package rtmp
import (
log "github.com/cihub/seelog"
"sync"
"time"
)
var (
broadcasts = make(map[string]*Broadcast)
)
type Terminal chan bool
func newTerminal() Terminal {
return make(Terminal, 1)
}
func start_broadcast(producer *RtmpNetStream, vl, al int) {
in := newChannel(producer.conn.remoteAddr, vl, al)
producer.AttachAudio(in.audioChannel)
producer.AttachVideo(in.videoChannel)
d := &Broadcast{path: producer.path,
lock: new(sync.Mutex),
consumers: make(map[string]*RtmpNetStream, 0),
producer: producer,
control: make(chan interface{}, 10)}
broadcasts[producer.path] = d
d.start()
}
func find_broadcast(path string) (*Broadcast, bool) {
v, ok := broadcasts[path]
return v, ok
}
func newChannel(id string, vl, al int) Channel {
return Channel{id: id,
videoChannel: make(VideoChannel, vl),
audioChannel: make(AudioChannel, al)}
}
type Channel struct {
id string
videoChannel VideoChannel //StreamChannel
audioChannel AudioChannel //StreamChannel
}
type Broadcast struct {
lock *sync.Mutex
path string
producer *RtmpNetStream
out Channel
consumers map[string]*RtmpNetStream
control chan interface{}
//terminal Terminal
}
func (p *Broadcast) stop() {
//p.terminal <- true
delete(broadcasts, p.path)
p.control <- "stop"
}
func (p *Broadcast) start() {
//p.terminal = newTerminal()
go func(p *Broadcast) {
defer func() {
if e := recover(); e != nil {
log.Critical(e)
}
log.Info("Broadcast " + p.path + " stopped")
}()
log.Info("Broadcast " + p.path + " started")
for {
select {
case amsg := <-p.producer.audiochan:
for _, s := range p.consumers {
err := s.SendAudio(amsg.Clone())
if err != nil {
notifyError(s, err)
}
}
case vmsg := <-p.producer.videochan:
for _, s := range p.consumers {
err := s.SendVideo(vmsg.Clone())
if err != nil {
notifyError(s, err)
}
}
case obj := <-p.control:
if c, ok := obj.(*RtmpNetStream); ok {
if c.closed {
delete(p.consumers, c.conn.remoteAddr)
log.Debugf("Broadcast %v consumers %v", p.path, len(p.consumers))
} else {
p.consumers[c.conn.remoteAddr] = c
log.Debugf("Broadcast %v consumers %v", p.path, len(p.consumers))
}
} else if v, ok := obj.(string); ok && "stop" == v {
for k, ss := range p.consumers {
delete(p.consumers, k)
ss.Close()
}
return
}
case <-time.After(time.Second * 90):
log.Warn("Broadcast " + p.path + " Video | Audio Buffer Empty,Timeout 30s")
p.stop()
p.producer.Close()
return
}
}
}(p)
}
func (p *Broadcast) addConsumer(s *RtmpNetStream) {
s.dispatcher = p
p.control <- s
}
func (p *Broadcast) removeConsumer(s *RtmpNetStream) {
s.closed = true
p.control <- s
}