Skip to content

Commit

Permalink
worker: added graceful shutdown functionality + client factory func
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrymomot committed Jun 29, 2023
1 parent efd1fec commit da5c9d7
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 1 deletion.
6 changes: 5 additions & 1 deletion worker/_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
}

// Init asynq client
asynqClient := asynq.NewClient(redisConnOpt)
asynqClient, err := asynq.NewClient(redisConnOpt)
defer asynqClient.Close()

// Create a context with a timeout and set the Server's context
Expand All @@ -38,6 +38,8 @@ func main() {
redisConnOpt, logger,
worker.WithSchedulerLocation("UTC"), // options are not required
)
defer schedulerServer.Shutdown()

// Init scheduler handlers
testScheduler := NewScheduler(nil)
// Run the scheduler
Expand All @@ -51,6 +53,8 @@ func main() {
redisConnOpt, logger,
worker.WithQueueName("default"), // options are not required
)
defer queueServer.Shutdown()

// Init worker handlers
testWorker := NewWorker(nil)
// Run the worker
Expand Down
19 changes: 19 additions & 0 deletions worker/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package worker

import (
"fmt"

"github.com/hibiken/asynq"
)

// NewClient creates a new asynq client from the given redis client instance.
func NewClient(redisConnStr string) (*asynq.Client, asynq.RedisConnOpt, error) {
// Redis connect options for asynq client
redisConnOpt, err := asynq.ParseRedisURI(redisConnStr)
if err != nil {
return nil, nil, fmt.Errorf("worket.NewClient: failed to parse redis connection string: %w", err)
}

// Init asynq client
return asynq.NewClient(redisConnOpt), redisConnOpt, nil
}
7 changes: 7 additions & 0 deletions worker/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,10 @@ func registerQueueHandlers(handlers ...taskHandler) *asynq.ServeMux {

return mux
}

// Shutdown gracefully shuts down the queue server by waiting for all
// in-flight tasks to finish processing before shutdown.
func (srv *QueueServer) Shutdown() {
srv.Server.Stop()
srv.Server.Shutdown()
}
6 changes: 6 additions & 0 deletions worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,9 @@ func (srv *SchedulerServer) Run(handlers ...schedulerHandler) func() error {
return srv.Scheduler.Run()
}
}

// Shutdown gracefully shuts down the scheduler server by waiting for all
// pending tasks to be processed.
func (srv *SchedulerServer) Shutdown() {
srv.Scheduler.Shutdown()
}

0 comments on commit da5c9d7

Please sign in to comment.