Skip to content

Commit

Permalink
feed dropped data so it is not lost
Browse files Browse the repository at this point in the history
  • Loading branch information
MagicalTux committed Jan 13, 2020
1 parent 0c9a09d commit c9c1cc0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
4 changes: 4 additions & 0 deletions idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ func (f *File) feed(b []byte, offset int64) error {

return nil
}

func (f *File) getBlockSize() int64 {
return f.blkSize
}
55 changes: 53 additions & 2 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ type DownloadTarget interface {
IngestData(b []byte, offset int64) error // stores data received
}

type downloadFeed interface {
// feed intake can only be called during downloads for locking reasons
// it does the same as IngestData() but without the lock, which is already acquired during download
feed(b []byte, offset int64) error
getBlockSize() int64
}

type DownloadManager struct {
// MaxConcurrent is the maximum number of concurrent downloads.
// changing it might not be effective immediately. Default is 10
Expand Down Expand Up @@ -64,7 +71,7 @@ func NewDownloadManager() *DownloadManager {
MaxConcurrent: 10,
Client: http.DefaultClient,
TmpDir: os.TempDir(),
MaxDataJump: 131072,
MaxDataJump: 512 * 1024, // 512kB
clients: make(map[string]*dlClient),
openFiles: make(map[[32]byte]*File),
}
Expand Down Expand Up @@ -187,6 +194,50 @@ func (dl *dlClient) Close() error {
return nil
}

func (dl *dlClient) dropDataCount(cnt, startPos int64) error {
feeder, ok := dl.handler.(downloadFeed)
if !ok {
_, err := io.CopyN(nullWriter{}, dl.reader.Body, cnt)
return err
}

// download data in buffers
sz := feeder.getBlockSize()
if sz <= 0 || cnt < sz {
// doesn't want data?
_, err := io.CopyN(nullWriter{}, dl.reader.Body, cnt)
return err
}

buf := make([]byte, sz)

for cnt > 0 {
if cnt < sz {
// can't download enough so that it's worth it
_, err := io.CopyN(nullWriter{}, dl.reader.Body, cnt)
return err
}

_, err := io.ReadFull(dl.reader.Body, buf)
if err != nil {
return err
}

cnt -= sz

err = feeder.feed(buf, startPos)
if err != nil {
// give up
_, err := io.CopyN(nullWriter{}, dl.reader.Body, cnt)
return err
}

startPos += sz
}

return nil
}

func (dl *dlClient) ReadAt(p []byte, off int64) (int, error) {
dl.lk.Lock()
defer dl.lk.Unlock()
Expand All @@ -201,7 +252,7 @@ func (dl *dlClient) ReadAt(p []byte, off int64) (int, error) {
} else if dl.rPos < off {
if off-dl.rPos < dl.dlm.MaxDataJump {
// drop that amount of data to move rPos forward
_, err := io.CopyN(nullWriter{}, dl.reader.Body, off-dl.rPos)
err := dl.dropDataCount(off-dl.rPos, dl.rPos)
if err != nil {
// failed, drop connection & retry
dl.reader.Body.Close()
Expand Down

0 comments on commit c9c1cc0

Please sign in to comment.