Skip to content

Commit

Permalink
manager: continue downloading when idle
Browse files Browse the repository at this point in the history
  • Loading branch information
MagicalTux committed Jan 13, 2020
1 parent c9c1cc0 commit d42772f
Showing 1 changed file with 118 additions and 12 deletions.
130 changes: 118 additions & 12 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type DownloadManager struct {
clients map[string]*dlClient
mapLock sync.Mutex
cd *sync.Cond
idleCnt uintptr

openFiles map[[32]byte]*File
openFilesLk sync.RWMutex
Expand Down Expand Up @@ -133,15 +134,22 @@ func (dl *DownloadManager) getClient(u string, handler DownloadTarget) *dlClient

func (dlm *DownloadManager) managerTask() {
for {
time.Sleep(10 * time.Second)

dlm.intervalReap()
if dlm.intervalProcess() {
time.Sleep(10 * time.Second)
} else {
time.Sleep(1 * time.Second)
}
}
}

func (dlm *DownloadManager) intervalReap() {
func (dlm *DownloadManager) intervalProcess() bool {
dlm.mapLock.Lock()
defer dlm.mapLock.Unlock()

if len(dlm.clients) == 0 {
return true // idle
}

change := false
now := time.Now()

Expand All @@ -152,31 +160,129 @@ func (dlm *DownloadManager) intervalReap() {

if cl.expire.Before(now) {
delete(dlm.clients, u)
cl.Close()
go cl.Close() // let close run in thread so we don't get locked
change = true
}

if cl.handler == nil {
continue
}

if atomic.LoadUintptr(&dlm.idleCnt) == 0 {
atomic.AddUintptr(&dlm.idleCnt, 1)
atomic.AddUintptr(&cl.taskCnt, 1)
go cl.idleTaskRun()
}
}

if change {
dlm.cd.Broadcast()
}
return false
}

func (dl *dlClient) idleTaskRun() {
// this is run in a separate process
defer atomic.AddUintptr(&dl.taskCnt, ^uintptr(0))
defer atomic.AddUintptr(&dl.dlm.idleCnt, ^uintptr(0))

// increase timer now to avoid deletion
dl.expire = time.Now().Add(time.Minute)

dl.lk.Lock()
defer dl.lk.Unlock()

if dl.reader != nil {
cnt := dl.handler.WantsFollowing(dl.rPos)
if cnt > 0 {
rPos := dl.rPos
// let's just read this from existing reader
buf := make([]byte, cnt)
n, err := io.ReadFull(dl.reader.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
log.Printf("idle read failed: %s", err)
dl.reader.Body.Close()
dl.reader = nil
}
dl.rPos += int64(n)

// feed it
err = dl.handler.IngestData(buf[:n], rPos)
if err != nil {
log.Printf("idle write failed: %s", err)
}
return
}

dl.reader.Body.Close()
dl.reader = nil
}

// let's just ask where to start
off := dl.handler.FirstMissing()
if off < 0 {
// do not download
return
}

// spawn a new reader
req, err := http.NewRequest("GET", dl.url, nil)

if off != 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off))
}

log.Printf("idle: initializing HTTP connection download at byte %d~", off)

// should respond with code 206 Partial Content
resp, err := dl.dlm.Client.Do(req)
if err != nil {
log.Printf("idle download failed: %s", err)
return
}
if resp.StatusCode > 299 {
// that's bad
resp.Body.Close()
log.Printf("idle download failed due to status %s", resp.Status)
return
}
dl.reader = resp
dl.rPos = off

cnt := dl.handler.WantsFollowing(off)
if cnt <= 0 {
// why?
return
}

buf := make([]byte, cnt)
n, err := io.ReadFull(dl.reader.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
log.Printf("idle read failed: %s", err)
dl.reader.Body.Close()
dl.reader = nil
}
dl.rPos += int64(n)

// feed it
err = dl.handler.IngestData(buf[:n], off)
if err != nil {
log.Printf("idle write failed: %s", err)
}
return
}

func (dl *DownloadManager) internalReap() {
upd := false
// reap (lock already acquired by caller)
// attempt to reap at least one idle client
// (lock already acquired by caller)
for u, cl := range dl.clients {
if atomic.LoadUintptr(&cl.taskCnt) == 0 {
// can reap this
delete(dl.clients, u)
cl.Close()
upd = true
break
}
}

if upd {
dl.cd.Broadcast()
}
}

func (dl *dlClient) Close() error {
Expand Down

0 comments on commit d42772f

Please sign in to comment.