forked from asonawalla/gazette
/
index.go
230 lines (191 loc) · 6.62 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package fragment
import (
"context"
"sync"
"time"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"golang.org/x/net/trace"
)
const (
// When a covering fragment cannot be found, we allow serving a *greater*
// fragment so long as it was last modified at least this long ago.
offsetJumpAgeThreshold = 6 * time.Hour
)
// Index maintains a queryable index of local and remote journal Fragments.
type Index struct {
ctx context.Context // Context over the lifetime of the Index.
set CoverSet // All Fragments of the index (local and remote).
local CoverSet // Local Fragments only (having non-nil File).
condCh chan struct{} // Condition variable; notifies blocked queries on each |set| update.
firstRefreshCh chan struct{} // Closed when the first remote index load has completed.
mu sync.RWMutex // Guards |set| and |condCh|.
}
// NewIndex returns a new, empty Index.
func NewIndex(ctx context.Context) *Index {
return &Index{
ctx: ctx,
condCh: make(chan struct{}),
firstRefreshCh: make(chan struct{}),
}
}
// Query the Index for a Fragment matching the ReadRequest.
func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadResponse, File, error) {
defer fi.mu.RUnlock()
fi.mu.RLock()
var resp = &pb.ReadResponse{
Offset: req.Offset,
}
// Special handling for reads at the Journal Write head.
if resp.Offset == -1 {
resp.Offset = fi.set.EndOffset()
}
for {
var ind, found = fi.set.LongestOverlappingFragment(resp.Offset)
var condCh = fi.condCh
var err error
// If the requested offset isn't covered by the index, but we do have a
// Fragment covering a *greater* offset, where that Fragment is also older
// than a large time.Duration, then: skip forward the request offset to
// the Fragment offset. This case allows us to recover from "holes" or
// deletions in the offset space of a Journal, while not impacting races
// which can occur between delayed persistence to the Fragment store
// vs hand-off of Journals to new brokers (eg, a new broker which isn't
// yet aware of a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
if !found && ind != len(fi.set) &&
fi.set[ind].ModTime != 0 &&
fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix() {
resp.Offset = fi.set[ind].Begin
found = true
}
if found {
resp.Status = pb.Status_OK
resp.WriteHead = fi.set.EndOffset()
resp.Fragment = new(pb.Fragment)
*resp.Fragment = fi.set[ind].Fragment
if resp.Fragment.BackingStore != "" && resp.Fragment.ModTime != 0 {
resp.FragmentUrl, err = SignGetURL(*resp.Fragment, time.Minute)
}
addTrace(ctx, "Index.Query(%s) => %s, localFile: %t", req, resp, fi.set[ind].File != nil)
return resp, fi.set[ind].File, err
}
if !req.Block {
resp.Status = pb.Status_OFFSET_NOT_YET_AVAILABLE
resp.WriteHead = fi.set.EndOffset()
addTrace(ctx, "Index.Query(%s) => %s", req, resp)
return resp, nil, nil
}
addTrace(ctx, " ... stalled in Index.Query(%s)", req)
// Wait for |condCh| to signal, or for the request |ctx| or Index
// Context to be cancelled.
fi.mu.RUnlock()
select {
case <-condCh:
// Pass.
case <-ctx.Done():
err = ctx.Err()
case <-fi.ctx.Done():
err = fi.ctx.Err()
}
fi.mu.RLock()
if err != nil {
return nil, nil, err
}
}
}
// EndOffset returns the last (largest) End offset in the index.
func (fi *Index) EndOffset() int64 {
defer fi.mu.RUnlock()
fi.mu.RLock()
return fi.set.EndOffset()
}
// SpoolCommit adds local Spool Fragment |frag| to the index.
func (fi *Index) SpoolCommit(frag Fragment) {
defer fi.mu.Unlock()
fi.mu.Lock()
fi.set, _ = fi.set.Add(frag)
fi.local, _ = fi.local.Add(frag)
fi.wakeBlockedQueries()
}
// ReplaceRemote replaces all remote Fragments in the index with |set|.
func (fi *Index) ReplaceRemote(set CoverSet) {
defer fi.mu.Unlock()
fi.mu.Lock()
// Remove local fragments which are also present in |set|. This removes
// references to held File instances, allowing them to be finalized by the
// garbage collector. As Fragment Files have only the single open file-
// descriptor and no remaining hard links, this also releases associated
// disk and OS page buffer resources. Note that we cannot directly Close
// these Fragment Files (and must instead rely on GC to collect them),
// as they may still be referenced by concurrent read requests.
fi.local = CoverSetDifference(fi.local, set)
// Extend |set| with remaining local Fragments not already in |set|.
for _, frag := range fi.local {
var ok bool
if set, ok = set.Add(frag); !ok {
panic("expected local fragment to not be covered")
}
}
fi.set = set
fi.wakeBlockedQueries()
select {
case <-fi.firstRefreshCh:
// Already closed.
default:
close(fi.firstRefreshCh)
}
}
// wakeBlockedQueries wakes all queries waiting for an index update.
// fi.mu must already be held.
func (fi *Index) wakeBlockedQueries() {
// Close |condCh| to signal to waiting readers that the index has updated.
// Create a new condition channel for future readers to block on, while
// awaiting the next index update.
close(fi.condCh)
fi.condCh = make(chan struct{})
}
// WaitForFirstRemoteRefresh blocks until ReplaceRemote has been called at least
// one time, or until the context is cancelled.
func (fi *Index) WaitForFirstRemoteRefresh(ctx context.Context) error {
select {
case <-fi.firstRefreshCh:
return nil
default:
}
addTrace(ctx, " ... stalled in Index.WaitForFirstRemoteRefresh()")
select {
case <-fi.firstRefreshCh:
return nil
case <-ctx.Done():
return ctx.Err()
case <-fi.ctx.Done():
return fi.ctx.Err()
}
}
// Inspect will call |callback| with a CoverSet represeting a snapshot of all the fragments in the index.
// While |callback| is executing there will be no changes to the fragment set of the index.
func (fi *Index) Inspect(callback func(CoverSet) error) error {
fi.mu.RLock()
defer fi.mu.RUnlock()
return callback(fi.set)
}
// WalkAllStores enumerates Fragments from each of |stores| into the returned
// CoverSet, or returns an encountered error.
func WalkAllStores(ctx context.Context, name pb.Journal, stores []pb.FragmentStore) (CoverSet, error) {
var set CoverSet
for _, store := range stores {
var err = List(ctx, store, name, func(f pb.Fragment) {
set, _ = set.Add(Fragment{Fragment: f})
})
if err != nil {
return CoverSet{}, err
}
}
return set, nil
}
var timeNow = time.Now
func addTrace(ctx context.Context, format string, args ...interface{}) {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(format, args...)
}
}