Skip to content

Commit

Permalink
refactor: scheduler announce task (#1407)
Browse files Browse the repository at this point in the history
* feat: scheduler announce normal task

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: remove cid

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jun 22, 2022
1 parent bf430c3 commit 2f07346
Show file tree
Hide file tree
Showing 36 changed files with 1,184 additions and 716 deletions.
12 changes: 6 additions & 6 deletions client/config/peerhost.go
Expand Up @@ -38,7 +38,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
netip "d7y.io/dragonfly/v2/pkg/net/ip"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
rpcbase "d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/unit"
)

Expand Down Expand Up @@ -146,18 +146,18 @@ func (p *DaemonOption) Validate() error {
return nil
}

func ConvertPattern(p string, defaultPattern scheduler.Pattern) scheduler.Pattern {
func ConvertPattern(p string, defaultPattern rpcbase.Pattern) rpcbase.Pattern {
switch p {
case PatternP2P:
return scheduler.Pattern_P2P
return rpcbase.Pattern_P2P
case PatternSeedPeer:
return scheduler.Pattern_SEED_PEER
return rpcbase.Pattern_SEED_PEER
case PatternSource:
return scheduler.Pattern_SOURCE
return rpcbase.Pattern_SOURCE
case "":
return defaultPattern
}
logger.Warnf("unknown pattern, use default pattern: %s", scheduler.Pattern_name[int32(defaultPattern)])
logger.Warnf("unknown pattern, use default pattern: %s", rpcbase.Pattern_name[int32(defaultPattern)])
return defaultPattern
}

Expand Down
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Expand Up @@ -53,6 +53,7 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/reachable"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
Expand Down Expand Up @@ -116,7 +117,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
schedulers []*manager.Scheduler
dynconfig config.Dynconfig
managerClient managerclient.Client
defaultPattern = config.ConvertPattern(opt.Download.DefaultPattern, scheduler.Pattern_P2P)
defaultPattern = config.ConvertPattern(opt.Download.DefaultPattern, base.Pattern_P2P)
)

if opt.Scheduler.Manager.Enable == true {
Expand Down
10 changes: 4 additions & 6 deletions client/daemon/peer/peertask_manager.go
Expand Up @@ -62,7 +62,7 @@ type TaskManager interface {
StatTask(ctx context.Context, taskID string) (*scheduler.Task, error)

// AnnouncePeerTask announces peer task info to P2P network
AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, cid string, urlMeta *base.UrlMeta) error
AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType base.TaskType, urlMeta *base.UrlMeta) error

GetPieceManager() PieceManager

Expand Down Expand Up @@ -170,8 +170,6 @@ func NewPeerTaskManager(
return ptm, nil
}

var _ TaskManager = (*peerTaskManager)(nil)

