Skip to content

Commit

Permalink
Merge pull request #6 from runner365/master
Browse files Browse the repository at this point in the history
新增功能和修改异常崩溃bug
  • Loading branch information
gwuhaolin committed Jul 5, 2017
2 parents 9d1384d + b2ca5ce commit d17eee1
Show file tree
Hide file tree
Showing 16 changed files with 1,016 additions and 141 deletions.
3 changes: 2 additions & 1 deletion av/av.go
Expand Up @@ -62,6 +62,7 @@ type Packet struct {
IsVideo bool
IsMetadata bool
TimeStamp uint32 // dts
StreamID uint32
Header PacketHeader
Data []byte
}
Expand Down Expand Up @@ -148,5 +149,5 @@ type WriteCloser interface {
Closer
Alive
CalcTime
Write(Packet) error
Write(*Packet) error
}
74 changes: 74 additions & 0 deletions configure/liveconfig.go
@@ -0,0 +1,74 @@
package configure

import (
"encoding/json"
"io/ioutil"
"log"
)

/*
{
[
{
"application":"live",
"live":"on",
"hls":"on",
"static_push":["rtmp://xx/live"]
}
]
}
*/
type Application struct {
Appname string
Liveon string
Hlson string
Static_push []string
}

type ServerCfg struct {
Server []Application
}

var RtmpServercfg ServerCfg

func LoadConfig(configfilename string) error {
log.Printf("starting load configure file(%s)......", configfilename)
data, err := ioutil.ReadFile(configfilename)
if err != nil {
log.Printf("ReadFile %s error:%v", configfilename, err)
return err
}

log.Printf("loadconfig: \r\n%s", string(data))

err = json.Unmarshal(data, &RtmpServercfg)
if err != nil {
log.Printf("json.Unmarshal error:%v", err)
return err
}
log.Printf("get config json data:%v", RtmpServercfg)
return nil
}

func CheckAppName(appname string) bool {
for _, app := range RtmpServercfg.Server {
if (app.Appname == appname) && (app.Liveon == "on") {
return true
}
}
return false
}

func GetStaticPushUrlList(appname string) ([]string, bool) {
for _, app := range RtmpServercfg.Server {
if (app.Appname == appname) && (app.Liveon == "on") {
if len(app.Static_push) > 0 {
return app.Static_push, true
} else {
return nil, false
}
}

}
return nil, false
}
2 changes: 1 addition & 1 deletion container/flv/muxer.go
Expand Up @@ -72,7 +72,7 @@ func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter {
return ret
}

