Skip to content

Commit

Permalink
Fix storage and torrent reader. (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajnavarro committed Dec 1, 2021
1 parent 0b89fda commit c51c88e
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 58 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ jobs:
shell: bash
run: |
make build
if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain.

- name: Cross-compile
shell: bash
run: |
Expand All @@ -133,8 +131,7 @@ jobs:
if-no-files-found: error
name: build-${{ matrix.job_name }}
path: bin/*
if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain.


- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
Expand Down
16 changes: 10 additions & 6 deletions fs/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,21 @@ func (s *storage) createParent(p string, f File) error {
func (s *storage) Children(path string) (map[string]File, error) {
path = clean(path)

l := make(map[string]File)
for n, f := range s.children[path] {
l[n] = f
files, err := s.getDirFromFs(path)
if err == nil {
return files, nil
}

if _, ok := s.children[path]; ok {
return l, nil
if !os.IsNotExist(err) {
return nil, err
}

return s.getDirFromFs(path)
l := make(map[string]File)
for n, f := range s.children[path] {
l[n] = f
}

return l, nil
}

func (s *storage) Get(path string) (File, error) {
Expand Down
4 changes: 2 additions & 2 deletions fs/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func TestStorage(t *testing.T) {
require.Equal(&Dummy{}, file)

files, err = s.Children("/path/special_file.test")
require.Error(err)
require.Nil(files)
require.NoError(err)
require.NotNil(files)

files, err = s.Children("/path/special_file.test/dir/here")
require.NoError(err)
Expand Down
128 changes: 86 additions & 42 deletions fs/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"sync"
"time"

"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent"
"github.com/distribyted/distribyted/iio"
)

var _ Filesystem = &Torrent{}
Expand Down Expand Up @@ -56,9 +58,9 @@ func (fs *Torrent) load() {
<-t.GotInfo()
for _, file := range t.Files() {
fs.s.Add(&torrentFile{
reader: file.NewReader(),
len: file.Length(),
timeout: fs.readTimeout,
readerFunc: file.NewReader,
len: file.Length(),
timeout: fs.readTimeout,
}, file.Path())
}
}
Expand All @@ -76,55 +78,36 @@ func (fs *Torrent) ReadDir(path string) (map[string]File, error) {
return fs.s.Children(path)
}

var _ File = &torrentFile{}

type torrentFile struct {
mu sync.Mutex

reader torrent.Reader
len int64

timeout int
type reader interface {
iio.Reader
missinggo.ReadContexter
}

func (d *torrentFile) Size() int64 {
return d.len
}
type readAtWrapper struct {
timeout int
mu sync.Mutex

func (d *torrentFile) IsDir() bool {
return false
torrent.Reader
io.ReaderAt
io.Closer
}

func (d *torrentFile) Close() error {
return d.reader.Close()
func newReadAtWrapper(r torrent.Reader, timeout int) reader {
return &readAtWrapper{Reader: r, timeout: timeout}
}

func (d *torrentFile) Read(p []byte) (n int, err error) {
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)

defer timer.Stop()

return d.reader.ReadContext(ctx, p)
}

func (d *torrentFile) ReadAt(p []byte, off int64) (int, error) {
d.mu.Lock()
defer d.mu.Unlock()
_, err := d.reader.Seek(off, io.SeekStart)
func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) {
rw.mu.Lock()
defer rw.mu.Unlock()
_, err := rw.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
i, err := d.readAtLeast(p, len(p))
return i, err

return readAtLeast(rw, rw.timeout, p, len(p))
}

func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) {
func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
Expand All @@ -133,13 +116,13 @@ func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) {

ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
time.Duration(timeout)*time.Second,
func() {
cancel()
},
)

nn, err = d.reader.ReadContext(ctx, buf[n:])
nn, err = r.ReadContext(ctx, buf[n:])
n += nn

timer.Stop()
Expand All @@ -151,3 +134,64 @@ func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) {
}
return
}

func (rw *readAtWrapper) Close() error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.Reader.Close()
}

var _ File = &torrentFile{}

type torrentFile struct {
readerFunc func() torrent.Reader
reader reader
len int64
timeout int
}

func (d *torrentFile) load() {
if d.reader != nil {
return
}
d.reader = newReadAtWrapper(d.readerFunc(), d.timeout)
}

func (d *torrentFile) Size() int64 {
return d.len
}

func (d *torrentFile) IsDir() bool {
return false
}

func (d *torrentFile) Close() error {
var err error
if d.reader != nil {
err = d.reader.Close()
}

d.reader = nil

return err
}

func (d *torrentFile) Read(p []byte) (n int, err error) {
d.load()
ctx, cancel := context.WithCancel(context.Background())
timer := time.AfterFunc(
time.Duration(d.timeout)*time.Second,
func() {
cancel()
},
)

defer timer.Stop()

return d.reader.ReadContext(ctx, p)
}

func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) {
d.load()
return d.reader.ReadAt(p, off)
}
34 changes: 30 additions & 4 deletions fs/torrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestTorrentFilesystem(t *testing.T) {
require.NoError(f.Close())
}

func TestReadAtWrapper(t *testing.T) {
func TestReadAtTorrent(t *testing.T) {
require := require.New(t)

to, err := Cli.AddMagnet(testMagnet)
Expand All @@ -91,9 +91,9 @@ func TestReadAtWrapper(t *testing.T) {
torrFile := to.Files()[0]

tf := torrentFile{
reader: torrFile.NewReader(),
len: torrFile.Length(),
timeout: 500,
readerFunc: torrFile.NewReader,
len: torrFile.Length(),
timeout: 500,
}

defer tf.Close()
Expand All @@ -109,3 +109,29 @@ func TestReadAtWrapper(t *testing.T) {
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}

func TestReadAtWrapper(t *testing.T) {
t.Parallel()

require := require.New(t)

to, err := Cli.AddMagnet(testMagnet)
require.NoError(err)

<-to.GotInfo()
torrFile := to.Files()[0]

r := newReadAtWrapper(torrFile.NewReader(), 10)
defer r.Close()

toRead := make([]byte, 5)
n, err := r.ReadAt(toRead, 6)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead)

n, err = r.ReadAt(toRead, 0)
require.NoError(err)
require.Equal(5, n)
require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead)
}

0 comments on commit c51c88e

Please sign in to comment.