Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exceptions may occur when closing the pool #20

Closed
zhu121 opened this issue Dec 10, 2021 · 7 comments
Closed

Exceptions may occur when closing the pool #20

zhu121 opened this issue Dec 10, 2021 · 7 comments

Comments

@zhu121
Copy link

zhu121 commented Dec 10, 2021

Assuming that the pool is limited in size and is busy, an exception will occur when the pool is closed and the external task is still sending to the pool.

Test code:

func TestStop(t *testing.T) {
	pool := New(1, 4, MinWorkers(1), IdleTimeout(1*time.Second))
	// Simulate some users sending tasks to the pool
	go func() {
		for i := 0; i < 30; i++ {
			pool.Submit(func() {
				fmt.Println("do task")
				time.Sleep(3 * time.Second)
			})
		}
	}()
	// Suppose the server is shut down after a period of time
	time.Sleep(5 * time.Second)
	pool.StopAndWait()
}

panic: send on closed channel

@alitto
Copy link
Owner

alitto commented Dec 11, 2021

Hey @zhu121,

This is the expected behaviour in that scenario. When the process is shutting down, it's the application's reponsibility to make sure no more tasks are submitted to the pool after it was stopped (by calling StopAndWait()).

A common pattern to signal a goroutine to stop its processing (in this case, stop sending tasks to the pool) is to use an "exit" channel, doing something like this:

func TestStop(t *testing.T) {
	pool := New(1, 30, MinWorkers(1), IdleTimeout(1*time.Second))

	// Simulate some users sending tasks to the pool
	quit := make(chan struct{})
	go func() {
		for i := 0; i < 30; i++ {
			select {
			case <-quit:
				// Quit signal received, stop sending tasks
				return
			default:
				pool.Submit(func() {
					fmt.Println("do task")
					time.Sleep(1 * time.Second)
				})
			}
			time.Sleep(1 * time.Second)
		}
	}()

	// Suppose the server is shut down after a period of time
	time.Sleep(5 * time.Second)

	// Send exit signal
	quit <- struct{}{}

	// Wait for all tasks to complete
	pool.StopAndWait()
}

@zhu121
Copy link
Author

zhu121 commented Dec 13, 2021

Another way I think of is to modify the submit function like this:

func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted bool) {
	defer func() {
		// Avoid exceptions in the external goroutine and judge the submitted results
		if r := recover(); r != nil {
			submitted = false
		}
	}()
	// other codes
}

@alitto
Copy link
Owner

alitto commented Dec 15, 2021

I see. Agree that submitting a task to a closed worker pool should not panic if the call is made using TrySubmit() method, which should return false in this case. However, we should panic if the call is made using Submit() method, since this one is meant to block until the task is queued or panic otherwise.
I'll look at it more closely when I have some time, but feel free to open a Pull request with the suggested changes in the meantime. Thanks!

@alitto
Copy link
Owner

alitto commented Dec 26, 2021

Hey @zhu121! I finally had some time to work on this and opened a pull request (#22) to improve handling of task submission when pool is being stopped. The latest release of pond (1.6.1) includes these changes. Please take a look when you get a chance and let me know if that's in line with your suggestion.

@zhu121
Copy link
Author

zhu121 commented Dec 27, 2021

A small idea. Will adding recover() to the defer function of the submit() function reduce the mental burden of users. Because when calling the Trysubmit() function, you need to judge false to stop submitting the task, and when calling the Submit() function, you need to add recover() to stop submitting the task and ensure that you will not end abnormally

@alitto
Copy link
Owner

alitto commented Dec 27, 2021

Adding recover() to the Submit() function and returning the err instead of panicking changes its signature from func Submit() to func Submit() error, which kind of breaks the current contract and could impact existing users of the library.
The call to Submit() can only fail if the pool has been stopped manually by calling Stop() or StopAndWait(), so it's meant to be used when you are confident the pool will not be unexpectedly stopped. TrySubmit() is meant to be a non-blocking alternative to Submit, which allows the caller to decide what to do if the task was not accepted by the pool.

@zhu121
Copy link
Author

zhu121 commented Dec 29, 2021

Modifying the function interface directly will indeed affect existing users. Thank you for your answer. I closed this problem first

@zhu121 zhu121 closed this as completed Dec 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants