-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
125 lines (114 loc) · 3.25 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package rtsp2
import (
"net"
"net/http"
"strconv"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"go.uber.org/zap"
"m7s.live/engine/v4"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
type RTSP2Config struct {
config.HTTP
config.Publish
config.Subscribe
config.Pull
config.Push
config.TCP
}
var conf = RTSP2Config{
TCP: config.TCP{ListenAddr: ":554"},
}
var RTSP2Plugin = engine.InstallPlugin(&conf)
func (c *RTSP2Config) OnEvent(event any) {
switch v := event.(type) {
case engine.FirstConfig:
for streamPath, url := range conf.PullOnStart {
if err := RTSP2Plugin.Pull(streamPath, url, new(RTSPPuller), 0); err != nil {
RTSP2Plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
}
case engine.SEpublish:
if url, ok := conf.PushList[v.Target.Path]; ok {
if err := RTSP2Plugin.Push(v.Target.Path, url, new(RTSPPusher), false); err != nil {
RTSP2Plugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", url), zap.Error(err))
}
}
case engine.InvitePublish: //按需拉流
if url, ok := conf.PullOnSub[v.Target]; ok {
if err := RTSP2Plugin.Pull(v.Target, url, new(RTSPPuller), 0); err != nil {
RTSP2Plugin.Error("pull", zap.String("streamPath", v.Target), zap.String("url", url), zap.Error(err))
}
}
}
}
func (c *RTSP2Config) ServeTCP(conn net.Conn) {
server := rtsp.NewServer(conn)
server.Listen(func(msg any) {
RTSP2Plugin.Debug("rtsp", zap.Any("msg", msg))
switch msg := msg.(type) {
case *tcp.Response:
switch msg.Request.Method {
case rtsp.MethodRecord, rtsp.MethodPlay:
go server.Start()
}
case string:
switch msg {
case rtsp.MethodDescribe:
var suber RTSPSubscriber
suber.Conn = server
if err := RTSP2Plugin.Subscribe(server.URL.Path, &suber); err != nil {
server.Stop()
}
case rtsp.MethodAnnounce:
var puber RTSPPublisher
puber.Conn = server
if err := RTSP2Plugin.Publish(server.URL.Path, &puber); err != nil {
server.Stop()
} else {
puber.setTracks()
if puber.AudioTrack == nil {
puber.Publisher.Config.PubAudio = false
}
if puber.VideoTrack == nil {
puber.Publisher.Config.PubVideo = false
}
}
}
}
})
server.Accept()
}
func filterStreams() (ss []*engine.Stream) {
engine.Streams.Range(func(key string, s *engine.Stream) {
switch s.Publisher.(type) {
case *RTSPPublisher, *RTSPPuller:
ss = append(ss, s)
}
})
return
}
func (*RTSP2Config) API_list(w http.ResponseWriter, r *http.Request) {
util.ReturnFetchValue(filterStreams, w, r)
}
func (*RTSP2Config) API_Pull(rw http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
save, _ := strconv.Atoi(query.Get("save"))
err := RTSP2Plugin.Pull(query.Get("streamPath"), query.Get("target"), new(RTSPPuller), save)
if err != nil {
util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
} else {
util.ReturnOK(rw, r)
}
}
func (*RTSP2Config) API_Push(rw http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
err := RTSP2Plugin.Push(query.Get("streamPath"), query.Get("target"), new(RTSPPusher), query.Has("save"))
if err != nil {
util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
} else {
util.ReturnOK(rw, r)
}
}