Skip to content

Commit

Permalink
Merge pull request #57 from fzxiao233/fix_keep
Browse files Browse the repository at this point in the history
[invalid]改进断流处理
  • Loading branch information
AlotOfBlahaj committed May 30, 2020
2 parents 24f5bc4 + b299aa3 commit 28ecc41
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 34 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ require (
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect
github.com/spf13/viper v1.6.2
github.com/tidwall/gjson v1.6.0 // indirect
github.com/tidwall/pretty v1.0.1 // indirect
)
56 changes: 32 additions & 24 deletions plugins/videoProcesser.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,35 @@ type ProcessVideo struct {
videoPathList VideoPathList
liveTrace LiveTrace
monitor monitor.VideoMonitor
isLive bool
end chan int
}

func (p *ProcessVideo) startDownloadVideo(ch chan string) {
p.videoPathList = VideoPathList{}
for {
aFilePath := worker.DownloadVideo(p.liveStatus.video)
time.Sleep(time.Millisecond * 100)
if aFilePath != "" {
p.videoPathList = append(p.videoPathList, aFilePath)
}
videoName := p.liveStatus.video.Title + ".ts"
if utils.Config.EnableTS2MP4 {
if len(p.videoPathList) > 1 {
videoName = p.videoPathList.mergeVideo(p.liveStatus.video.Title, p.liveStatus.video.UsersConfig.DownloadDir)
} else {
videoName = ts2mp4(aFilePath, p.liveStatus.video.UsersConfig.DownloadDir, p.liveStatus.video.Title)
}
time.Sleep(time.Millisecond * 100)
if !p.isLive {
break
}
if videoName == "" {
return
}
var videoName string
if utils.Config.EnableTS2MP4 {
if len(p.videoPathList) > 1 {
videoName = p.videoPathList.mergeVideo(p.liveStatus.video.Title, p.liveStatus.video.UsersConfig.DownloadDir)
} else {
videoName = ts2mp4(p.videoPathList[0], p.liveStatus.video.UsersConfig.DownloadDir, p.liveStatus.video.Title)
}
ch <- videoName
break

}
if videoName == "" {
p.end <- 1
return
}
ch <- videoName
}

func (p *ProcessVideo) isNeedDownload() bool {
Expand All @@ -48,34 +52,38 @@ func (p *ProcessVideo) isNeedDownload() bool {

func (p *ProcessVideo) StartProcessVideo() {
log.Printf("%s|%s|%s is living. start to process", p.liveStatus.video.Provider, p.liveStatus.video.UsersConfig.Name, p.liveStatus.video.Title)
p.isLive = true // 默认在直播中
ch := make(chan string)
video := p.liveStatus.video
end := make(chan int)
p.end = make(chan int)
go worker.CQBot(video)
go p.keepLiveAlive()
if p.isNeedDownload() {
p.liveStatus.video.TransRecordPath = worker.StartRecord(video)
go p.startDownloadVideo(ch)
go p.distributeVideo(end, <-ch)
} else {
go p.keepLiveAlive(end)
go p.distributeVideo(<-ch)
}
<-end
<-p.end
worker.CloseRecord(video)
}

func (p *ProcessVideo) distributeVideo(end chan int, fileName string) {
func (p *ProcessVideo) distributeVideo(fileName string) {
video := p.liveStatus.video
video.FileName = fileName
video.FilePath = video.UsersConfig.DownloadDir + "/" + video.FileName
worker.UploadVideo(video)
end <- 1
p.end <- 1
}

func (p *ProcessVideo) keepLiveAlive(end chan int) {
func (p *ProcessVideo) keepLiveAlive() {
ticker := time.NewTicker(time.Second * 30)
for {
if p.isNewLive() {
end <- 1
p.isLive = false
if p.isNeedDownload() {
return // 需要下载时不由此控制end
}
p.end <- 1
return
}
<-ticker.C
Expand All @@ -85,10 +93,10 @@ func (p *ProcessVideo) keepLiveAlive(end chan int) {
func (p *ProcessVideo) isNewLive() bool {
newLiveStatus := p.liveTrace(p.monitor, p.liveStatus.video.UsersConfig)
if newLiveStatus.isLive == false || (p.liveStatus.isLive == true && p.liveStatus.video.Title != newLiveStatus.video.Title || p.liveStatus.video.StreamingLink != newLiveStatus.video.StreamingLink) {
log.Printf("%s|%s|%s is new live or offline", p.liveStatus.video.Provider, p.liveStatus.video.UsersConfig.Name, p.liveStatus.video.Title)
log.Printf("[isNewLive]%s|%s|%s is new live or offline", p.liveStatus.video.Provider, p.liveStatus.video.UsersConfig.Name, p.liveStatus.video.Title)
return true
} else {
log.Printf("%s|%s|%s KeepAlive", p.liveStatus.video.Provider, p.liveStatus.video.UsersConfig.Name, p.liveStatus.video.Title)
log.Printf("[isNewLive]%s|%s|%s KeepAlive", p.liveStatus.video.Provider, p.liveStatus.video.UsersConfig.Name, p.liveStatus.video.Title)
return false
}
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/worker/downloadCore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ func downloadByStreamlink(video *structUtils.VideoInfo) {
arg = addStreamlinkProxy(arg)
}
arg = append(arg, video.Target, utils.Config.DownloadQuality)
log.Printf("start to download %s", video.FilePath)
log.Printf("[Downloader]start to download %s", video.FilePath)
utils.ExecShell("streamlink", arg...)
}

func DownloadVideo(video *structUtils.VideoInfo) string {
log.Printf("%s|%s start to download", video.Provider, video.UsersConfig.Name)
log.Printf("[Downloader]%s|%s start to download", video.Provider, video.UsersConfig.Name)
video.Title = utils.RemoveIllegalChar(video.Title)
video.FilePath = utils.GenerateFilepath(video.UsersConfig.Name, video.Title+".ts")
video.UsersConfig.DownloadDir = utils.GenerateDownloadDir(video.UsersConfig.Name)
downloadByStreamlink(video)
if !utils.IsFileExist(video.FilePath) {
log.Printf("downloader: %s the video file don't exist", video.Title)
log.Printf("[Downloader] %s the video file don't exist", video.Title)
return ""
}
log.Printf("%s download successfully", video.FilePath)
log.Printf("[Downloader]%s download successfully", video.FilePath)
return video.FilePath
}
7 changes: 1 addition & 6 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ func HttpGet(url string, header map[string]string) ([]byte, error) {
}
res, err := client.Do(req)
if res != nil {
defer func() {
err := res.Body.Close()
if err != nil {
return
}
}()
defer res.Body.Close()
}
if err != nil {
err = fmt.Errorf("HttpGet error %w", err)
Expand Down

0 comments on commit 28ecc41

Please sign in to comment.