-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
materializer.go
227 lines (193 loc) · 6.07 KB
/
materializer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package submatview
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
Reset()
}
// Result returned from the View.
type Result struct {
Index uint64
Value interface{}
// Cached is true if the requested value was already available locally. If
// the value is false, it indicates that GetFromView had to wait for an update,
Cached bool
}
type Deps struct {
View View
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) *pbsubscribe.SubscribeRequest
}
// materializer consumes the event stream, handling any framing events, and
// allows for querying the materialized view.
type materializer struct {
retryWaiter *retry.Waiter
logger hclog.Logger
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer {
m := materializer{
view: view,
retryWaiter: waiter,
logger: logger,
updateCh: make(chan struct{}),
}
if m.retryWaiter == nil {
m.retryWaiter = defaultWaiter()
}
return &m
}
// Query blocks until the index of the View is greater than opts.MinIndex,
// or the context is cancelled.
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock()
result := Result{
Index: m.index,
Value: m.view.Result(m.index),
}
updateCh := m.updateCh
m.lock.Unlock()
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan.
if result.Index > 0 && result.Index > minIndex {
result.Cached = true
return result, nil
}
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
switch {
case m.err != nil:
err := m.err
m.lock.Unlock()
return result, err
case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
updateCh = m.updateCh
m.lock.Unlock()
continue
}
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil
case <-ctx.Done():
// Update the result value to the latest because callers may still
// use the value when the error is context.DeadlineExceeded
m.lock.Lock()
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, ctx.Err()
}
}
}
func (m *materializer) currentIndex() uint64 {
var resp uint64
m.lock.Lock()
resp = m.index
m.lock.Unlock()
return resp
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the m.lock lock.
func (m *materializer) notifyUpdateLocked(err error) {
m.err = err
close(m.updateCh)
m.updateCh = make(chan struct{})
}
// reset clears the state ready to start a new stream from scratch.
func (m *materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.view.Reset()
m.index = 0
}
// updateView updates the view from a sequence of events and stores
// the corresponding Raft index.
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.view.Update(events); err != nil {
return err
}
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) {
failures := m.retryWaiter.Failures()
if isNonTemporaryOrConsecutiveFailure(err, failures) {
m.lock.Lock()
m.notifyUpdateLocked(err)
m.lock.Unlock()
}
logger := m.logger.With(
"err", err,
"topic", req.Topic,
"failure_count", failures+1,
)
if req.GetWildcardSubject() {
logger = logger.With("wildcard_subject", true)
} else if sub := req.GetNamedSubject(); sub != nil {
logger = logger.With("key", sub.Key)
} else {
logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field
}
logger.Error("subscribe call failed")
}
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
temp, ok := err.(interface {
Temporary() bool
})
return !ok || !temp.Temporary() || failures > 0
}
func defaultWaiter() *retry.Waiter {
return &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}