forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
clock_offset.go
303 lines (277 loc) · 10.9 KB
/
clock_offset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Kathy Spradlin (kathyspradlin@gmail.com)
package rpc
import (
"math"
"sort"
"sync"
"time"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
)
var (
// How often the cluster offset is measured.
monitorInterval time.Duration
)
func init() {
monitorInterval = heartbeatInterval * 10
}
// RemoteClockMonitor keeps track of the most recent measurements of remote
// offsets from this node to connected nodes.
type RemoteClockMonitor struct {
offsets map[string]proto.RemoteOffset // Maps remote string addr to offset.
lClock *hlc.Clock // The server clock.
mu sync.Mutex
// Wall time in nanoseconds when we last monitored cluster offset.
lastMonitoredAt int64
}
// ClusterOffsetInterval is the best interval we can construct to estimate this
// node's offset from the cluster.
type ClusterOffsetInterval struct {
Lowerbound int64 // The lowerbound on the offset in nanoseconds.
Upperbound int64 // The upperbound on the offset in nanoseconds.
}
// MajorityIntervalNotFoundError indicates that we could not find a majority
// overlap in our estimate of remote clocks.
type MajorityIntervalNotFoundError struct{}
func (MajorityIntervalNotFoundError) Error() string {
return "a majority of connected remote clocks have " +
"failed to encompass the true time for the cluster"
}
// endpoint represents an endpoint in the interval estimation of a single
// remote clock. It could be either the lowpoint or the highpoint of the
// interval.
//
// For example, if the remote clock offset bounds are [-5, 10], then it
// will be converted into two endpoints:
// endpoint{offset: -5, endType: -1}
// endpoint{offset: 10, endType: +1}
type endpoint struct {
offset int64 // The boundary offset represented by this endpoint.
endType int // -1 if lowpoint, +1 if highpoint.
}
// endpointList is a slice of endpoints, sorted by endpoint offset.
type endpointList []endpoint
// Implementation of sort.Interface.
func (l endpointList) Len() int {
return len(l)
}
func (l endpointList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}
func (l endpointList) Less(i, j int) bool {
if l[i].offset == l[j].offset {
return l[i].endType < l[j].endType
}
return l[i].offset < l[j].offset
}
// newRemoteClockMonitor returns a monitor with the given server clock.
func newRemoteClockMonitor(clock *hlc.Clock) *RemoteClockMonitor {
return &RemoteClockMonitor{
offsets: map[string]proto.RemoteOffset{},
lClock: clock,
}
}
// UpdateOffset is a thread-safe way to update the remote clock measurements.
//
// It only updates the offset for addr if one the following three cases holds:
// 1. There is no prior offset for that address.
// 2. The old offset for addr was measured before r.lastMonitoredAt. We never
// use values during monitoring that are older than r.lastMonitoredAt.
// 3. The new offset's error is smaller than the old offset's error. Note:
// InfiniteOffsets implicitly have the largest error.
//
// The third case allows the monitor to use the most precise clock reading of
// the remote addr during the next findOffsetInterval() invocation. We may
// measure the remote clock several times before we next calculate the cluster
// offset. When we do the measurement, we want to use the reading with the
// smallest error. Because monitorInterval > heartbeatInterval, this gives us
// several chances to accurately read the remote clock. Note that we don't want
// monitorInterval to be too large, else we might end up relying on old
// information.
func (r *RemoteClockMonitor) UpdateOffset(addr string, offset proto.RemoteOffset) {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
oldOffset, ok := r.offsets[addr]
if !ok {
r.offsets[addr] = offset
} else if oldOffset.MeasuredAt < r.lastMonitoredAt {
// No matter what offset is, we weren't going to use oldOffset again,
// because it was measured before the last cluster offset calculation.
r.offsets[addr] = offset
} else if oldOffset.MeasuredAt >= r.lastMonitoredAt &&
!offset.Equal(proto.InfiniteOffset) &&
offset.Error < oldOffset.Error {
r.offsets[addr] = offset
}
}
// MonitorRemoteOffsets periodically checks that the offset of this server's
// clock from the true cluster time is within MaxOffset. If the offset exceeds
// MaxOffset, then this method will trigger a fatal error, causing the node to
// suicide.
func (r *RemoteClockMonitor) MonitorRemoteOffsets() {
if log.V(1) {
log.Infof("monitoring cluster offset")
}
for {
time.Sleep(monitorInterval)
offsetInterval, err := r.findOffsetInterval()
// By the contract of the hlc, if the value is 0, then safety checking
// of the max offset is disabled. However we may still want to
// propagate the information to a status node.
// TODO(embark): once there is a framework for collecting timeseries
// data about the db, propagate the offset status to that.
// Don't forget to protect r.offsets through the Mutex if those
// Fatalf's below ever turn into something less destructive.
if r.lClock.MaxOffset() != 0 {
if err != nil {
log.Fatalf("clock offset from the cluster time "+
"for remote clocks %v could not be determined: %s",
r.offsets, err)
}
if !isHealthyOffsetInterval(offsetInterval, r.lClock.MaxOffset()) {
log.Fatalf("clock offset from the cluster time "+
"for remote clocks: %v is in interval: %v, which "+
"indicates that the true offset is greater than %d",
r.offsets, offsetInterval, r.lClock.MaxOffset())
}
if log.V(1) {
log.Infof("healthy cluster offset: %v", offsetInterval)
}
}
r.mu.Lock()
r.lastMonitoredAt = r.lClock.PhysicalNow()
r.mu.Unlock()
}
}
// isHealthyOffsetInterval returns true if the ClusterOffsetInterval indicates
// that the node's offset is within maxOffset, else false. For example, if the
// offset interval is [-20, -11] and the maxOffset is 10 nanoseconds, then the
// clock offset must be too great, because no point in the interval is within
// the maxOffset.
func isHealthyOffsetInterval(i ClusterOffsetInterval, maxOffset time.Duration) bool {
if i.Lowerbound > maxOffset.Nanoseconds() ||
i.Upperbound < -maxOffset.Nanoseconds() {
return false
}
return true
}
// The routine that measures this node's probable offset from the rest of the
// cluster. This offset is measured as a ClusterOffsetInterval. For example,
// the output might be [-5, 10], which would indicate that this node's offset
// is likely between -5 and 10 nanoseconds from the average clock of the
// cluster.
//
// The intersection algorithm used here is documented at:
// http://infolab.stanford.edu/pub/cstr/reports/csl/tr/83/247/CSL-TR-83-247.pdf,
// commonly known as Marzullo's algorithm. If a remote clock is correct, its
// offset interval should encompass this clock's offset from the cluster time
// (see buildEndpointList()). If the majority of remote clock are correct, then
// their intervals should overlap over some region, which should include the
// true offset from the cluster time. This algorithm returns this region.
//
// If an interval cannot be found, an error is returned, indicating that
// a majority of remote node offset intervals do not overlap the cluster time.
func (r *RemoteClockMonitor) findOffsetInterval() (ClusterOffsetInterval, error) {
endpoints := r.buildEndpointList()
sort.Sort(endpoints)
numClocks := len(endpoints) / 2
if log.V(1) {
log.Infof("finding offset interval for monitorInterval: %d, numOffsets %d",
monitorInterval, numClocks)
}
if numClocks == 0 {
return ClusterOffsetInterval{
Lowerbound: 0,
Upperbound: 0,
}, nil
}
best := 0
count := 0
var lowerbound int64
var upperbound int64
// Find the interval which the most offset intervals overlap.
for i, endpoint := range endpoints {
count -= endpoint.endType
if count > best {
best = count
lowerbound = endpoint.offset
// Note the endType of the last endpoint is +1, so count < best.
// Thus this code will never run when i = len(endpoint)-1.
upperbound = endpoints[i+1].offset
}
}
// Indicates that fewer than a majority of connected remote clocks seem to
// encompass the central offset from the cluster, an error condition.
if best <= numClocks/2 {
return ClusterOffsetInterval{
Lowerbound: math.MaxInt64,
Upperbound: math.MaxInt64}, MajorityIntervalNotFoundError{}
}
// A majority of offset intervals overlap at this interval, which should
// contain the true cluster offset.
return ClusterOffsetInterval{
Lowerbound: lowerbound,
Upperbound: upperbound,
}, nil
}
// buildEndpointList() takes all the RemoteOffsets that are in the monitor, and
// turns these offsets into intervals which should encompass this node's true
// offset from the cluster time. It returns a list including the two endpoints
// of each interval.
//
// As a side effect, any RemoteOffsets that haven't been
// updated since the last monitoring are removed. (Side effects are nasty, but
// prevent us from running through the list an extra time under a lock).
//
// A RemoteOffset r is represented by this interval:
// [r.Offset - r.Error - MaxOffset, r.Offset + r.Error + MaxOffset],
// where MaxOffset is the furthest a node's clock can deviate from the cluster
// time. While the offset between this node and the remote time is actually
// within [r.Offset - r.Error, r.Offset + r.Error], we also must expand the
// interval by MaxOffset. This accounts for the fact that the remote clock is at
// most MaxOffset distance from the cluster time. Thus the expanded interval
// ought to contain this node's offset from the true cluster time, not just the
// offset from the remote clock's time.
func (r *RemoteClockMonitor) buildEndpointList() endpointList {
r.mu.Lock()
defer r.mu.Unlock()
endpoints := make(endpointList, 0, len(r.offsets)*2)
for addr, o := range r.offsets {
// Remove anything that hasn't been updated since the last time offest
// was measured. This indicates that we no longer have a connection to
// that addr.
if o.MeasuredAt < r.lastMonitoredAt {
delete(r.offsets, addr)
continue
}
lowpoint := endpoint{
offset: o.Offset - o.Error - r.lClock.MaxOffset().Nanoseconds(),
endType: -1,
}
highpoint := endpoint{
offset: o.Offset + o.Error + r.lClock.MaxOffset().Nanoseconds(),
endType: +1,
}
endpoints = append(endpoints, lowpoint, highpoint)
}
return endpoints
}