Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker Blocks When Concurrency Is Set To 0 #535

Closed
mavolin opened this issue Apr 23, 2020 · 4 comments · Fixed by #553
Closed

Worker Blocks When Concurrency Is Set To 0 #535

mavolin opened this issue Apr 23, 2020 · 4 comments · Fixed by #553

Comments

@mavolin
Copy link

mavolin commented Apr 23, 2020

I just played a bit with the example and found a strange behavior, that I can't make sense of and that is described differently in the docs.

func main() {
	switch os.Args[1] {
	case "worker":
		if err := worker(); err != nil {
			panic(err)
		}
	case "send":
		if err := send(); err != nil {
			panic(err)
		}
	default:
		panic("invalid option: " + strings.Join(os.Args[1:], " "))
	}
}

func startServer() (*machinery.Server, error) {
	server, err := machinery.NewServer(&config.Config{
		Broker:                  "redis://localhost:6379",
		ResultBackend:           "redis://localhost:6378",
	})
	if err != nil {
		return nil, err
	}

	return server, server.RegisterTasks(map[string]interface{}{
		"add": func() (int64, error) {
			log.INFO.Println("add")

			return 1+1, nil
		},
		"sleep": func() error {
			log.INFO.Println("going to sleep")

			time.Sleep(2 * time.Second)

			log.INFO.Println("awake")

			return nil
		},
	})
}

func worker() error {
	server, err := startServer()
	if err != nil {
		return err
	}

	worker := server.NewWorker("machine1", 0) // note that the second param is set to 0

	return worker.Launch()
}

func send() error {
	server, err := startServer()
	if err != nil {
		return err
	}

	start := time.Now()

	sleep, err := server.SendTask(&tasks.Signature{
		Name: "sleep",
	})
	if err != nil {
		panic(err)
	}

	fmt.Printf("send sleep: %s\n", time.Since(start))

	go func() {
		_, err = sleep.Get(time.Millisecond * 5)
		if err != nil {
			panic(err)
		}

		fmt.Printf("return sleep: %s\n", time.Since(start))
	}()

	add, err := server.SendTask(&tasks.Signature{Name: "add"})
	if err != nil {
		panic(err)
	}

	fmt.Printf("send add: %s\n", time.Since(start))

	go func() {
		_, err = add.Get(time.Millisecond * 5)
		if err != nil {
			panic(err)
		}

		fmt.Printf("return add: %s\n", time.Since(start))
	}()

	time.Sleep(3 * time.Second)

	return nil
}

1 will serialize task execution while 0 makes the number of concurrently executed tasks unlimited (default)

This is stated in the docs, so I would assume that add returns first, as both are executed concurrently, but sleep sleeps for two seconds, while add returns immediatly.

However, when looking at the logs, sleep is executed first (as expected), but blocks until finished, making add return last.

Logs:

DEBUG: 2020/04/24 00:42:21 redis.go:324 Received new message: {"UUID":"task_998fb807-1e1d-4170-ba15-da7330494fcf","Name":"sleep","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":null,"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
INFO: 2020/04/24 00:42:21 machinery.go:46  going to sleep 
INFO: 2020/04/24 00:42:23 machinery.go:50  awake 
DEBUG: 2020/04/24 00:42:23 worker.go:257 Processed task task_998fb807-1e1d-4170-ba15-da7330494fcf. Results = []
DEBUG: 2020/04/24 00:42:23 redis.go:324 Received new message: {"UUID":"task_c47f88f7-7cf0-4387-a664-3c9b1aa89960","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":null,"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
INFO: 2020/04/24 00:42:23 machinery.go:41  add 
DEBUG: 2020/04/24 00:42:23 worker.go:257 Processed task task_c47f88f7-7cf0-4387-a664-3c9b1aa89960. Results = 2
send sleep: 1.867531ms
send add: 2.511851ms
return add: 2.008545171s
return sleep: 2.008583332s

However, if I change the second parameter of server.NewWorker to 2 or higher, they are executed concurrently, and add returns first, as one would expect.

Logs:

DEBUG: 2020/04/24 00:49:03 redis.go:324 Received new message: {"UUID":"task_45ca2c56-fba8-4261-b946-c4c2ca554100","Name":"sleep","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":null,"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/04/24 00:49:03 redis.go:324 Received new message: {"UUID":"task_f91ed834-f6cc-432e-b9c1-8556ed1f7a8f","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":null,"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
INFO: 2020/04/24 00:49:03 machinery.go:41  add 
INFO: 2020/04/24 00:49:03 machinery.go:46  going to sleep 
DEBUG: 2020/04/24 00:49:03 worker.go:257 Processed task task_f91ed834-f6cc-432e-b9c1-8556ed1f7a8f. Results = 2
INFO: 2020/04/24 00:49:05 machinery.go:50  awake 
DEBUG: 2020/04/24 00:49:05 worker.go:257 Processed task task_45ca2c56-fba8-4261-b946-c4c2ca554100. Results = []
send sleep: 2.197295ms
send add: 3.284421ms
return add: 8.883116ms
return sleep: 2.009574316s

Am I missing something? If so, what is the correct way to allow for unlimited concurrent tasks?

@caldempsey
Copy link

caldempsey commented Apr 25, 2020

Set it to max int as a workaround perhaps until the maintainers can get to you

@henripqt
Copy link

henripqt commented May 12, 2020

Which broker are you using ?

I've faced the same issue using Redis broker and after some digging in the code I've found out that the concurrency is set to 1 if < 1

// StartConsuming enters a loop and waits for incoming messages
func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
	b.consumingWG.Add(1)
	defer b.consumingWG.Done()

	if concurrency < 1 {
		concurrency = 1
	}

The documentation however points out that :

Example: 1 will serialize task execution while 0 makes the number of concurrently executed tasks unlimited

Maybe we should update the documentation

@icecraft
Copy link

Which broker are you using ?

I've faced the same issue using Redis broker and after some digging in the code I've found out that the concurrency is set to 1 if < 1

// StartConsuming enters a loop and waits for incoming messages
func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
	b.consumingWG.Add(1)
	defer b.consumingWG.Done()

	if concurrency < 1 {
		concurrency = 1
	}

The documentation however points out that :

Example: 1 will serialize task execution while 0 makes the number of concurrently executed tasks unlimited

Maybe we should update the documentation

@henripqt I have faced this too, and post an issue #529.

@mavolin
Copy link
Author

mavolin commented May 15, 2020

@henripqt I was using the Redis broker as well. I hope they'll allow 0 = infinity cause max int is kind of ugly.

RichardKnop added a commit that referenced this issue May 25, 2020
According to the README:
> Use the second parameter of `server.NewWorker` to limit the number of concurrently running
> worker.Process() calls (per worker). Example: 1 will serialize task execution while 0 makes
> the number of concurrently executed tasks unlimited (default).

This change will set the concurrency to 2 * num CPUs when concurrency = 0. While it isn't "unlimited", it is neverthe a practical choice for most workloads (mixed CPU, I/O) and better than 1. This change should make the behavior more consistent with the doc.

Should also fix #535

Co-authored-by: Charles Chan <charles.wh.chan@users.noreply.github.com>
Co-authored-by: Richard Knop <RichardKnop@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants