-
Notifications
You must be signed in to change notification settings - Fork 25
/
memory.go
134 lines (123 loc) · 3.66 KB
/
memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package lock
import (
"context"
"sync"
"github.com/aldor007/mort/pkg/monitoring"
"github.com/aldor007/mort/pkg/response"
"go.uber.org/zap"
)
// MemoryLock is in memory lock for single mort instance
type MemoryLock struct {
lock sync.RWMutex
internal map[string]lockData
}
// NewMemoryLock create a new empty instance of MemoryLock
func NewMemoryLock() *MemoryLock {
m := &MemoryLock{}
m.internal = make(map[string]lockData)
return m
}
func notifyListeners(lock lockData, respFactory func() (*response.Response, bool)) {
for _, q := range lock.notifyQueue {
select {
case <-q.Cancel:
// Observer revoked his interest in obtaining the response.
close(q.ResponseChan)
continue
default:
}
resp, ok := respFactory()
if ok {
// As the response channel is under our package control
// we are sure that it was initiated with a single place for a response
// and there is no need to use select.
q.ResponseChan <- resp
}
close(q.ResponseChan)
}
}
// NotifyAndRelease tries notify all waiting goroutines about response
func (m *MemoryLock) NotifyAndRelease(_ context.Context, key string, originalResponse *response.Response) {
m.lock.Lock()
lock, ok := m.internal[key]
if !ok {
m.lock.Unlock()
return
}
delete(m.internal, key)
m.lock.Unlock()
if len(lock.notifyQueue) == 0 {
return
}
monitoring.Log().Info("Notify lock queue", zap.String("key", key), zap.Int("len", len(lock.notifyQueue)))
// Notify all listeners by sending them a copy of originalResponse.
//
// Current synchronous notification is simpler compared to asynchronous implementation.
// The asynchronous implementation might be tricky since the response in not buffered mode must be
// protected from being read before it is copied. Otherwise CopyWithStream in a worst case will deliver partial body
// since it can read in parallel with HTTP handler. To prevent such behavior extra temporary copy of response
// must be created before returning from this method. Of course such creation must
// also take into account whether the originalResponse is buffered or not.
// The time spend on notifying listeners is negligible compared to the total time of image processing,
// so making this process asynchronous makes almost no sense.
notifyListeners(lock, func() (*response.Response, bool) {
if originalResponse == nil {
return nil, false
} else if originalResponse.IsBuffered() {
res, err := originalResponse.Copy()
return res, err == nil
} else {
res, err := originalResponse.CopyWithStream()
return res, err == nil
}
})
}
// Lock create unique entry in memory map
func (m *MemoryLock) Lock(_ context.Context, key string) (LockResult, bool) {
m.lock.Lock()
defer m.lock.Unlock()
lock, ok := m.internal[key]
result := LockResult{}
if !ok {
lock = lockData{}
lock.notifyQueue = make([]LockResult, 0, 5)
} else {
result = lock.AddWatcher()
}
m.internal[key] = lock
return result, !ok
}
func (m *MemoryLock) forceLockAndAddWatch(_ context.Context, key string) (LockResult, bool) {
m.lock.Lock()
defer m.lock.Unlock()
lock, ok := m.internal[key]
result := LockResult{}
if !ok {
lock = lockData{}
lock.notifyQueue = make([]LockResult, 0, 5)
result = lock.AddWatcher()
} else {
result = lock.AddWatcher()
}
m.internal[key] = lock
return result, !ok
}
// Release remove entry from memory map
func (m *MemoryLock) Release(_ context.Context, key string) {
m.lock.RLock()
_, ok := m.internal[key]
m.lock.RUnlock()
if ok {
m.lock.Lock()
defer m.lock.Unlock()
res, exists := m.internal[key]
if !exists {
return
}
notifyListeners(res, func() (*response.Response, bool) {
return nil, false
})
delete(m.internal, key)
return
}
}