-
Notifications
You must be signed in to change notification settings - Fork 9.8k
/
inflights.go
132 lines (117 loc) · 3.63 KB
/
inflights.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int
// the size of the buffer
size int
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
}
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
func NewInflights(size int) *Inflights {
return &Inflights{
size: size,
}
}
// Clone returns an *Inflights that is identical to but shares no memory with
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]uint64(nil), in.buffer...)
return &ins
}
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
// monotonic sequence of indexes.
func (in *Inflights) Add(inflight uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
}
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight
in.count++
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]uint64, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
// out of the left side of the window
return
}
idx := in.start
var i int
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
break
}
// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
in.count -= i
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
// inflight.
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size
}
// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
// reset frees all inflights.
func (in *Inflights) reset() {
in.count = 0
in.start = 0
}