forked from asonawalla/gazette
/
reader.go
324 lines (283 loc) · 11.4 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
package client
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/LiveRamp/gazette/v2/pkg/codecs"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Reader adapts a Read RPC to the io.Reader interface. It additionally supports
// directly reading Fragment URLs advertised but not proxied by the broker (eg,
// because DoNotProxy of the ReadRequest is true). A Reader is invalidated by its
// first returned error, with the exception of ErrOffsetJump: this error is
// returned to notify the client that the next Journal offset to be Read is not
// the offset that was requested, but the Reader is prepared to continue at the
// updated offset.
type Reader struct {
Request pb.ReadRequest // ReadRequest of the Reader.
Response pb.ReadResponse // Most recent ReadResponse from broker.
ctx context.Context
client pb.RoutedJournalClient // Client against which Read is dispatched.
stream pb.Journal_ReadClient // Server stream.
direct io.ReadCloser // Directly opened Fragment URL.
}
// NewReader returns a Reader initialized with the given BrokerClient and ReadRequest.
func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *Reader {
var r = &Reader{
Request: req,
ctx: ctx,
client: client,
}
return r
}
func (r *Reader) Read(p []byte) (n int, err error) {
// If we have an open direct reader of a persisted fragment, delegate to it.
if r.direct != nil {
if n, err = r.direct.Read(p); err != nil {
_ = r.direct.Close()
}
r.Request.Offset += int64(n)
return
}
// Is there remaining content in the last ReadResponse?
if l, d := len(r.Response.Content), int(r.Request.Offset-r.Response.Offset); l != 0 && l > d {
n = copy(p, r.Response.Content[d:])
r.Request.Offset += int64(n)
return
}
// Lazy initialization: begin the Read RPC.
if r.stream == nil {
if r.stream, err = r.client.Read(
pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false),
&r.Request,
); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.stream|.
} else {
err = mapGRPCCtxErr(r.ctx, err)
}
return
}
// Read and Validate the next frame.
if err = r.stream.RecvMsg(&r.Response); err == nil {
if err = r.Response.Validate(); err != nil {
err = pb.ExtendContext(err, "ReadResponse")
} else if r.Response.Status == pb.Status_OK && r.Response.Offset < r.Request.Offset {
err = pb.NewValidationError("invalid ReadResponse offset (%d; expected >= %d)",
r.Response.Offset, r.Request.Offset) // Violation of Read API contract.
}
} else {
err = mapGRPCCtxErr(r.ctx, err)
}
// A note on resource leaks: an invariant of Read is that in invocations where
// the returned error != nil, an error has also been read from |r.stream|,
// implying that the gRPC stream has been torn down. The exception is if
// response validation fails, which indicates a client / server API version
// incompatibility and cannot happen in normal operation.
if err == nil {
// If a Header was sent, advise of its advertised journal Route.
if r.Response.Header != nil {
r.client.UpdateRoute(r.Request.Journal.String(), &r.Response.Header.Route)
}
if r.Request.Offset < r.Response.Offset {
// Offset jumps are uncommon, but possible if fragments were removed,
// or if the requested offset was -1.
r.Request.Offset = r.Response.Offset
err = ErrOffsetJump
}
if r.Response.Status == pb.Status_OK {
// Return empty read, to allow inspection of the updated |r.Response|.
} else {
// The broker will send a stream closure following a !OK status.
// Recurse to read that closure, and _then_ return a final error.
n, err = r.Read(p)
}
return
} else if err != io.EOF {
// We read an error _other_ than a graceful tear-down of the stream.
return
}
// We read a graceful stream closure (err == io.EOF).
// If the frame preceding EOF provided a fragment URL, open it directly.
if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" {
if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, r.Response.FragmentUrl); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
}
return
}
// Otherwise, map Status of the preceding frame into a more specific error.
switch r.Response.Status {
case pb.Status_OK:
if err != io.EOF {
panic(err.Error()) // Status_OK implies graceful stream closure.
}
case pb.Status_NOT_JOURNAL_BROKER:
err = ErrNotJournalBroker
case pb.Status_OFFSET_NOT_YET_AVAILABLE:
err = ErrOffsetNotYetAvailable
default:
err = errors.New(r.Response.Status.String())
}
return
}
// AdjustedOffset returns the current journal offset, adjusted for content read
// by |br| (which wraps this Reader) but not yet consumed from |br|'s buffer.
func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 {
return r.Request.Offset - int64(br.Buffered())
}
// Seek provides a limited form of seeking support. Specifically, iff a
// Fragment URL is being directly read, the Seek offset is ahead of the current
// Reader offset, and the Fragment also covers the desired Seek offset, then a
// seek is performed by reading and discarding to the seeked offset. Seek will
// otherwise return ErrSeekRequiresNewReader.
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
// |offset| is already absolute.
case io.SeekCurrent:
offset = r.Request.Offset + offset
case io.SeekEnd:
return r.Request.Offset, errors.New("io.SeekEnd whence is not supported")
default:
panic("invalid whence")
}
if r.direct == nil || offset < r.Request.Offset || offset >= r.Response.Fragment.End {
return r.Request.Offset, ErrSeekRequiresNewReader
}
var _, err = io.CopyN(ioutil.Discard, r, offset-r.Request.Offset)
return r.Request.Offset, err
}
// OpenFragmentURL directly opens |fragment|, which must be available at URL
// |url|, and returns a *FragmentReader which has been pre-seeked to |offset|.
func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url string) (*FragmentReader, error) {
var req, err = http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if fragment.CompressionCodec == pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION {
// Require that the server send us un-encoded content, offloading
// decompression onto the storage API. Go's standard `gzip` package is slow,
// and we also see a parallelism benefit by offloading decompression work
// onto the cloud storage system.
req.Header.Set("Accept-Encoding", "identity")
} else if fragment.CompressionCodec == pb.CompressionCodec_GZIP {
// Explicitly request gzip. Doing so disables the http client's transparent
// handling for gzip decompression if the Fragment happened to be written with
// "Content-Encoding: gzip", and it instead directly surfaces the compressed
// bytes to us.
req.Header.Set("Accept-Encoding", "gzip")
}
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
} else if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
return nil, fmt.Errorf("!OK fetching (%s, %q)", resp.Status, url)
}
// Technically the store _must_ decompress in response to honor our
// Accept-Encoding header, but some implementations (eg, Minio) don't.
if fragment.CompressionCodec == pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION &&
resp.Header.Get("Content-Encoding") == "gzip" {
fragment.CompressionCodec = pb.CompressionCodec_GZIP // Decompress client-side.
}
return NewFragmentReader(resp.Body, fragment, offset)
}
// NewFragmentReader wraps |rc|, which is a io.ReadCloser of raw Fragment bytes,
// with a returned *FragmentReader which has been pre-seeked to |offset|.
func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error) {
var decomp, err = codecs.NewCodecReader(rc, fragment.CompressionCodec)
if err != nil {
_ = rc.Close()
return nil, err
}
var fr = &FragmentReader{
decomp: decomp,
raw: rc,
Fragment: fragment,
Offset: fragment.Begin,
}
// Attempt to seek to |offset| within the fragment.
var delta = offset - fragment.Begin
if _, err = io.CopyN(ioutil.Discard, fr, delta); err != nil {
_ = fr.Close()
return nil, err
}
return fr, nil
}
// FragmentReader is a io.ReadCloser of a Fragment.
type FragmentReader struct {
pb.Fragment // Fragment being read.
Offset int64 // Next journal offset to be read, in range [Begin, End).
decomp io.ReadCloser
raw io.ReadCloser
}
// Read returns the next bytes of decompressed Fragment content. When Read
// returns, Offset has been updated to reflect the next byte to be read.
// Read returns EOF only if the underlying Reader returns EOF at precisely
// Offset == Fragment.End. If the underlying Reader is too short,
// io.ErrUnexpectedEOF is returned. If it's too long, ErrDidNotReadExpectedEOF
// is returned.
func (fr *FragmentReader) Read(p []byte) (n int, err error) {
n, err = fr.decomp.Read(p)
fr.Offset += int64(n)
if fr.Offset > fr.Fragment.End {
// Did we somehow read beyond Fragment.End?
n -= int(fr.Offset - fr.Fragment.End)
err = ErrDidNotReadExpectedEOF
} else if err == io.EOF && fr.Offset != fr.Fragment.End {
// Did we read EOF before the reaching Fragment.End?
err = io.ErrUnexpectedEOF
}
return
}
// Close closes the underlying ReadCloser and associated
// decompressor (if any).
func (fr *FragmentReader) Close() error {
var errA, errB = fr.decomp.Close(), fr.raw.Close()
if errA != nil {
return errA
}
return errB
}
// InstallFileTransport registers a file:// protocol handler rooted at |root|
// with the http.Client used by OpenFragmentURL. The returned cleanup function
// removes the handler and restores the prior http.Client.
func InstallFileTransport(root string) (remove func()) {
var transport = new(http.Transport)
*transport = *http.DefaultTransport.(*http.Transport) // Clone.
transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root)))
var prevClient = httpClient
httpClient = &http.Client{Transport: transport}
return func() { httpClient = prevClient }
}
// mapGRPCCtxErr returns ctx.Err() iff |err| represents a gRPC error with a
// status code matching ctx.Err(). Otherwise, it returns |err| unmodified.
// In other words, this routine "unwraps" gRPC errors which have their root cause
// in a local Context error, instead mapping to the common context package errors.
func mapGRPCCtxErr(ctx context.Context, err error) error {
if ctx.Err() == context.DeadlineExceeded && status.Code(err) == codes.DeadlineExceeded {
return ctx.Err()
}
if ctx.Err() == context.Canceled && status.Code(err) == codes.Canceled {
return ctx.Err()
}
return err
}
var (
// Map common broker error statuses into named errors.
ErrNotJournalBroker = errors.New(pb.Status_NOT_JOURNAL_BROKER.String())
ErrNotJournalPrimaryBroker = errors.New(pb.Status_NOT_JOURNAL_PRIMARY_BROKER.String())
ErrOffsetNotYetAvailable = errors.New(pb.Status_OFFSET_NOT_YET_AVAILABLE.String())
ErrWrongAppendOffset = errors.New(pb.Status_WRONG_APPEND_OFFSET.String())
ErrOffsetJump = errors.New("offset jump")
ErrSeekRequiresNewReader = errors.New("seek offset requires new Reader")
ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End")
// httpClient is the http.Client used by OpenFragmentURL
httpClient = http.DefaultClient
)