From d42772f180d44f08640a1576154dca43918a2aaf Mon Sep 17 00:00:00 2001 From: Mark Karpeles Date: Mon, 13 Jan 2020 12:12:20 +0900 Subject: [PATCH] manager: continue downloading when idle --- manager.go | 130 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 118 insertions(+), 12 deletions(-) diff --git a/manager.go b/manager.go index 601b6f0..5de4cd2 100644 --- a/manager.go +++ b/manager.go @@ -42,6 +42,7 @@ type DownloadManager struct { clients map[string]*dlClient mapLock sync.Mutex cd *sync.Cond + idleCnt uintptr openFiles map[[32]byte]*File openFilesLk sync.RWMutex @@ -133,15 +134,22 @@ func (dl *DownloadManager) getClient(u string, handler DownloadTarget) *dlClient func (dlm *DownloadManager) managerTask() { for { - time.Sleep(10 * time.Second) - - dlm.intervalReap() + if dlm.intervalProcess() { + time.Sleep(10 * time.Second) + } else { + time.Sleep(1 * time.Second) + } } } -func (dlm *DownloadManager) intervalReap() { +func (dlm *DownloadManager) intervalProcess() bool { dlm.mapLock.Lock() defer dlm.mapLock.Unlock() + + if len(dlm.clients) == 0 { + return true // idle + } + change := false now := time.Now() @@ -152,31 +160,129 @@ func (dlm *DownloadManager) intervalReap() { if cl.expire.Before(now) { delete(dlm.clients, u) - cl.Close() + go cl.Close() // let close run in thread so we don't get locked change = true } + + if cl.handler == nil { + continue + } + + if atomic.LoadUintptr(&dlm.idleCnt) == 0 { + atomic.AddUintptr(&dlm.idleCnt, 1) + atomic.AddUintptr(&cl.taskCnt, 1) + go cl.idleTaskRun() + } } if change { dlm.cd.Broadcast() } + return false +} + +func (dl *dlClient) idleTaskRun() { + // this is run in a separate process + defer atomic.AddUintptr(&dl.taskCnt, ^uintptr(0)) + defer atomic.AddUintptr(&dl.dlm.idleCnt, ^uintptr(0)) + + // increase timer now to avoid deletion + dl.expire = time.Now().Add(time.Minute) + + dl.lk.Lock() + defer dl.lk.Unlock() + + if dl.reader != nil { + cnt := dl.handler.WantsFollowing(dl.rPos) + if cnt > 0 { + rPos := dl.rPos + // let's just read this from existing reader + buf := make([]byte, cnt) + n, err := io.ReadFull(dl.reader.Body, buf) + if err != nil && err != io.ErrUnexpectedEOF { + log.Printf("idle read failed: %s", err) + dl.reader.Body.Close() + dl.reader = nil + } + dl.rPos += int64(n) + + // feed it + err = dl.handler.IngestData(buf[:n], rPos) + if err != nil { + log.Printf("idle write failed: %s", err) + } + return + } + + dl.reader.Body.Close() + dl.reader = nil + } + + // let's just ask where to start + off := dl.handler.FirstMissing() + if off < 0 { + // do not download + return + } + + // spawn a new reader + req, err := http.NewRequest("GET", dl.url, nil) + + if off != 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off)) + } + + log.Printf("idle: initializing HTTP connection download at byte %d~", off) + + // should respond with code 206 Partial Content + resp, err := dl.dlm.Client.Do(req) + if err != nil { + log.Printf("idle download failed: %s", err) + return + } + if resp.StatusCode > 299 { + // that's bad + resp.Body.Close() + log.Printf("idle download failed due to status %s", resp.Status) + return + } + dl.reader = resp + dl.rPos = off + + cnt := dl.handler.WantsFollowing(off) + if cnt <= 0 { + // why? + return + } + + buf := make([]byte, cnt) + n, err := io.ReadFull(dl.reader.Body, buf) + if err != nil && err != io.ErrUnexpectedEOF { + log.Printf("idle read failed: %s", err) + dl.reader.Body.Close() + dl.reader = nil + } + dl.rPos += int64(n) + + // feed it + err = dl.handler.IngestData(buf[:n], off) + if err != nil { + log.Printf("idle write failed: %s", err) + } + return } func (dl *DownloadManager) internalReap() { - upd := false - // reap (lock already acquired by caller) + // attempt to reap at least one idle client + // (lock already acquired by caller) for u, cl := range dl.clients { if atomic.LoadUintptr(&cl.taskCnt) == 0 { // can reap this delete(dl.clients, u) cl.Close() - upd = true + break } } - - if upd { - dl.cd.Broadcast() - } } func (dl *dlClient) Close() error {