Skip to content

Commit

Permalink
chore(storage): fix checksums for gRPC uploads (#7193)
Browse files Browse the repository at this point in the history
There was an API change so that checksums can now only be provided by the StartResumableUpload request rather than while uploading. Send the checksum at this stage instead.

Fixes #7033
  • Loading branch information
tritone committed Dec 27, 2022
1 parent 94723a2 commit a7720e4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 24 deletions.
43 changes: 22 additions & 21 deletions storage/grpc_client.go
Expand Up @@ -1496,11 +1496,29 @@ func (w *gRPCWriter) startResumableUpload() error {
if err != nil {
return err
}
req := &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
}
// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
if w.sendCRC32C {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(w.attrs.CRC32C),
}
}
if len(w.attrs.MD5) != 0 {
if cs := req.GetObjectChecksums(); cs == nil {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Md5Hash: w.attrs.MD5,
}
} else {
cs.Md5Hash = w.attrs.MD5
}
}
return run(w.ctx, func() error {
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
})
upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
w.upid = upres.GetUploadId()
return err
}, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx))
Expand Down Expand Up @@ -1587,23 +1605,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
}

// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
if w.sendCRC32C {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(w.attrs.CRC32C),
}
}
if len(w.attrs.MD5) != 0 {
if cs := req.GetObjectChecksums(); cs == nil {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Md5Hash: w.attrs.MD5,
}
} else {
cs.Md5Hash = w.attrs.MD5
}
}
}

err = w.stream.Send(req)
Expand Down
3 changes: 0 additions & 3 deletions storage/integration_test.go
Expand Up @@ -1082,9 +1082,6 @@ func TestIntegration_MultiMessageWriteGRPC(t *testing.T) {

func TestIntegration_MultiChunkWrite(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
if bucket == grpcBucketName {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/7033")
}
h := testHelper{t}
obj := client.Bucket(bucket).Object(uidSpace.New()).Retryer(WithPolicy(RetryAlways))
defer h.mustDeleteObject(obj)
Expand Down

0 comments on commit a7720e4

Please sign in to comment.