Skip to content

Commit

Permalink
feat: bt fetcher reuse (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Jul 6, 2023
1 parent abe3cec commit 32d2e46
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 42 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/GopeedLab/gopeed
go 1.19

require (
github.com/anacrolix/missinggo/v2 v2.7.2-0.20230527121029-a582b4f397b9
github.com/anacrolix/torrent v1.52.3
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
Expand All @@ -27,7 +28,6 @@ require (
github.com/anacrolix/log v0.14.0 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/perf v1.0.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.2-0.20230527121029-a582b4f397b9 // indirect
github.com/anacrolix/mmsg v1.0.0 // indirect
github.com/anacrolix/multiless v0.3.0 // indirect
github.com/anacrolix/stm v0.4.0 // indirect
Expand Down
103 changes: 62 additions & 41 deletions internal/protocol/bt/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,29 @@ import (
"github.com/GopeedLab/gopeed/pkg/util"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
)

var client *torrent.Client
var (
client *torrent.Client
lock sync.Mutex
torrentDirMap = make(map[string]string)
ftMap = make(map[string]*fileTorrentImpl)
)

type Fetcher struct {
ctl *controller.Controller

torrent *torrent.Torrent
meta *fetcher.FetcherMeta

ready atomic.Bool
progress fetcher.Progress

torrentPaths map[string]string
torrentReady atomic.Bool
create atomic.Bool
progress fetcher.Progress
}

func (f *Fetcher) Name() string {
Expand All @@ -38,42 +41,44 @@ func (f *Fetcher) Setup(ctl *controller.Controller) (err error) {
if f.meta == nil {
f.meta = &fetcher.FetcherMeta{}
}
var once sync.Once
once.Do(func() {
cfg := torrent.NewDefaultClientConfig()
cfg.ListenPort = 0
return
}

// Support custom download path for each torrent
pieceCompletion, err := storage.NewDefaultPieceCompletionForDir(cfg.DataDir)
if err != nil {
pieceCompletion = storage.NewMapPieceCompletion()
}
clientImplCloser := storage.NewFileOpts(storage.NewFileClientOpts{
ClientBaseDir: cfg.DataDir,
TorrentDirMaker: func(baseDir string, info *metainfo.Info, infoHash metainfo.Hash) string {
if dir, ok := f.torrentPaths[infoHash.String()]; ok {
return dir
}
return baseDir
},
PieceCompletion: pieceCompletion,
})
cfg.DefaultStorage = clientImplCloser
client, err = torrent.NewClient(cfg)
if err != nil {
return
}
func (f *Fetcher) initClient() (err error) {
lock.Lock()
defer lock.Unlock()

if client != nil {
return
}

f.torrentPaths = make(map[string]string)
cfg := torrent.NewDefaultClientConfig()
cfg.ListenPort = 0
cfg.DefaultStorage = newFileOpts(newFileClientOpts{
ClientBaseDir: cfg.DataDir,
HandleFileTorrent: func(infoHash metainfo.Hash, ft *fileTorrentImpl) {
if dir, ok := torrentDirMap[infoHash.String()]; ok {
ft.setTorrentDir(dir)
}
ftMap[infoHash.String()] = ft
},
})
client, err = torrent.NewClient(cfg)
return
}

func (f *Fetcher) Resolve(req *base.Request) error {
if err := f.addTorrent(req.URL); err != nil {
return err
}
defer f.torrent.Drop()
go func() {
// recycle unused torrent resource
time.Sleep(time.Minute * 3)
if !f.create.Load() {
f.torrentReady.Store(false)
f.safeDrop()
}
}()
res := &base.Resource{
Name: f.torrent.Name(),
Range: true,
Expand All @@ -95,6 +100,7 @@ func (f *Fetcher) Resolve(req *base.Request) error {
}

func (f *Fetcher) Create(opts *base.Options) (err error) {
f.create.Store(true)
f.meta.Opts = opts
if len(opts.SelectFiles) == 0 {
opts.SelectFiles = make([]int, 0)
Expand All @@ -103,16 +109,19 @@ func (f *Fetcher) Create(opts *base.Options) (err error) {
}
}
if opts.Path != "" {
f.torrentPaths[f.meta.Res.Hash] = path.Join(f.meta.Opts.Path, f.meta.Res.RootDir)
torrentDirMap[f.meta.Res.Hash] = path.Join(f.meta.Opts.Path, f.meta.Res.RootDir)
if ft, ok := ftMap[f.meta.Res.Hash]; ok {
// reuse resolve fetcher
ft.setTorrentDir(torrentDirMap[f.meta.Res.Hash])
}
}

f.progress = make(fetcher.Progress, len(f.meta.Opts.SelectFiles))
f.ready.Store(false)
return nil
}

func (f *Fetcher) Start() (err error) {
if !f.ready.Load() {
if !f.torrentReady.Load() {
if err = f.addTorrent(f.meta.Req.URL); err != nil {
return
}
Expand All @@ -130,8 +139,8 @@ func (f *Fetcher) Start() (err error) {
}

func (f *Fetcher) Pause() (err error) {
f.ready.Store(false)
f.torrent.Drop()
f.torrentReady.Store(false)
f.safeDrop()
return
}

Expand All @@ -140,13 +149,22 @@ func (f *Fetcher) Continue() (err error) {
}

func (f *Fetcher) Close() (err error) {
f.torrent.Drop()
f.safeDrop()
return nil
}

func (f *Fetcher) safeDrop() {
defer func() {
// ignore panic
_ = recover()
}()

f.torrent.Drop()
}

func (f *Fetcher) Wait() (err error) {
for {
if f.ready.Load() {
if f.torrentReady.Load() {
done := true
for _, selectIndex := range f.meta.Opts.SelectFiles {
file := f.torrent.Files()[selectIndex]
Expand Down Expand Up @@ -182,7 +200,7 @@ func (f *Fetcher) Meta() *fetcher.FetcherMeta {
}

func (f *Fetcher) Progress() fetcher.Progress {
if !f.ready.Load() {
if !f.torrentReady.Load() {
return f.progress
}
for i := range f.progress {
Expand All @@ -194,6 +212,9 @@ func (f *Fetcher) Progress() fetcher.Progress {
}

func (f *Fetcher) addTorrent(url string) (err error) {
if err = f.initClient(); err != nil {
return
}
schema := util.ParseSchema(url)
if schema == "MAGNET" {
f.torrent, err = client.AddMagnet(url)
Expand All @@ -216,7 +237,7 @@ func (f *Fetcher) addTorrent(url string) (err error) {
f.torrent.AddTrackers(announceList)
}
<-f.torrent.GotInfo()
f.ready.Store(true)
f.torrentReady.Store(true)
return
}

Expand Down
Loading

0 comments on commit 32d2e46

Please sign in to comment.