Skip to content

Commit

Permalink
feat: optimize scheduler start server (#558)
Browse files Browse the repository at this point in the history
* feat: optimize scheduler start server

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Aug 19, 2021
1 parent 8380b21 commit f530cb8
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 179 deletions.
4 changes: 2 additions & 2 deletions cdnsystem/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ type KeepAliveConfig struct {
}

type HostConfig struct {
// Peerhost location for scheduler
// Location for scheduler
Location string `mapstructure:"location" yaml:"location"`

// Peerhost idc for scheduler
// IDC for scheduler
IDC string `mapstructure:"idc" yaml:"idc"`
}
4 changes: 2 additions & 2 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ type SchedulerOption struct {
type HostOption struct {
// SecurityDomain is the security domain
SecurityDomain string `mapstructure:"securityDomain" yaml:"securityDomain"`
// Peerhost location for scheduler
// Location for scheduler
Location string `mapstructure:"location" yaml:"location"`
// Peerhost idc for scheduler
// IDC for scheduler
IDC string `mapstructure:"idc" yaml:"idc"`
// Peerhost net topology for scheduler
NetTopology string `mapstructure:"netTopology" yaml:"netTopology"`
Expand Down
1 change: 1 addition & 0 deletions cmd/manager/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,6 @@ func runManager() error {
if err != nil {
return err
}
dependency.SetupQuitSignalHandler(func() { svr.Stop() })
return svr.Serve()
}
2 changes: 1 addition & 1 deletion cmd/scheduler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ func runScheduler() error {
logger.Errorf("get scheduler server error: %s", err)
return err
}

dependency.SetupQuitSignalHandler(func() { svr.Stop() })
return svr.Serve()
}
59 changes: 25 additions & 34 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"d7y.io/dragonfly/v2/manager/service"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -102,55 +101,47 @@ func New(cfg *config.Config) (*Server, error) {
}

func (s *Server) Serve() error {
g := errgroup.Group{}

// GRPC listener
lis, _, err := rpc.ListenWithPortRange(s.config.Server.GRPC.Listen, s.config.Server.GRPC.PortRange.Start, s.config.Server.GRPC.PortRange.End)
if err != nil {
logger.Errorf("failed to net listen: %+v", err)
return err
logger.Fatalf("failed to net listen: %+v", err)
}
defer lis.Close()

// Serve GRPC
g.Go(func() error {
defer lis.Close()
grpcServer := grpc.NewServer()
manager.RegisterManagerServer(grpcServer, s.service)
logger.Infof("serve grpc at %s://%s", lis.Addr().Network(), lis.Addr().String())
if err := grpcServer.Serve(lis); err != nil {
logger.Errorf("failed to start manager grpc server: %+v", err)
// Serve Proxy
go func() {
logger.Info("serve proxy")
if err := s.proxyServer.Serve(); err != nil {
logger.Fatalf("failed to start manager proxy server: %+v", err)
}
return nil
})
}()

// Serve REST
g.Go(func() error {
go func() {
logger.Infof("serve rest at %s", s.restServer.Addr)
if err := s.restServer.ListenAndServe(); err != nil {
logger.Errorf("failed to start manager rest server: %+v", err)
return err
logger.Fatalf("failed to start manager rest server: %+v", err)
}
return nil
})
}()

// Serve Proxy
g.Go(func() error {
if err := s.proxyServer.Serve(); err != nil {
logger.Errorf("failed to start manager proxy server: %+v", err)
return err
}
return nil
})
// Serve GRPC
grpcServer := grpc.NewServer()
manager.RegisterManagerServer(grpcServer, s.service)
logger.Infof("serve grpc at %s://%s", lis.Addr().Network(), lis.Addr().String())
if err := grpcServer.Serve(lis); err != nil {
logger.Errorf("failed to start manager grpc server: %+v", err)
return err
}

return g.Wait()
return nil
}

func (s *Server) Stop() {
// Stop Proxy
s.proxyServer.Stop()

// Stop REST
err := s.restServer.Shutdown(context.TODO())
if err != nil {
if err := s.restServer.Shutdown(context.TODO()); err != nil {
logger.Errorf("failed to stop manager rest server: %+v", err)
}

// Stop Proxy
s.proxyServer.Stop()
}
2 changes: 0 additions & 2 deletions manager/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func New(cfg *config.RedisConfig) Proxy {
}

func (p *proxy) Serve() error {
logger.Infof("proxy start to listen port %s", p.from)
listener, err := net.Listen("tcp", p.from)
if err != nil {
return err
Expand All @@ -68,7 +67,6 @@ func (p *proxy) Serve() error {
}

func (p *proxy) Stop() {
logger.Infof("proxy stop to listen port %s", p.from)
if p.done == nil {
return
}
Expand Down
176 changes: 74 additions & 102 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,78 @@ type Config struct {

func New() *Config {
return &Config{
Scheduler: NewDefaultSchedulerConfig(),
Server: NewDefaultServerConfig(),
DynConfig: NewDefaultDynConfig(),
Manager: NewDefaultManagerConfig(),
Host: NewHostConfig(),
Job: NewDefaultJobConfig(),
}
}

func NewHostConfig() *HostConfig {
return &HostConfig{
Location: "",
IDC: "",
Scheduler: &SchedulerConfig{
DisableCDN: false,
ABTest: false,
AScheduler: "",
BScheduler: "",
WorkerNum: runtime.GOMAXPROCS(0),
BackSourceCount: 3,
AccessWindow: 3 * time.Minute,
CandidateParentCount: 10,
Scheduler: "basic",
CDNLoad: 100,
ClientLoad: 10,
OpenMonitor: true,
GC: &GCConfig{
PeerGCInterval: 5 * time.Minute,
TaskGCInterval: 5 * time.Minute,
PeerTTL: 10 * time.Minute,
PeerTTI: 3 * time.Minute,
TaskTTL: 10 * time.Minute,
TaskTTI: 3 * time.Minute,
},
},
Server: &ServerConfig{
IP: iputils.HostIP,
Host: iputils.HostName,
Port: 8002,
},
DynConfig: &DynConfig{
Type: dc.LocalSourceType,
ExpireTime: 30 * time.Second,
CDNDirPath: "",
Data: &DynconfigData{
CDNs: []*CDN{
{
HostName: "localhost",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
SecurityGroup: "",
Location: "",
IDC: "",
NetTopology: "",
},
},
},
},
Manager: &ManagerConfig{
Addr: "",
SchedulerClusterID: 0,
KeepAlive: KeepAliveConfig{
Interval: 5 * time.Second,
RetryMaxAttempts: 100000000,
RetryInitBackOff: 5,
RetryMaxBackOff: 10,
},
},
Host: &HostConfig{
Location: "",
IDC: "",
},
Job: &JobConfig{
GlobalWorkerNum: 10,
SchedulerWorkerNum: 10,
LocalWorkerNum: 10,
Redis: &RedisConfig{
Host: "",
Port: 6379,
Password: "",
BrokerDB: 1,
BackendDB: 2,
},
},
}
}

Expand Down Expand Up @@ -79,93 +138,6 @@ func (c *Config) Validate() error {
return nil
}

func NewDefaultDynConfig() *DynConfig {
return &DynConfig{
Type: dc.LocalSourceType,
ExpireTime: 30 * time.Second,
CDNDirPath: "",
Data: &DynconfigData{
CDNs: []*CDN{
{
HostName: "localhost",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
SecurityGroup: "",
Location: "",
IDC: "",
NetTopology: "",
},
},
},
}
}

func NewDefaultServerConfig() *ServerConfig {
return &ServerConfig{
IP: iputils.HostIP,
Host: iputils.HostName,
Port: 8002,
}
}

func NewDefaultSchedulerConfig() *SchedulerConfig {
return &SchedulerConfig{
DisableCDN: false,
ABTest: false,
AScheduler: "",
BScheduler: "",
WorkerNum: runtime.GOMAXPROCS(0),
BackSourceCount: 3,
AccessWindow: 3 * time.Minute,
CandidateParentCount: 10,
Scheduler: "basic",
CDNLoad: 100,
ClientLoad: 10,
OpenMonitor: true,
GC: NewDefaultGCConfig(),
}
}

func NewDefaultGCConfig() *GCConfig {
return &GCConfig{
PeerGCInterval: 5 * time.Minute,
TaskGCInterval: 5 * time.Minute,
PeerTTL: 10 * time.Minute,
PeerTTI: 3 * time.Minute,
TaskTTL: 10 * time.Minute,
TaskTTI: 3 * time.Minute,
}
}

func NewDefaultManagerConfig() *ManagerConfig {
return &ManagerConfig{
Addr: "",
SchedulerClusterID: 0,
KeepAlive: KeepAliveConfig{
Interval: 5 * time.Second,
RetryMaxAttempts: 100000000,
RetryInitBackOff: 5,
RetryMaxBackOff: 10,
},
}
}

func NewDefaultJobConfig() *JobConfig {
return &JobConfig{
GlobalWorkerNum: 10,
SchedulerWorkerNum: 10,
LocalWorkerNum: 10,
Redis: &RedisConfig{
Host: "",
Port: 6379,
Password: "",
BrokerDB: 1,
BackendDB: 2,
},
}
}

func (c *Config) Convert() error {
if c.Manager.Addr != "" && c.Job.Redis.Host == "" {
host, _, err := net.SplitHostPort(c.Manager.Addr)
Expand Down Expand Up @@ -249,10 +221,10 @@ type GCConfig struct {
}

type HostConfig struct {
// Peerhost location for scheduler
// Location for scheduler
Location string `mapstructure:"location" yaml:"location"`

// Peerhost idc for scheduler
// IDC for scheduler
IDC string `mapstructure:"idc" yaml:"idc"`
}

Expand Down

0 comments on commit f530cb8

Please sign in to comment.