New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): Implement io.WriterTo in Reader #9659
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"encoding/base64" | ||
"errors" | ||
"fmt" | ||
"hash/crc32" | ||
"io" | ||
"net/url" | ||
"os" | ||
|
@@ -1042,6 +1043,16 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange | |
// This is the size of the entire object, even if only a range was requested. | ||
size := obj.GetSize() | ||
|
||
// Only support checksums when reading an entire object, not a range. | ||
var ( | ||
wantCRC uint32 | ||
checkCRC bool | ||
) | ||
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 { | ||
wantCRC = checksums.GetCrc32C() | ||
checkCRC = true | ||
} | ||
|
||
r = &Reader{ | ||
Attrs: ReaderObjectAttrs{ | ||
Size: size, | ||
|
@@ -1063,7 +1074,10 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange | |
settings: s, | ||
zeroRange: params.length == 0, | ||
databuf: databuf, | ||
wantCRC: wantCRC, | ||
checkCRC: checkCRC, | ||
}, | ||
checkCRC: checkCRC, | ||
} | ||
|
||
cr := msg.GetContentRange() | ||
|
@@ -1081,12 +1095,6 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange | |
r.reader.Close() | ||
} | ||
|
||
// Only support checksums when reading an entire object, not a range. | ||
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 { | ||
r.wantCRC = checksums.GetCrc32C() | ||
r.checkCRC = true | ||
} | ||
|
||
return r, nil | ||
} | ||
|
||
|
@@ -1464,12 +1472,36 @@ type gRPCReader struct { | |
databuf []byte | ||
cancel context.CancelFunc | ||
settings *settings | ||
checkCRC bool // should we check the CRC? | ||
wantCRC uint32 // the CRC32c value the server sent in the header | ||
gotCRC uint32 // running crc | ||
} | ||
|
||
// Update the running CRC with the data in the slice, if CRC checking was enabled. | ||
func (r *gRPCReader) updateCRC(b []byte) { | ||
if r.checkCRC { | ||
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, b) | ||
} | ||
} | ||
|
||
// Checks whether the CRC matches at the conclusion of a read, if CRC checking was enabled. | ||
func (r *gRPCReader) runCRCCheck() error { | ||
if r.checkCRC { | ||
if r.gotCRC != r.wantCRC { | ||
tritone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return fmt.Errorf("storage: bad CRC on read: got %d, want %d", r.gotCRC, r.wantCRC) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Read reads bytes into the user's buffer from an open gRPC stream. | ||
func (r *gRPCReader) Read(p []byte) (int, error) { | ||
// The entire object has been read by this reader, return EOF. | ||
// The entire object has been read by this reader, check the checksum if | ||
// necessary and return EOF. | ||
if r.size == r.seen || r.zeroRange { | ||
if err := r.runCRCCheck(); err != nil { | ||
return 0, err | ||
} | ||
return 0, io.EOF | ||
} | ||
|
||
|
@@ -1487,6 +1519,7 @@ func (r *gRPCReader) Read(p []byte) (int, error) { | |
if len(r.leftovers) > 0 { | ||
n = copy(p, r.leftovers) | ||
r.seen += int64(n) | ||
r.updateCRC(p[:n]) | ||
r.leftovers = r.leftovers[n:] | ||
return n, nil | ||
} | ||
|
@@ -1512,10 +1545,78 @@ func (r *gRPCReader) Read(p []byte) (int, error) { | |
r.leftovers = content[n:] | ||
} | ||
r.seen += int64(n) | ||
r.updateCRC(p[:n]) | ||
|
||
return n, nil | ||
} | ||
|
||
// WriteTo writes all the data requested by the Reader into w, implementing | ||
// io.WriterTo. | ||
func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) { | ||
// The entire object has been read by this reader, check the checksum if | ||
// necessary and return EOF. | ||
if r.size == r.seen || r.zeroRange { | ||
if err := r.runCRCCheck(); err != nil { | ||
return 0, err | ||
} | ||
return 0, io.EOF | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed this above in the read too, but I wonder if these should be storage specific errors so users have more context? Or maybe wrapping happens at a higher layer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think there is still more we can do to have more coherently wrapped errors. it's been on the back burner. |
||
} | ||
|
||
// No stream to read from, either never initialized or Close was called. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this handled in an above layer or is this documented as such on the public way this methods is called into? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's handled in an above layer; the stream is always initialized in NewRangeReader. Really the only way you can trigger this is as an end user is by calling Close and then trying to read more. |
||
// Note: There is a potential concurrency issue if multiple routines are | ||
// using the same reader. One encounters an error and the stream is closed | ||
// and then reopened while the other routine attempts to read from it. | ||
if r.stream == nil { | ||
return 0, fmt.Errorf("reader has been closed") | ||
tritone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Track bytes written during before call. | ||
var alreadySeen = r.seen | ||
|
||
// Write any leftovers to the stream. There will be some leftovers from the | ||
// original NewRangeReader call. | ||
if len(r.leftovers) > 0 { | ||
// Write() will write the entire leftovers slice unless there is an error. | ||
written, err := w.Write(r.leftovers) | ||
r.seen += int64(written) | ||
r.updateCRC(r.leftovers) | ||
r.leftovers = nil | ||
if err != nil { | ||
return r.seen - alreadySeen, err | ||
} | ||
} | ||
|
||
// Loop and receive additional messages until the entire data is written. | ||
for { | ||
// Attempt to receive the next message on the stream. | ||
// Will terminate with io.EOF once data has all come through. | ||
// recv() handles stream reopening and retry logic so no need for retries here. | ||
msg, err := r.recv() | ||
if err != nil { | ||
if err == io.EOF { | ||
// We are done; check the checksum if necessary and return. | ||
err = r.runCRCCheck() | ||
} | ||
return r.seen - alreadySeen, err | ||
} | ||
|
||
// TODO: Determine if we need to capture incremental CRC32C for this | ||
// chunk. The Object CRC32C checksum is captured when directed to read | ||
// the entire Object. If directed to read a range, we may need to | ||
// calculate the range's checksum for verification if the checksum is | ||
// present in the response here. | ||
// TODO: Figure out if we need to support decompressive transcoding | ||
// https://cloud.google.com/storage/docs/transcoding. | ||
written, err := w.Write(msg) | ||
r.seen += int64(written) | ||
r.updateCRC(msg) | ||
if err != nil { | ||
return r.seen - alreadySeen, err | ||
} | ||
} | ||
|
||
} | ||
|
||
// Close cancels the read stream's context in order for it to be closed and | ||
// collected. | ||
func (r *gRPCReader) Close() error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have the full context here, but do you really want params.length to be negative here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is so that we only check the checksums on the entire object read, in which case params.length would be negative to indicate no limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Brenna is correct.