From 052f9b2b5517e37c8bdcba0d502b57f1cdae6137 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 8 Aug 2023 11:26:00 +0800 Subject: [PATCH] chore: improve redis queue and add example (#102) --- examples/queue/redis/app.yaml | 22 ++++++++++ examples/queue/redis/client.go | 50 +++++++++++++++++++++ examples/queue/redis/handler.go | 56 +++++++++++++++++++++++ examples/queue/redis/logger.yaml | 14 ++++++ examples/queue/redis/producer.go | 17 +++++++ examples/queue/redis/redis.yaml | 11 +++++ examples/queue/redis/server.go | 61 ++++++++++++++++++++++++++ pkg/transport/consumer/redis/server.go | 36 ++++++--------- 8 files changed, 244 insertions(+), 23 deletions(-) create mode 100644 examples/queue/redis/app.yaml create mode 100644 examples/queue/redis/client.go create mode 100644 examples/queue/redis/handler.go create mode 100644 examples/queue/redis/logger.yaml create mode 100644 examples/queue/redis/producer.go create mode 100644 examples/queue/redis/redis.yaml create mode 100644 examples/queue/redis/server.go diff --git a/examples/queue/redis/app.yaml b/examples/queue/redis/app.yaml new file mode 100644 index 0000000000..28e0df54ed --- /dev/null +++ b/examples/queue/redis/app.yaml @@ -0,0 +1,22 @@ +Name: eagle +Version: 1.0.0 +PprofPort: :5555 +Mode: debug # debug, release, test +JwtSecret: JWT_SECRET +JwtTimeout: 86400 +CookieName: jwt-token +SSL: true +CtxDefaultTimeout: 12 +CSRF: true +Debug: false +EnableTrace: false +EnablePprof: true + +HTTP: + Addr: :8080 + ReadTimeout: 3s + WriteTimeout: 3s +GRPC: + Addr: :9090 + ReadTimeout: 5s + WriteTimeout: 5s \ No newline at end of file diff --git a/examples/queue/redis/client.go b/examples/queue/redis/client.go new file mode 100644 index 0000000000..7ed35e28e8 --- /dev/null +++ b/examples/queue/redis/client.go @@ -0,0 +1,50 @@ +package main + +import ( + "sync" + "time" + + "github.com/go-eagle/eagle/pkg/config" + "github.com/hibiken/asynq" +) + +var ( + client *asynq.Client + once sync.Once +) + +type Config struct { + Queue struct { + Addr string + Password string + DB int + MinIdleConn int + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + PoolSize int + PoolTimeout time.Duration + Concurrency int //并发数 + } `json:"redis"` +} + +func GetClient() *asynq.Client { + once.Do(func() { + //c := config.New("config", config.WithEnv("local")) + c := config.New(".") + var cfg Config + if err := c.Load("redis", &cfg); err != nil { + panic(err) + } + client = asynq.NewClient(asynq.RedisClientOpt{ + Addr: cfg.Queue.Addr, + Password: cfg.Queue.Password, + DB: cfg.Queue.DB, + DialTimeout: cfg.Queue.DialTimeout, + ReadTimeout: cfg.Queue.ReadTimeout, + WriteTimeout: cfg.Queue.WriteTimeout, + PoolSize: cfg.Queue.PoolSize, + }) + }) + return client +} diff --git a/examples/queue/redis/handler.go b/examples/queue/redis/handler.go new file mode 100644 index 0000000000..e78cfc49d6 --- /dev/null +++ b/examples/queue/redis/handler.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + + "github.com/pkg/errors" + + "github.com/hibiken/asynq" +) + +const ( + TypeEmailWelcome = "email:welcome" +) + +type EmailWelcomePayload struct { + UserID int64 +} + +//---------------------------------------------- +// Write a function NewXXXTask to create a task. +// A task consists of a type and a payload. +//---------------------------------------------- + +func NewEmailWelcomeTask(data EmailWelcomePayload) error { + payload, err := json.Marshal(data) + if err != nil { + return errors.Wrapf(err, "json marshal error, name: %s", TypeEmailWelcome) + } + task := asynq.NewTask(TypeEmailWelcome, payload) + _, err = GetClient().Enqueue(task) + if err != nil { + return errors.Wrapf(err, "Enqueue task error, name: %s", TypeEmailWelcome) + } + return nil +} + +//--------------------------------------------------------------- +// Write a function HandleXXXTask to handle the input task. +// Note that it satisfies the asynq.HandlerFunc interface. +// +// Handler doesn't need to be a function. You can define a type +// that satisfies asynq.Handler interface. See examples below. +//--------------------------------------------------------------- + +func HandleEmailWelcomeTask(ctx context.Context, t *asynq.Task) error { + var p EmailWelcomePayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + log.Printf("Sending Email to User: user_id=%d", p.UserID) + // Email delivery code ... + return nil +} diff --git a/examples/queue/redis/logger.yaml b/examples/queue/redis/logger.yaml new file mode 100644 index 0000000000..41d37ced3c --- /dev/null +++ b/examples/queue/redis/logger.yaml @@ -0,0 +1,14 @@ +Development: false +DisableCaller: false +DisableStacktrace: false +Encoding: console # json or console +Level: info # 日志级别,INFO, WARN, ERROR +Name: eagle +Writers: console # 有2个可选项:file,console 选择file会将日志记录到logger_file指定的日志文件中,选择console会将日志输出到标准输出,当然也可以两者同时选择 +LoggerFile: /tmp/log/eagle.log +LoggerWarnFile: /tmp/log/eagle.wf.log +LoggerErrorFile: /tmp/log/eagle.err.log +LogRollingPolicy: daily +LogRotateDate: 1 +LogRotateSize: 1 +LogBackupCount: 7 \ No newline at end of file diff --git a/examples/queue/redis/producer.go b/examples/queue/redis/producer.go new file mode 100644 index 0000000000..29fc29a420 --- /dev/null +++ b/examples/queue/redis/producer.go @@ -0,0 +1,17 @@ +package main + +import "time" + +// cd examples/queue/redis/consumer/ +// go run producer.go handler.go client.go +func main() { + for i := 0; i < 10; i++ { + err := NewEmailWelcomeTask(EmailWelcomePayload{ + UserID: time.Now().Unix(), + }) + if err != nil { + panic(err) + } + time.Sleep(time.Second) + } +} diff --git a/examples/queue/redis/redis.yaml b/examples/queue/redis/redis.yaml new file mode 100644 index 0000000000..1fdc09d1d8 --- /dev/null +++ b/examples/queue/redis/redis.yaml @@ -0,0 +1,11 @@ +Queue: + Addr: 127.0.0.1:6379 + Password: "" + DB: 0 + MinIdleConn: 200 + DialTimeout: 60s + ReadTimeout: 500ms + WriteTimeout: 500ms + PoolSize: 100 + PoolTimeout: 240s + EnableTrace: true diff --git a/examples/queue/redis/server.go b/examples/queue/redis/server.go new file mode 100644 index 0000000000..7226a7d450 --- /dev/null +++ b/examples/queue/redis/server.go @@ -0,0 +1,61 @@ +package main + +import ( + eagle "github.com/go-eagle/eagle/pkg/app" + "github.com/go-eagle/eagle/pkg/config" + logger "github.com/go-eagle/eagle/pkg/log" + redisMQ "github.com/go-eagle/eagle/pkg/transport/consumer/redis" + "github.com/hibiken/asynq" + "github.com/spf13/pflag" +) + +// redis queue consumer +// cd examples/queue/redis/consumer/ +// go run server.go handler.go client.go +func main() { + pflag.Parse() + + // init config + c := config.New(".") + var cfg eagle.Config + if err := c.Load("app", &cfg); err != nil { + panic(err) + } + // set global + eagle.Conf = &cfg + + logger.Init() + + srv := redisMQ.NewServer( + asynq.RedisClientOpt{Addr: "localhost:6379"}, + asynq.Config{ + // Specify how many concurrent workers to use + Concurrency: 10, + // Optionally specify multiple queues with different priority. + Queues: map[string]int{ + redisMQ.QueueCritical: 6, + redisMQ.QueueDefault: 3, + redisMQ.QueueLow: 1, + }, + // See the godoc for other configuration options + }, + ) + + // register handler + srv.RegisterHandler(TypeEmailWelcome, HandleEmailWelcomeTask) + // here register other handlers... + + // start app + app := eagle.New( + eagle.WithName(cfg.Name), + eagle.WithVersion(cfg.Version), + eagle.WithLogger(logger.GetLogger()), + eagle.WithServer( + srv, + ), + ) + + if err := app.Run(); err != nil { + panic(err) + } +} diff --git a/pkg/transport/consumer/redis/server.go b/pkg/transport/consumer/redis/server.go index 50e25ccbdd..d13b22e53c 100644 --- a/pkg/transport/consumer/redis/server.go +++ b/pkg/transport/consumer/redis/server.go @@ -2,45 +2,40 @@ package redis import ( "context" - "time" "github.com/pkg/errors" "github.com/hibiken/asynq" ) +const ( + // QueueCritical queue priority + QueueCritical = "critical" + QueueDefault = "default" + QueueLow = "low" +) + +// Server async server type Server struct { clientOpt asynq.RedisClientOpt // async server srv *asynq.Server mux *asynq.ServeMux - - // async schedule - sche *asynq.Scheduler } +// NewServer new async server func NewServer(redisOpt asynq.RedisClientOpt, asyncCfg asynq.Config) *Server { srv := &Server{ srv: asynq.NewServer(redisOpt, asyncCfg), mux: asynq.NewServeMux(), - sche: asynq.NewScheduler( - redisOpt, - &asynq.SchedulerOpts{Location: time.Local}, - ), } return srv } +// Start async server func (s *Server) Start(ctx context.Context) error { - go func() { - err := s.sche.Run() - if err != nil { - panic(err) - } - }() - err := s.srv.Run(s.mux) if err != nil { return errors.Wrapf(err, "failed to run async server: %v") @@ -49,18 +44,13 @@ func (s *Server) Start(ctx context.Context) error { return nil } +// Stop async server func (s *Server) Stop(ctx context.Context) error { s.srv.Shutdown() - s.sche.Shutdown() return nil } -// RegisterTask register task -func (s *Server) RegisterTask(schedule string, task *asynq.Task) (entryID string, err error) { - return s.sche.Register(schedule, task) -} - -// RegisterHandle register handler -func (s *Server) RegisterHandle(pattern string, handler func(context.Context, *asynq.Task) error) { +// RegisterHandler register handler +func (s *Server) RegisterHandler(pattern string, handler func(context.Context, *asynq.Task) error) { s.mux.HandleFunc(pattern, handler) }