func (ptm *peerTaskManager) findPeerTaskConductor(taskID string) (*peerTaskConductor, bool) {
pt, ok := ptm.runningPeerTasks.Load(taskID)
if !ok {
Expand Down Expand Up @@ -416,8 +414,8 @@ func (ptm *peerTaskManager) GetPieceManager() PieceManager {
}

func (ptm *peerTaskManager) AnnouncePeerTask(ctx context.Context,
meta storage.PeerTaskMetadata, cid string, urlMeta *base.UrlMeta) error {
log := logger.With("function", "AnnouncePeerTask", "taskID", meta.TaskID, "peerID", meta.PeerID, "CID", cid)
meta storage.PeerTaskMetadata, url string, taskType base.TaskType, urlMeta *base.UrlMeta) error {
log := logger.With("function", "AnnouncePeerTask", "taskID", meta.TaskID, "peerID", meta.PeerID, "URL", url)

// Check if the given task is completed in local storageManager
if ptm.storageManager.FindCompletedTask(meta.TaskID) == nil {
Expand Down Expand Up @@ -448,7 +446,7 @@ func (ptm *peerTaskManager) AnnouncePeerTask(ctx context.Context,
piecePacket.DstAddr = fmt.Sprintf("%s:%d", ptm.host.Ip, ptm.host.DownPort)
req := &scheduler.AnnounceTaskRequest{
TaskId: meta.TaskID,
Cid: cid,
Url: url,
UrlMeta: urlMeta,
PeerHost: ptm.host,
PiecePacket: piecePacket,
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_manager_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_stream.go
Expand Up @@ -45,7 +45,7 @@ type StreamTaskRequest struct {
// peer's id and must be global uniqueness
PeerID string
// Pattern to register to scheduler
Pattern scheduler.Pattern
Pattern base.Pattern
}

// StreamTask represents a peer task with stream io for reading directly without once more disk io
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/piece_manager.go
Expand Up @@ -489,7 +489,7 @@ func (pm *pieceManager) processPieceFromFile(ctx context.Context, ptm storage.Pe
}

func (pm *pieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, req *dfdaemon.ImportTaskRequest) error {
log := logger.With("function", "ImportFile", "Cid", req.Cid, "taskID", ptm.TaskID)
log := logger.With("function", "ImportFile", "URL", req.Url, "taskID", ptm.TaskID)
// get file size and compute piece size and piece count
stat, err := os.Stat(req.Path)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/proxy/proxy.go
Expand Up @@ -43,6 +43,7 @@ import (
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/transport"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
)
Expand Down Expand Up @@ -98,7 +99,7 @@ type Proxy struct {
defaultFilter string

// defaultFilter is used for registering steam task
defaultPattern scheduler.Pattern
defaultPattern base.Pattern

// tracer is used for telemetry
tracer trace.Tracer
Expand Down Expand Up @@ -216,7 +217,7 @@ func WithDefaultFilter(f string) Option {
}

// WithDefaultPattern sets default pattern for downloading
func WithDefaultPattern(pattern scheduler.Pattern) Option {
func WithDefaultPattern(pattern base.Pattern) Option {
return func(p *Proxy) *Proxy {
p.defaultPattern = pattern
return p
Expand Down
3 changes: 2 additions & 1 deletion client/daemon/proxy/proxy_manager.go
Expand Up @@ -33,6 +33,7 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)

Expand All @@ -51,7 +52,7 @@ type proxyManager struct {

var _ Manager = (*proxyManager)(nil)

func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, defaultPattern scheduler.Pattern, opts *config.ProxyOption) (Manager, error) {
func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, defaultPattern base.Pattern, opts *config.ProxyOption) (Manager, error) {
// proxy is option, when nil, just disable it
if opts == nil {
logger.Infof("proxy config is empty, disabled")
Expand Down
28 changes: 14 additions & 14 deletions client/daemon/rpcserver/rpcserver.go
Expand Up @@ -62,15 +62,15 @@ type server struct {
peerHost *scheduler.PeerHost
peerTaskManager peer.TaskManager
storageManager storage.Manager
defaultPattern scheduler.Pattern
defaultPattern base.Pattern

downloadServer *grpc.Server
peerServer *grpc.Server
uploadAddr string
}

func New(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager,
storageManager storage.Manager, defaultPattern scheduler.Pattern,
storageManager storage.Manager, defaultPattern base.Pattern,
downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) {
s := &server{
KeepAlive: clientutil.NewKeepAlive("rpc server"),
Expand Down Expand Up @@ -411,8 +411,8 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemongrpc.DownRequest,

func (s *server) StatTask(ctx context.Context, req *dfdaemongrpc.StatTaskRequest) error {
s.Keep()
taskID := idgen.TaskID(req.Cid, req.UrlMeta)
log := logger.With("function", "StatTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID, "LocalOnly", req.LocalOnly)
taskID := idgen.TaskID(req.Url, req.UrlMeta)
log := logger.With("function", "StatTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID, "LocalOnly", req.LocalOnly)

log.Info("new stat task request")
if completed := s.isTaskCompleted(taskID); completed {
Expand Down Expand Up @@ -448,8 +448,8 @@ func (s *server) isTaskCompleted(taskID string) bool {
func (s *server) ImportTask(ctx context.Context, req *dfdaemongrpc.ImportTaskRequest) error {
s.Keep()
peerID := idgen.PeerID(s.peerHost.Ip)
taskID := idgen.TaskID(req.Cid, req.UrlMeta)
log := logger.With("function", "ImportTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID, "file", req.Path)
taskID := idgen.TaskID(req.Url, req.UrlMeta)
log := logger.With("function", "ImportTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID, "file", req.Path)

log.Info("new import task request")
ptm := storage.PeerTaskMetadata{
Expand All @@ -459,7 +459,7 @@ func (s *server) ImportTask(ctx context.Context, req *dfdaemongrpc.ImportTaskReq
announceFunc := func() {
// TODO: retry announce on error
start := time.Now()
err := s.peerTaskManager.AnnouncePeerTask(context.Background(), ptm, req.Cid, req.UrlMeta)
err := s.peerTaskManager.AnnouncePeerTask(context.Background(), ptm, req.Url, req.Type, req.UrlMeta)
if err != nil {
log.Warnf("Failed to announce task to scheduler: %s", err)
} else {
Expand Down Expand Up @@ -509,8 +509,8 @@ func (s *server) ImportTask(ctx context.Context, req *dfdaemongrpc.ImportTaskReq

func (s *server) ExportTask(ctx context.Context, req *dfdaemongrpc.ExportTaskRequest) error {
s.Keep()
taskID := idgen.TaskID(req.Cid, req.UrlMeta)
log := logger.With("function", "ExportTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID, "destination", req.Output)
taskID := idgen.TaskID(req.Url, req.UrlMeta)
log := logger.With("function", "ExportTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID, "destination", req.Output)

log.Info("new export task request")
task := s.storageManager.FindCompletedTask(taskID)
Expand All @@ -536,7 +536,7 @@ func (s *server) exportFromLocal(ctx context.Context, req *dfdaemongrpc.ExportTa
return s.storageManager.Store(ctx, &storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: peerID,
TaskID: idgen.TaskID(req.Cid, req.UrlMeta),
TaskID: idgen.TaskID(req.Url, req.UrlMeta),
Destination: req.Output,
},
StoreDataOnly: true,
Expand All @@ -545,7 +545,7 @@ func (s *server) exportFromLocal(ctx context.Context, req *dfdaemongrpc.ExportTa

func (s *server) exportFromPeers(ctx context.Context, log *logger.SugaredLoggerOnWith, req *dfdaemongrpc.ExportTaskRequest) error {
peerID := idgen.PeerID(s.peerHost.Ip)
taskID := idgen.TaskID(req.Cid, req.UrlMeta)
taskID := idgen.TaskID(req.Url, req.UrlMeta)

task, err := s.peerTaskManager.StatTask(ctx, taskID)
if err != nil {
Expand All @@ -572,7 +572,7 @@ func (s *server) exportFromPeers(ctx context.Context, log *logger.SugaredLoggerO
downError error
)
downRequest := &dfdaemongrpc.DownRequest{
Url: req.Cid,
Url: req.Url,
Output: req.Output,
Timeout: req.Timeout,
Limit: req.Limit,
Expand Down Expand Up @@ -621,8 +621,8 @@ func call(ctx context.Context, peerID string, drc chan *dfdaemongrpc.DownResult,

func (s *server) DeleteTask(ctx context.Context, req *dfdaemongrpc.DeleteTaskRequest) error {
s.Keep()
taskID := idgen.TaskID(req.Cid, req.UrlMeta)
log := logger.With("function", "DeleteTask", "Cid", req.Cid, "Tag", req.UrlMeta.Tag, "taskID", taskID)
taskID := idgen.TaskID(req.Url, req.UrlMeta)
log := logger.With("function", "DeleteTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID)

log.Info("new delete task request")
task := s.storageManager.FindCompletedTask(taskID)
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/test/mock/peer/peertask_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions client/daemon/transport/transport.go
Expand Up @@ -41,7 +41,6 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)

var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
Expand Down Expand Up @@ -69,7 +68,7 @@ type transport struct {
defaultFilter string

// defaultFilter is used for registering steam task
defaultPattern scheduler.Pattern
defaultPattern base.Pattern

// defaultBiz is used when http request without X-Dragonfly-Biz Header
defaultBiz string
Expand Down Expand Up @@ -124,7 +123,7 @@ func WithDefaultFilter(f string) Option {
}

// WithDefaultPattern sets default pattern
func WithDefaultPattern(pattern scheduler.Pattern) Option {
func WithDefaultPattern(pattern base.Pattern) Option {
return func(rt *transport) *transport {
rt.defaultPattern = pattern
return rt
Expand Down
9 changes: 5 additions & 4 deletions client/dfcache/dfcache.go
Expand Up @@ -101,7 +101,7 @@ func statTask(ctx context.Context, client daemonclient.DaemonClient, cfg *config

func newStatRequest(cfg *config.DfcacheConfig) *dfdaemon.StatTaskRequest {
return &dfdaemon.StatTaskRequest{
Cid: newCid(cfg.Cid),
Url: newCid(cfg.Cid),
UrlMeta: &base.UrlMeta{
Tag: cfg.Tag,
},
Expand Down Expand Up @@ -161,7 +161,8 @@ func importTask(ctx context.Context, client daemonclient.DaemonClient, cfg *conf

func newImportRequest(cfg *config.DfcacheConfig) *dfdaemon.ImportTaskRequest {
return &dfdaemon.ImportTaskRequest{
Cid: newCid(cfg.Cid),
Type: base.TaskType_DfCache,
Url: newCid(cfg.Cid),
Path: cfg.Path,
UrlMeta: &base.UrlMeta{
Tag: cfg.Tag,
Expand Down Expand Up @@ -228,7 +229,7 @@ func exportTask(ctx context.Context, client daemonclient.DaemonClient, cfg *conf

func newExportRequest(cfg *config.DfcacheConfig) *dfdaemon.ExportTaskRequest {
return &dfdaemon.ExportTaskRequest{
Cid: newCid(cfg.Cid),
Url: newCid(cfg.Cid),
Output: cfg.Output,
Timeout: uint64(cfg.Timeout),
Limit: float64(cfg.RateLimit),
Expand Down Expand Up @@ -292,7 +293,7 @@ func deleteTask(ctx context.Context, client daemonclient.DaemonClient, cfg *conf

func newDeleteRequest(cfg *config.DfcacheConfig) *dfdaemon.DeleteTaskRequest {
return &dfdaemon.DeleteTaskRequest{
Cid: newCid(cfg.Cid),
Url: newCid(cfg.Cid),
UrlMeta: &base.UrlMeta{
Tag: cfg.Tag,
},
Expand Down

0 comments on commit 2f07346

Please sign in to comment.