func (writer *FLVWriter) Write(p av.Packet) error {
func (writer *FLVWriter) Write(p *av.Packet) error {
writer.RWBaser.SetPreTime()
h := writer.buf[:headerLen]
typeID := av.TAG_VIDEO
Expand Down
10 changes: 10 additions & 0 deletions livego.cfg
@@ -0,0 +1,10 @@
{
"server": [
{
"appname":"live",
"liveon":"on",
"hlson":"on"
}
]
}

42 changes: 30 additions & 12 deletions main.go → livego.go
Expand Up @@ -2,21 +2,23 @@ package main

import (
"flag"
"net"
"time"
"log"
"github.com/gwuhaolin/livego/protocol/rtmp"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/hls"
"github.com/gwuhaolin/livego/protocol/httpflv"
"github.com/gwuhaolin/livego/protocol/httpopera"
"github.com/gwuhaolin/livego/protocol/rtmp"
"log"
"net"
"time"
)

var (
version = "master"
rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address")
httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address")
hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address")
operaAddr = flag.String("manage-addr", ":8080", "HTTP manage interface server listen address")
version = "master"
rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address")
httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address")
hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address")
operaAddr = flag.String("manage-addr", ":8090", "HTTP manage interface server listen address")
configfilename = flag.String("cfgfile", "livego.cfg", "live configure filename")
)

func init() {
Expand Down Expand Up @@ -49,7 +51,16 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) {
log.Fatal(err)
}

rtmpServer := rtmp.NewRtmpServer(stream, hlsServer)
var rtmpServer *rtmp.Server

if hlsServer == nil {
rtmpServer = rtmp.NewRtmpServer(stream, nil)
log.Printf("hls server disable....")
} else {
rtmpServer = rtmp.NewRtmpServer(stream, hlsServer)
log.Printf("hls server enable....")
}

defer func() {
if r := recover(); r != nil {
log.Println("RTMP server panic: ", r)
Expand Down Expand Up @@ -83,7 +94,7 @@ func startHTTPOpera(stream *rtmp.RtmpStream) {
if err != nil {
log.Fatal(err)
}
opServer := httpopera.NewServer(stream)
opServer := httpopera.NewServer(stream, *rtmpAddr)
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -104,9 +115,16 @@ func main() {
}
}()
log.Println("start livego, version", version)
err := configure.LoadConfig(*configfilename)
if err != nil {
return
}

stream := rtmp.NewRtmpStream()
hlsServer := startHls()
startHTTPFlv(stream)
//startHTTPOpera(stream)
startHTTPOpera(stream)

startRtmp(stream, hlsServer)
//startRtmp(stream, nil)
}
28 changes: 20 additions & 8 deletions protocol/hls/source.go
Expand Up @@ -35,7 +35,7 @@ type Source struct {
tsCache *TSCacheItem
tsparser *parser.CodecParser
closed bool
packetQueue chan av.Packet
packetQueue chan *av.Packet
}

func NewSource(info av.Info) *Source {
Expand All @@ -51,7 +51,7 @@ func NewSource(info av.Info) *Source {
tsCache: NewTSCacheItem(info.Key),
tsparser: parser.NewCodecParser(),
bwriter: bytes.NewBuffer(make([]byte, 100*1024)),
packetQueue: make(chan av.Packet, maxQueueNum),
packetQueue: make(chan *av.Packet, maxQueueNum),
}
go func() {
err := s.SendPacket()
Expand All @@ -67,7 +67,7 @@ func (source *Source) GetCacheInc() *TSCacheItem {
return source.tsCache
}

func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) {
func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
Expand Down Expand Up @@ -95,16 +95,27 @@ func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) {
log.Println("packet queue len: ", len(pktQue))
}

func (source *Source) Write(p av.Packet) error {
func (source *Source) Write(p *av.Packet) (err error) {
err = nil
if source.closed {
err = errors.New("hls source closed")
return
}
source.SetPreTime()
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("hls source has already been closed:%v", e)
err = errors.New(errString)
}
}()
if len(source.packetQueue) >= maxQueueNum-24 {
source.DropPacket(source.packetQueue, source.info)
} else {
if !source.closed {
source.packetQueue <- p
}
}
return nil
return
}

func (source *Source) SendPacket() error {
Expand All @@ -114,6 +125,7 @@ func (source *Source) SendPacket() error {
log.Println("hls SendPacket panic: ", r)
}
}()

log.Printf("[%v] hls sender start", source.info)
for {
if source.closed {
Expand All @@ -126,7 +138,7 @@ func (source *Source) SendPacket() error {
continue
}

err := source.demuxer.Demux(&p)
err := source.demuxer.Demux(p)
if err == flv.ErrAvcEndSEQ {
log.Println(err)
continue
Expand All @@ -136,7 +148,7 @@ func (source *Source) SendPacket() error {
return err
}
}
compositionTime, isSeq, err := source.parse(&p)
compositionTime, isSeq, err := source.parse(p)
if err != nil {
log.Println(err)
}
Expand All @@ -146,7 +158,7 @@ func (source *Source) SendPacket() error {
if source.btswriter != nil {
source.stat.update(p.IsVideo, p.TimeStamp)
source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime))
source.tsMux(&p)
source.tsMux(p)
}
} else {
return errors.New("closed")
Expand Down
32 changes: 20 additions & 12 deletions protocol/httpflv/writer.go
Expand Up @@ -3,6 +3,7 @@ package httpflv
import (
"time"
"errors"
"fmt"
"log"
"net/http"
"github.com/gwuhaolin/livego/utils/uid"
Expand All @@ -17,14 +18,14 @@ const (
)

type FLVWriter struct {
Uid string
Uid string
av.RWBaser
app, title, url string
buf []byte
closed bool
closedChan chan struct{}
ctx http.ResponseWriter
packetQueue chan av.Packet
packetQueue chan *av.Packet
}

func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
Expand All @@ -37,7 +38,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
RWBaser: av.NewRWBaser(time.Second * 10),
closedChan: make(chan struct{}),
buf: make([]byte, headerLen),
packetQueue: make(chan av.Packet, maxQueueNum),
packetQueue: make(chan *av.Packet, maxQueueNum),
}

ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09})
Expand All @@ -53,7 +54,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
return ret
}

func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
func (flvWriter *FLVWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
Expand All @@ -80,18 +81,25 @@ func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
log.Println("packet queue len: ", len(pktQue))
}

func (flvWriter *FLVWriter) Write(p av.Packet) error {
if !flvWriter.closed {
if len(flvWriter.packetQueue) >= maxQueueNum-24 {
flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info())
} else {
flvWriter.packetQueue <- p
func (flvWriter *FLVWriter) Write(p *av.Packet) (err error) {
err = nil
if flvWriter.closed {
err = errors.New("flvwrite source closed")
return
}
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("FLVWriter has already been closed:%v", e)
err = errors.New(errString)
}
return nil
}()
if len(flvWriter.packetQueue) >= maxQueueNum-24 {
flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info())
} else {
return errors.New("closed")
flvWriter.packetQueue <- p
}

return
}

func (flvWriter *FLVWriter) SendPacket() error {
Expand Down

0 comments on commit d17eee1

Please sign in to comment.