Skip to content

Commit

Permalink
refactor: announcer and dynconfig with v2 verison of the manager grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Feb 6, 2023
1 parent fe7b7fd commit 1e7c9ea
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 222 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.0.3
image-tag: v2.0.9-beta.0
chart-name: manager
- module: scheduler
image: scheduler
image: scheduler
image-tag: v2.0.3
chart-name: scheduler
- module: dfdaemon
Expand Down
14 changes: 7 additions & 7 deletions scheduler/announcer/announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package announcer
import (
"context"

managerv1 "d7y.io/api/pkg/apis/manager/v1"
managerv2 "d7y.io/api/pkg/apis/manager/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
Expand All @@ -40,24 +40,24 @@ type Announcer interface {
// announcer provides announce function.
type announcer struct {
config *config.Config
managerClient managerclient.V1
managerClient managerclient.V2
done chan struct{}
}

// Option is a functional option for configuring the announcer.
type Option func(s *announcer)

// New returns a new Announcer interface.
func New(cfg *config.Config, managerClient managerclient.V1) (Announcer, error) {
func New(cfg *config.Config, managerClient managerclient.V2) (Announcer, error) {
a := &announcer{
config: cfg,
managerClient: managerClient,
done: make(chan struct{}),
}

// Register to manager.
if _, err := a.managerClient.UpdateScheduler(context.Background(), &managerv1.UpdateSchedulerRequest{
SourceType: managerv1.SourceType_SCHEDULER_SOURCE,
if _, err := a.managerClient.UpdateScheduler(context.Background(), &managerv2.UpdateSchedulerRequest{
SourceType: managerv2.SourceType_SCHEDULER_SOURCE,
HostName: a.config.Server.Host,
Ip: a.config.Server.AdvertiseIP.String(),
Port: int32(a.config.Server.Port),
Expand Down Expand Up @@ -90,8 +90,8 @@ func (a *announcer) Stop() error {
// announceSeedPeer announces peer information to manager.
func (a *announcer) announceToManager() error {
// Start keepalive to manager.
a.managerClient.KeepAlive(a.config.Manager.KeepAlive.Interval, &managerv1.KeepAliveRequest{
SourceType: managerv1.SourceType_SCHEDULER_SOURCE,
a.managerClient.KeepAlive(a.config.Manager.KeepAlive.Interval, &managerv2.KeepAliveRequest{
SourceType: managerv2.SourceType_SCHEDULER_SOURCE,
HostName: a.config.Server.Host,
Ip: a.config.Server.AdvertiseIP.String(),
ClusterId: uint64(a.config.Manager.SchedulerClusterID),
Expand Down
8 changes: 4 additions & 4 deletions scheduler/announcer/announcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestAnnouncer_New(t *testing.T) {
tests := []struct {
name string
config *config.Config
mock func(m *mocks.MockV1MockRecorder)
mock func(m *mocks.MockV2MockRecorder)
expect func(t *testing.T, announcer Announcer, err error)
}{
{
Expand All @@ -51,7 +51,7 @@ func TestAnnouncer_New(t *testing.T) {
SchedulerClusterID: 1,
},
},
mock: func(m *mocks.MockV1MockRecorder) {
mock: func(m *mocks.MockV2MockRecorder) {
m.UpdateScheduler(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
},
expect: func(t *testing.T, a Announcer, err error) {
Expand All @@ -78,7 +78,7 @@ func TestAnnouncer_New(t *testing.T) {
SchedulerClusterID: 1,
},
},
mock: func(m *mocks.MockV1MockRecorder) {
mock: func(m *mocks.MockV2MockRecorder) {
m.UpdateScheduler(gomock.Any(), gomock.Any()).Return(nil, errors.New("foo")).Times(1)
},
expect: func(t *testing.T, a Announcer, err error) {
Expand All @@ -92,7 +92,7 @@ func TestAnnouncer_New(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockManagerClient := mocks.NewMockV1(ctl)
mockManagerClient := mocks.NewMockV2(ctl)
tc.mock(mockManagerClient.EXPECT())

a, err := New(tc.config, mockManagerClient)
Expand Down
38 changes: 19 additions & 19 deletions scheduler/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"

managerv1 "d7y.io/api/pkg/apis/manager/v1"
managerv2 "d7y.io/api/pkg/apis/manager/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
dc "d7y.io/dragonfly/v2/internal/dynconfig"
Expand All @@ -55,25 +55,25 @@ var (
)

type DynconfigData struct {
Scheduler *managerv1.Scheduler
Applications []*managerv1.Application
Scheduler *managerv2.Scheduler
Applications []*managerv2.Application
}

type DynconfigInterface interface {
// GetResolveSeedPeerAddrs returns the dynamic schedulers resolve addrs.
GetResolveSeedPeerAddrs() ([]resolver.Address, error)

// GetScheduler returns the scheduler config from manager.
GetScheduler() (*managerv1.Scheduler, error)
GetScheduler() (*managerv2.Scheduler, error)

// GetApplications returns the applications config from manager.
GetApplications() ([]*managerv1.Application, error)
GetApplications() ([]*managerv2.Application, error)

// GetSeedPeers returns the dynamic seed peers config from manager.
GetSeedPeers() ([]*managerv1.SeedPeer, error)
GetSeedPeers() ([]*managerv2.SeedPeer, error)

// GetSchedulerCluster returns the the scheduler cluster config from manager.
GetSchedulerCluster() (*managerv1.SchedulerCluster, error)
GetSchedulerCluster() (*managerv2.SchedulerCluster, error)

// GetSchedulerClusterConfig returns the scheduler cluster config.
GetSchedulerClusterConfig() (types.SchedulerClusterConfig, error)
Expand Down Expand Up @@ -129,7 +129,7 @@ func WithTransportCredentials(creds credentials.TransportCredentials) DynconfigO
}

// NewDynconfig returns a new dynconfig instence.
func NewDynconfig(rawManagerClient managerclient.V1, cacheDir string, cfg *Config, options ...DynconfigOption) (DynconfigInterface, error) {
func NewDynconfig(rawManagerClient managerclient.V2, cacheDir string, cfg *Config, options ...DynconfigOption) (DynconfigInterface, error) {
cachePath := filepath.Join(cacheDir, cacheFileName)
d := &dynconfig{
observers: map[Observer]struct{}{},
Expand Down Expand Up @@ -215,7 +215,7 @@ func (d *dynconfig) GetResolveSeedPeerAddrs() ([]resolver.Address, error) {
}

// GetScheduler returns the scheduler config from manager.
func (d *dynconfig) GetScheduler() (*managerv1.Scheduler, error) {
func (d *dynconfig) GetScheduler() (*managerv2.Scheduler, error) {
data, err := d.Get()
if err != nil {
return nil, err
Expand All @@ -229,7 +229,7 @@ func (d *dynconfig) GetScheduler() (*managerv1.Scheduler, error) {
}

// GetApplications returns the applications config from manager.
func (d *dynconfig) GetApplications() ([]*managerv1.Application, error) {
func (d *dynconfig) GetApplications() ([]*managerv2.Application, error) {
data, err := d.Get()
if err != nil {
return nil, err
Expand All @@ -247,7 +247,7 @@ func (d *dynconfig) GetApplications() ([]*managerv1.Application, error) {
}

// GetSeedPeers returns the the seed peers config from manager.
func (d *dynconfig) GetSeedPeers() ([]*managerv1.SeedPeer, error) {
func (d *dynconfig) GetSeedPeers() ([]*managerv2.SeedPeer, error) {
scheduler, err := d.GetScheduler()
if err != nil {
return nil, err
Expand All @@ -265,7 +265,7 @@ func (d *dynconfig) GetSeedPeers() ([]*managerv1.SeedPeer, error) {
}

// GetSchedulerCluster returns the the scheduler cluster config from manager.
func (d *dynconfig) GetSchedulerCluster() (*managerv1.SchedulerCluster, error) {
func (d *dynconfig) GetSchedulerCluster() (*managerv2.SchedulerCluster, error) {
scheduler, err := d.GetScheduler()
if err != nil {
return nil, err
Expand Down Expand Up @@ -376,21 +376,21 @@ func (d *dynconfig) Stop() error {

// Manager client for dynconfig.
type managerClient struct {
managerClient managerclient.V1
managerClient managerclient.V2
config *Config
}

// New the manager client used by dynconfig.
func newManagerClient(client managerclient.V1, cfg *Config) dc.ManagerClient {
func newManagerClient(client managerclient.V2, cfg *Config) dc.ManagerClient {
return &managerClient{
managerClient: client,
config: cfg,
}
}

func (mc *managerClient) Get() (any, error) {
getSchedulerResp, err := mc.managerClient.GetScheduler(context.Background(), &managerv1.GetSchedulerRequest{
SourceType: managerv1.SourceType_SCHEDULER_SOURCE,
getSchedulerResp, err := mc.managerClient.GetScheduler(context.Background(), &managerv2.GetSchedulerRequest{
SourceType: managerv2.SourceType_SCHEDULER_SOURCE,
HostName: mc.config.Server.Host,
Ip: mc.config.Server.AdvertiseIP.String(),
SchedulerClusterId: uint64(mc.config.Manager.SchedulerClusterID),
Expand All @@ -399,8 +399,8 @@ func (mc *managerClient) Get() (any, error) {
return nil, err
}

listApplicationsResp, err := mc.managerClient.ListApplications(context.Background(), &managerv1.ListApplicationsRequest{
SourceType: managerv1.SourceType_SCHEDULER_SOURCE,
listApplicationsResp, err := mc.managerClient.ListApplications(context.Background(), &managerv2.ListApplicationsRequest{
SourceType: managerv2.SourceType_SCHEDULER_SOURCE,
HostName: mc.config.Server.Host,
Ip: mc.config.Server.AdvertiseIP.String(),
})
Expand All @@ -425,7 +425,7 @@ func (mc *managerClient) Get() (any, error) {
}

// GetSeedPeerClusterConfigBySeedPeer returns the seed peer cluster config by seed peer.
func GetSeedPeerClusterConfigBySeedPeer(seedPeer *managerv1.SeedPeer) (types.SeedPeerClusterConfig, error) {
func GetSeedPeerClusterConfigBySeedPeer(seedPeer *managerv2.SeedPeer) (types.SeedPeerClusterConfig, error) {
if seedPeer == nil {
return types.SeedPeerClusterConfig{}, errors.New("invalid seed peer")
}
Expand Down

0 comments on commit 1e7c9ea

Please sign in to comment.