diff --git a/config/config.go b/config/config.go index a7e7017..067614e 100644 --- a/config/config.go +++ b/config/config.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/fsnotify/fsnotify" "github.com/mitchellh/mapstructure" - "github.com/rclone/rclone/fs" "github.com/sirupsen/logrus" "github.com/spf13/viper" "log" @@ -23,7 +22,6 @@ type UsersConfig struct { DownloadDir string NeedDownload bool TransBiliId string - UserHeaders map[string]string ExtraConfig map[string]interface{} } type ModuleConfig struct { @@ -46,10 +44,6 @@ type MainConfig struct { DownloadDir []string UploadDir string Module []ModuleConfig - PprofHost string - OutboundAddrs []string - DomainRewrite map[string]([]string) - RedisHost string ExpressPort string EnableTS2MP4 bool ExtraConfig map[string]interface{} @@ -123,10 +117,6 @@ func ReloadConfig() (bool, error) { if err != nil { fmt.Printf("Struct config error: %s", err) } - /*modules := viper.AllSettings()["module"].([]interface{}) - for i := 0; i < len(modules); i++ { - Config.Module[i].ExtraConfig = modules[i].(map[string]interface{}) - }*/ Config = config UpdateLogLevel() return true, nil @@ -147,18 +137,6 @@ func LevelStrParse(levelStr string) (level logrus.Level) { } func UpdateLogLevel() { - fs.Config.LogLevel = fs.LogLevelInfo - if Config.RLogLevel == "debug" { - fs.Config.LogLevel = fs.LogLevelDebug - } else if Config.RLogLevel == "info" { - fs.Config.LogLevel = fs.LogLevelInfo - } else if Config.RLogLevel == "warn" { - fs.Config.LogLevel = fs.LogLevelWarning - } else if Config.RLogLevel == "error" { - fs.Config.LogLevel = fs.LogLevelError - } - logrus.Printf("Set rclone logrus level to %s", fs.Config.LogLevel) - if ConsoleHook != nil { level := LevelStrParse(Config.LogLevel) ConsoleHook.LogLevel = level diff --git a/config/logs.go b/config/logs.go index dcaf712..08994da 100644 --- a/config/logs.go +++ b/config/logs.go @@ -3,9 +3,7 @@ package config import ( "fmt" "github.com/fzxiao233/Vtb_Record/utils" - "github.com/knq/sdhook" "github.com/orandin/lumberjackrus" - "github.com/rclone/rclone/fs" "github.com/sirupsen/logrus" "io" "io/ioutil" @@ -46,7 +44,6 @@ func (hook *WriterHook) Levels() []logrus.Level { var ConsoleHook *WriterHook var FileHook *lumberjackrus.Hook -var GoogleHook *sdhook.StackdriverHook // Can't be func init as we need the parsed config func InitLog() { @@ -94,20 +91,5 @@ func InitLog() { logrus.AddHook(FileHook) - GoogleHook, err = sdhook.New( - sdhook.GoogleLoggingAgent(), - sdhook.LogName(Config.LogFile), - sdhook.Levels(logrus.AllLevels[:logrus.DebugLevel+1]...), - ) - if err != nil { - logrus.WithField("prof", true).Warnf("Failed to initialize the sdhook: %v", err) - } else { - logrus.AddHook(GoogleHook) - } - - fs.LogPrint = func(level fs.LogLevel, text string) { - logrus.WithField("src", "rclone").Infof(fmt.Sprintf("%-6s: %s", level, text)) - } - UpdateLogLevel() } diff --git a/config/prof.go b/config/prof.go index d48d5de..72146c4 100644 --- a/config/prof.go +++ b/config/prof.go @@ -1,11 +1,7 @@ package config import ( - "context" - "github.com/gogf/greuse" log "github.com/sirupsen/logrus" - "net" - "net/http" "runtime" "runtime/debug" "time" @@ -27,38 +23,7 @@ func PrintMemUsage() { m.NumGC) } -var PprofServer *http.Server - func InitProfiling() { - go func() { - logger := log.WithField("prof", true) - ticker := time.NewTicker(time.Minute * 1) - for { - //go http.ListenAndServe("0.0.0.0:49314", nil) - if PprofServer == nil || PprofServer.Addr != Config.PprofHost { - logger.Warnf("Starting pprof server") - if PprofServer != nil { - go PprofServer.Shutdown(context.Background()) - } - //PprofServer = &http.Server{Addr: config.Config.PprofHost, Handler: nil} - listener, err := greuse.Listen("tcp", Config.PprofHost) - if listener == nil { - logger.Warnf("Error creating reusable listener, creating a normal one instead!") - listener, err = net.Listen("tcp", Config.PprofHost) - } - if err != nil { - logger.WithError(err).Warnf("Failed to reuse-listen addr") - } - PprofServer = &http.Server{ - Addr: Config.PprofHost, - } - //go PprofServer.ListenAndServe() - go PprofServer.Serve(listener) - } - <-ticker.C - } - }() - go func() { ticker := time.NewTicker(time.Minute * 1) for { @@ -70,13 +35,10 @@ func InitProfiling() { go func() { ticker := time.NewTicker(time.Millisecond * 3000) for { - //start := time.Now() runtime.GC() - //log.WithField("prof", true).Debugf("G C & scvg use %s", time.Now().Sub(start)) <-ticker.C } }() - ticker := time.NewTicker(time.Second * 3) for { start := time.Now() diff --git a/go.mod b/go.mod index 7f71383..010998b 100644 --- a/go.mod +++ b/go.mod @@ -5,15 +5,14 @@ go 1.13 require ( github.com/bitly/go-simplejson v0.5.0 github.com/etherlabsio/go-m3u8 v0.1.2 - github.com/fsnotify/fsnotify v1.4.7 + github.com/fsnotify/fsnotify v1.4.9 github.com/fzxiao233/Go-Emoji-Utils v0.0.0-20200305114615-005e99b02c2f github.com/go-redis/redis v6.15.8+incompatible + github.com/go-redis/redis/v8 v8.7.1 github.com/gogf/greuse v1.1.0 github.com/hashicorp/golang-lru v0.5.4 github.com/knq/sdhook v0.0.0-20190801142816-0b7fa827d09a github.com/mitchellh/mapstructure v1.1.2 - github.com/onsi/ginkgo v1.12.0 // indirect - github.com/onsi/gomega v1.9.0 // indirect github.com/orandin/lumberjackrus v1.0.1 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/rclone/rclone v1.52.2 @@ -23,8 +22,7 @@ require ( github.com/tidwall/pretty v1.0.1 // indirect github.com/valyala/bytebufferpool v1.0.0 go.uber.org/ratelimit v0.1.0 - golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 // indirect - golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/protobuf v1.23.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/live/plugins/pubsub.go b/live/plugins/pubsub.go deleted file mode 100644 index faee012..0000000 --- a/live/plugins/pubsub.go +++ /dev/null @@ -1,24 +0,0 @@ -package plugins - -import ( - "github.com/fzxiao233/Vtb_Record/config" - "github.com/go-redis/redis" -) - -var RedisClient *redis.Client - -func initRedis() *redis.Client { - RedisClient := redis.NewClient( - &redis.Options{ - Addr: config.Config.RedisHost, - Password: "", - DB: 0, - }) - return RedisClient -} -func Publish(data []byte, channel string) { - if RedisClient == nil { - RedisClient = initRedis() - } - _ = RedisClient.Publish(channel, data) -} diff --git a/live/plugins/uploader.go b/live/plugins/uploader.go deleted file mode 100644 index 24ba6a5..0000000 --- a/live/plugins/uploader.go +++ /dev/null @@ -1,49 +0,0 @@ -package plugins - -import ( - "encoding/json" - "github.com/fzxiao233/Vtb_Record/live/videoworker" - log "github.com/sirupsen/logrus" -) - -type UploadDict struct { - Title string - Filename string - Date string - Path string - User string - OriginTitle string `json:"Origin_Title"` - ASS string - Txt string - OriginTarget string `json:"originTarget"` -} - -type PluginUploader struct { -} - -func (p *PluginUploader) LiveStart(process *videoworker.ProcessVideo) error { - return nil -} - -func (p *PluginUploader) DownloadStart(process *videoworker.ProcessVideo) error { - video := process.LiveStatus.Video - u := UploadDict{ - Title: video.Title, - Filename: video.FileName, - Date: video.Date, - Path: video.FilePath, - User: video.UsersConfig.Name, - OriginTitle: video.Title, - ASS: "", - Txt: video.TransRecordPath, - OriginTarget: video.Target, - } - data, _ := json.Marshal(u) - log.Debug(string(data)) - Publish(data, "upload") - return nil -} - -func (p *PluginUploader) LiveEnd(process *videoworker.ProcessVideo) error { - return nil -} diff --git a/live/videoworker/downloader/provgo/downloaderGo.go b/live/videoworker/downloader/provgo/downloaderGo.go deleted file mode 100644 index 29fd870..0000000 --- a/live/videoworker/downloader/provgo/downloaderGo.go +++ /dev/null @@ -1,199 +0,0 @@ -package provgo - -import ( - "context" - "fmt" - "github.com/bitly/go-simplejson" - "github.com/fzxiao233/Vtb_Record/config" - "github.com/fzxiao233/Vtb_Record/live/interfaces" - "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/provbase" - "github.com/fzxiao233/Vtb_Record/utils" - log "github.com/sirupsen/logrus" - "go.uber.org/ratelimit" - "golang.org/x/sync/semaphore" - "io" - "os" - - "strings" -) - -type DownloaderGo struct { - provbase.Downloader - cookie string - proxy string - useAlt bool -} - -func addStreamlinkProxy(co []string, proxy string) []string { - co = append(co, "--http-proxy", "socks5://"+proxy) - return co -} - -var rl ratelimit.Limiter -var randData []byte - -func init() { - rl = ratelimit.New(1) - randFile, err := os.Open("randData") - if err == nil { - randData = make([]byte, 6*1024*1024) - io.ReadFull(randFile, randData) - } -} - -var StreamlinkSemaphore = semaphore.NewWeighted(3) - -func updateInfo(video *interfaces.VideoInfo, proxy string, cookie string, isAlt bool) (needAbort bool, err error, infoJson *simplejson.Json) { - needAbort = false - rl.Take() - logger := log.WithField("video", video).WithField("alt", isAlt) - var conf string - if isAlt { - conf = "AltStreamLinkArgs" - } else { - conf = "StreamLinkArgs" - } - _arg, ok := video.UsersConfig.ExtraConfig[conf] - arg := []string{} - if ok { - for _, a := range _arg.([]interface{}) { - arg = append(arg, a.(string)) - } - } - arg = append(arg, []string{"--json"}...) - if proxy != "" { - arg = addStreamlinkProxy(arg, proxy) - } - if cookie != "" { - hasCookie := false - for _, c := range arg { - if c == "--http-cookies" { - hasCookie = true - } - } - if !hasCookie { - arg = append(arg, []string{"--http-cookies", cookie}...) - } - } - arg = append(arg, video.Target, config.Config.DownloadQuality) - logger.Infof("start to query, command %s", arg) - StreamlinkSemaphore.Acquire(context.Background(), 1) - ret, stderr := utils.ExecShellEx(logger, false, "streamlink", arg...) - StreamlinkSemaphore.Release(1) - if stderr != "" { - logger.Infof("Streamlink err output: %s", stderr) - if strings.Contains(stderr, "(abort)") { - err = fmt.Errorf("streamlink requested abort") - needAbort = true - return - } - } - if ret == "" { - err = fmt.Errorf("streamlink returned unexpected json") - return - } - _ret := []byte(ret) - infoJson, _ = simplejson.NewJson(_ret) - if infoJson == nil { - err = fmt.Errorf("JSON parsed failed: %s", ret) - return - } - slErr := infoJson.Get("error").MustString() - if slErr != "" { - err = fmt.Errorf("Streamlink error: " + slErr) - if strings.Contains(stderr, "(abort)") { - log.WithField("video", video).WithError(err).Warnf("streamlink requested abort") - needAbort = true - } - return - } - err = nil - return -} - -func parseHttpJson(infoJson *simplejson.Json) (string, map[string]string, error) { - jret := infoJson.Get("url") - if jret == nil { - return "", nil, fmt.Errorf("Not a good json ret: no url") - } - url := jret.MustString() - headers := make(map[string]string) - jret = infoJson.Get("headers") - if jret == nil { - return "", nil, fmt.Errorf("Not a good json ret: no headers") - } - for k, v := range jret.MustMap() { - headers[k] = v.(string) - } - return url, headers, nil -} - -func (d *DownloaderGo) StartDownload(video *interfaces.VideoInfo, proxy string, cookie string, filepath string) error { - logger := log.WithField("video", video) - d.cookie = cookie - d.proxy = proxy - d.useAlt = false - - var err error - var infoJson *simplejson.Json - var streamtype string - var needAbort bool - for i := 0; i < 6; i++ { - if i < 3 { - needAbort, err, infoJson = updateInfo(video, proxy, cookie, false) - } else { - d.useAlt = true - needAbort, err, infoJson = updateInfo(video, proxy, cookie, true) - } - if needAbort { - // if we didn't entered live - logger.Warnf("Streamlink requested to abort because: %s", err) - panic("forceabort") - } - if err == nil { - err = func() error { - jret := infoJson.Get("type") - if jret == nil { - return fmt.Errorf("Not a good json ret: no type") - } - streamtype = jret.MustString() - if streamtype == "" { - return fmt.Errorf("Not a good json ret: %s", infoJson) - } - return nil - }() - if err != nil { - continue - } - if streamtype == "http" || streamtype == "hls" { - url, headers, err := parseHttpJson(infoJson) - if err != nil { - return err - } - //needMove := config.Config.UploadDir == config.Config.DownloadDir - needMove := false - if streamtype == "http" { - logger.Infof("start to download httpstream %s", url) - return doDownloadHttp(logger, filepath, url, headers, needMove) - } else { - if strings.Contains(url, "gotcha103") { - //fuck qiniu - //entry.Errorf("Not supporting qiniu cdn... %s", m3u8url) - logger.Warnf("We're getting qiniu cdn... %s, having shitty downloading experiences", url) - //continue - } - logger.Infof("start to download hls stream %s", url) - return d.doDownloadHls(logger, filepath, video, url, headers, needMove) - } - } else { - return fmt.Errorf("Unknown stream type: %s", streamtype) - } - } else { - logger.WithField("alt", d.useAlt).Infof("Failed to query m3u8 url, err: %s", err) - if needAbort { - return fmt.Errorf("abort") - } - } - } - return err -} diff --git a/live/videoworker/downloader/provgo/downloaderGoHLS.go b/live/videoworker/downloader/provgo/downloaderGoHLS.go deleted file mode 100644 index e34c8a9..0000000 --- a/live/videoworker/downloader/provgo/downloaderGoHLS.go +++ /dev/null @@ -1,892 +0,0 @@ -package provgo - -import ( - "bytes" - "crypto/tls" - "fmt" - m3u8Parser "github.com/etherlabsio/go-m3u8/m3u8" - "github.com/fzxiao233/Vtb_Record/live/interfaces" - "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/stealth" - "github.com/fzxiao233/Vtb_Record/utils" - lru "github.com/hashicorp/golang-lru" - "github.com/patrickmn/go-cache" - log "github.com/sirupsen/logrus" - "github.com/valyala/bytebufferpool" - "go.uber.org/ratelimit" - "io" - "math/rand" - "net/http" - "net/url" - "path" - "strconv" - "strings" - "sync" - "time" -) - -type HLSSegment struct { - SegNo int - SegArriveTime time.Time - Url string - //Data []byte - Data *bytes.Buffer -} - -type HLSDownloader struct { - Logger *log.Entry - M3U8UrlRewriter stealth.URLRewriter - AltAsMain bool - OutPath string - Video *interfaces.VideoInfo - Cookie string - - HLSUrl string - HLSHeader map[string]string - AltHLSUrl string - AltHLSHeader map[string]string - UrlUpdating sync.Mutex - AltUrlUpdating sync.Mutex - - Clients []*http.Client - AltClients []*http.Client - allClients []*http.Client - - SeqMap sync.Map - AltSeqMap *lru.Cache - SegLen float64 - FinishSeq int - lastSeqNo int - Stopped bool - AltStopped bool - output io.Writer - segRl ratelimit.Limiter - - firstSeqChan chan int - hasAlt bool - - errChan chan error - alterrChan chan error - - forceRefreshChan chan int - altforceRefreshChan chan int - - downloadErr *cache.Cache - altdownloadErr *cache.Cache - - altSegErr sync.Map -} - -var bufPool bytebufferpool.Pool - -var IsStub = false - -// download each segment -func (d *HLSDownloader) handleSegment(segData *HLSSegment) bool { - // rate limit the download speed... - d.segRl.Take() - if IsStub { - return true - } - - logger := d.Logger.WithField("alt", false) - - // download using a client - downChan := make(chan *bytes.Buffer) - defer func() { - defer func() { - recover() - }() - close(downChan) - }() - doDownload := func(client *http.Client) { - s := time.Now() - newbuf, err := utils.HttpGetBuffer(client, segData.Url, d.HLSHeader, nil) - if err != nil { - logger.WithError(err).Infof("Err when download segment %s", segData.Url) - // if it's 404, then we'll never be able to download it later, stop the useless retry - if strings.HasSuffix(err.Error(), "404") { - func() { - defer func() { - recover() - }() - ch := downChan - if ch == nil { - return - } - ch <- nil - }() - } - } else { - usedTime := time.Now().Sub(s) - if usedTime > time.Second*15 { - // we used too much time to download a segment - logger.Infof("Download %d used %s", segData.SegNo, usedTime) - } - func() { - defer func() { - recover() - }() - ch := downChan - if ch == nil { - return - } - ch <- newbuf - }() - } - } - - // prepare the client - onlyAlt := false - // gotcha104 is tencent yun, only m3u8 blocked the foreign ip, so after that we simply ignore it - /*if strings.Contains(segData.Url, "gotcha104") { - onlyAlt = true - }*/ - i := 0 - clients := d.allClients - if onlyAlt { - clients = d.AltClients - if len(clients) == 0 { - clients = d.allClients - } - } else { - // TODO: Refactor this - if strings.Contains(segData.Url, "gotcha105") { - clients = make([]*http.Client, 0) - clients = append(clients, d.Clients...) - clients = append(clients, d.Clients...) // double same client - } else if strings.Contains(segData.Url, "gotcha104") { - clients = []*http.Client{} - clients = append(clients, d.AltClients...) - clients = append(clients, d.Clients...) - } else if strings.Contains(segData.Url, "googlevideo.com") { - clients = []*http.Client{} - clients = append(clients, d.Clients...) - } - } - - // we one by one use each clients to download the segment, the first returned downloader wins - // normally each hls seg will exist for 1 minutes - round := 0 -breakout: - for { - i %= len(clients) - go doDownload(clients[i]) - i += 1 - select { - case ret := <-downChan: - close(downChan) - if ret == nil { // unrecoverable error, so reture at once - return false - } - segData.Data = ret - break breakout - case <-time.After(15 * time.Second): - // wait 10 second for each download try - } - if i == len(clients) { - logger.Warnf("Failed all-clients to download segment %d", segData.SegNo) - round++ - } - if time.Now().Sub(segData.SegArriveTime) > 300*time.Second { - logger.Warnf("Failed to download segment %d within timeout...", segData.SegNo) - return false - } - } - if round > 0 { - // log the too long seg download and alt seg download - logger.Infof("Downloaded segment %d: len %v", segData.SegNo, segData.Data.Len()) - } else { - logger.Debugf("Downloaded segment %d: len %v", segData.SegNo, segData.Data.Len()) - } - return true -} - -type ParserStatus int32 - -const ( - Parser_OK ParserStatus = 0 - Parser_FAIL ParserStatus = 1 - Parser_REDIRECT ParserStatus = 2 -) - -// parse the m3u8 file to get segment number and url -func (d *HLSDownloader) m3u8Parser(parsedurl *url.URL, m3u8 string, isAlt bool) (status ParserStatus, additionalData interface{}) { - logger := d.Logger.WithField("alt", isAlt) - relaUrl := "http" + "://" + parsedurl.Host + path.Dir(parsedurl.Path) - hostUrl := "http" + "://" + parsedurl.Host - // if url is /XXX.ts, then it's related to host, if the url is XXX.ts, then it's related to url path - getSegUrl := func(url string) string { - if strings.HasPrefix(url, "http") { - return url - } else if url[0:1] == "/" { - return hostUrl + url - } else { - return relaUrl + "/" + url - } - } - - playlist, err := m3u8Parser.ReadString(m3u8) - if err != nil { - return Parser_FAIL, err - } - - curseq := playlist.Sequence - - if curseq == -1 { - // curseq parse failed - logger.Warnf("curseq parse failed!!!") - return Parser_FAIL, nil - } - - segs := make([]string, 0) - - seg_i := 0 - for _, _item := range playlist.Items { - switch item := _item.(type) { - case *m3u8Parser.PlaylistItem: - //log.Debugf("Got redirect m3u8, redirecting to %s", item.URI) - return Parser_REDIRECT, item.URI - case *m3u8Parser.SegmentItem: - seqNo := curseq + seg_i - if playlist.IsLive() && seg_i == 0 { - d.SegLen = item.Duration - } - seg_i += 1 - segs = append(segs, item.Segment) - - if !isAlt { - _segData, loaded := d.SeqMap.LoadOrStore(seqNo, &HLSSegment{SegNo: seqNo, SegArriveTime: time.Now(), Url: getSegUrl(item.Segment)}) - if !loaded { - segData := _segData.(*HLSSegment) - logger.Debugf("Got new seg %d %s", seqNo, segData.Url) - go d.handleSegment(segData) - } - } else { - d.AltSeqMap.PeekOrAdd(seqNo, &HLSSegment{SegNo: seqNo, SegArriveTime: time.Now(), Url: getSegUrl(item.Segment)}) - } - } - } - if !isAlt && d.firstSeqChan != nil { - d.firstSeqChan <- curseq - d.firstSeqChan = nil - } - if !isAlt { - d.lastSeqNo = curseq + len(segs) - } - if !playlist.IsLive() { - d.FinishSeq = curseq + len(segs) - 1 - } - - return Parser_OK, nil -} - -func (d *HLSDownloader) forceRefresh(isAlt bool) { - defer func() { - recover() - }() - ch := d.forceRefreshChan - if !isAlt { - ch = d.forceRefreshChan - } else { - ch = d.altforceRefreshChan - } - if ch == nil { - return - } - ch <- 1 -} - -func (d *HLSDownloader) sendErr(err error) { - defer func() { - recover() - }() - ch := d.errChan - if ch == nil { - return - } - ch <- err -} - -func (d *HLSDownloader) getHLSUrl(isAlt bool) (curUrl string, curHeader map[string]string) { - if !isAlt { - d.UrlUpdating.Lock() - curUrl = d.HLSUrl - curHeader = d.HLSHeader - d.UrlUpdating.Unlock() - } else { - d.AltUrlUpdating.Lock() - curUrl = d.AltHLSUrl - curHeader = d.AltHLSHeader - d.AltUrlUpdating.Unlock() - } - return -} - -func (d *HLSDownloader) setHLSUrl(isAlt bool, curUrl string, curHeader map[string]string) { - if !isAlt { - d.UrlUpdating.Lock() - d.HLSUrl = curUrl - if curHeader != nil { - d.HLSHeader = curHeader - } - d.UrlUpdating.Unlock() - } else { - d.AltUrlUpdating.Lock() - d.AltHLSUrl = curUrl - if curHeader != nil { - d.AltHLSHeader = curHeader - } - d.AltUrlUpdating.Unlock() - } - return -} - -type M3u8ParserCallback interface { - m3u8Parser(parsedurl *url.URL, m3u8 string, isAlt bool) (status ParserStatus, additionalData interface{}) -} - -// the core worker that download the m3u8 file -func (d *HLSDownloader) m3u8Handler(isAlt bool, parser M3u8ParserCallback) error { - var err error - logger := d.Logger.WithField("alt", isAlt) - - // if too many errors occurred during the m3u8 downloading, then we refresh the url - errCache := d.downloadErr - if isAlt { - errCache = d.altdownloadErr - } - errCache.DeleteExpired() - if errCache.ItemCount() >= 5 { - errs := make([]interface{}, 0, 10) - for _, e := range errCache.Items() { - errs = append(errs, e) - } - errCache.Flush() - url, _ := d.getHLSUrl(isAlt) - logger.WithField("errors", errs).Warnf("Too many err occured downloading %s, refreshing m3u8url...", url) - d.forceRefresh(isAlt) - //time.Sleep(5 * time.Second) - } - - // setup the worker chan - retchan := make(chan []byte, 1) - defer func() { - defer func() { - recover() - }() - close(retchan) - }() - - if retchan == nil { - retchan = make(chan []byte, 1) - } - - // prepare the url - var curUrl string - var curHeader map[string]string - curUrl, curHeader = d.getHLSUrl(isAlt) - if curUrl == "" { - logger.Infof("got empty m3u8 url") - d.forceRefresh(isAlt) - time.Sleep(10 * time.Second) - return nil - } - _, err = url.Parse(curUrl) - if err != nil { - logger.WithError(err).Warnf("m3u8 url parse fail") - d.forceRefresh(isAlt) - //time.Sleep(10 * time.Second) - return nil - } - curUrl, useMain, useAlt := d.M3U8UrlRewriter.Rewrite(curUrl) // do some transform to avoid the rate limit from provider - - // request the m3u8 - doQuery := func(client *http.Client) { - m3u8CurUrl := curUrl - for { - if _, ok := curHeader["Accept-Encoding"]; ok { // if there's custom Accept-Encoding, http.Client won't process them for us - delete(curHeader, "Accept-Encoding") - } - _m3u8, err := utils.HttpGet(client, m3u8CurUrl, curHeader) - if err != nil { - d.M3U8UrlRewriter.Callback(m3u8CurUrl, err) - logger.WithError(err).Debugf("Download m3u8 failed") - // if it's 404, then we need to abort - if strings.HasSuffix(err.Error(), "404") { - func() { - defer func() { - recover() - }() - ch := retchan - if ch == nil { - return - } - ch <- nil // abort! - }() - } else { - if !isAlt { - d.downloadErr.SetDefault(strconv.Itoa(int(time.Now().Unix())), err) - } else { - d.altdownloadErr.SetDefault(strconv.Itoa(int(time.Now().Unix())), err) - } - } - } else { - func() { - defer func() { - recover() - }() - ch := retchan - if ch == nil { - return - } - ch <- _m3u8 // abort! - }() - //logger.Debugf("Downloaded m3u8 in %s", time.Now().Sub(start)) - m3u8 := string(_m3u8) - m3u8parsedurl, _ := url.Parse(m3u8CurUrl) - //ret, info := d.m3u8Parser(m3u8parsedurl, m3u8, isAlt) - ret, info := parser.m3u8Parser(m3u8parsedurl, m3u8, isAlt) - if ret == Parser_REDIRECT { - newUrl := info.(string) - log.Tracef("Got redirect to %s!", newUrl) - m3u8CurUrl = newUrl - continue - } else if ret == Parser_OK { - // perfect! - } else { - // oh no - logger.Warnf("Failed to parse m3u8: %s", m3u8) - } - } - return - } - } - - clients := []*http.Client{} - if useMain == 0 { - clients = append(clients, d.AltClients...) - } else if useAlt == 0 { - clients = append(clients, d.Clients...) - clients = append(clients, d.Clients...) - clients = append(clients, d.AltClients...) - } else { - if useAlt > useMain { - clients = append(clients, d.AltClients...) - clients = append(clients, d.Clients...) - } else { - clients = d.allClients - } - } - if len(clients) == 0 { - clients = d.allClients - } - - timeout := time.Millisecond * 1500 - if isAlt { - timeout = time.Millisecond * 2500 - } -breakout: - for i, client := range clients { - go doQuery(client) - select { - case ret := <-retchan: - close(retchan) - retchan = nil - if ret == nil { - //logger.Info("Unrecoverable m3u8 download err, aborting") - return fmt.Errorf("Unrecoverable m3u8 download err, aborting, url: %s", curUrl) - } - if !isAlt { - d.downloadErr.Flush() - } else { - d.altdownloadErr.Flush() - } - break breakout - case <-time.After(timeout): // failed to download within timeout, issue another req - logger.Debugf("Download m3u8 %s timeout with client %d", curUrl, i) - } - } - return nil -} - -// query main m3u8 every 2 seconds -func (d *HLSDownloader) Downloader() { - curDuration := 2.0 - ticker := time.NewTicker(time.Duration(float64(time.Second) * curDuration)) - breakflag := false - for { - go func() { - err := d.m3u8Handler(false, d) - if err != nil { - d.sendErr(err) // we have error, break out now - breakflag = true - return - } - }() - if breakflag { - return - } - if d.FinishSeq > 0 { - d.Stopped = true - } - if d.Stopped { - break - } - <-ticker.C // if the handler takes too long, the next tick will arrive at once - if d.SegLen < curDuration { - ticker.Stop() - curDuration = d.SegLen * 0.8 - if curDuration < 0.8 { - curDuration = 0.8 - } - d.Logger.Infof("Using new hls interval: %f", curDuration) - ticker = time.NewTicker(time.Duration(float64(time.Second) * curDuration)) - } - } - ticker.Stop() -} - -// update the main hls stream's link -func (d *HLSDownloader) Worker() { - ticker := time.NewTicker(time.Minute * 40) - defer ticker.Stop() - for { - if d.forceRefreshChan == nil { - d.forceRefreshChan = make(chan int) - } - if d.Stopped { - <-ticker.C // avoid busy loop - } else { - select { - case _ = <-ticker.C: - - case _ = <-d.forceRefreshChan: - d.Logger.Info("Got forceRefresh signal, refresh at once!") - isClose := false - func() { - defer func() { - panicMsg := recover() - if panicMsg != nil { - isClose = true - } - }() - close(d.forceRefreshChan) - d.forceRefreshChan = nil // avoid multiple refresh - }() - if isClose { - return - } - } - } - retry := 0 - for { - // try at most 20 times - retry += 1 - if retry > 1 { - time.Sleep(30 * time.Second) - if retry > 20 { - d.sendErr(fmt.Errorf("failed to update playlist in 20 attempts")) - return - } - if d.Stopped { - return - } - } - alt := d.AltAsMain - - // check if we have error or need abort - needAbort, err, infoJson := updateInfo(d.Video, "", d.Cookie, alt) - if needAbort { - d.Logger.WithError(err).Warnf("Streamlink requests to abort, worker finishing...") - // if we have entered live - d.sendErr(fmt.Errorf("Streamlink requests to abort: %s", err)) - return - } - if err != nil { - d.Logger.WithError(err).Warnf("Failed to update playlist") - continue - } - m3u8url, headers, err := parseHttpJson(infoJson) - if err != nil { - d.Logger.WithError(err).Warnf("Failed to parse json ret") - continue - } - - // update hls url - d.Logger.Infof("Got new m3u8url: %s", m3u8url) - if m3u8url == "" { - d.Logger.Warnf("Got empty m3u8 url...: %s", infoJson) - continue - } - d.UrlUpdating.Lock() - d.HLSUrl = m3u8url - d.HLSHeader = headers - d.UrlUpdating.Unlock() - break - } - if d.Stopped { - return - } - } -} - -// test stub for writer -func (d *HLSDownloader) WriterStub() { - for { - timer := time.NewTimer(time.Second * time.Duration((50+rand.Intn(20))/10)) - d.output.Write(randData) - <-timer.C - } -} - -// Responsible to write out each segments -func (d *HLSDownloader) Writer() { - // get the seq of first segment, then start the writing - curSeq := <-d.firstSeqChan - for { - // calculate the load time, so that we can check the timeout - loadTime := time.Second * 0 - //d.Logger.Debugf("Loading segment %d", curSeq) - for { - _val, ok := d.SeqMap.Load(curSeq) - if ok { - // the segment has already been retrieved - val := _val.(*HLSSegment) - if curSeq >= 30 { - d.SeqMap.Delete(curSeq - 30) - } - - if val.Data != nil { - // segment has been downloaded - timeoutChan := make(chan int, 1) - go func(timeoutChan chan int, startTime time.Time, segNo int) { - // detect writing timeout - timer := time.NewTimer(15 * time.Second) - select { - case <-timeoutChan: - d.Logger.Debugf("Wrote segment %d in %s", segNo, time.Now().Sub(startTime)) - case <-timer.C: - d.Logger.Warnf("Write segment %d too slow...", curSeq) - timer2 := time.NewTimer(60 * time.Second) - select { - case <-timeoutChan: - d.Logger.Debugf("Wrote segment %d in %s", segNo, time.Now().Sub(startTime)) - case <-timer2.C: - d.Logger.Errorf("Write segment %d timeout!!!!!!!", curSeq) - } - } - }(timeoutChan, time.Now(), curSeq) - _, err := d.output.Write(val.Data.Bytes()) - timeoutChan <- 1 - - //bufPool.Put(val.Data) - val.Data = nil - if err != nil { - d.sendErr(err) - return - } - break - } - // segment still not downloaded, increase the load time - } else { - // segment is not loaded - if d.lastSeqNo > 3 && d.lastSeqNo+2 < curSeq { // seqNo got reset to 0 - // exit ASAP so that alt stream will be preserved - d.sendErr(fmt.Errorf("Failed to load segment %d due to segNo got reset to %d", curSeq, d.lastSeqNo)) - return - } else { - // detect if we are lagged (e.g. we are currently at seg2, still waiting for seg3 to appear, however seg4 5 6 7 has already been downloaded) - isLagged := false - d.SeqMap.Range(func(key, value interface{}) bool { - if key.(int) > curSeq+3 && value.(*HLSSegment).Data != nil { - d.Logger.Warnf("curSeq %d lags behind segData %d!", curSeq, key.(int)) - isLagged = true - return false - } else { - return true - } - }) - if isLagged && loadTime > 15*time.Second { // exit ASAP so that alt stream will be preserved - d.sendErr(fmt.Errorf("Failed to load segment %d within m3u8 timeout due to lag...", curSeq)) - return - } - } - } - time.Sleep(500 * time.Millisecond) - loadTime += 500 * time.Millisecond - // if load time is too long, then likely the recording is interrupted - if loadTime == 1*time.Minute || loadTime == 150*time.Second || loadTime == 240*time.Second { - go d.AltSegDownloader() // trigger alt download in advance, so we can avoid more loss - } - if loadTime > 5*time.Minute { // segNo shouldn't return to 0 within 5 min - d.sendErr(fmt.Errorf("Failed to load segment %d within timeout...", curSeq)) - return - } - if curSeq == d.FinishSeq { // successfully finished - d.sendErr(nil) - return - } - } - curSeq += 1 - } -} - -func (d *HLSDownloader) startDownload() error { - var err error - - d.FinishSeq = -1 - // rate limit, so we won't break up all things - d.segRl = ratelimit.New(1) - d.SegLen = 2.0 - - writer := utils.GetWriter(d.OutPath) - d.output = writer - defer writer.Close() - - d.allClients = make([]*http.Client, 0) - d.allClients = append(d.allClients, d.Clients...) - d.allClients = append(d.allClients, d.AltClients...) - - d.AltSeqMap, _ = lru.New(16) - d.errChan = make(chan error) - d.alterrChan = make(chan error) - d.firstSeqChan = make(chan int) - d.forceRefreshChan = make(chan int) - d.altforceRefreshChan = make(chan int) - d.downloadErr = cache.New(30*time.Second, 5*time.Minute) - d.altdownloadErr = cache.New(30*time.Second, 5*time.Minute) - - d.hasAlt = false - if _, ok := d.Video.UsersConfig.ExtraConfig["AltStreamLinkArgs"]; ok { - d.hasAlt = true - } - - if !d.hasAlt && d.AltAsMain { - return fmt.Errorf("Current live does not have alt source") - } - - if IsStub { - d.hasAlt = false - go d.WriterStub() - } else { - go d.Writer() - } - - go d.Downloader() - go d.Worker() - - if !d.AltAsMain && d.hasAlt { - d.Logger.Infof("Use alt downloader") - - // start the alt downloader 60 seconds later to avoid the burst query of streamlink - time.AfterFunc(60*time.Second, func() { - go func() { - for { - d.AltWorker() - if d.AltStopped { - break - } - } - }() - d.altforceRefreshChan <- 1 - // start the downloader later so that the url is already initialized - time.AfterFunc(30*time.Second, d.AltDownloader) - }) - } else { - d.Logger.Infof("Disabled alt downloader") - } - - startTime := time.Now() - err = <-d.errChan - usedTime := time.Now().Sub(startTime) - if err == nil { - d.Logger.Infof("HLS Download successfully!") - d.AltStopped = true - } else { - d.Logger.Infof("HLS Download failed: %s", err) - if d.hasAlt { - if usedTime > 1*time.Minute { - go d.AltWriter() - } else { - d.AltStopped = true - } - } - } - func() { - defer func() { - recover() - }() - close(d.errChan) - close(d.forceRefreshChan) - }() - d.Stopped = true - d.SeqMap = sync.Map{} - defer func() { - go func() { - time.Sleep(3 * time.Minute) - d.AltStopped = true - }() - }() - return err -} - -// initialize the go hls downloader -func (dd *DownloaderGo) doDownloadHls(entry *log.Entry, output string, video *interfaces.VideoInfo, m3u8url string, headers map[string]string, needMove bool) error { - clients := []*http.Client{ - { - Transport: &http.Transport{ - ResponseHeaderTimeout: 20 * time.Second, - TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper), - //DisableCompression: true, - DisableKeepAlives: false, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - DialContext: http.DefaultTransport.(*http.Transport).DialContext, - DialTLS: http.DefaultTransport.(*http.Transport).DialTLS, - }, - Timeout: 60 * time.Second, - }, - } - - _altproxy, ok := video.UsersConfig.ExtraConfig["AltProxy"] - var altproxy string - var altclients []*http.Client - if ok { - altproxy = _altproxy.(string) - proxyUrl, _ := url.Parse("socks5://" + altproxy) - altclients = []*http.Client{ - { - Transport: &http.Transport{ - TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper), - Proxy: http.ProxyURL(proxyUrl), - //DisableCompression: true, - DisableKeepAlives: false, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, - Timeout: 100 * time.Second, - }, - } - } else { - altclients = []*http.Client{} - } - - d := &HLSDownloader{ - Logger: entry, - AltAsMain: dd.useAlt, - HLSUrl: m3u8url, - HLSHeader: headers, - AltHLSUrl: m3u8url, - AltHLSHeader: headers, - Clients: clients, - AltClients: altclients, - Video: video, - OutPath: output, - Cookie: dd.cookie, - M3U8UrlRewriter: stealth.GetRewriter(), - //output: out, - } - - err := d.startDownload() - time.Sleep(1 * time.Second) - utils.ExecShell("/home/misty/rclone", "rc", "vfs/forget", "dir="+path.Dir(output)) - return err -} diff --git a/live/videoworker/downloader/provgo/downloaderGoHLSAlt.go b/live/videoworker/downloader/provgo/downloaderGoHLSAlt.go deleted file mode 100644 index dc23235..0000000 --- a/live/videoworker/downloader/provgo/downloaderGoHLSAlt.go +++ /dev/null @@ -1,440 +0,0 @@ -package provgo - -import ( - "bytes" - "context" - "fmt" - "github.com/fzxiao233/Vtb_Record/utils" - "golang.org/x/sync/semaphore" - "net/http" - "strings" - "sync" - "time" -) - -func (d *HLSDownloader) handleAltSegment(segData *HLSSegment) (bool, []error) { - d.segRl.Take() - if IsStub { - return true, nil - } - - logger := d.Logger.WithField("alt", true) - downChan := make(chan *bytes.Buffer) - defer func() { - defer func() { - recover() - }() - close(downChan) - }() - // alt seg download is much slower (because we use mainland node), so longer timeout - ALT_TIMEOUT := 35 * time.Second - - errs := []error{} - errMutex := sync.Mutex{} - doDownload := func(client *http.Client) { - s := time.Now() - newbuf, err := utils.HttpGetBuffer(client, segData.Url, d.HLSHeader, nil) - if err != nil { - errMutex.Lock() - errs = append(errs, err) - errMutex.Unlock() - if strings.HasSuffix(err.Error(), "404") { - func() { - defer func() { - recover() - }() - ch := downChan - if ch == nil { - return - } - ch <- nil - }() - } - } else { - usedTime := time.Now().Sub(s) - if usedTime > ALT_TIMEOUT { - logger.Infof("Download %d used %s", segData.SegNo, usedTime) - } - func() { - defer func() { - recover() - }() - ch := downChan - if ch == nil { - return - } - ch <- newbuf - }() - } - } - onlyAlt := false - // gotcha104 is tencent yun, only m3u8 blocked the foreign ip, so after that we simply ignore it - /*if strings.Contains(segData.Url, "gotcha104") { - onlyAlt = true - }*/ - i := 0 - clients := d.Clients - if onlyAlt { - clients = d.AltClients - if len(clients) == 0 { - clients = d.allClients - } - } else { - // TODO: Refactor this - if strings.Contains(segData.Url, "gotcha105") { - clients = make([]*http.Client, 0) - clients = append(clients, d.Clients...) - clients = append(clients, d.Clients...) // double same client - } else if strings.Contains(segData.Url, "gotcha104") { - clients = []*http.Client{} - clients = append(clients, d.Clients...) - clients = append(clients, d.AltClients...) - } else if strings.Contains(segData.Url, "googlevideo.com") { - clients = []*http.Client{} - clients = append(clients, d.Clients...) - } - } - round := 0 -breakout: - for { - i %= len(clients) - go doDownload(clients[i]) - i += 1 - select { - case ret := <-downChan: - close(downChan) - if ret == nil { // unrecoverable error, so return at once - errMutex.Lock() - reterr := make([]error, len(errs)) - copy(reterr, errs) - errMutex.Unlock() - return false, reterr - } - segData.Data = ret - break breakout - case <-time.After(ALT_TIMEOUT): - // wait 10 second for each download try - } - if i == len(clients) { - //logger.Warnf("Failed all-clients to download segment %d", segData.SegNo) - round++ - } - if round == 2 { - logger.WithField("errors", errs).Warnf("Failed to download alt segment %d after 2 round, giving up", segData.SegNo) - errMutex.Lock() - reterr := make([]error, len(errs)) - copy(reterr, errs) - errMutex.Unlock() - return true, reterr // true but not setting segment, so not got removed - } - } - return true, nil -} - -// download alt m3u8 every 3 seconds -func (d *HLSDownloader) AltDownloader() { - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() - for { - err := d.m3u8Handler(true, d) - if err != nil { - if strings.Contains(err.Error(), "aborting") { // for aborting errors, we sleep for a while to avoid too much error - time.Sleep(10 * time.Second) - } else { - d.Logger.WithError(err).Infof("Alt m3u8 download failed") - } - } - if d.AltStopped { - break - } - <-ticker.C - } -} - -// update the alt hls stream's link -func (d *HLSDownloader) AltWorker() { - logger := d.Logger.WithField("alt", true) - ticker := time.NewTicker(time.Minute * 40) - defer ticker.Stop() - - if d.AltHLSUrl == "" { - d.AltUrlUpdating.Lock() - d.AltHLSUrl = d.HLSUrl - d.AltHLSHeader = d.HLSHeader - d.AltUrlUpdating.Unlock() - } - - for { - if d.altforceRefreshChan == nil { - time.Sleep(120 * time.Second) - d.altforceRefreshChan = make(chan int) - } - select { - case _ = <-ticker.C: - - case _ = <-d.altforceRefreshChan: - logger.Info("Got altforceRefresh signal, refresh at once!") - isClose := false - func() { - defer func() { - panicMsg := recover() - if panicMsg != nil { - isClose = true - } - }() - ch := d.altforceRefreshChan - d.altforceRefreshChan = nil // avoid multiple refresh - close(ch) - }() - if isClose { - return - } - } - retry := 0 - for { - retry += 1 - if retry > 1 { - time.Sleep(30 * time.Second) - if retry > 5 { - logger.Warnf("failed to update playlist in 5 attempts, fallback to main hls") - d.AltUrlUpdating.Lock() - d.AltHLSUrl = d.HLSUrl - d.AltHLSHeader = d.HLSHeader - d.AltUrlUpdating.Unlock() - return - } - if d.AltStopped { - return - } - } - needAbort, err, infoJson := updateInfo(d.Video, "", d.Cookie, true) - if needAbort { - logger.WithError(err).Warnf("Alt streamlink requested to abort") - for { - if d.AltStopped { - return - } - time.Sleep(10 * time.Second) - } - } - if err != nil { - logger.Warnf("Failed to update playlist: %s", err) - continue - } - m3u8url, headers, err := parseHttpJson(infoJson) - if err != nil { - logger.WithError(err).Warnf("Failed to parse json, rawData: %s", infoJson) - continue - } - - logger.Infof("Got new m3u8url: %s", m3u8url) - if m3u8url == "" { - logger.Warnf("Got empty m3u8 url...: %s", infoJson) - continue - } - // if we only have qiniu - if strings.Contains(m3u8url, "gotcha103") { - // fuck qiniu, we have to specially handle gotcha103... - logger.Infof("We got qiniu cdn... %s", m3u8url) - // if we have different althlsurl, then we've got other cdn other than qiniu cdn, so we retry! - // todo: still fallback if we failed too much - url1 := d.HLSUrl[strings.Index(d.HLSUrl, "://")+3:] - url2 := d.AltHLSUrl[strings.Index(d.AltHLSUrl, "://")+3:] - urlhost1 := url1[:strings.Index(url1, "/")] - urlhost2 := url2[:strings.Index(url2, "/")] - if urlhost1 == urlhost2 { - m3u8url = d.HLSUrl - headers = d.HLSHeader - } else { - logger.Infof("We got a good alt m3u8 before: %s, not replacing it", d.AltHLSUrl) - m3u8url = "" - time.Sleep(270 * time.Second) // additional sleep time for this reason - continue // use the retry logic - } - } - - if m3u8url != "" { - logger.Infof("Updated AltHLSUrl: %s", m3u8url) - d.AltUrlUpdating.Lock() - d.AltHLSUrl = m3u8url - d.AltHLSHeader = headers - d.AltUrlUpdating.Unlock() - } - break - } - if d.AltStopped { - return - } - } -} - -var AltDownSem = semaphore.NewWeighted(8) - -// Download the segments located in the alt cache -func (d *HLSDownloader) AltSegDownloader() { - AltSemaphore.Acquire(context.Background(), 1) - defer AltSemaphore.Release(1) - for _, _segNo := range d.AltSeqMap.Keys() { - segNo := _segNo.(int) - _segData, ok := d.AltSeqMap.Peek(segNo) - if ok { - segData := _segData.(*HLSSegment) - if segData.Data == nil { - go func(segNo int, segData *HLSSegment) { - if segData.Data == nil { - ret, errs := d.handleAltSegment(segData) - if !ret { - d.AltSeqMap.Remove(segNo) - } - if errs != nil { - _ori_errs, loaded := d.altSegErr.LoadOrStore(fmt.Sprintf("%d|%s", segNo, segData.Url), errs) - ori_errs := _ori_errs.([]error) - if !loaded { - for _, c := range errs { - ori_errs = append(ori_errs, c) - } - } - } - } - }(segNo, segData) - time.Sleep(1 * time.Second) - } - } - } -} - -var AltSemaphore = semaphore.NewWeighted(30) - -// AltWriter writes the alt hls stream's segments into _tail.ts files -func (d *HLSDownloader) AltWriter() { - AltSemaphore.Acquire(context.Background(), 1) - defer AltSemaphore.Release(1) - defer d.AltSeqMap.Purge() - - if d.AltSeqMap.Len() == 0 { - d.AltStopped = true - return - } - writer := utils.GetWriter(utils.AddSuffix(d.OutPath, "tail")) - defer writer.Close() - d.Logger.Infof("Started to write tail video!") - - // download seg 2 times, in 35 seconds totally - d.AltSegDownloader() - time.Sleep(15 * time.Second) - d.AltStopped = true - func() { - defer func() { - recover() - }() - close(d.altforceRefreshChan) - }() - d.AltSegDownloader() - time.Sleep(20 * time.Second) - errMapCpy := map[string][]error{} - d.altSegErr.Range(func(key, value interface{}) bool { - k := key.(string) - v := value.([]error) - errMapCpy[k] = v - return true - }) - d.Logger.Infof("Errors during alt download: %v", errMapCpy) - segs := []int{} - for _, _segNo := range d.AltSeqMap.Keys() { - segNo := _segNo.(int) - _segData, ok := d.AltSeqMap.Peek(segNo) - if ok { - if _segData.(*HLSSegment).Data != nil { - segs = append(segs, segNo) - } - } - } - - // check the tail video parts - min := 10000000000 - max := -1000 - for _, v := range d.AltSeqMap.Keys() { - if v.(int) < min { - min = v.(int) - } - if v.(int) > max { - max = v.(int) - } - } - d.Logger.Infof("Got tail segs: %v, key max: %d, min %d", segs, max, min) - - // sometimes the cdn will reset everything back to 1 and then restart, so after wrote the - // last segments, we try to write the first parts - resetNo := 0 - if min < 25 { - for i := min; i < 25; i++ { - if seg, ok := d.AltSeqMap.Peek(i); ok { - if seg.(*HLSSegment).Data != nil { - resetNo = i + 1 - continue - } - } - break - } - } - - // select the last part of tail video - startNo := min - lastGood := max - for i := startNo; i <= max; i++ { - if seg, ok := d.AltSeqMap.Peek(i); ok { - if seg.(*HLSSegment).Data != nil { - lastGood = startNo - continue - } - } - if i > max-3 { - continue - } - startNo = i - } - if startNo > max { - startNo = lastGood - } - - // write tail videos - d.Logger.Infof("Going to write segment %d to %d", startNo, max) - var i int - for i = startNo + 1; i <= max; i++ { - if _seg, ok := d.AltSeqMap.Peek(i); ok { - seg := _seg.(*HLSSegment) - if seg.Data != nil { - _, err := writer.Write(seg.Data.Bytes()) - //bufPool.Put(seg.Data) - seg.Data = nil - if err != nil { - d.Logger.Warnf("Failed to write to tail video, err: %s", err) - return - } - continue - } - } - break - } - d.Logger.Infof("Finished writing segment %d to %d", startNo+1, i) - - if resetNo != 0 { - for i := min; i < resetNo; i++ { - if _seg, ok := d.AltSeqMap.Peek(i); ok { - seg := _seg.(*HLSSegment) - if seg.Data != nil { - _, err := writer.Write(seg.Data.Bytes()) - //bufPool.Put(seg.Data) - seg.Data = nil - if err != nil { - d.Logger.Warnf("Failed to write to tail video, err: %s", err) - return - } - continue - } - } - break - } - d.Logger.Infof("Finished writing reset segment %d to %d", 1, resetNo-1) - } -} diff --git a/live/videoworker/downloader/provgo/downloaderGoHLS_test.go b/live/videoworker/downloader/provgo/downloaderGoHLS_test.go deleted file mode 100644 index c50da4d..0000000 --- a/live/videoworker/downloader/provgo/downloaderGoHLS_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package provgo - -import ( - "crypto/tls" - "github.com/fzxiao233/Vtb_Record/config" - "github.com/fzxiao233/Vtb_Record/live/interfaces" - "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/stealth" - log "github.com/sirupsen/logrus" - "net/http" - "testing" - "time" -) - -var STUB_URL = "http://127.0.0.1:8000/a.m3u8" - -func TestMain(m *testing.M) { - config.PrepareConfig() - - m.Run() -} - -func CreateDownloader() *HLSDownloader { - clients := []*http.Client{ - { - Transport: &http.Transport{ - ResponseHeaderTimeout: 20 * time.Second, - TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper), - //DisableCompression: true, - DisableKeepAlives: false, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - DialContext: http.DefaultTransport.(*http.Transport).DialContext, - DialTLS: http.DefaultTransport.(*http.Transport).DialTLS, - }, - Timeout: 60 * time.Second, - }, - } - - downloader := &HLSDownloader{ - Logger: log.WithField("test", true), - AltAsMain: false, - HLSUrl: STUB_URL, - HLSHeader: map[string]string{}, - AltHLSUrl: STUB_URL, - AltHLSHeader: map[string]string{}, - Clients: clients, - AltClients: []*http.Client{}, - Video: &interfaces.VideoInfo{}, - OutPath: "stub", - Cookie: "stub", - M3U8UrlRewriter: stealth.GetRewriter(), - } - return downloader -} diff --git a/live/videoworker/downloader/provgo/downloaderGoHttp.go b/live/videoworker/downloader/provgo/downloaderGoHttp.go deleted file mode 100644 index 009176b..0000000 --- a/live/videoworker/downloader/provgo/downloaderGoHttp.go +++ /dev/null @@ -1,93 +0,0 @@ -package provgo - -import ( - "crypto/tls" - "fmt" - "github.com/fzxiao233/Vtb_Record/utils" - log "github.com/sirupsen/logrus" - "io" - "net/http" -) - -func doDownloadHttp(entry *log.Entry, output string, url string, headers map[string]string, needMove bool) error { - // Create the file - /*out, err := os.Create(output) - if err != nil { - return err - } - if !needMove { - defer func () { - go out.Close() - }() - } else { - defer out.Close() - }*/ - out := utils.GetWriter(output) - defer out.Close() - - transport := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - - client := &http.Client{ - Transport: transport, - } - // Get the data - req, _ := http.NewRequest("GET", url, nil) - for k, v := range headers { - req.Header.Set(k, v) - } - - resp, err := client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - // Check server response - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("downloader got bad status: %s", resp.Status) - } - - buf := make([]byte, 1024*1024*3) // 1M buffer - src := resp.Body - dst := out - for { - // Writer the body to file - written := int64(0) - for { - nr, er := src.Read(buf) - if nr > 0 { - nw, ew := dst.Write(buf[0:nr]) - if nw > 0 { - written += int64(nw) - } - if ew != nil { - err = ew - break - } - if nr != nw { - err = io.ErrShortWrite - break - } - } - if er != nil { - err = er - break - } - } - - //written, err := io.CopyBuffer(out, resp.Body, buf) - entry.Infof("Wrote %d, err: %s", written, err) - if err == nil { - return nil - } else if err == io.EOF { - entry.Info("Stream ended") - return nil - } else { - return err - } - } - - return nil -} diff --git a/live/videoworker/downloader/provstreamlink/downloadStreamlink.go b/live/videoworker/downloader/provstreamlink/downloadStreamlink.go index 9e65cdb..2b079a1 100644 --- a/live/videoworker/downloader/provstreamlink/downloadStreamlink.go +++ b/live/videoworker/downloader/provstreamlink/downloadStreamlink.go @@ -1,15 +1,11 @@ package provstreamlink import ( - "bufio" - "bytes" "github.com/fzxiao233/Vtb_Record/config" "github.com/fzxiao233/Vtb_Record/live/interfaces" "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/provbase" "github.com/fzxiao233/Vtb_Record/utils" log "github.com/sirupsen/logrus" - "io" - "os/exec" ) func addStreamlinkProxy(co []string, proxy string) []string { @@ -22,66 +18,14 @@ type DownloaderStreamlink struct { } func (d *DownloaderStreamlink) StartDownload(video *interfaces.VideoInfo, proxy string, cookie string, filepath string) error { - _arg, ok := video.UsersConfig.ExtraConfig["StreamLinkArgs"] arg := []string{} - if ok { - for _, a := range _arg.([]interface{}) { - arg = append(arg, a.(string)) - } - } - //arg = append(arg, []string{"--force", "-o", filepath}...) + arg = append(arg, []string{"--force", "-o", filepath}...) if proxy != "" { arg = addStreamlinkProxy(arg, proxy) } arg = append(arg, video.Target, config.Config.DownloadQuality) logger := log.WithField("video", video) logger.Infof("start to download %s, command %s", filepath, arg) - //utils.ExecShellEx(logger, true, "streamlink", arg...) - downloader := &StreamlinkDownload{ - Logger: logger, - Video: video, - Filepath: filepath, - StreamlinkCommand: arg, - } - return downloader.doDownload() -} - -type StreamlinkDownload struct { - Logger *log.Entry - Video *interfaces.VideoInfo - Filepath string - StreamlinkCommand []string -} - -func (d *StreamlinkDownload) doDownload() error { - out := utils.GetWriter(d.Filepath) - defer out.Close() - d.StreamlinkCommand = append(d.StreamlinkCommand, []string{"--force", "--stdout"}...) - var stderrBuf bytes.Buffer - co := exec.Command("streamlink_", d.StreamlinkCommand...) - stdoutIn, _ := co.StdoutPipe() - stderrIn, _ := co.StderrPipe() - stderr := &stderrBuf - - _ = co.Start() - go func() { - //_, errStderr = io.Copy(stderr, stderrIn) - in := bufio.NewScanner(stderrIn) - for in.Scan() { - stderr.Write(in.Bytes()) - d.Logger.Info(in.Text()) // write each line to your log, or anything you need - } - }() - errChan := make(chan error) - go func() { - _, errStdout := io.Copy(out, stdoutIn) - if errStdout != nil { - d.Logger.WithError(errStdout).Warn("Error during writing streamlink video") - } - errChan <- errStdout - }() - - _ = co.Wait() - err := <-errChan - return err + utils.ExecShell("streamlink", arg...) + return nil } diff --git a/live/videoworker/downloader/wrapper.go b/live/videoworker/downloader/wrapper.go index 0f58dfd..0869d71 100644 --- a/live/videoworker/downloader/wrapper.go +++ b/live/videoworker/downloader/wrapper.go @@ -2,7 +2,6 @@ package downloader import ( "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/provbase" - "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/provgo" "github.com/fzxiao233/Vtb_Record/live/videoworker/downloader/provstreamlink" log "github.com/sirupsen/logrus" ) @@ -12,8 +11,6 @@ type Downloader = provbase.Downloader func GetDownloader(providerName string) *Downloader { if providerName == "" || providerName == "streamlink" { return &Downloader{&provstreamlink.DownloaderStreamlink{}} - } else if providerName == "go" { - return &Downloader{&provgo.DownloaderGo{}} } else { log.Fatalf("Unknown download provider %s", providerName) return nil diff --git a/live/videoworker/videoProcesser.go b/live/videoworker/videoProcesser.go index be4edab..8946ced 100644 --- a/live/videoworker/videoProcesser.go +++ b/live/videoworker/videoProcesser.go @@ -177,9 +177,9 @@ func (p *ProcessVideo) startDownloadVideo() { logger.Info("Do postprocessing...") videoName := p.postProcessing() if videoName != "" { - //video := p.LiveStatus.Video - //video.FileName = videoName - //video.FilePath = video.UsersConfig.DownloadDir + "/" + video.FileName + video := p.LiveStatus.Video + video.FileName = videoName + video.FilePath = video.UsersConfig.DownloadDir + "/" + video.FileName } p.finish <- 1 } @@ -256,48 +256,16 @@ func (p *ProcessVideo) getFullTitle() string { } func (p *ProcessVideo) postProcessing() string { - logger := log.WithField("video", p.LiveStatus.Video) pathSlice := []string{config.Config.UploadDir, p.LiveStatus.Video.UsersConfig.Name} // , p.getFullTitle() dirpath := strings.Join(pathSlice, "/") _, err := utils.MakeDir(filepath.Dir(dirpath)) if err != nil { return "" } - if config.Config.EnableTS2MP4 { return p.convertToMp4(dirpath) - } else { - dirpath += "/" - dirpath += p.getFullTitle() - //err := os.Rename(p.LiveStatus.Video.UsersConfig.DownloadDir, dirpath) - - doMove := func(src string, dst string, quiet bool) string { - err = utils.MoveFiles(src, dst) - if err != nil { - if !quiet { - logger.WithError(err).Warnf("Failed to rename from [%s] to [%s]!", src, dst) - } else { - logger.WithError(err).Infof("Post renaming from [%s] to [%s] failed!", src, dst) - } - } else { - logger.Infof("Renamed %s to %s", src, dst) - } - if err != nil { - return "" - } - return dirpath - } - - srcdir := p.LiveStatus.Video.UsersConfig.DownloadDir - dstdir := dirpath - time.AfterFunc(time.Second*60, func() { - for i := 0; i < 5; i++ { - doMove(srcdir, dstdir, true) - time.Sleep(time.Second * 60) - } - }) - return doMove(srcdir, dstdir, false) } + return dirpath } func (p *ProcessVideo) convertToMp4(dirpath string) string { @@ -326,7 +294,7 @@ func (p *ProcessVideo) mergeVideo(outpath string) string { co += aPath + "|" } logger := log.WithField("video", p.LiveStatus.Video) - utils.ExecShellEx(logger, true, "ffmpeg", "-i", co, "-c", "copy", "-f", "mp4", outpath) + utils.ExecShell("ffmpeg", "-i", co, "-c", "copy", "-f", "mp4", outpath) if !utils.IsFileExist(outpath) { logger.Warnf("%s the video file don't exist", outpath) return "" @@ -339,7 +307,7 @@ func (p *ProcessVideo) mergeVideo(outpath string) string { func (p *ProcessVideo) ts2mp4(tsPath string, outpath string) string { logger := log.WithField("video", p.LiveStatus.Video) - utils.ExecShellEx(logger, true, "ffmpeg", "-i", tsPath, "-c", "copy", "-f", "mp4", outpath) + utils.ExecShell("ffmpeg", "-i", tsPath, "-c", "copy", "-f", "mp4", outpath) if !utils.IsFileExist(outpath) { logger.Warnf("%s the video file don't exist", outpath) return "" diff --git a/main.go b/main.go index 63498bb..b92ff2c 100644 --- a/main.go +++ b/main.go @@ -1,8 +1,6 @@ package main import ( - "context" - "crypto/tls" "fmt" "github.com/fzxiao233/Vtb_Record/config" "github.com/fzxiao233/Vtb_Record/live" @@ -10,19 +8,12 @@ import ( "github.com/fzxiao233/Vtb_Record/live/plugins" "github.com/fzxiao233/Vtb_Record/live/videoworker" "github.com/fzxiao233/Vtb_Record/utils" - "github.com/rclone/rclone/fs" - rconfig "github.com/rclone/rclone/fs/config" - "github.com/rclone/rclone/fs/operations" log "github.com/sirupsen/logrus" - "io" "math/rand" - "net" "net/http" _ "net/http/pprof" "os" "os/signal" - "strconv" - "strings" "sync" "syscall" "time" @@ -42,9 +33,6 @@ func arrangeTask() { status := make([]map[string]bool, len(config.Config.Module)) for i, module := range config.Config.Module { status[i] = make(map[string]bool, len(module.Users)) - /*for j, _ := range status[i] { - status[i][j] = false - }*/ } go func() { @@ -52,16 +40,8 @@ func arrangeTask() { for { if config.ConfigChanged { allDone := true - /*for mod_i, _ := range status { - for _, ch := range status[mod_i] { - if ch != false { - allDone = false - } - } - }*/ if allDone { time.Sleep(4 * time.Second) // wait to ensure the config is fully written - rconfig.LoadConfig() ret, err := config.ReloadConfig() if ret { if err == nil { @@ -76,10 +56,6 @@ func arrangeTask() { } }() - var uploadDir = config.Config.UploadDir - if uploadDir != "" { - utils.MakeDir(uploadDir) - } for _, dir := range config.Config.DownloadDir { utils.MakeDir(dir) } @@ -121,7 +97,6 @@ func arrangeTask() { } time.Sleep(time.Duration(config.Config.NormalCheckSec) * time.Second) - // wait all live to finish before exit :) if SafeStop { break } @@ -154,14 +129,7 @@ func handleInterrupt() { go func() { <-c log.Warnf("Ctrl+C pressed in Terminal!") - operations.RcatFiles.Range(func(key, value interface{}) bool { - fn := key.(string) - log.Infof("Closing opened file: %s", fn) - in := value.(io.ReadCloser) - in.Close() - return true - }) - time.Sleep(20 * time.Second) // wait rclone upload finish.. + time.Sleep(5 * time.Second) // wait rclone upload finish.. os.Exit(0) }() } @@ -180,149 +148,10 @@ func handleUpdate() { func main() { handleInterrupt() handleUpdate() - fs.Config.StreamingUploadCutoff = fs.SizeSuffix(0) - fs.Config.IgnoreChecksum = true - fs.Config.NoGzip = true rand.Seed(time.Now().UnixNano()) - fs.Config.UserAgent = "google-api-go-client/0.5" - - http.DefaultTransport = &http.Transport{ - DisableKeepAlives: true, // disable keep alive to avoid connection reset - DisableCompression: true, - IdleConnTimeout: time.Second * 20, - ForceAttemptHTTP2: false, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - DialContext: func(ctx context.Context, network, addr string) (conn net.Conn, err error) { - /*addrparts := strings.SplitN(addr, ":", 2) - if domains, ok := config.Config.DomainRewrite[addrparts[0]]; ok { - addr = utils.RandChooseStr(domains) + ":" + addrparts[1] - }*/ - _addr := addr - if domains, ok := config.Config.DomainRewrite[addr]; ok { - addr = utils.RandChooseStr(domains) - log.Debugf("Overrided %s to %s", _addr, addr) - } - /*if addr == "www.googleapis.com:443" { - //addr = "216.58.198.206:443" - addrs := []string{"private.googleapis.com:443", "www.googleapis.com:443"} - addr = addrs[rand.Intn(len(addrs))] - }*/ - needLB := true // do we need to load balance? we do it in a opt-out fashion - if _, err := strconv.Atoi(addr[0:1]); err == nil { - // is it an IP Address? - needLB = false - } - if config.Config.OutboundAddrs != nil && len(config.Config.OutboundAddrs) > 0 { - var outIp string - if addr == "STICKY_IP" { - outIp = config.Config.OutboundAddrs[0] - addr = _addr // revert to original ip - } else if needLB { - outIp = utils.RandChooseStr(config.Config.OutboundAddrs) - } else { - outIp = "" - } - if outIp != "" { - return (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - LocalAddr: &net.TCPAddr{ - IP: net.ParseIP(outIp), - Port: 0, - }, - }).DialContext(ctx, network, addr) - } - } - return net.Dial(network, addr) - }, - } - - if false { - dialer := &net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 30 * time.Second, - } - addrReplace := func(addr string) string { - if addr == "www.googleapis.com:443" { - //addr = "216.58.198.206:443" - addrs := []string{"private.googleapis.com:443", "10.224.1.3:19999", "10.224.1.3:19999"} - //addrs := []string{"10.224.1.3:19999"} - addr = addrs[rand.Intn(len(addrs))] - } - return addr - } - dialTls := - func(network, addr string) (conn net.Conn, err error) { - addr = addrReplace(addr) - if !strings.HasSuffix(addr, ":443") { - return dialer.Dial(network, addr) - } - c, err := tls.Dial(network, addr, &tls.Config{InsecureSkipVerify: true}) - if err != nil { - //log.Println("DialTls Err:", err) - return nil, err - } - //log.Println("doing handshake") - err = c.Handshake() - if err != nil { - return c, err - } - //log.Println(c.RemoteAddr()) - return c, c.Handshake() - } - //dialTls := nil - http.DefaultTransport = &http.Transport{ - DisableKeepAlives: true, // disable keep alive to avoid connection reset - DisableCompression: true, - IdleConnTimeout: time.Second * 10, - ForceAttemptHTTP2: false, - DialTLS: dialTls, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - //DialContext: func(ctx context.Context, network, addr string) (conn net.Conn, err error) { - /*ipaddr := "10.168.1." + strconv.Itoa(100 + rand.Intn(20)) - netaddr, _ := net.ResolveIPAddr("ip", ipaddr) - return (&net.Dialer{ - LocalAddr: &net.TCPAddr{ - IP: netaddr.IP, - }, - Timeout: 8 * time.Second, - }).DialContext(ctx, network, addr)*/ - /*if addr == "www.googleapis.com:443" { - //addr = "216.58.198.206:443" - addrs := []string{"private.googleapis.com", "www.googleapis.com:443"} - rand.Intn(len(addrs)) - addr = "216.58.198.206:443" - } - return dialer.DialContext(ctx, network, addr)*/ - //}, - //ForceAttemptHTTP2: true, - } - } - /*http.DefaultTransport = &http3.RoundTripper{ - QuicConfig: &quic.Config{ - MaxIdleTimeout: time.Second * 20, - MaxIncomingStreams: 0, - MaxIncomingUniStreams: 0, - StatelessResetKey: nil, - KeepAlive: false, - }, - }*/ http.DefaultClient.Transport = http.DefaultTransport - fs.Config.Transfers = 20 - fs.Config.ConnectTimeout = time.Second * 2 - fs.Config.Timeout = time.Second * 4 - fs.Config.TPSLimit = 0 - fs.Config.LowLevelRetries = 120 - //fs.Config.NoGzip = false - - // moved to config package - //confPath := flag.String("config", "config.json", "config.json location") - //flag.Parse() - //viper.SetConfigFile(*confPath) - //config.InitConfig() config.PrepareConfig() - config.InitLog() go config.InitProfiling() arrangeTask() diff --git a/utils/file.go b/utils/file.go index 2b3c93e..4993e13 100644 --- a/utils/file.go +++ b/utils/file.go @@ -4,37 +4,10 @@ import ( "context" _ "github.com/rclone/rclone/backend/all" "github.com/rclone/rclone/cmd" - "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/sync" - log "github.com/sirupsen/logrus" - "io" - "time" ) -type FileType int - -const ( - FILE_NORMAL FileType = 0 - FILE_RCLONE FileType = 1 -) - -type FsType int - -const ( - FS_OS FsType = 0 - FS_RCLONE FsType = 1 -) - -type FileSystem struct { - FsType FsType - FsData interface{} -} - -type FsRcloneData struct { - rcloneFs *fs.Fs -} - func MkdirAll(path string) error { fdst := cmd.NewFsDir([]string{path}) err := operations.Mkdir(context.Background(), fdst, "") @@ -48,15 +21,3 @@ func MoveFiles(src string, dst string) error { } return operations.MoveFile(context.Background(), fdst, fsrc, srcFileName, srcFileName) } - -func GetWriter(dst string) io.WriteCloser { - reader, writer := io.Pipe() - fdst, dstFileName := cmd.NewFsDstFile([]string{dst}) - go func() { - _, err := operations.Rcat(context.Background(), fdst, dstFileName, reader, time.Now()) - if err != nil { - log.Warnf("Rcat [%s] Error! err: %s", dst, err) - } - }() - return writer -} diff --git a/utils/shellSupport.go b/utils/shellSupport.go index 2a87161..430680e 100644 --- a/utils/shellSupport.go +++ b/utils/shellSupport.go @@ -1,64 +1,36 @@ package utils import ( - "bufio" "bytes" - log "github.com/sirupsen/logrus" "io" + "log" + "os" "os/exec" ) -func ExecShell(name string, arg ...string) (string, string) { - return ExecShellEx(log.NewEntry(log.StandardLogger()), true, name, arg...) -} - -func ExecShellEx(entry *log.Entry, redirect bool, name string, arg ...string) (string, string) { +func ExecShell(name string, arg ...string) string { var stdoutBuf, stderrBuf bytes.Buffer co := exec.Command(name, arg...) stdoutIn, _ := co.StdoutPipe() stderrIn, _ := co.StderrPipe() - stdout := &stdoutBuf - stderr := &stderrBuf - - //stdout := io.MultiWriter(os.Stdout, &stdoutBuf) - //stderr := io.MultiWriter(os.Stderr, &stderrBuf) - + var errStdout, errStderr error + stdout := io.MultiWriter(os.Stdout, &stdoutBuf) + stderr := io.MultiWriter(os.Stderr, &stderrBuf) _ = co.Start() - if redirect { - go func() { - //_, errStdout = io.Copy(stdout, stdoutIn) - in := bufio.NewScanner(stdoutIn) - for in.Scan() { - stdout.Write(in.Bytes()) - entry.Info(in.Text()) // write each line to your log, or anything you need - } - }() - go func() { - //_, errStderr = io.Copy(stderr, stderrIn) - in := bufio.NewScanner(stderrIn) - for in.Scan() { - stderr.Write(in.Bytes()) - entry.Info(in.Text()) // write each line to your log, or anything you need - } - }() - } else { - var errStdout, errStderr error - go func() { - _, errStdout = io.Copy(stdout, stdoutIn) - }() - go func() { - _, errStderr = io.Copy(stderr, stderrIn) - }() - if errStderr != nil { - entry.Warnf("%v", errStderr) - } - if errStdout != nil { - entry.Warnf("%v", errStdout) - } + go func() { + _, errStdout = io.Copy(stdout, stdoutIn) + }() + go func() { + _, errStderr = io.Copy(stderr, stderrIn) + }() + if errStderr != nil { + log.Printf("%v", errStderr) + } + if errStdout != nil { + log.Printf("%v", errStdout) } - _ = co.Wait() outStr, errStr := string(stdoutBuf.Bytes()), string(stderrBuf.Bytes()) //println(outStr + errStr) - return outStr, errStr + return outStr + errStr }