From f65513a0099f820532cb0159c366cd9a3fd9036a Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 25 May 2023 21:15:31 +0800 Subject: [PATCH] feat: scheduler supports to disable redis (#2389) feat: scheduler support to disable redis Signed-off-by: Gaius --- pkg/redis/redis.go | 6 +++++ scheduler/config/config.go | 4 --- scheduler/config/config_test.go | 13 ---------- scheduler/scheduler.go | 43 ++++++++++++++++++--------------- 4 files changed, 29 insertions(+), 37 deletions(-) diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index 2aeae7ba207..b770673ed42 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -51,6 +51,7 @@ const ( ProbedCountNamespace = "probed-count" ) +// NewRedis returns a new redis client. func NewRedis(cfg *redis.UniversalOptions) (redis.UniversalClient, error) { redis.SetLogger(&redisLogger{}) client := redis.NewUniversalClient(&redis.UniversalOptions{ @@ -68,6 +69,11 @@ func NewRedis(cfg *redis.UniversalOptions) (redis.UniversalClient, error) { return client, nil } +// IsEnabled check redis is enabled. +func IsEnabled(addrs []string) bool { + return len(addrs) != 0 +} + // MakeNamespaceKeyInManager make namespace key in manager. func MakeNamespaceKeyInManager(namespace string) string { return fmt.Sprintf("%s:%s", types.ManagerName, namespace) diff --git a/scheduler/config/config.go b/scheduler/config/config.go index fe15710b32a..8aea8c6ed5f 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -445,10 +445,6 @@ func (cfg *Config) Validate() error { return errors.New("server requires parameter host") } - if len(cfg.Database.Redis.Addrs) == 0 { - return errors.New("redis requires parameter addrs") - } - if cfg.Database.Redis.BrokerDB < 0 { return errors.New("redis requires parameter brokerDB") } diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index ecfe93fb74d..92217a88c61 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -259,19 +259,6 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "server requires parameter host") }, }, - { - name: "redis requires parameter addrs", - config: New(), - mock: func(cfg *Config) { - cfg.Manager = mockManagerConfig - cfg.Database.Redis = mockRedisConfig - cfg.Database.Redis.Addrs = []string{} - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "redis requires parameter addrs") - }, - }, { name: "redis requires parameter brokerDB", config: New(), diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 0a5fb671ae1..0ec9b42500c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -107,18 +107,6 @@ type Server struct { func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) { s := &Server{config: cfg} - // Initialize redis client. - rdb, err := pkgredis.NewRedis(&redis.UniversalOptions{ - Addrs: cfg.Database.Redis.Addrs, - MasterName: cfg.Database.Redis.MasterName, - DB: cfg.Database.Redis.NetworkTopologyDB, - Username: cfg.Database.Redis.Username, - Password: cfg.Database.Redis.Password, - }) - if err != nil { - return nil, err - } - // Initialize Storage. storage, err := storage.New( d.DataDir(), @@ -263,27 +251,42 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...) s.grpcServer = svr - // Initialize job service. - if cfg.Job.Enable { - s.job, err = job.New(cfg, resource) + // Initialize redis client. + var rdb redis.UniversalClient + if pkgredis.IsEnabled(cfg.Database.Redis.Addrs) { + rdb, err = pkgredis.NewRedis(&redis.UniversalOptions{ + Addrs: cfg.Database.Redis.Addrs, + MasterName: cfg.Database.Redis.MasterName, + DB: cfg.Database.Redis.NetworkTopologyDB, + Username: cfg.Database.Redis.Username, + Password: cfg.Database.Redis.Password, + }) if err != nil { return nil, err } } - // Initialize metrics. - if cfg.Metrics.Enable { - s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer) + // Initialize job service. + if cfg.Job.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) { + s.job, err = job.New(cfg, resource) + if err != nil { + return nil, err + } } // Initialize network topology service. - if cfg.NetworkTopology.Enable { + if cfg.NetworkTopology.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) { s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage) if err != nil { return nil, err } } + // Initialize metrics. + if cfg.Metrics.Enable { + s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer) + } + return s, nil } @@ -301,7 +304,7 @@ func (s *Server) Serve() error { logger.Info("gc start successfully") // Serve Job. - if s.config.Job.Enable { + if s.config.Job.Enable && pkgredis.IsEnabled(s.config.Database.Redis.Addrs) { s.job.Serve() logger.Info("job start successfully") }