Skip to content

Commit

Permalink
feat(storage) Implement io.WriterTo in Reader
Browse files Browse the repository at this point in the history
This allows the gRPC Reader to write directly into the application
write buffer, saving a data copy.

Users can get the benefit of this directly by explicitly calling
Reader.WriteTo, but they can also benefit implicitly if they are
calling io.Copy.

A bunch of checksum logic had to be moved from the parent Reader
into the transport Readers to make this work, since we need to
update the checksum for every message read in WriteTo.
  • Loading branch information
tritone committed Mar 28, 2024
1 parent fbf19aa commit 6252744
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 28 deletions.
115 changes: 108 additions & 7 deletions storage/grpc_client.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"net/url"
"os"
Expand Down Expand Up @@ -1042,6 +1043,16 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
// This is the size of the entire object, even if only a range was requested.
size := obj.GetSize()

// Only support checksums when reading an entire object, not a range.
var (
wantCRC uint32
checkCRC bool
)
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
wantCRC = checksums.GetCrc32C()
checkCRC = true
}

r = &Reader{
Attrs: ReaderObjectAttrs{
Size: size,
Expand All @@ -1063,7 +1074,10 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
settings: s,
zeroRange: params.length == 0,
databuf: databuf,
wantCRC: wantCRC,
checkCRC: checkCRC,
},
checkCRC: checkCRC,
}

cr := msg.GetContentRange()
Expand All @@ -1081,12 +1095,6 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
r.reader.Close()
}

// Only support checksums when reading an entire object, not a range.
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
r.wantCRC = checksums.GetCrc32C()
r.checkCRC = true
}

return r, nil
}

Expand Down Expand Up @@ -1464,12 +1472,36 @@ type gRPCReader struct {
databuf []byte
cancel context.CancelFunc
settings *settings
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
}

// Update the running CRC with the data in the slice, if CRC checking was enabled.
func (r *gRPCReader) updateCRC(b []byte) {
if r.checkCRC {
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, b)
}
}

// Checks whether the CRC matches at the conclusion of a read, if CRC checking was enabled.
func (r *gRPCReader) runCRCCheck() error {
if r.checkCRC {
if r.gotCRC != r.wantCRC {
return fmt.Errorf("storage: bad CRC on read: got %d, want %d", r.gotCRC, r.wantCRC)
}
}
return nil
}

// Read reads bytes into the user's buffer from an open gRPC stream.
func (r *gRPCReader) Read(p []byte) (int, error) {
// The entire object has been read by this reader, return EOF.
// The entire object has been read by this reader, check the checksum if
// necessary and return EOF.
if r.size == r.seen || r.zeroRange {
if err := r.runCRCCheck(); err != nil {
return 0, err
}
return 0, io.EOF
}

Expand All @@ -1487,6 +1519,7 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
if len(r.leftovers) > 0 {
n = copy(p, r.leftovers)
r.seen += int64(n)
r.updateCRC(p[:n])
r.leftovers = r.leftovers[n:]
return n, nil
}
Expand All @@ -1512,10 +1545,78 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
r.leftovers = content[n:]
}
r.seen += int64(n)
r.updateCRC(p[:n])

return n, nil
}

