From c51c88e51141ff1ae343b0dea8ddfc9bb250b696 Mon Sep 17 00:00:00 2001 From: Antonio Navarro Perez Date: Wed, 1 Dec 2021 19:59:21 +0100 Subject: [PATCH] Fix storage and torrent reader. (#101) --- .github/workflows/build.yaml | 5 +- fs/storage.go | 16 +++-- fs/storage_test.go | 4 +- fs/torrent.go | 128 +++++++++++++++++++++++------------ fs/torrent_test.go | 34 ++++++++-- 5 files changed, 129 insertions(+), 58 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 9092ce2..c570e4f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -119,8 +119,6 @@ jobs: shell: bash run: | make build - if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain. - - name: Cross-compile shell: bash run: | @@ -133,8 +131,7 @@ jobs: if-no-files-found: error name: build-${{ matrix.job_name }} path: bin/* - if: matrix.os != 'windows-latest' # Remove windows builds. Difficult to maintain. - + - name: Release uses: softprops/action-gh-release@v1 if: startsWith(github.ref, 'refs/tags/') diff --git a/fs/storage.go b/fs/storage.go index e0134dc..9271b23 100644 --- a/fs/storage.go +++ b/fs/storage.go @@ -127,17 +127,21 @@ func (s *storage) createParent(p string, f File) error { func (s *storage) Children(path string) (map[string]File, error) { path = clean(path) - l := make(map[string]File) - for n, f := range s.children[path] { - l[n] = f + files, err := s.getDirFromFs(path) + if err == nil { + return files, nil } - if _, ok := s.children[path]; ok { - return l, nil + if !os.IsNotExist(err) { + return nil, err } - return s.getDirFromFs(path) + l := make(map[string]File) + for n, f := range s.children[path] { + l[n] = f + } + return l, nil } func (s *storage) Get(path string) (File, error) { diff --git a/fs/storage_test.go b/fs/storage_test.go index 1038d5f..38d4068 100644 --- a/fs/storage_test.go +++ b/fs/storage_test.go @@ -73,8 +73,8 @@ func TestStorage(t *testing.T) { require.Equal(&Dummy{}, file) files, err = s.Children("/path/special_file.test") - require.Error(err) - require.Nil(files) + require.NoError(err) + require.NotNil(files) files, err = s.Children("/path/special_file.test/dir/here") require.NoError(err) diff --git a/fs/torrent.go b/fs/torrent.go index e66f796..33ebd7e 100644 --- a/fs/torrent.go +++ b/fs/torrent.go @@ -6,7 +6,9 @@ import ( "sync" "time" + "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent" + "github.com/distribyted/distribyted/iio" ) var _ Filesystem = &Torrent{} @@ -56,9 +58,9 @@ func (fs *Torrent) load() { <-t.GotInfo() for _, file := range t.Files() { fs.s.Add(&torrentFile{ - reader: file.NewReader(), - len: file.Length(), - timeout: fs.readTimeout, + readerFunc: file.NewReader, + len: file.Length(), + timeout: fs.readTimeout, }, file.Path()) } } @@ -76,55 +78,36 @@ func (fs *Torrent) ReadDir(path string) (map[string]File, error) { return fs.s.Children(path) } -var _ File = &torrentFile{} - -type torrentFile struct { - mu sync.Mutex - - reader torrent.Reader - len int64 - - timeout int +type reader interface { + iio.Reader + missinggo.ReadContexter } -func (d *torrentFile) Size() int64 { - return d.len -} +type readAtWrapper struct { + timeout int + mu sync.Mutex -func (d *torrentFile) IsDir() bool { - return false + torrent.Reader + io.ReaderAt + io.Closer } -func (d *torrentFile) Close() error { - return d.reader.Close() +func newReadAtWrapper(r torrent.Reader, timeout int) reader { + return &readAtWrapper{Reader: r, timeout: timeout} } -func (d *torrentFile) Read(p []byte) (n int, err error) { - ctx, cancel := context.WithCancel(context.Background()) - timer := time.AfterFunc( - time.Duration(d.timeout)*time.Second, - func() { - cancel() - }, - ) - - defer timer.Stop() - - return d.reader.ReadContext(ctx, p) -} - -func (d *torrentFile) ReadAt(p []byte, off int64) (int, error) { - d.mu.Lock() - defer d.mu.Unlock() - _, err := d.reader.Seek(off, io.SeekStart) +func (rw *readAtWrapper) ReadAt(p []byte, off int64) (int, error) { + rw.mu.Lock() + defer rw.mu.Unlock() + _, err := rw.Seek(off, io.SeekStart) if err != nil { return 0, err } - i, err := d.readAtLeast(p, len(p)) - return i, err + + return readAtLeast(rw, rw.timeout, p, len(p)) } -func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) { +func readAtLeast(r missinggo.ReadContexter, timeout int, buf []byte, min int) (n int, err error) { if len(buf) < min { return 0, io.ErrShortBuffer } @@ -133,13 +116,13 @@ func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) { ctx, cancel := context.WithCancel(context.Background()) timer := time.AfterFunc( - time.Duration(d.timeout)*time.Second, + time.Duration(timeout)*time.Second, func() { cancel() }, ) - nn, err = d.reader.ReadContext(ctx, buf[n:]) + nn, err = r.ReadContext(ctx, buf[n:]) n += nn timer.Stop() @@ -151,3 +134,64 @@ func (d *torrentFile) readAtLeast(buf []byte, min int) (n int, err error) { } return } + +func (rw *readAtWrapper) Close() error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.Reader.Close() +} + +var _ File = &torrentFile{} + +type torrentFile struct { + readerFunc func() torrent.Reader + reader reader + len int64 + timeout int +} + +func (d *torrentFile) load() { + if d.reader != nil { + return + } + d.reader = newReadAtWrapper(d.readerFunc(), d.timeout) +} + +func (d *torrentFile) Size() int64 { + return d.len +} + +func (d *torrentFile) IsDir() bool { + return false +} + +func (d *torrentFile) Close() error { + var err error + if d.reader != nil { + err = d.reader.Close() + } + + d.reader = nil + + return err +} + +func (d *torrentFile) Read(p []byte) (n int, err error) { + d.load() + ctx, cancel := context.WithCancel(context.Background()) + timer := time.AfterFunc( + time.Duration(d.timeout)*time.Second, + func() { + cancel() + }, + ) + + defer timer.Stop() + + return d.reader.ReadContext(ctx, p) +} + +func (d *torrentFile) ReadAt(p []byte, off int64) (n int, err error) { + d.load() + return d.reader.ReadAt(p, off) +} diff --git a/fs/torrent_test.go b/fs/torrent_test.go index a1c09a3..1cb7439 100644 --- a/fs/torrent_test.go +++ b/fs/torrent_test.go @@ -81,7 +81,7 @@ func TestTorrentFilesystem(t *testing.T) { require.NoError(f.Close()) } -func TestReadAtWrapper(t *testing.T) { +func TestReadAtTorrent(t *testing.T) { require := require.New(t) to, err := Cli.AddMagnet(testMagnet) @@ -91,9 +91,9 @@ func TestReadAtWrapper(t *testing.T) { torrFile := to.Files()[0] tf := torrentFile{ - reader: torrFile.NewReader(), - len: torrFile.Length(), - timeout: 500, + readerFunc: torrFile.NewReader, + len: torrFile.Length(), + timeout: 500, } defer tf.Close() @@ -109,3 +109,29 @@ func TestReadAtWrapper(t *testing.T) { require.Equal(5, n) require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead) } + +func TestReadAtWrapper(t *testing.T) { + t.Parallel() + + require := require.New(t) + + to, err := Cli.AddMagnet(testMagnet) + require.NoError(err) + + <-to.GotInfo() + torrFile := to.Files()[0] + + r := newReadAtWrapper(torrFile.NewReader(), 10) + defer r.Close() + + toRead := make([]byte, 5) + n, err := r.ReadAt(toRead, 6) + require.NoError(err) + require.Equal(5, n) + require.Equal([]byte{0x0, 0x0, 0x1f, 0x76, 0x54}, toRead) + + n, err = r.ReadAt(toRead, 0) + require.NoError(err) + require.Equal(5, n) + require.Equal([]byte{0x49, 0x44, 0x33, 0x3, 0x0}, toRead) +}