forked from knative/serving
-
Notifications
You must be signed in to change notification settings - Fork 0
/
breaker.go
69 lines (61 loc) · 2.2 KB
/
breaker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import "fmt"
type token struct{}
// Breaker is a component that enforces a concurrency limit on the
// execution of a function. It also maintains a queue of function
// executions in excess of the concurrency limit. Function call attempts
// beyond the limit of the queue are failed immediately.
type Breaker struct {
pendingRequests chan token
activeRequests chan token
}
// NewBreaker creates a Breaker with the desired queue depth and
// concurrency limit.
func NewBreaker(queueDepth, maxConcurrency int32) *Breaker {
if queueDepth <= 0 {
panic(fmt.Sprintf("Queue depth must be greater than 0. Got %v.", queueDepth))
}
if maxConcurrency <= 0 {
panic(fmt.Sprintf("Max concurrency must be greater than 0. Got %v.", maxConcurrency))
}
return &Breaker{
pendingRequests: make(chan token, queueDepth),
activeRequests: make(chan token, maxConcurrency),
}
}
// Maybe conditionally executes thunk based on the Breaker concurrency
// and queue parameters. If the concurrency limit and queue capacity are
// already consumed, Maybe returns immediately without calling thunk. If
// the thunk was executed, Maybe returns true, else false.
func (b *Breaker) Maybe(thunk func()) bool {
var t token
select {
default:
// Pending request queue is full. Report failure.
return false
case b.pendingRequests <- t:
// Pending request has capacity.
// Wait for capacity in the active queue.
b.activeRequests <- t
// Release capacity in the pending request queue.
<-b.pendingRequests
// Defer releasing capacity in the active request queue.
defer func() { <-b.activeRequests }()
// Do the thing.
thunk()
// Report success
return true
}
}