diff --git a/protocol/httpflv/http_flv.go b/protocol/httpflv/http_flv.go deleted file mode 100755 index 837f5ba5..00000000 --- a/protocol/httpflv/http_flv.go +++ /dev/null @@ -1,271 +0,0 @@ -package httpflv - -import ( - "encoding/json" - "net" - "net/http" - "strings" - "time" - "errors" - "github.com/gwuhaolin/livego/utils/uid" - "github.com/gwuhaolin/livego/protocol/amf" - "github.com/gwuhaolin/livego/av" - "github.com/gwuhaolin/livego/utils/pio" - "log" - "github.com/gwuhaolin/livego/protocol/rtmp" -) - -type Server struct { - handler av.Handler -} - -type stream struct { - Key string `json:"key"` - Id string `json:"id"` -} - -type streams struct { - Publishers []stream `json:"publishers"` - Players []stream `json:"players"` -} - -func NewServer(h av.Handler) *Server { - return &Server{ - handler: h, - } -} - -func (self *Server) Serve(l net.Listener) error { - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - self.handleConn(w, r) - }) - mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) { - self.getStream(w, r) - }) - http.Serve(l, mux) - return nil -} - -func (s *Server) getStream(w http.ResponseWriter, r *http.Request) { - rtmpStream := s.handler.(*rtmp.RtmpStream) - if rtmpStream == nil { - return - } - msgs := new(streams) - for item := range rtmpStream.GetStreams().IterBuffered() { - if s, ok := item.Val.(*rtmp.Stream); ok { - if s.GetReader() != nil { - msg := stream{item.Key, s.GetReader().Info().UID} - msgs.Publishers = append(msgs.Publishers, msg) - } - } - } - - for item := range rtmpStream.GetStreams().IterBuffered() { - ws := item.Val.(*rtmp.Stream).GetWs() - for s := range ws.IterBuffered() { - if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok { - if pw.GetWriter() != nil { - msg := stream{item.Key, pw.GetWriter().Info().UID} - msgs.Players = append(msgs.Players, msg) - } - } - } - } - resp, _ := json.Marshal(msgs) - w.Header().Set("Content-Type", "application/json") - w.Write(resp) - -} - -func (self *Server) handleConn(w http.ResponseWriter, r *http.Request) { - defer func() { - if r := recover(); r != nil { - log.Println("http flv handleConn panic: ", r) - } - }() - - url := r.URL.String() - u := r.URL.Path - if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" { - http.Error(w, "invalid path", http.StatusBadRequest) - return - } - path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv") - paths := strings.SplitN(path, "/", 2) - log.Println("url:", u, "path:", path, "paths:", paths) - - if len(paths) != 2 { - http.Error(w, "invalid path", http.StatusBadRequest) - return - } - - w.Header().Set("Access-Control-Allow-Origin", "*") - writer := NewFLVWriter(paths[0], paths[1], url, w) - - self.handler.HandleWriter(writer) - writer.Wait() -} - -const ( - headerLen = 11 - maxQueueNum = 1024 -) - -type FLVWriter struct { - Uid string - av.RWBaser - app, title, url string - buf []byte - closed bool - closedChan chan struct{} - ctx http.ResponseWriter - packetQueue chan av.Packet -} - -func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { - ret := &FLVWriter{ - Uid: uid.NewId(), - app: app, - title: title, - url: url, - ctx: ctx, - RWBaser: av.NewRWBaser(time.Second * 10), - closedChan: make(chan struct{}), - buf: make([]byte, headerLen), - packetQueue: make(chan av.Packet, maxQueueNum), - } - - ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09}) - pio.PutI32BE(ret.buf[:4], 0) - ret.ctx.Write(ret.buf[:4]) - go func() { - err := ret.SendPacket() - if err != nil { - log.Println("SendPacket error:", err) - ret.closed = true - } - }() - return ret -} - -func (self *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) { - log.Printf("[%v] packet queue max!!!", info) - for i := 0; i < maxQueueNum-84; i++ { - tmpPkt, ok := <-pktQue - if ok && tmpPkt.IsVideo { - videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) - // dont't drop sps config and dont't drop key frame - if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { - log.Println("insert keyframe to queue") - pktQue <- tmpPkt - } - - if len(pktQue) > maxQueueNum-10 { - <-pktQue - } - // drop other packet - <-pktQue - } - // try to don't drop audio - if ok && tmpPkt.IsAudio { - log.Println("insert audio to queue") - pktQue <- tmpPkt - } - } - log.Println("packet queue len: ", len(pktQue)) -} - -func (self *FLVWriter) Write(p av.Packet) error { - if !self.closed { - if len(self.packetQueue) >= maxQueueNum-24 { - self.DropPacket(self.packetQueue, self.Info()) - } else { - self.packetQueue <- p - } - return nil - } else { - return errors.New("closed") - } - -} - -// func (self *FLVWriter) Write(p av.Packet) error { -func (self *FLVWriter) SendPacket() error { - for { - p, ok := <-self.packetQueue - if ok { - self.RWBaser.SetPreTime() - h := self.buf[:headerLen] - typeID := av.TAG_VIDEO - if !p.IsVideo { - if p.IsMetadata { - var err error - typeID = av.TAG_SCRIPTDATAAMF0 - p.Data, err = amf.MetaDataReform(p.Data, amf.DEL) - if err != nil { - return err - } - } else { - typeID = av.TAG_AUDIO - } - } - dataLen := len(p.Data) - timestamp := p.TimeStamp - timestamp += self.BaseTimeStamp() - self.RWBaser.RecTimeStamp(timestamp, uint32(typeID)) - - preDataLen := dataLen + headerLen - timestampbase := timestamp & 0xffffff - timestampExt := timestamp >> 24 & 0xff - - pio.PutU8(h[0:1], uint8(typeID)) - pio.PutI24BE(h[1:4], int32(dataLen)) - pio.PutI24BE(h[4:7], int32(timestampbase)) - pio.PutU8(h[7:8], uint8(timestampExt)) - - if _, err := self.ctx.Write(h); err != nil { - return err - } - - if _, err := self.ctx.Write(p.Data); err != nil { - return err - } - - pio.PutI32BE(h[:4], int32(preDataLen)) - if _, err := self.ctx.Write(h[:4]); err != nil { - return err - } - } else { - return errors.New("closed") - } - - } - - return nil -} - -func (self *FLVWriter) Wait() { - select { - case <-self.closedChan: - return - } -} - -func (self *FLVWriter) Close(error) { - log.Println("http flv closed") - if !self.closed { - close(self.packetQueue) - close(self.closedChan) - } - self.closed = true -} - -func (self *FLVWriter) Info() (ret av.Info) { - ret.UID = self.Uid - ret.URL = self.url - ret.Key = self.app + "/" + self.title - ret.Inter = true - return -} diff --git a/protocol/httpflv/server.go b/protocol/httpflv/server.go new file mode 100644 index 00000000..a4af13ac --- /dev/null +++ b/protocol/httpflv/server.go @@ -0,0 +1,104 @@ +package httpflv + +import ( + "encoding/json" + "strings" + "net" + "net/http" + "log" + "github.com/gwuhaolin/livego/av" + "github.com/gwuhaolin/livego/protocol/rtmp" +) + +type Server struct { + handler av.Handler +} + +type stream struct { + Key string `json:"key"` + Id string `json:"id"` +} + +type streams struct { + Publishers []stream `json:"publishers"` + Players []stream `json:"players"` +} + +func NewServer(h av.Handler) *Server { + return &Server{ + handler: h, + } +} + +func (server *Server) Serve(l net.Listener) error { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + server.handleConn(w, r) + }) + mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) { + server.getStream(w, r) + }) + http.Serve(l, mux) + return nil +} + +func (server *Server) getStream(w http.ResponseWriter, r *http.Request) { + rtmpStream := server.handler.(*rtmp.RtmpStream) + if rtmpStream == nil { + return + } + msgs := new(streams) + for item := range rtmpStream.GetStreams().IterBuffered() { + if s, ok := item.Val.(*rtmp.Stream); ok { + if s.GetReader() != nil { + msg := stream{item.Key, s.GetReader().Info().UID} + msgs.Publishers = append(msgs.Publishers, msg) + } + } + } + + for item := range rtmpStream.GetStreams().IterBuffered() { + ws := item.Val.(*rtmp.Stream).GetWs() + for s := range ws.IterBuffered() { + if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok { + if pw.GetWriter() != nil { + msg := stream{item.Key, pw.GetWriter().Info().UID} + msgs.Players = append(msgs.Players, msg) + } + } + } + } + resp, _ := json.Marshal(msgs) + w.Header().Set("Content-Type", "application/json") + w.Write(resp) + +} + +func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) { + defer func() { + if r := recover(); r != nil { + log.Println("http flv handleConn panic: ", r) + } + }() + + url := r.URL.String() + u := r.URL.Path + if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" { + http.Error(w, "invalid path", http.StatusBadRequest) + return + } + path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv") + paths := strings.SplitN(path, "/", 2) + log.Println("url:", u, "path:", path, "paths:", paths) + + if len(paths) != 2 { + http.Error(w, "invalid path", http.StatusBadRequest) + return + } + + w.Header().Set("Access-Control-Allow-Origin", "*") + writer := NewFLVWriter(paths[0], paths[1], url, w) + + server.handler.HandleWriter(writer) + writer.Wait() +} diff --git a/protocol/httpflv/writer.go b/protocol/httpflv/writer.go new file mode 100755 index 00000000..355f57e5 --- /dev/null +++ b/protocol/httpflv/writer.go @@ -0,0 +1,173 @@ +package httpflv + +import ( + "net/http" + "time" + "errors" + "log" + "github.com/gwuhaolin/livego/utils/uid" + "github.com/gwuhaolin/livego/protocol/amf" + "github.com/gwuhaolin/livego/av" + "github.com/gwuhaolin/livego/utils/pio" +) + +const ( + headerLen = 11 + maxQueueNum = 1024 +) + +type FLVWriter struct { + Uid string + av.RWBaser + app, title, url string + buf []byte + closed bool + closedChan chan struct{} + ctx http.ResponseWriter + packetQueue chan av.Packet +} + +func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { + ret := &FLVWriter{ + Uid: uid.NewId(), + app: app, + title: title, + url: url, + ctx: ctx, + RWBaser: av.NewRWBaser(time.Second * 10), + closedChan: make(chan struct{}), + buf: make([]byte, headerLen), + packetQueue: make(chan av.Packet, maxQueueNum), + } + + ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09}) + pio.PutI32BE(ret.buf[:4], 0) + ret.ctx.Write(ret.buf[:4]) + go func() { + err := ret.SendPacket() + if err != nil { + log.Println("SendPacket error:", err) + ret.closed = true + } + }() + return ret +} + +func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) { + log.Printf("[%v] packet queue max!!!", info) + for i := 0; i < maxQueueNum-84; i++ { + tmpPkt, ok := <-pktQue + if ok && tmpPkt.IsVideo { + videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) + // dont't drop sps config and dont't drop key frame + if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { + log.Println("insert keyframe to queue") + pktQue <- tmpPkt + } + + if len(pktQue) > maxQueueNum-10 { + <-pktQue + } + // drop other packet + <-pktQue + } + // try to don't drop audio + if ok && tmpPkt.IsAudio { + log.Println("insert audio to queue") + pktQue <- tmpPkt + } + } + log.Println("packet queue len: ", len(pktQue)) +} + +func (flvWriter *FLVWriter) Write(p av.Packet) error { + if !flvWriter.closed { + if len(flvWriter.packetQueue) >= maxQueueNum-24 { + flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info()) + } else { + flvWriter.packetQueue <- p + } + return nil + } else { + return errors.New("closed") + } + +} + +func (flvWriter *FLVWriter) SendPacket() error { + for { + p, ok := <-flvWriter.packetQueue + if ok { + flvWriter.RWBaser.SetPreTime() + h := flvWriter.buf[:headerLen] + typeID := av.TAG_VIDEO + if !p.IsVideo { + if p.IsMetadata { + var err error + typeID = av.TAG_SCRIPTDATAAMF0 + p.Data, err = amf.MetaDataReform(p.Data, amf.DEL) + if err != nil { + return err + } + } else { + typeID = av.TAG_AUDIO + } + } + dataLen := len(p.Data) + timestamp := p.TimeStamp + timestamp += flvWriter.BaseTimeStamp() + flvWriter.RWBaser.RecTimeStamp(timestamp, uint32(typeID)) + + preDataLen := dataLen + headerLen + timestampbase := timestamp & 0xffffff + timestampExt := timestamp >> 24 & 0xff + + pio.PutU8(h[0:1], uint8(typeID)) + pio.PutI24BE(h[1:4], int32(dataLen)) + pio.PutI24BE(h[4:7], int32(timestampbase)) + pio.PutU8(h[7:8], uint8(timestampExt)) + + if _, err := flvWriter.ctx.Write(h); err != nil { + return err + } + + if _, err := flvWriter.ctx.Write(p.Data); err != nil { + return err + } + + pio.PutI32BE(h[:4], int32(preDataLen)) + if _, err := flvWriter.ctx.Write(h[:4]); err != nil { + return err + } + } else { + return errors.New("closed") + } + + } + + return nil +} + +func (flvWriter *FLVWriter) Wait() { + select { + case <-flvWriter.closedChan: + return + } +} + +func (flvWriter *FLVWriter) Close(error) { + log.Println("http flv closed") + if !flvWriter.closed { + close(flvWriter.packetQueue) + close(flvWriter.closedChan) + } + flvWriter.closed = true +} + +func (flvWriter *FLVWriter) Info() (ret av.Info) { + ret.UID = flvWriter.Uid + ret.URL = flvWriter.url + ret.Key = flvWriter.app + "/" + flvWriter.title + ret.Inter = true + return +}