Skip to content

Commit

Permalink
feat: support concurrent recursive download (#1714)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Sep 29, 2022
1 parent daf86fc commit 4584ef0
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 30 deletions.
36 changes: 21 additions & 15 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,22 @@ type HostOption struct {
}

type DownloadOption struct {
DefaultPattern string `mapstructure:"defaultPattern" yaml:"defaultPattern"`
TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
TrafficShaperType string `mapstructure:"trafficShaperType" yaml:"trafficShaperType"`
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
GRPCDialTimeout time.Duration `mapstructure:"grpcDialTimeout" yaml:"grpcDialTimeout"`
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
Transport *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
WatchdogTimeout time.Duration `mapstructure:"watchdogTimeout" yaml:"watchdogTimeout"`
Concurrent *ConcurrentOption `mapstructure:"concurrent" yaml:"concurrent"`
SyncPieceViaHTTPS bool `mapstructure:"syncPieceViaHTTPS" yaml:"syncPieceViaHTTPS"`
DefaultPattern string `mapstructure:"defaultPattern" yaml:"defaultPattern"`
TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"`
PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"`
TrafficShaperType string `mapstructure:"trafficShaperType" yaml:"trafficShaperType"`
PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"`
GRPCDialTimeout time.Duration `mapstructure:"grpcDialTimeout" yaml:"grpcDialTimeout"`
DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"`
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
Transport *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
WatchdogTimeout time.Duration `mapstructure:"watchdogTimeout" yaml:"watchdogTimeout"`
Concurrent *ConcurrentOption `mapstructure:"concurrent" yaml:"concurrent"`
RecursiveConcurrent RecursiveConcurrent `mapstructure:"recursiveConcurrent" yaml:"recursiveConcurrent"`
SyncPieceViaHTTPS bool `mapstructure:"syncPieceViaHTTPS" yaml:"syncPieceViaHTTPS"`
}

type TransportOption struct {
Expand Down Expand Up @@ -332,6 +333,11 @@ type ConcurrentOption struct {
MaxAttempts int `mapstructure:"maxAttempts" yaml:"maxAttempts"`
}

type RecursiveConcurrent struct {
// GoroutineCount indicates the concurrent goroutine count for every recursive task
GoroutineCount int `mapstructure:"goroutineCount" yaml:"goroutineCount"`
}

type ProxyOption struct {
// WARNING: when add more option, please update ProxyOption.unmarshal function
ListenOption `mapstructure:",squash" yaml:",inline"`
Expand Down
3 changes: 3 additions & 0 deletions client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ var peerHostConfig = func() *DaemonOption {
PieceDownloadTimeout: 30 * time.Second,
GRPCDialTimeout: 10 * time.Second,
GetPiecesMaxRetry: 100,
RecursiveConcurrent: RecursiveConcurrent{
GoroutineCount: 32,
},,
TotalRateLimit: util.RateLimit{
Limit: rate.Limit(DefaultTotalDownloadLimit),
},
Expand Down
3 changes: 3 additions & 0 deletions client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ var peerHostConfig = func() *DaemonOption {
PieceDownloadTimeout: 30 * time.Second,
GRPCDialTimeout: 10 * time.Second,
GetPiecesMaxRetry: 100,
RecursiveConcurrent: RecursiveConcurrent{
GoroutineCount: 32,
},
TotalRateLimit: util.RateLimit{
Limit: rate.Limit(DefaultTotalDownloadLimit),
},
Expand Down
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
peerServerOption = append(peerServerOption, grpc.Creds(tlsCredentials))
}

rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, defaultPattern, downloadServerOption, peerServerOption)
rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, defaultPattern,
opt.Download.RecursiveConcurrent.GoroutineCount, downloadServerOption, peerServerOption)
if err != nil {
return nil, err
}
Expand Down
69 changes: 56 additions & 13 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -72,25 +73,27 @@ type Server interface {

type server struct {
util.KeepAlive
peerHost *schedulerv1.PeerHost
peerTaskManager peer.TaskManager
storageManager storage.Manager
defaultPattern commonv1.Pattern
peerHost *schedulerv1.PeerHost
peerTaskManager peer.TaskManager
storageManager storage.Manager
defaultPattern commonv1.Pattern
recursiveConcurrent int

downloadServer *grpc.Server
peerServer *grpc.Server
uploadAddr string
}

func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager,
storageManager storage.Manager, defaultPattern commonv1.Pattern,
storageManager storage.Manager, defaultPattern commonv1.Pattern, recursiveConcurrent int,
downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) {
s := &server{
KeepAlive: util.NewKeepAlive("rpc server"),
peerHost: peerHost,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
defaultPattern: defaultPattern,
KeepAlive: util.NewKeepAlive("rpc server"),
peerHost: peerHost,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
defaultPattern: defaultPattern,
recursiveConcurrent: recursiveConcurrent,
}

sd := &seeder{
Expand Down Expand Up @@ -367,6 +370,36 @@ func (s *server) doRecursiveDownload(ctx context.Context, req *dfdaemonv1.DownRe
return status.Errorf(codes.FailedPrecondition, err.Error())
}

var (
requestCh = make(chan *dfdaemonv1.DownRequest)
stopCh = make(chan struct{})
wg = sync.WaitGroup{}
lock = sync.Mutex{}
downloadErrors []error
)
defer close(stopCh)

for i := 0; i < s.recursiveConcurrent; i++ {
go func(i int) {
for {
select {
case req := <-requestCh:
logger.Debugf("downloader %d start to download %s", i, req.Url)
if err := s.doDownload(ctx, req, stream, ""); err != nil {
logger.Errorf("download %#v error: %s", req, err.Error())
lock.Lock()
downloadErrors = append(downloadErrors, err)
lock.Unlock()
}
logger.Debugf("downloader %d completed download %s", i, req.Url)
wg.Done()
case <-stopCh:
return
}
}
}(i)
}

var queue deque.Deque[*dfdaemonv1.DownRequest]
queue.PushBack(req)
downloadMap := map[url.URL]struct{}{}
Expand Down Expand Up @@ -418,11 +451,21 @@ func (s *server) doRecursiveDownload(ctx context.Context, req *dfdaemonv1.DownRe
return err
}

if err = s.doDownload(ctx, childReq, stream, ""); err != nil {
return err
}
wg.Add(1)
requestCh <- childReq
}
}

// wait all sent task done or error
wg.Wait()
lock.Lock()
if len(downloadErrors) > 0 {
// just return first error
lock.Unlock()
return downloadErrors[0]
}
lock.Unlock()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion client/daemon/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestServer_New(t *testing.T) {
var defaultPattern commonv1.Pattern = 0
var mockdownloadOpts []grpc.ServerOption
var mockpeerOpts []grpc.ServerOption
_, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, defaultPattern, mockdownloadOpts, mockpeerOpts)
_, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, defaultPattern, 16, mockdownloadOpts, mockpeerOpts)
tc.expect(t, err)
})
}
Expand Down

0 comments on commit 4584ef0

Please sign in to comment.