Skip to content

Commit

Permalink
fix locks in some cases, add beginning of idle downloader code (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
MagicalTux committed Jan 13, 2020
1 parent 3ce840e commit 0c9a09d
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 11 deletions.
5 changes: 4 additions & 1 deletion close.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import "os"
// Close will close the file and make sure data is synced on the disk if the
// download is still partial.
func (f *File) Close() error {
err := f.SavePart()
f.lk.Lock()
defer f.lk.Unlock()

err := f.savePart()

f.dlm.openFilesLk.Lock()
delete(f.dlm.openFiles, f.hash)
Expand Down
12 changes: 6 additions & 6 deletions download.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f *File) downloadFull() error {
if err != nil {
// ok that's a big failure
f.status.Clear() // because likely corrupt
f.SavePart()
f.savePart()
return err
}

Expand All @@ -41,7 +41,7 @@ func (f *File) downloadFull() error {
f.complete = true
f.hasSize = true
f.size = n
f.SavePart()
f.savePart()

return nil
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (f *File) needBlocks(start, end uint32) error {
}

posByte := int64(start) * f.blkSize
f.local.Seek(posByte, io.SeekStart)
//f.local.Seek(posByte, io.SeekStart)
buf := make([]byte, f.blkSize)

for start <= end {
Expand All @@ -93,7 +93,7 @@ func (f *File) needBlocks(start, end uint32) error {
}

//log.Printf("downloading block %d (%d bytes)", start, n)
_, err := f.dlm.readUrl(f.url, buf[:n], posByte)
_, err := f.dlm.readUrl(f.url, buf[:n], posByte, f)
if err != nil {
log.Printf("download error: %s", err)
if f.status.IsEmpty() && posByte != 0 {
Expand All @@ -106,7 +106,7 @@ func (f *File) needBlocks(start, end uint32) error {
return err
}

_, err = f.local.Write(buf[:n])
_, err = f.local.WriteAt(buf[:n], posByte)
if err != nil {
// failed to write (disk full?)
log.Printf("write error: %s", err)
Expand All @@ -121,7 +121,7 @@ func (f *File) needBlocks(start, end uint32) error {
posByte += f.blkSize
}

f.SavePart()
f.savePart()

return nil
}
102 changes: 102 additions & 0 deletions idle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package smartremote

import (
"errors"
"log"
)

func (f *File) WantsFollowing(offset int64) int {
f.lk.Lock()
defer f.lk.Unlock()

block := offset / f.blkSize
if block*f.blkSize != offset {
// wrong offset (not block aligned), don't care
return 0
}

if f.status.Contains(uint32(block)) {
return 0
}

return int(f.blkSize)
}

func (f *File) FirstMissing() int64 {
f.lk.Lock()
defer f.lk.Unlock()

if f.complete {
// nothing to download
return -1
}

if f.status.IsEmpty() {
// everything to download
return 0
}

err := f.getSize()
if err != nil {
log.Printf("failed to get file size: %s", err)
return -1 // can't be helped
}

// computer number of blocks
blkCount := f.size / f.blkSize
if f.size%f.blkSize != 0 {
blkCount += 1
}

if int64(f.status.GetCardinality()) == blkCount {
// we already have all blocks
f.complete = true
f.savePart()
return -1
}

// find out first missing block
// TODO this can probably be optimized, roaring api may have something
for i := int64(0); i < blkCount; i++ {
if !f.status.Contains(uint32(i)) {
return i * f.blkSize
}
}

// ?????
// did we have more blocks in status than we need? did file size change? this sounds like everything is likely corrupted...
return -1
}

func (f *File) IngestData(b []byte, offset int64) error {
f.lk.Lock()
defer f.lk.Unlock()

return f.feed(b, offset)
}

func (f *File) feed(b []byte, offset int64) error {
block := offset / f.blkSize
if block*f.blkSize != offset {
return errors.New("invalid offset (not block aligned)")
}
count := int64(len(b)) / f.blkSize
if f.blkSize*count != int64(len(b)) {
if count > 1 {
// trim
b = b[:count*f.blkSize]
} else {
return errors.New("invalid buffer length (not block aligned)")
}
}

_, err := f.local.WriteAt(b, offset)
if err != nil {
return err
}

// mark blocks as received
f.status.AddRange(uint64(block), uint64(block+count)) // [rangeStart, rangeEnd)

return nil
}
16 changes: 12 additions & 4 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"time"
)

type DownloadTarget interface {
WantsFollowing(offset int64) int // returns >0 if the bytes starting at offset are useful
FirstMissing() int64 // returns first missing byte, or -1 if file is complete
IngestData(b []byte, offset int64) error // stores data received
}

type DownloadManager struct {
// MaxConcurrent is the maximum number of concurrent downloads.
// changing it might not be effective immediately. Default is 10
Expand Down Expand Up @@ -38,6 +44,7 @@ type dlClient struct {
dlm *DownloadManager
url string
taskCnt uintptr // currently running/pending tasks
handler DownloadTarget

reader *http.Response
rPos int64 // in bytes
Expand Down Expand Up @@ -73,17 +80,17 @@ func (dl *DownloadManager) For(u string) io.ReaderAt {
}

func (dlr *dlReaderAt) ReadAt(p []byte, off int64) (int, error) {
return dlr.dl.readUrl(dlr.url, p, off)
return dlr.dl.readUrl(dlr.url, p, off, nil)
}

func (dlm *DownloadManager) readUrl(url string, p []byte, off int64) (int, error) {
dl := dlm.getClient(url)
func (dlm *DownloadManager) readUrl(url string, p []byte, off int64, handler DownloadTarget) (int, error) {
dl := dlm.getClient(url, handler)
defer atomic.AddUintptr(&dl.taskCnt, ^uintptr(0))

return dl.ReadAt(p, off)
}

func (dl *DownloadManager) getClient(u string) *dlClient {
func (dl *DownloadManager) getClient(u string, handler DownloadTarget) *dlClient {
dl.mapLock.Lock()

for {
Expand All @@ -109,6 +116,7 @@ func (dl *DownloadManager) getClient(u string) *dlClient {
url: u,
taskCnt: 1, // pre-init at 1 to avoid reap
expire: time.Now().Add(300 * time.Second),
handler: handler,
}
dl.clients[u] = cl
dl.mapLock.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions part.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
)

func (f *File) SavePart() error {
f.lk.Lock()
defer f.lk.Unlock()

return f.savePart()
}

func (f *File) savePart() error {
// save partial file
if f.complete {
// remove part file if any
Expand Down

0 comments on commit 0c9a09d

Please sign in to comment.