forked from coinbase/redisbetween
/
reservation.go
142 lines (116 loc) · 3.75 KB
/
reservation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package handlers
import (
"fmt"
"strings"
"sync"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/coinbase/mongobetween/util"
"github.com/d2army/redisbetween/redis"
)
type reservation interface {
close()
upstreamWrite(wm []*redis.Message) error
upstreamRead(timeout time.Duration) ([]*redis.Message, error)
localWrite(c *connection, wm []*redis.Message) error
}
type reservationsMonitor struct {
incrementUpstreamSubscriptions util.StatsdBackgroundGaugeCallback
decrementUpstreamSubscriptions util.StatsdBackgroundGaugeCallback
incrementLocalSubscriptions util.StatsdBackgroundGaugeCallback
decrementLocalSubscriptions util.StatsdBackgroundGaugeCallback
incrementUpstreamBlockers util.StatsdBackgroundGaugeCallback
decrementUpstreamBlockers util.StatsdBackgroundGaugeCallback
incrementLocalBlockers util.StatsdBackgroundGaugeCallback
decrementLocalBlockers util.StatsdBackgroundGaugeCallback
}
type Reservations struct {
sync.Mutex
list map[string]reservation
maxSub int
maxBlk int
monitor *reservationsMonitor
closed bool
}
type reservationError struct {
key string
}
func (e reservationError) Error() string {
return fmt.Sprintf("reservation with key '%s' already exists", e.key)
}
func NewReservations(maxSub, maxBlk int, sd *statsd.Client) *Reservations {
return &Reservations{
list: make(map[string]reservation),
maxSub: maxSub,
maxBlk: maxBlk,
monitor: newReservationsMonitor(sd),
}
}
func newReservationsMonitor(sd *statsd.Client) *reservationsMonitor {
incrementUpstreamSubs, decrementUpstreamSubs := util.StatsdBackgroundGauge(sd, "reservations.upstream_subscriptions", []string{})
incrementLocalSubs, decrementLocalSubs := util.StatsdBackgroundGauge(sd, "reservations.local_subscriptions", []string{})
incrementUpstreamBlockers, decrementUpstreamBlockers := util.StatsdBackgroundGauge(sd, "reservations.upstream_blockers", []string{})
incrementLocalBlockers, decrementLocalBlockers := util.StatsdBackgroundGauge(sd, "reservations.local_blockers", []string{})
return &reservationsMonitor{
incrementUpstreamSubscriptions: incrementUpstreamSubs,
decrementUpstreamSubscriptions: decrementUpstreamSubs,
incrementLocalSubscriptions: incrementLocalSubs,
decrementLocalSubscriptions: decrementLocalSubs,
incrementUpstreamBlockers: incrementUpstreamBlockers,
decrementUpstreamBlockers: decrementUpstreamBlockers,
incrementLocalBlockers: incrementLocalBlockers,
decrementLocalBlockers: decrementLocalBlockers,
}
}
func (r *Reservations) add(prefix string, source string, res reservation) error {
key := fmt.Sprintf("%s:%s", prefix, source)
if r.closed {
return fmt.Errorf("reservations are closed: %s", key)
}
_, ok := r.list[key]
if ok {
return reservationError{key: key}
}
r.list[key] = res
return nil
}
func (r *Reservations) get(prefix string, source string) reservation {
if r.closed {
return nil
}
key := fmt.Sprintf("%s:%s", prefix, source)
return r.list[key]
}
func (r *Reservations) delete(prefix string, source string) error {
key := fmt.Sprintf("%s:%s", prefix, source)
_, ok := r.list[key]
if !ok {
return fmt.Errorf("reservation with key '%s' does not exist", key)
}
delete(r.list, key)
return nil
}
func (r *Reservations) checkMax(prefix string) error {
var count int
for key := range r.list {
if strings.HasPrefix(key, prefix) {
count++
}
}
switch prefix {
case "c":
if count >= r.maxSub {
return fmt.Errorf("new reservation would exceed configured max subscriptions (%d)", r.maxSub)
}
case "b":
if count >= r.maxBlk {
return fmt.Errorf("new reservation would exceed configured max blockers (%d)", r.maxBlk)
}
}
return nil
}
func (r *Reservations) Close() {
r.Lock()
defer r.Unlock()
r.closed = true
}