-
Notifications
You must be signed in to change notification settings - Fork 38
/
synchronizer.go
188 lines (156 loc) · 4.69 KB
/
synchronizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package slidingwindow
import (
"log"
"time"
)
// Datastore represents the central datastore.
type Datastore interface {
// Add adds delta to the count of the window represented
// by start, and returns the new count.
Add(key string, start, delta int64) (int64, error)
// Get returns the count of the window represented by start.
Get(key string, start int64) (int64, error)
}
// syncHelper is a helper that will be leveraged by both BlockingSynchronizer
// and NonblockingSynchronizer.
type syncHelper struct {
store Datastore
syncInterval time.Duration
inProgress bool // Whether the synchronization is in progress.
lastSynced time.Time
}
func newSyncHelper(store Datastore, syncInterval time.Duration) *syncHelper {
return &syncHelper{store: store, syncInterval: syncInterval}
}
// IsTimeUp returns whether it's time to sync data to the central datastore.
func (h *syncHelper) IsTimeUp(now time.Time) bool {
return !h.inProgress && now.Sub(h.lastSynced) >= h.syncInterval
}
func (h *syncHelper) InProgress() bool {
return h.inProgress
}
func (h *syncHelper) Begin(now time.Time) {
h.inProgress = true
h.lastSynced = now
}
func (h *syncHelper) End() {
h.inProgress = false
}
func (h *syncHelper) Sync(req SyncRequest) (resp SyncResponse, err error) {
var newCount int64
if req.Changes > 0 {
newCount, err = h.store.Add(req.Key, req.Start, req.Changes)
} else {
newCount, err = h.store.Get(req.Key, req.Start)
}
if err != nil {
return SyncResponse{}, err
}
return SyncResponse{
OK: true,
Start: req.Start,
Changes: req.Changes,
OtherChanges: newCount - req.Count,
}, nil
}
// BlockingSynchronizer does synchronization in a blocking mode and consumes
// no extra goroutine.
//
// It's recommended to use BlockingSynchronizer in low-concurrency scenarios,
// either for higher accuracy, or for less goroutine consumption.
type BlockingSynchronizer struct {
helper *syncHelper
}
func NewBlockingSynchronizer(store Datastore, syncInterval time.Duration) *BlockingSynchronizer {
return &BlockingSynchronizer{
helper: newSyncHelper(store, syncInterval),
}
}
func (s *BlockingSynchronizer) Start() {}
func (s *BlockingSynchronizer) Stop() {}
// Sync sends the window's count to the central datastore, and then update
// the window's count according to the response from the datastore.
func (s *BlockingSynchronizer) Sync(now time.Time, makeReq MakeFunc, handleResp HandleFunc) {
if s.helper.IsTimeUp(now) {
s.helper.Begin(now)
resp, err := s.helper.Sync(makeReq())
if err != nil {
log.Printf("err: %v\n", err)
}
handleResp(resp)
s.helper.End()
}
}
// NonblockingSynchronizer does synchronization in a non-blocking mode. To achieve
// this, it needs to spawn a goroutine to exchange data with the central datastore.
//
// It's recommended to always use NonblockingSynchronizer in high-concurrency scenarios.
type NonblockingSynchronizer struct {
reqC chan SyncRequest
respC chan SyncResponse
stopC chan struct{}
exitC chan struct{}
helper *syncHelper
}
func NewNonblockingSynchronizer(store Datastore, syncInterval time.Duration) *NonblockingSynchronizer {
return &NonblockingSynchronizer{
reqC: make(chan SyncRequest),
respC: make(chan SyncResponse),
stopC: make(chan struct{}),
exitC: make(chan struct{}),
helper: newSyncHelper(store, syncInterval),
}
}
func (s *NonblockingSynchronizer) Start() {
go s.syncLoop()
}
func (s *NonblockingSynchronizer) Stop() {
close(s.stopC)
<-s.exitC
}
// syncLoop is a worker that receives a sync request and generates the
// corresponding sync response.
func (s *NonblockingSynchronizer) syncLoop() {
for {
select {
case req := <-s.reqC:
resp, err := s.helper.Sync(req)
if err != nil {
log.Printf("err: %v\n", err)
}
select {
case s.respC <- resp:
case <-s.stopC:
goto exit
}
case <-s.stopC:
goto exit
}
}
exit:
close(s.exitC)
}
// Sync tries to send the window's count to the central datastore, or to update
// the window's count according to the response from the latest synchronization.
// Since the exchange with the datastore is always slower than the execution of Sync,
// usually Sync must be called at least twice to update the window's count finally.
func (s *NonblockingSynchronizer) Sync(now time.Time, makeReq MakeFunc, handleResp HandleFunc) {
if s.helper.IsTimeUp(now) {
// Just try to sync. If this fails, we assume the previous synchronization
// is still ongoing, and we wait for the next time.
select {
case s.reqC <- makeReq():
s.helper.Begin(now)
default:
}
}
if s.helper.InProgress() {
// Try to get the response from the latest synchronization.
select {
case resp := <-s.respC:
handleResp(resp)
s.helper.End()
default:
}
}
}