diff --git a/.gitignore b/.gitignore index 0b8c36b..7c38c28 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,5 @@ go.sum # IDE and OS .idea + .DS_Store diff --git a/cmd/delayqueue/main.go b/cmd/delayqueue/main.go index aaa83b2..07ed6e8 100644 --- a/cmd/delayqueue/main.go +++ b/cmd/delayqueue/main.go @@ -133,7 +133,10 @@ func run() int { p := pool.New(s, l) q := queue.New(s, conf.DelayQueue.QueueName) - t := timer.New(l, time.Duration(conf.DelayQueue.TimerFetchInterval)*time.Millisecond) + t := timer.New( + l, time.Duration(conf.DelayQueue.TimerFetchInterval)*time.Millisecond, + time.Duration(conf.DelayQueue.TimerFetchDelay)*time.Millisecond, + ) return b, p, q, t }, ) diff --git a/config/config.go b/config/config.go index 67b8410..e3dff5a 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,7 @@ type DelayQueue struct { BucketMaxFetchNum uint64 `yaml:"bucket_max_fetch_num,omitempty" json:"bucket_max_fetch_num,omitempty"` QueueName string `yaml:"queue_name,omitempty" json:"queue_name,omitempty"` TimerFetchInterval int `yaml:"timer_fetch_interval,omitempty" json:"timer_fetch_interval,omitempty"` + TimerFetchDelay int `yaml:"timer_fetch_delay,omitempty" json:"timer_fetch_delay,omitempty"` } // Redis redis configuration diff --git a/config/config.yaml b/config/config.yaml index 6047274..0bfef91 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,8 @@ delay_queue: queue_name: "dq_queue" # queue redis key name bucket_max_fetch_num: 200 # max fetch number of jobs in the bucket timer_fetch_interval: 1000 # fetching job interval(ms), decrease interval may get better throughout. + timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch, + # it will delay timer_fetch_delay ms for next fetch. Default is not wait. # redis config redis: diff --git a/config/config.yaml.example b/config/config.yaml.example index 6425efe..ef55d1e 100644 --- a/config/config.yaml.example +++ b/config/config.yaml.example @@ -7,6 +7,8 @@ delay_queue: queue_name: "dqqueue" # queue redis key name bucket_max_fetch_num: 250 # max fetch number of jobs in the bucket timer_fetch_interval: 2000 # fetching job interval(ms), decrease interval may get better throughout. + timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch, + # it will delay timer_fetch_delay ms for next fetch. Default is not wait. # redis config redis: diff --git a/server/pprof.go b/server/pprof.go new file mode 100644 index 0000000..f291c86 --- /dev/null +++ b/server/pprof.go @@ -0,0 +1,128 @@ +package server + +import ( + "net/http/pprof" + "strings" + + "github.com/gin-gonic/gin" +) + +// WrapPProf Wrap adds several routes from package `net/http/pprof` to *gin.Engine object +func WrapPProf(router *gin.Engine) { + WrapGroup(&router.RouterGroup) +} + +// WrapGroup adds several routes from package `net/http/pprof` to *gin.RouterGroup object +func WrapGroup(router *gin.RouterGroup) { + routers := []struct { + Method string + Path string + Handler gin.HandlerFunc + }{ + {"GET", "/debug/pprof/", IndexHandler()}, + {"GET", "/debug/pprof/heap", HeapHandler()}, + {"GET", "/debug/pprof/goroutine", GoroutineHandler()}, + {"GET", "/debug/pprof/allocs", AllocsHandler()}, + {"GET", "/debug/pprof/block", BlockHandler()}, + {"GET", "/debug/pprof/threadcreate", ThreadCreateHandler()}, + {"GET", "/debug/pprof/cmdline", CmdlineHandler()}, + {"GET", "/debug/pprof/profile", ProfileHandler()}, + {"GET", "/debug/pprof/symbol", SymbolHandler()}, + {"POST", "/debug/pprof/symbol", SymbolHandler()}, + {"GET", "/debug/pprof/trace", TraceHandler()}, + {"GET", "/debug/pprof/mutex", MutexHandler()}, + } + + basePath := strings.TrimSuffix(router.BasePath(), "/") + var prefix string + + switch { + case basePath == "": + prefix = "" + case strings.HasSuffix(basePath, "/debug"): + prefix = "/debug" + case strings.HasSuffix(basePath, "/debug/pprof"): + prefix = "/debug/pprof" + } + + for _, r := range routers { + router.Handle(r.Method, strings.TrimPrefix(r.Path, prefix), r.Handler) + } +} + +// IndexHandler will pass the call from /debug/pprof to pprof +func IndexHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Index(ctx.Writer, ctx.Request) + } +} + +// HeapHandler will pass the call from /debug/pprof/heap to pprof +func HeapHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Handler("heap").ServeHTTP(ctx.Writer, ctx.Request) + } +} + +// GoroutineHandler will pass the call from /debug/pprof/goroutine to pprof +func GoroutineHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Handler("goroutine").ServeHTTP(ctx.Writer, ctx.Request) + } +} + +// AllocsHandler will pass the call from /debug/pprof/allocs to pprof +func AllocsHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Handler("allocs").ServeHTTP(ctx.Writer, ctx.Request) + } +} + +// BlockHandler will pass the call from /debug/pprof/block to pprof +func BlockHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Handler("block").ServeHTTP(ctx.Writer, ctx.Request) + } +} + +// ThreadCreateHandler will pass the call from /debug/pprof/threadcreate to pprof +func ThreadCreateHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Handler("threadcreate").ServeHTTP(ctx.Writer, ctx.Request) + } +} + +// CmdlineHandler will pass the call from /debug/pprof/cmdline to pprof +func CmdlineHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Cmdline(ctx.Writer, ctx.Request) + } +} + +// ProfileHandler will pass the call from /debug/pprof/profile to pprof +func ProfileHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Profile(ctx.Writer, ctx.Request) + } +} + +// SymbolHandler will pass the call from /debug/pprof/symbol to pprof +func SymbolHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Symbol(ctx.Writer, ctx.Request) + } +} + +// TraceHandler will pass the call from /debug/pprof/trace to pprof +func TraceHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Trace(ctx.Writer, ctx.Request) + } +} + +// MutexHandler will pass the call from /debug/pprof/mutex to pprof +func MutexHandler() gin.HandlerFunc { + return func(ctx *gin.Context) { + pprof.Handler("mutex").ServeHTTP(ctx.Writer, ctx.Request) + } +} diff --git a/server/server.go b/server/server.go index b59bde5..bbec326 100644 --- a/server/server.go +++ b/server/server.go @@ -66,6 +66,8 @@ func (s *server) Init() { s.r.Use(gin.LoggerWithConfig(gin.LoggerConfig{})) } + WrapPProf(s.r) + regMetricFunc := setServerMetricHandlerAndMiddleware() regMetricFunc(s.r) } diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index f98d256..f43b405 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -18,7 +18,7 @@ import ( // It will test add job, consume and remove. func TestDelayQueueAddAndRemove(t *testing.T) { // push n jobs with delay within 1 min - DelayTimeSeconds := 60 + DelayTimeSeconds := 30 Jobs := 200 topic, key := "TestDelayQueueAddAndRemove-topic", "TestDelayQueueAddAndRemove-set" rand.Seed(time.Now().Unix()) @@ -64,7 +64,7 @@ func TestDelayQueueAddAndRemove(t *testing.T) { // check after 1.5 min, all jobs should be done t.Log("Sleeping") - time.Sleep(90 * time.Second) + time.Sleep(50 * time.Second) num, err := RecordNumbers(key) require.NoError(t, err) diff --git a/timer/timer.go b/timer/timer.go index 55f3348..bc8a7e7 100644 --- a/timer/timer.go +++ b/timer/timer.go @@ -10,7 +10,7 @@ import ( // TaskFunc only task function can be added to // the timer. -type TaskFunc func() (bool, error) +type TaskFunc func() (hasMore bool, err error) // Timer is for processing task. it checks buckets // for popping jobs. it will put ready jobs to queue. @@ -26,7 +26,8 @@ type timer struct { tasks []taskStub // task stub once sync.Once // once l log.Logger // logger - taskInterval time.Duration + taskInterval time.Duration // fetch interval + taskDelay time.Duration // fetch delay when bucket has more jobs after a fetching. Default no wait. } // taskStub task stub for function itself and context, @@ -38,11 +39,12 @@ type taskStub struct { l log.Logger } -func New(l log.Logger, taskInterval time.Duration) Timer { +func New(l log.Logger, taskInterval, taskDelay time.Duration) Timer { return &timer{ wg: sync.WaitGroup{}, l: l.WithModule("timer"), taskInterval: taskInterval, + taskDelay: taskDelay, } } @@ -64,7 +66,7 @@ func (t *timer) Run() { for _, task := range t.tasks { go func(task taskStub) { defer t.wg.Done() - task.run(t.taskInterval) + task.run(t.taskInterval, t.taskDelay) }(task) } @@ -84,7 +86,7 @@ func (t *timer) Close() { // run a task, and wait for context is done. // this can be implement with more thinking. -func (task taskStub) run(fetchInterval time.Duration) { +func (task taskStub) run(fetchInterval, fetchDelay time.Duration) { for { select { case <-task.ctx.Done(): @@ -95,10 +97,13 @@ func (task taskStub) run(fetchInterval time.Duration) { task.l.Error("task run failed", log.String("err", err.Error())) time.Sleep(fetchInterval) continue - } else if hasMore { + } else if !hasMore { time.Sleep(fetchInterval) continue } + + // have more jobs, wait delay time to fetch next time + time.Sleep(fetchDelay) } } }