Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement throttling #144

Merged
merged 4 commits into from
Jun 29, 2021
Merged

Implement throttling #144

merged 4 commits into from
Jun 29, 2021

Conversation

yigarashi-9
Copy link
Contributor

Related issues: #138 #139 #141

This p-r implements throttled dispatcher using max_dispatches_per_second and max_busrt_size.

@yigarashi-9 yigarashi-9 self-assigned this Jun 28, 2021
if dps == 0 {
dps = rate.Inf
}
limiter := rate.NewLimiter(dps, int(m.MaxBurstSize))
Copy link
Contributor Author

@yigarashi-9 yigarashi-9 Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When MaxDispatchesPerSecond is empty, we set it to Inf, meaning no limit. Though MaxBurstSize is also empty in such case, we don't have to care it because it is ignored.

https://pkg.go.dev/golang.org/x/time/rate#pkg-constants

Inf is the infinite rate limit; it allows all events (even if burst is zero).

https://pkg.go.dev/golang.org/x/time/rate#Limiter.WaitN

The burst limit is ignored if the rate limit is Inf.

@@ -155,8 +183,11 @@ Loop:
go func(job jobqueue.Job) {
defer wg.Done()
defer func() { <-d.sem }()
rslt := d.worker.Work(job)
d.jobqueue.Complete(job, rslt)
err := d.limiter.Wait(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://pkg.go.dev/golang.org/x/time/rate#Limiter.WaitN

It returns an error if n exceeds the Limiter's burst size, the Context is canceled, or the expected wait time exceeds the Context's Deadline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[QUESTION]

If there are more than one gorutines waiting for limiter.Wait() here, can we assume that they will be unblocked in FIFO order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the order is preserved. Wait internally uses reserveN API, which returns delay taking early reservations into consideration. So, for example, if the first goroutine waited for 2 secs, the next would wait for 4 secs.

Copy link
Member

@tarao tarao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yigarashi-9 I have two questions.

@@ -155,8 +183,11 @@ Loop:
go func(job jobqueue.Job) {
defer wg.Done()
defer func() { <-d.sem }()
rslt := d.worker.Work(job)
d.jobqueue.Complete(job, rslt)
err := d.limiter.Wait(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[QUESTION]

If there are more than one gorutines waiting for limiter.Wait() here, can we assume that they will be unblocked in FIFO order?

rslt := d.worker.Work(job)
d.jobqueue.Complete(job, rslt)
err := d.limiter.Wait(ctx)
if err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[QUESTION]

It looks that the job will be lost when it exceeded the burst size. Shouldn't we retry the job later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think we shoudn't.

https://pkg.go.dev/golang.org/x/time/rate#Limiter.WaitN

It returns an error if n exceeds the Limiter's burst size

In this case, n is always 1 because we use Wait. Although we have to care for burst size = 0, this is a broken job queue basically (0 burst size allows no execution). We will never encounter the case.

However, as you worry about, some jobs would stay grabbed when the dispatcher is stopped. Should we do something? If so, is there a recommended way to recover?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will never encounter the case.

Ah, I see, I've misunderstood about this.

If so, is there a recommended way to recover?

We don't have to worry about this. We already have a mechanism to recover grabbed jobs in primary-backup job queue activation.

Copy link
Member

@tarao tarao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yigarashi-9
Copy link
Contributor Author

Thank you!

@yigarashi-9 yigarashi-9 merged commit ec649c9 into master Jun 29, 2021
@yigarashi-9 yigarashi-9 deleted the throttled-queue branch June 29, 2021 05:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants