Skip to content

Commit

Permalink
new api File.Complete() to force immediate completion of download, mo…
Browse files Browse the repository at this point in the history
…ve logging to download manager
  • Loading branch information
MagicalTux committed Oct 25, 2020
1 parent 8fca53e commit 1155eb3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 20 deletions.
17 changes: 8 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package smartremote
import (
"fmt"
"io"
"log"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -119,7 +118,7 @@ func (dl *dlClient) ReadAt(p []byte, off int64) (int, error) {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off))
}

log.Printf("initializing HTTP connection download at byte %d~", off)
dl.dlm.logf("initializing HTTP connection download at byte %d~", off)

// should respond with code 206 Partial Content
resp, err := dl.dlm.Client.Do(req)
Expand Down Expand Up @@ -176,7 +175,7 @@ func (dl *dlClient) idleTaskRun() {
buf := make([]byte, cnt)
n, err := io.ReadFull(dl.reader.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
log.Printf("idle read failed: %s", err)
dl.dlm.logf("idle read failed: %s", err)
dl.reader.Body.Close()
dl.reader = nil
}
Expand All @@ -185,7 +184,7 @@ func (dl *dlClient) idleTaskRun() {
// feed it
err = dl.handler.ingestData(buf[:n], rPos)
if err != nil {
log.Printf("idle write failed: %s", err)
dl.dlm.logf("idle write failed: %s", err)
}
return
}
Expand All @@ -211,18 +210,18 @@ func (dl *dlClient) idleTaskRun() {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off))
}

log.Printf("idle: initializing HTTP connection download at byte %d~", off)
dl.dlm.logf("idle: initializing HTTP connection download at byte %d~", off)

// should respond with code 206 Partial Content
resp, err := dl.dlm.Client.Do(req)
if err != nil {
log.Printf("idle download failed: %s", err)
dl.dlm.logf("idle download failed: %s", err)
return
}
if resp.StatusCode > 299 {
// that's bad
resp.Body.Close()
log.Printf("idle download failed due to status %s", resp.Status)
dl.dlm.logf("idle download failed due to status %s", resp.Status)
return
}
dl.reader = resp
Expand All @@ -237,7 +236,7 @@ func (dl *dlClient) idleTaskRun() {
buf := make([]byte, cnt)
n, err := io.ReadFull(dl.reader.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
log.Printf("idle read failed: %s", err)
dl.dlm.logf("idle read failed: %s", err)
dl.reader.Body.Close()
dl.reader = nil
}
Expand All @@ -246,7 +245,7 @@ func (dl *dlClient) idleTaskRun() {
// feed it (use separate thread to avoid deadlock)
err = dl.handler.ingestData(buf[:n], off)
if err != nil {
log.Printf("idle write failed: %s", err)
dl.dlm.logf("idle write failed: %s", err)
}
return
}
11 changes: 5 additions & 6 deletions download.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"log"
)

func (f *File) downloadFull() error {
Expand Down Expand Up @@ -60,15 +59,15 @@ func (f *File) needBlocks(start, end uint32) error {

// trim start/end based on known downloaded blocks
for {
if f.status.Contains(start) && (start < end) {
if (start < end) && f.status.Contains(start) {
start += 1
} else {
break
}
}

for {
if f.status.Contains(end) && (end > start) {
if (end > start) && f.status.Contains(end) {
end -= 1
} else {
break
Expand All @@ -92,10 +91,10 @@ func (f *File) needBlocks(start, end uint32) error {
n = f.size - posByte
}

//log.Printf("downloading block %d (%d bytes)", start, n)
//f.dlm.logf("downloading block %d (%d bytes)", start, n)
_, err := f.dlm.readUrl(f.url, buf[:n], posByte, f)
if err != nil {
log.Printf("download error: %s", err)
f.dlm.logf("download error: %s", err)
if f.status.IsEmpty() && posByte != 0 {
// this is typically linked to backend refusing to let us do partial download
return f.downloadFull()
Expand All @@ -109,7 +108,7 @@ func (f *File) needBlocks(start, end uint32) error {
_, err = f.local.WriteAt(buf[:n], posByte)
if err != nil {
// failed to write (disk full?)
log.Printf("write error: %s", err)
f.dlm.logf("write error: %s", err)
return err
}

Expand Down
3 changes: 1 addition & 2 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/base64"
"io"
"log"
"os"
"path/filepath"

Expand Down Expand Up @@ -80,7 +79,7 @@ func (dlm *DownloadManager) OpenTo(u, localPath string) (*File, error) {
}
err = f.readPart()
if err != nil {
log.Printf("failed to resume download: %s", err)
dlm.logf("failed to resume download: %s", err)
// truncate
fp.Truncate(0)
}
Expand Down
5 changes: 2 additions & 3 deletions idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package smartremote

import (
"errors"
"log"
)

func (f *File) wantsFollowing(offset int64) int {
Expand Down Expand Up @@ -41,7 +40,7 @@ func (f *File) isComplete() bool {
return false
}

log.Printf("idle: file is now complete, marking as such")
f.dlm.logf("file is now complete, marking as such")

f.complete = true
f.savePart()
Expand All @@ -56,7 +55,7 @@ func (f *File) firstMissing() int64 {

err := f.getSize()
if err != nil {
log.Printf("failed to get file size: %s", err)
f.dlm.logf("failed to get file size: %s", err)
return -1 // can't be helped
}

Expand Down
10 changes: 10 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package smartremote

import (
"io"
"log"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -32,6 +33,8 @@ type DownloadManager struct {

openFiles map[[32]byte]*File
openFilesLk sync.RWMutex

*log.Logger
}

type dlReaderAt struct {
Expand All @@ -50,6 +53,7 @@ func NewDownloadManager() *DownloadManager {
clients: make(map[string]*dlClient),
openFiles: make(map[[32]byte]*File),
idleTrigger: make(chan struct{}),
Logger: log.New(os.Stderr, "", log.LstdFlags),
}
dl.cd = sync.NewCond(&dl.mapLock)

Expand Down Expand Up @@ -176,3 +180,9 @@ func (dl *DownloadManager) internalReap() {
}
}
}

func (dl *DownloadManager) logf(format string, args ...interface{}) {
if dl.Logger != nil {
dl.Logger.Printf(format, args...)
}
}
34 changes: 34 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,37 @@ func (f *File) readAt(p []byte, off int64) (int, error) {
// let the OS handle the rest
return f.local.ReadAt(p, off)
}

// Complete will download the whole file locally, returning errors in case of
// failure.
func (f *File) Complete() error {
// read data
f.lk.Lock()
defer f.lk.Unlock()

if f.complete {
// file is complete, no need to do anything
return nil
}

// get size
err := f.getSize()
if err != nil {
return err
}

blkCount := uint32(f.getBlockCount())

// find out first missing blocks
for i := uint32(0); i < blkCount; i++ {
if !f.status.Contains(uint32(i)) {
err = f.needBlocks(i, i)
if err != nil {
return err
}
}
}
f.isComplete()

return nil
}

0 comments on commit 1155eb3

Please sign in to comment.