// WriteTo writes all the data requested by the Reader into w, implementing
// io.WriterTo.
func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) {
// The entire object has been read by this reader, check the checksum if
// necessary and return EOF.
if r.size == r.seen || r.zeroRange {
if err := r.runCRCCheck(); err != nil {
return 0, err
}
return 0, io.EOF
}

// No stream to read from, either never initialized or Close was called.
// Note: There is a potential concurrency issue if multiple routines are
// using the same reader. One encounters an error and the stream is closed
// and then reopened while the other routine attempts to read from it.
if r.stream == nil {
return 0, fmt.Errorf("reader has been closed")
}

// Track bytes written during before call.
var alreadySeen = r.seen

// Write any leftovers to the stream. There will be some leftovers from the
// original NewRangeReader call.
if len(r.leftovers) > 0 {
// Write() will write the entire leftovers slice unless there is an error.
written, err := w.Write(r.leftovers)
r.seen += int64(written)
r.updateCRC(r.leftovers)
r.leftovers = nil
if err != nil {
return r.seen - alreadySeen, err
}
}

// Loop and receive additional messages until the entire data is written.
for {
// Attempt to receive the next message on the stream.
// Will terminate with io.EOF once data has all come through.
// recv() handles stream reopening and retry logic so no need for retries here.
msg, err := r.recv()
if err != nil {
if err == io.EOF {
// We are done; check the checksum if necessary and return.
err = r.runCRCCheck()
}
return r.seen - alreadySeen, err
}

// TODO: Determine if we need to capture incremental CRC32C for this
// chunk. The Object CRC32C checksum is captured when directed to read
// the entire Object. If directed to read a range, we may need to
// calculate the range's checksum for verification if the checksum is
// present in the response here.
// TODO: Figure out if we need to support decompressive transcoding
// https://cloud.google.com/storage/docs/transcoding.
written, err := w.Write(msg)
r.seen += int64(written)
r.updateCRC(msg)
if err != nil {
return r.seen - alreadySeen, err
}
}

}

// Close cancels the read stream's context in order for it to be closed and
// collected.
func (r *gRPCReader) Close() error {
Expand Down
34 changes: 27 additions & 7 deletions storage/http_client.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -1218,9 +1219,12 @@ func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket strin
}

type httpReader struct {
body io.ReadCloser
seen int64
reopen func(seen int64) (*http.Response, error)
body io.ReadCloser
seen int64
reopen func(seen int64) (*http.Response, error)
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
}

func (r *httpReader) Read(p []byte) (int, error) {
Expand All @@ -1229,7 +1233,22 @@ func (r *httpReader) Read(p []byte) (int, error) {
m, err := r.body.Read(p[n:])
n += m
r.seen += int64(m)
if err == nil || err == io.EOF {
if r.checkCRC {
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
}
if err == nil {
return n, err
}
if err == io.EOF {
// Check CRC here. It would be natural to check it in Close, but
// everybody defers Close on the assumption that it doesn't return
// anything worth looking at.
if r.checkCRC {
if r.gotCRC != r.wantCRC {
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
r.gotCRC, r.wantCRC)
}
}
return n, err
}
// Read failed (likely due to connection issues), but we will try to reopen
Expand Down Expand Up @@ -1435,11 +1454,12 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
Attrs: attrs,
size: size,
remain: remain,
wantCRC: crc,
checkCRC: checkCRC,
reader: &httpReader{
reopen: reopen,
body: body,
reopen: reopen,
body: body,
wantCRC: crc,
checkCRC: checkCRC,
},
}, nil
}
26 changes: 12 additions & 14 deletions storage/reader.go
Expand Up @@ -198,9 +198,7 @@ var emptyBody = ioutil.NopCloser(strings.NewReader(""))
type Reader struct {
Attrs ReaderObjectAttrs
seen, remain, size int64
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
checkCRC bool // Did we check the CRC? This is now only used by tests.

reader io.ReadCloser
ctx context.Context
Expand All @@ -218,17 +216,17 @@ func (r *Reader) Read(p []byte) (int, error) {
if r.remain != -1 {
r.remain -= int64(n)
}
if r.checkCRC {
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
// Check CRC here. It would be natural to check it in Close, but
// everybody defers Close on the assumption that it doesn't return
// anything worth looking at.
if err == io.EOF {
if r.gotCRC != r.wantCRC {
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
r.gotCRC, r.wantCRC)
}
}
return n, err
}

// WriteTo writes all the data from the Reader to w. Fulfills the io.WriterTo interface.
// This is called implicitly when calling io.Copy on a Reader.
func (r *Reader) WriteTo(w io.Writer) (int64, error) {
// This implicitly calls r.reader.WriteTo for gRPC only. JSON and XML don't have an
// implementation of WriteTo.
n, err := io.Copy(w, r.reader)
if r.remain != -1 {
r.remain -= int64(n)
}
return n, err
}
Expand Down

0 comments on commit 6252744

Please sign in to comment.