Skip to content

Commit

Permalink
chore: improve redis queue and add example (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
qloog committed Aug 8, 2023
1 parent b4e4b80 commit 052f9b2
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 23 deletions.
22 changes: 22 additions & 0 deletions examples/queue/redis/app.yaml
@@ -0,0 +1,22 @@
Name: eagle
Version: 1.0.0
PprofPort: :5555
Mode: debug # debug, release, test
JwtSecret: JWT_SECRET
JwtTimeout: 86400
CookieName: jwt-token
SSL: true
CtxDefaultTimeout: 12
CSRF: true
Debug: false
EnableTrace: false
EnablePprof: true

HTTP:
Addr: :8080
ReadTimeout: 3s
WriteTimeout: 3s
GRPC:
Addr: :9090
ReadTimeout: 5s
WriteTimeout: 5s
50 changes: 50 additions & 0 deletions examples/queue/redis/client.go
@@ -0,0 +1,50 @@
package main

import (
"sync"
"time"

"github.com/go-eagle/eagle/pkg/config"
"github.com/hibiken/asynq"
)

var (
client *asynq.Client
once sync.Once
)

type Config struct {
Queue struct {
Addr string
Password string
DB int
MinIdleConn int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
PoolTimeout time.Duration
Concurrency int //并发数
} `json:"redis"`
}

func GetClient() *asynq.Client {
once.Do(func() {
//c := config.New("config", config.WithEnv("local"))
c := config.New(".")
var cfg Config
if err := c.Load("redis", &cfg); err != nil {
panic(err)
}
client = asynq.NewClient(asynq.RedisClientOpt{
Addr: cfg.Queue.Addr,
Password: cfg.Queue.Password,
DB: cfg.Queue.DB,
DialTimeout: cfg.Queue.DialTimeout,
ReadTimeout: cfg.Queue.ReadTimeout,
WriteTimeout: cfg.Queue.WriteTimeout,
PoolSize: cfg.Queue.PoolSize,
})
})
return client
}
56 changes: 56 additions & 0 deletions examples/queue/redis/handler.go
@@ -0,0 +1,56 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/pkg/errors"

"github.com/hibiken/asynq"
)

const (
TypeEmailWelcome = "email:welcome"
)

