Skip to content

Commit

Permalink
refactor: scheduler grpc (#1310)
Browse files Browse the repository at this point in the history
* refactor: scheduler grpc

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: generate protoc

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed May 18, 2022
1 parent 14d866f commit 7610d47
Show file tree
Hide file tree
Showing 28 changed files with 821 additions and 1,298 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/compatibility-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ env:
KIND_VERSION: v0.11.1
CONTAINERD_VERSION: v1.5.2
KIND_CONFIG_PATH: test/testdata/kind/config.yaml
DRAGONFLY_STABLE_IMAGE_TAG: v2.0.3-beta.2
DRAGONFLY_STABLE_IMAGE_TAG: v2.0.3-beta.3
DRAGONFLY_CHARTS_PATH: deploy/helm-charts/charts/dragonfly
DRAGONFLY_CHARTS_CONFIG_PATH: test/testdata/charts/config.yaml
DRAGONFLY_FILE_SERVER_PATH: test/testdata/k8s/file-server.yaml
Expand Down
7 changes: 5 additions & 2 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type clientDaemon struct {
dfpath dfpath.Dfpath
schedulers []*manager.Scheduler
managerClient managerclient.Client
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client
}

func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
Expand Down Expand Up @@ -144,7 +144,10 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {

var opts []grpc.DialOption
if opt.Options.Telemetry.Jaeger != "" {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
opts = append(opts,
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
}
sched, err := schedulerclient.GetClientByAddr(addrs, opts...)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type peerTaskConductor struct {

// schedule options
schedulerOption config.SchedulerOption
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client

// peer task meta info
peerID string
Expand All @@ -108,7 +108,7 @@ type peerTaskConductor struct {
tinyData *TinyData

// peerPacketStream stands schedulerclient.PeerPacketStream from scheduler
peerPacketStream schedulerclient.PeerPacketStream
peerPacketStream scheduler.Scheduler_ReportPieceResultClient
// peerPacket is the latest available peers from peerPacketCh
// Deprecated: remove in future release
peerPacket atomic.Value // *scheduler.PeerPacket
Expand Down Expand Up @@ -179,6 +179,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))

taskID := idgen.TaskID(request.Url, request.UrlMeta)
request.TaskId = taskID

var (
log *logger.SugaredLoggerOnWith
Expand Down Expand Up @@ -324,7 +325,7 @@ func (pt *peerTaskConductor) register() error {
}
}

peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, result.TaskId, pt.request)
peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, pt.request)
pt.Infof("step 2: start report piece result")
if err != nil {
pt.span.RecordError(err)
Expand Down Expand Up @@ -1469,6 +1470,9 @@ func (pt *peerTaskConductor) done() {
schedulerclient.NewEndOfPiece(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("end piece result sent: %v, peer task finished", err)

err = pt.peerPacketStream.CloseSend()
pt.Debugf("close stream result: %v", err)

err = pt.schedulerClient.ReportPeerResult(
peerResultCtx,
&scheduler.PeerResult{
Expand Down Expand Up @@ -1521,6 +1525,9 @@ func (pt *peerTaskConductor) fail() {
schedulerclient.NewEndOfPiece(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("end piece result sent: %v, peer task finished", err)

err = pt.peerPacketStream.CloseSend()
pt.Debugf("close stream result: %v", err)

ctx := trace.ContextWithSpan(context.Background(), trace.SpanFromContext(pt.ctx))
peerResultCtx, peerResultSpan := tracer.Start(ctx, config.SpanReportPeerResult)
defer peerResultSpan.End()
Expand Down
12 changes: 8 additions & 4 deletions client/daemon/peer/peertask_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)

// when scheduler is not available, use dummySchedulerClient to back source
Expand All @@ -36,7 +35,7 @@ func (d *dummySchedulerClient) RegisterPeerTask(ctx context.Context, request *sc
panic("should not call this function")
}

func (d *dummySchedulerClient) ReportPieceResult(ctx context.Context, s string, request *scheduler.PeerTaskRequest, option ...grpc.CallOption) (schedulerclient.PeerPacketStream, error) {
func (d *dummySchedulerClient) ReportPieceResult(ctx context.Context, request *scheduler.PeerTaskRequest, option ...grpc.CallOption) (scheduler.Scheduler_ReportPieceResultClient, error) {
return &dummyPeerPacketStream{}, nil
}

Expand Down Expand Up @@ -68,13 +67,18 @@ func (d *dummySchedulerClient) GetState() []dfnet.NetAddr {
}

type dummyPeerPacketStream struct {
grpc.ClientStream
}

func (d *dummyPeerPacketStream) Recv() (pp *scheduler.PeerPacket, err error) {
func (d *dummyPeerPacketStream) Recv() (*scheduler.PeerPacket, error) {
// TODO set base.Code_SchedNeedBackSource in *scheduler.PeerPacket instead of error
return nil, dferrors.New(base.Code_SchedNeedBackSource, "")
}

func (d *dummyPeerPacketStream) Send(pr *scheduler.PieceResult) (err error) {
func (d *dummyPeerPacketStream) Send(pr *scheduler.PieceResult) error {
return nil
}

func (d *dummyPeerPacketStream) CloseSend() error {
return nil
}
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func init() {

type peerTaskManager struct {
host *scheduler.PeerHost
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client
schedulerOption config.SchedulerOption
pieceManager PieceManager
storageManager storage.Manager
Expand All @@ -140,7 +140,7 @@ func NewPeerTaskManager(
host *scheduler.PeerHost,
pieceManager PieceManager,
storageManager storage.Manager,
schedulerClient schedulerclient.SchedulerClient,
schedulerClient schedulerclient.Client,
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool,
Expand Down
19 changes: 11 additions & 8 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ import (
daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler_client "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
sourceMock "d7y.io/dragonfly/v2/pkg/source/mock"
Expand Down Expand Up @@ -85,7 +86,7 @@ type componentsOption struct {

//go:generate mockgen -package mock_server_grpc -source ../../../pkg/rpc/dfdaemon/dfdaemon.pb.go -destination ../test/mock/daemongrpc/daemon_server_grpc.go
func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) (
schedulerclient.SchedulerClient, storage.Manager) {
schedulerclient.Client, storage.Manager) {
port := int32(freeport.GetPort())
// 1. set up a mock daemon server for uploading pieces info
var daemon = mock_daemon.NewMockDaemonServer(ctrl)
Expand Down Expand Up @@ -177,7 +178,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
time.Sleep(100 * time.Millisecond)

// 2. setup a scheduler
pps := mock_scheduler.NewMockPeerPacketStream(ctrl)
pps := mock_scheduler.NewMockScheduler_ReportPieceResultClient(ctrl)
pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
func(pr *scheduler.PieceResult) error {
return nil
Expand Down Expand Up @@ -212,7 +213,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
StealPeers: nil,
}, nil
})
sched := mock_scheduler.NewMockSchedulerClient(ctrl)
pps.EXPECT().CloseSend().AnyTimes()

sched := mock_scheduler_client.NewMockClient(ctrl)
sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) {
switch opt.scope {
Expand Down Expand Up @@ -250,9 +253,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
DirectPiece: nil,
}, nil
})
sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (
schedulerclient.PeerPacketStream, error) {
sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (
scheduler.Scheduler_ReportPieceResultClient, error) {
return pps, nil
})
sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
Expand All @@ -274,7 +277,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
type mockManager struct {
testSpec *testSpec
peerTaskManager *peerTaskManager
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client
storageManager storage.Manager
}

Expand Down
12 changes: 7 additions & 5 deletions client/daemon/peer/peertask_stream_backsource_partial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ import (
daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler_client "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
sourceMock "d7y.io/dragonfly/v2/pkg/source/mock"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
)

func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, opt componentsOption) (
schedulerclient.SchedulerClient, storage.Manager) {
schedulerclient.Client, storage.Manager) {
port := int32(freeport.GetPort())
// 1. set up a mock daemon server for uploading pieces info
var daemon = mock_daemon.NewMockDaemonServer(ctrl)
Expand Down Expand Up @@ -111,7 +112,7 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
time.Sleep(100 * time.Millisecond)

// 2. setup a scheduler
pps := mock_scheduler.NewMockPeerPacketStream(ctrl)
pps := mock_scheduler.NewMockScheduler_ReportPieceResultClient(ctrl)
var (
wg = sync.WaitGroup{}
backSourceSent = atomic.Bool{}
Expand Down Expand Up @@ -159,7 +160,8 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
StealPeers: nil,
}, nil
})
sched := mock_scheduler.NewMockSchedulerClient(ctrl)
pps.EXPECT().CloseSend().AnyTimes()
sched := mock_scheduler_client.NewMockClient(ctrl)
sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) {
return &scheduler.RegisterResult{
Expand All @@ -169,7 +171,7 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
}, nil
})
sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (schedulerclient.PeerPacketStream, error) {
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (scheduler.Scheduler_ReportPieceResultClient, error) {
return pps, nil
})
sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) e
if !attributeSent && len(p.PieceInfos) > 0 {
exa, e := s.storageManager.GetExtendAttribute(ctx,
&storage.PeerTaskMetadata{
PeerID: request.TaskId,
TaskID: request.DstPid,
PeerID: request.DstPid,
TaskID: request.TaskId,
})
if e != nil {
log.Errorf("get extend attribute error: %s", e.Error())
Expand Down

0 comments on commit 7610d47

Please sign in to comment.