Skip to content

Commit

Permalink
feat: scheduler supports to disable redis (#2389)
Browse files Browse the repository at this point in the history
feat: scheduler support to disable redis

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed May 25, 2023
1 parent e4deba0 commit f65513a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 37 deletions.
6 changes: 6 additions & 0 deletions pkg/redis/redis.go
Expand Up @@ -51,6 +51,7 @@ const (
ProbedCountNamespace = "probed-count"
)

// NewRedis returns a new redis client.
func NewRedis(cfg *redis.UniversalOptions) (redis.UniversalClient, error) {
redis.SetLogger(&redisLogger{})
client := redis.NewUniversalClient(&redis.UniversalOptions{
Expand All @@ -68,6 +69,11 @@ func NewRedis(cfg *redis.UniversalOptions) (redis.UniversalClient, error) {
return client, nil
}

// IsEnabled check redis is enabled.
func IsEnabled(addrs []string) bool {
return len(addrs) != 0
}

// MakeNamespaceKeyInManager make namespace key in manager.
func MakeNamespaceKeyInManager(namespace string) string {
return fmt.Sprintf("%s:%s", types.ManagerName, namespace)
Expand Down
4 changes: 0 additions & 4 deletions scheduler/config/config.go
Expand Up @@ -445,10 +445,6 @@ func (cfg *Config) Validate() error {
return errors.New("server requires parameter host")
}

if len(cfg.Database.Redis.Addrs) == 0 {
return errors.New("redis requires parameter addrs")
}

if cfg.Database.Redis.BrokerDB < 0 {
return errors.New("redis requires parameter brokerDB")
}
Expand Down
13 changes: 0 additions & 13 deletions scheduler/config/config_test.go
Expand Up @@ -259,19 +259,6 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "server requires parameter host")
},
},
{
name: "redis requires parameter addrs",
config: New(),
mock: func(cfg *Config) {
cfg.Manager = mockManagerConfig
cfg.Database.Redis = mockRedisConfig
cfg.Database.Redis.Addrs = []string{}
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "redis requires parameter addrs")
},
},
{
name: "redis requires parameter brokerDB",
config: New(),
Expand Down
43 changes: 23 additions & 20 deletions scheduler/scheduler.go
Expand Up @@ -107,18 +107,6 @@ type Server struct {
func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
s := &Server{config: cfg}

// Initialize redis client.
rdb, err := pkgredis.NewRedis(&redis.UniversalOptions{
Addrs: cfg.Database.Redis.Addrs,
MasterName: cfg.Database.Redis.MasterName,
DB: cfg.Database.Redis.NetworkTopologyDB,
Username: cfg.Database.Redis.Username,
Password: cfg.Database.Redis.Password,
})
if err != nil {
return nil, err
}

// Initialize Storage.
storage, err := storage.New(
d.DataDir(),
Expand Down Expand Up @@ -263,27 +251,42 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
s.grpcServer = svr

// Initialize job service.
if cfg.Job.Enable {
s.job, err = job.New(cfg, resource)
// Initialize redis client.
var rdb redis.UniversalClient
if pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
rdb, err = pkgredis.NewRedis(&redis.UniversalOptions{
Addrs: cfg.Database.Redis.Addrs,
MasterName: cfg.Database.Redis.MasterName,
DB: cfg.Database.Redis.NetworkTopologyDB,
Username: cfg.Database.Redis.Username,
Password: cfg.Database.Redis.Password,
})
if err != nil {
return nil, err
}
}

// Initialize metrics.
if cfg.Metrics.Enable {
s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer)
// Initialize job service.
if cfg.Job.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
s.job, err = job.New(cfg, resource)
if err != nil {
return nil, err
}
}

// Initialize network topology service.
if cfg.NetworkTopology.Enable {
if cfg.NetworkTopology.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage)
if err != nil {
return nil, err
}
}

// Initialize metrics.
if cfg.Metrics.Enable {
s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer)
}

return s, nil
}

Expand All @@ -301,7 +304,7 @@ func (s *Server) Serve() error {
logger.Info("gc start successfully")

// Serve Job.
if s.config.Job.Enable {
if s.config.Job.Enable && pkgredis.IsEnabled(s.config.Database.Redis.Addrs) {
s.job.Serve()
logger.Info("job start successfully")
}
Expand Down

0 comments on commit f65513a

Please sign in to comment.