Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE REQUEST] Add callback for task state changes #864

Open
hungtcs opened this issue Apr 17, 2024 · 2 comments · May be fixed by #865
Open

[FEATURE REQUEST] Add callback for task state changes #864

hungtcs opened this issue Apr 17, 2024 · 2 comments · May be fixed by #865
Assignees
Labels
enhancement New feature or request

Comments

@hungtcs
Copy link

hungtcs commented Apr 17, 2024

Is your feature request related to a problem? Please describe.

I'm currently working on a feature that notifies other services when a task has finished.
Unfortunately, I found that asynq doesn't provide an api for this feature.

I currently do this via middleware, but it has some flaws and doesn't work well.

One of the key issues is that the life cycle of the task does not actually end when the middleware is executed.
If another service is notified of the end of the task at this point, however, it may still be active when the other service gets the task status from the inspector.

func TaskFinishEventMiddleware() asynq.MiddlewareFunc {
	return func(h asynq.Handler) asynq.Handler {
		return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
			var taskId string
			var queueName string
			if val, ok := asynq.GetQueueName(ctx); ok {
				queueName = val
			} else {
				return fmt.Errorf("can not get queue name from task context")
			}
			if val, ok := asynq.GetTaskID(ctx); ok {
				taskId = val
			} else {
				return fmt.Errorf("can not get task id from task context")
			}

			var taskErr = h.ProcessTask(ctx, t)

			// Send notifications to other services to notify the end of the task
						
			return taskErr
		})
	}
}

Describe the solution you'd like

One solution is to add a global callback function that is called when the task state changes.

var server = asynq.NewServer(redisClientOpt, asynq.Config{
	Queues:   queues,
	Logger:   log.Log,
	LogLevel: logLevel,

	// Add new callback 
	TaskStateChange: func(taskInfo *asynq.TaskInfo) {

	},
})

Describe alternatives you've considered

I'm currently using middleware, but it's not working well.

@hungtcs hungtcs added the enhancement New feature or request label Apr 17, 2024
@hungtcs hungtcs linked a pull request Apr 18, 2024 that will close this issue
@hibare
Copy link

hibare commented Jul 17, 2024

Looking for same feature, probably experiment with middleware for now. Having a concrete solution would be awesome.

@kamikazechaser
Copy link
Collaborator

I think the middleware should suffice, taskErr will report if it is a retry, if there is no error, you can use the inpsector to grab the latest state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants