/
semaphore_map.go
105 lines (88 loc) · 2.17 KB
/
semaphore_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package syncmap
import (
"context"
"sync"
"sync/atomic"
"golang.org/x/sync/semaphore"
)
type Semaphore struct {
// According to atomic, MUST be the first element of a struct.
refCount int64
key string
m *SemaphoreMap
sema *semaphore.Weighted
}
func (s *Semaphore) Release(n int64) {
s.sema.Release(n)
refs := atomic.AddInt64(&s.refCount, -n)
if refs == 0 {
s.m.tryDelete(s)
}
}
// SemaphoreMap is a map of semaphores that automatically allocates and
// deallocates semaphores as necessary. It allows for fine-grained semephore
// use, such as resource allocations and concurrent resource limits, for a
// potentially large key space, while only consuming space propotional to the
// number of acquired semaphores.
type SemaphoreMap struct {
n int64
semas map[string]*Semaphore
lock sync.Mutex
}
func NewSemaphoreMap(n int64) *SemaphoreMap {
return &SemaphoreMap{n: n, semas: make(map[string]*Semaphore)}
}
func (m *SemaphoreMap) fetchSemaEmtry(key string, n int64) *Semaphore {
m.lock.Lock()
defer m.lock.Unlock()
s := m.semas[key]
if s == nil {
s = &Semaphore{key: key, m: m, sema: semaphore.NewWeighted(m.n)}
m.semas[key] = s
}
atomic.AddInt64(&s.refCount, n)
return s
}
func (m *SemaphoreMap) Acquire(key string, n int64) *Semaphore {
s := m.fetchSemaEmtry(key, n)
err := s.sema.Acquire(context.Background(), n)
if err != nil {
panic(err)
}
return s
}
func (m *SemaphoreMap) AcquireContext(ctx context.Context, key string, n int64) (*Semaphore, error) {
s := m.fetchSemaEmtry(key, n)
err := s.sema.Acquire(ctx, n)
if err != nil {
refs := atomic.AddInt64(&s.refCount, -n)
if refs == 0 {
m.tryDelete(s)
}
return nil, err
}
return s, nil
}
func (m *SemaphoreMap) TryAcquire(key string, n int64) *Semaphore {
s := m.fetchSemaEmtry(key, n)
didAcquire := s.sema.TryAcquire(n)
if !didAcquire {
refs := atomic.AddInt64(&s.refCount, -n)
if refs == 0 {
m.tryDelete(s)
}
return nil
}
return s
}
func (m *SemaphoreMap) tryDelete(s *Semaphore) {
m.lock.Lock()
defer m.lock.Unlock()
refs := atomic.LoadInt64(&s.refCount)
if refs < 0 {
panic("refCount < 0")
} else if refs != 0 {
return
}
delete(m.semas, s.key)
}