type EmailWelcomePayload struct {
UserID int64
}

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailWelcomeTask(data EmailWelcomePayload) error {
payload, err := json.Marshal(data)
if err != nil {
return errors.Wrapf(err, "json marshal error, name: %s", TypeEmailWelcome)
}
task := asynq.NewTask(TypeEmailWelcome, payload)
_, err = GetClient().Enqueue(task)
if err != nil {
return errors.Wrapf(err, "Enqueue task error, name: %s", TypeEmailWelcome)
}
return nil
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailWelcomeTask(ctx context.Context, t *asynq.Task) error {
var p EmailWelcomePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Sending Email to User: user_id=%d", p.UserID)
// Email delivery code ...
return nil
}
14 changes: 14 additions & 0 deletions examples/queue/redis/logger.yaml
@@ -0,0 +1,14 @@
Development: false
DisableCaller: false
DisableStacktrace: false
Encoding: console # json or console
Level: info # 日志级别,INFO, WARN, ERROR
Name: eagle
Writers: console # 有2个可选项:file,console 选择file会将日志记录到logger_file指定的日志文件中,选择console会将日志输出到标准输出,当然也可以两者同时选择
LoggerFile: /tmp/log/eagle.log
LoggerWarnFile: /tmp/log/eagle.wf.log
LoggerErrorFile: /tmp/log/eagle.err.log
LogRollingPolicy: daily
LogRotateDate: 1
LogRotateSize: 1
LogBackupCount: 7
17 changes: 17 additions & 0 deletions examples/queue/redis/producer.go
@@ -0,0 +1,17 @@
package main

import "time"

// cd examples/queue/redis/consumer/
// go run producer.go handler.go client.go
func main() {
for i := 0; i < 10; i++ {
err := NewEmailWelcomeTask(EmailWelcomePayload{
UserID: time.Now().Unix(),
})
if err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
11 changes: 11 additions & 0 deletions examples/queue/redis/redis.yaml
@@ -0,0 +1,11 @@
Queue:
Addr: 127.0.0.1:6379
Password: ""
DB: 0
MinIdleConn: 200
DialTimeout: 60s
ReadTimeout: 500ms
WriteTimeout: 500ms
PoolSize: 100
PoolTimeout: 240s
EnableTrace: true
61 changes: 61 additions & 0 deletions examples/queue/redis/server.go
@@ -0,0 +1,61 @@
package main

import (
eagle "github.com/go-eagle/eagle/pkg/app"
"github.com/go-eagle/eagle/pkg/config"
logger "github.com/go-eagle/eagle/pkg/log"
redisMQ "github.com/go-eagle/eagle/pkg/transport/consumer/redis"
"github.com/hibiken/asynq"
"github.com/spf13/pflag"
)

// redis queue consumer
// cd examples/queue/redis/consumer/
// go run server.go handler.go client.go
func main() {
pflag.Parse()

// init config
c := config.New(".")
var cfg eagle.Config
if err := c.Load("app", &cfg); err != nil {
panic(err)
}
// set global
eagle.Conf = &cfg

logger.Init()

srv := redisMQ.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
redisMQ.QueueCritical: 6,
redisMQ.QueueDefault: 3,
redisMQ.QueueLow: 1,
},
// See the godoc for other configuration options
},
)

// register handler
srv.RegisterHandler(TypeEmailWelcome, HandleEmailWelcomeTask)
// here register other handlers...

// start app
app := eagle.New(
eagle.WithName(cfg.Name),
eagle.WithVersion(cfg.Version),
eagle.WithLogger(logger.GetLogger()),
eagle.WithServer(
srv,
),
)

if err := app.Run(); err != nil {
panic(err)
}
}
36 changes: 13 additions & 23 deletions pkg/transport/consumer/redis/server.go
Expand Up @@ -2,45 +2,40 @@ package redis

import (
"context"
"time"

"github.com/pkg/errors"

"github.com/hibiken/asynq"
)

const (
// QueueCritical queue priority
QueueCritical = "critical"
QueueDefault = "default"
QueueLow = "low"
)

// Server async server
type Server struct {
clientOpt asynq.RedisClientOpt

// async server
srv *asynq.Server
mux *asynq.ServeMux

// async schedule
sche *asynq.Scheduler
}

// NewServer new async server
func NewServer(redisOpt asynq.RedisClientOpt, asyncCfg asynq.Config) *Server {
srv := &Server{
srv: asynq.NewServer(redisOpt, asyncCfg),
mux: asynq.NewServeMux(),
sche: asynq.NewScheduler(
redisOpt,
&asynq.SchedulerOpts{Location: time.Local},
),
}

return srv
}

// Start async server
func (s *Server) Start(ctx context.Context) error {
go func() {
err := s.sche.Run()
if err != nil {
panic(err)
}
}()

err := s.srv.Run(s.mux)
if err != nil {
return errors.Wrapf(err, "failed to run async server: %v")
Expand All @@ -49,18 +44,13 @@ func (s *Server) Start(ctx context.Context) error {
return nil
}

// Stop async server
func (s *Server) Stop(ctx context.Context) error {
s.srv.Shutdown()
s.sche.Shutdown()
return nil
}

// RegisterTask register task
func (s *Server) RegisterTask(schedule string, task *asynq.Task) (entryID string, err error) {
return s.sche.Register(schedule, task)
}

// RegisterHandle register handler
func (s *Server) RegisterHandle(pattern string, handler func(context.Context, *asynq.Task) error) {
// RegisterHandler register handler
func (s *Server) RegisterHandler(pattern string, handler func(context.Context, *asynq.Task) error) {
s.mux.HandleFunc(pattern, handler)
}

0 comments on commit 052f9b2

Please sign in to comment.