Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): implement Read with gRPC #4401

Merged
merged 97 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
dbe86e0
feat(storage): implement Read with gRPC
noahdietz Jul 8, 2021
97bdd9c
remove print statements
noahdietz Jul 8, 2021
a22b845
subtract from remaining
noahdietz Jul 8, 2021
3e54d74
add leftovers buffer
noahdietz Jul 9, 2021
d10fffb
fix: refactor to apiv2
noahdietz Jul 15, 2021
4f175ba
Merge branch 'master' into storage-grpc
noahdietz Jul 19, 2021
5431e4b
fix: fix close bug and comments
noahdietz Jul 19, 2021
9f4e06c
fix: return io.EOF
noahdietz Jul 20, 2021
021bdba
fix: refactor read
noahdietz Jul 20, 2021
785716f
fix: use seen/size instead of remain
noahdietz Jul 20, 2021
e73853e
fix: set ReaderObjectAttrs
noahdietz Jul 20, 2021
a83b1fb
fix: use attrs.size
noahdietz Jul 20, 2021
128f942
fix: clean up comments & reset usedLeftovers flag
noahdietz Jul 20, 2021
45182ba
fix: do not retry permission denied
noahdietz Jul 21, 2021
517bc3b
fix: properly return EOF after leftovers
noahdietz Jul 21, 2021
223047d
fix: make grpc factories internal
noahdietz Jul 22, 2021
86c5852
fix: add grpc read integration tests
noahdietz Jul 22, 2021
03e401d
chore: skip integration tests
noahdietz Jul 22, 2021
a316055
Merge branch 'master' into storage-grpc
noahdietz Jul 22, 2021
a1aaee5
fix: add stream context cancellation
noahdietz Jul 22, 2021
0cc04c6
fix: refactor use of remain in readWithGRPC
noahdietz Jul 22, 2021
7788408
fix: consolidate returning EOF to single check
noahdietz Jul 22, 2021
35eb60a
fix: move grpc reader init into NewRangeReader
noahdietz Jul 23, 2021
2d878d3
fix: add experimental comments
noahdietz Jul 23, 2021
f0765d3
Merge remote-tracking branch 'origin' into storage-grpc
noahdietz Jul 23, 2021
cbaa917
fix: refactor grpc test data upload
noahdietz Jul 23, 2021
8bc11ec
fix: use io.Copy and bytes.Buffer
noahdietz Jul 23, 2021
d7d987c
fix: refactor test data upload
noahdietz Jul 26, 2021
49dcfea
fix: use bytes.Repeat for test data
noahdietz Jul 26, 2021
46b48b9
fix: use testConfig for test hc
noahdietz Jul 27, 2021
8340ca2
refactor grpc tests
noahdietz Jul 27, 2021
8a5aafd
fix: add clientOptions wrapper
noahdietz Jul 27, 2021
70a7a83
fix: use 5MB in chunk test
noahdietz Jul 27, 2021
6da4783
Merge branch 'master' into storage-grpc
noahdietz Jul 27, 2021
b84e7c4
clean up test error cases
noahdietz Jul 27, 2021
4f332f5
fix: recv on open
noahdietz Jul 29, 2021
79f81ba
fix: refactor v1 change for v2
noahdietz Jul 29, 2021
4f0fd31
add todo for first recv
noahdietz Jul 29, 2021
ada7a91
refactor recv after stream open
noahdietz Jul 29, 2021
79f1043
fatal if reader cannot be made in test
noahdietz Jul 29, 2021
f51ce4d
defer delete with http obj
noahdietz Jul 29, 2021
5d140dc
comment incremental checksum
noahdietz Jul 29, 2021
b74850f
offset chunk size
noahdietz Jul 29, 2021
ff2319d
always attempt to use leftovers
noahdietz Jul 29, 2021
914badb
Merge branch 'master' into storage-grpc
noahdietz Jul 30, 2021
8a93ba4
format read bucket name into resource name
noahdietz Jul 30, 2021
6d0981f
support reading relative to end
noahdietz Jul 30, 2021
1201c43
skip new test
noahdietz Jul 30, 2021
431c43e
Merge branch 'master' into storage-grpc
noahdietz Aug 3, 2021
6c9a4bf
fix: add status check to shouldRetry
noahdietz Aug 3, 2021
de95ad8
add (meta)generation conds
noahdietz Aug 3, 2021
b344e72
unskip v2 tests
noahdietz Aug 3, 2021
9ae1f8f
comment grpc read retry semantics
noahdietz Aug 3, 2021
9d75227
Merge branch 'master' into storage-grpc
noahdietz Aug 4, 2021
b5ca286
add testConfigGRPC to integrations
noahdietz Aug 4, 2021
b53c367
Merge branch 'master' into storage-grpc
noahdietz Aug 4, 2021
0ddac16
chore: promote compute to ga
noahdietz Aug 5, 2021
74dd212
Merge branch 'master' into storage-grpc
noahdietz Aug 9, 2021
85988fc
Merge remote-tracking branch 'upstream/master'
noahdietz Aug 9, 2021
8659a2f
add bucket name helper
noahdietz Aug 9, 2021
7896345
Merge branch 'master' into storage-grpc
noahdietz Aug 10, 2021
ea887cc
Merge branch 'master' into storage-grpc
noahdietz Aug 10, 2021
9c867b9
Merge branch 'master' into storage-grpc
noahdietz Aug 10, 2021
a360627
reorder mod deps
noahdietz Aug 10, 2021
673c722
update integration test bucket name
noahdietz Aug 10, 2021
3288c41
Merge remote-tracking branch 'upstream/master'
noahdietz Aug 13, 2021
1f6d2e6
Merge branch 'master' into storage-grpc
noahdietz Aug 13, 2021
a70d2ae
address test feedback
noahdietz Aug 13, 2021
abada74
rename to newHybridClient
noahdietz Aug 13, 2021
1faa0f7
add TODO about leftovers management
noahdietz Aug 13, 2021
28b0ce8
expand on per-chunk checksum comment
noahdietz Aug 13, 2021
4443d1a
return read error when stream is nil
noahdietz Aug 13, 2021
b5587c7
return early after copying leftovers
noahdietz Aug 13, 2021
ebda6d0
revert unnecessary conditional check
noahdietz Aug 13, 2021
7c9c992
refactor into reopenStream, add TODO for gzip
noahdietz Aug 13, 2021
d22e194
Merge branch 'master' into storage-grpc
noahdietz Aug 13, 2021
54cb273
refactor Recv with Retry
noahdietz Aug 13, 2021
adb3e2b
Merge branch 'master' into storage-grpc
noahdietz Aug 13, 2021
7af41ef
fix doubling up on retry
noahdietz Aug 13, 2021
5275065
Merge branch 'master' into storage-grpc
noahdietz Aug 16, 2021
b5ba94d
Merge remote-tracking branch 'upstream/master'
noahdietz Aug 17, 2021
023bd88
Merge remote-tracking branch 'upstream/master'
noahdietz Aug 18, 2021
269bd7d
Merge remote-tracking branch 'upstream/master'
noahdietz Aug 19, 2021
7a37ff6
Merge branch 'master' of https://github.com/noahdietz/google-cloud-go…
noahdietz Aug 19, 2021
9e247ff
address feedback
noahdietz Aug 19, 2021
7507f82
chore: use ephemeral buckets for grpc tests
noahdietz Aug 19, 2021
fd1cb10
remove duplicate todo
noahdietz Aug 19, 2021
ec26534
revert a random change that got in
noahdietz Aug 19, 2021
f3b8217
document use for bucket helper
noahdietz Aug 19, 2021
9534b2a
Merge branch 'master' into storage-grpc
noahdietz Aug 19, 2021
e869740
add more commentary
noahdietz Aug 19, 2021
72eb42e
Merge branch 'storage-grpc' of https://github.com/noahdietz/google-cl…
noahdietz Aug 19, 2021
26d778d
refactor code and add more comments
noahdietz Aug 19, 2021
fdbd0ba
fix reader size semantics
noahdietz Aug 19, 2021
7a6bb73
add concurrency issue note
noahdietz Aug 19, 2021
2955c34
Merge branch 'master' into storage-grpc
noahdietz Aug 19, 2021
d2553d3
Merge branch 'master' into storage-grpc
tritone Aug 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions storage/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ require (
google.golang.org/api v0.52.0
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67
google.golang.org/grpc v1.39.1
google.golang.org/protobuf v1.27.1
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
)
209 changes: 209 additions & 0 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"io"
"io/ioutil"
"log"
"math"
"math/rand"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -61,6 +62,7 @@ const (
// TODO(jba): move to testutil, factor out from firestore/integration_test.go.
envFirestoreProjID = "GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID"
envFirestorePrivateKey = "GCLOUD_TESTS_GOLANG_FIRESTORE_KEY"
grpcBucket = "golang-grpc-test-integration"
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -203,6 +205,22 @@ func testConfig(ctx context.Context, t *testing.T) *Client {
return client
}

// testConfigGPRC returns a gRPC-based client to access GCS. testConfigGRPC
// skips the curent test when being run in Short mode.
func testConfigGRPC(ctx context.Context, t *testing.T) (gc *Client) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}

var err error
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
gc, err = newHybridClient(ctx, nil)
if err != nil {
log.Fatalf("newClientWithGRPC: %v", err)
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
}

return
}

// config is like testConfig, but it doesn't need a *testing.T.
func config(ctx context.Context) *Client {
ts := testutil.TokenSource(ctx, ScopeFullControl)
Expand Down Expand Up @@ -744,6 +762,197 @@ func TestIntegration_ObjectsRangeReader(t *testing.T) {
}
}

func TestIntegration_ObjectReadGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

content := []byte("Hello, world this is a grpc request")

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucket).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucket).Object(name)

// Using a negative length to indicate reading to the end.
r, err := obj.NewRangeReader(ctx, 0, -1)
if err != nil {
t.Fatal(err)
}
defer r.Close()

b := new(bytes.Buffer)
b.Grow(len(content))

n, err := io.Copy(b, r)
if err != nil {
t.Fatal(err)
}
if n == 0 {
t.Fatal("Expected to have read more than 0 bytes")
}

got := b.String()
want := string(content)
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("got(-),want(+):\n%s", diff)
}
}

func TestIntegration_ObjectReadChunksGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

// Use a larger blob to test chunking logic. This is a little over 5MB.
content := bytes.Repeat([]byte("a"), 5<<20)

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucket).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucket).Object(name)

r, err := obj.NewReader(ctx)
if err != nil {
t.Fatal(err)
}
defer r.Close()

bufSize := len(content)
buf := make([]byte, bufSize)

// Read in smaller chunks, offset to provoke reading across a Recv boundary.
chunk := 4<<10 + 1234
offset := 0
for {
end := math.Min(float64(offset+chunk), float64(bufSize))
n, err := r.Read(buf[offset:int(end)])
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
offset += n
}

// TODO: Verify content with the checksums.
}

func TestIntegration_ObjectReadRelativeToEndGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()

content := []byte("Hello, world this is a grpc request")

// Upload test data.
name := uidSpace.New()
ho := hc.Bucket(grpcBucket).Object(name)
if err := writeObject(ctx, ho, "text/plain", content); err != nil {
t.Fatal(err)
}
defer func() {
if err := ho.Delete(ctx); err != nil {
log.Printf("failed to delete test object: %v", err)
}
}()

obj := gc.Bucket(grpcBucket).Object(name)

offset := 7
// Using a negative offset to start reading relative to the end of the
// object, and length to indicate reading to the end.
r, err := obj.NewRangeReader(ctx, int64(offset*-1), -1)
if err != nil {
t.Fatal(err)
}
defer r.Close()

b := new(bytes.Buffer)
b.Grow(offset)

n, err := io.Copy(b, r)
if err != nil {
t.Fatal(err)
}
if n == 0 {
t.Fatal("Expected to have read more than 0 bytes")
}

got := b.String()
want := string(content[len(content)-offset:])
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("got(-),want(+):\n%s", diff)
}
}

func TestIntegration_ConditionalDownloadGRPC(t *testing.T) {
ctx := context.Background()

// Create an HTTP client to upload test data and a gRPC client to test with.
hc := testConfig(ctx, t)
defer hc.Close()
gc := testConfigGRPC(ctx, t)
defer gc.Close()
h := testHelper{t}

o := hc.Bucket(grpcBucket).Object("condread")
defer o.Delete(ctx)

wc := o.NewWriter(ctx)
wc.ContentType = "text/plain"
h.mustWrite(wc, []byte("foo"))

gen := wc.Attrs().Generation
metaGen := wc.Attrs().Metageneration

obj := gc.Bucket(grpcBucket).Object(o.ObjectName())

if _, err := obj.Generation(gen + 1).NewReader(ctx); err == nil {
t.Fatalf("Unexpected successful download with nonexistent Generation")
}
if _, err := obj.If(Conditions{MetagenerationMatch: metaGen + 1}).NewReader(ctx); err == nil {
t.Fatalf("Unexpected successful download with failed preconditions IfMetaGenerationMatch")
}
if _, err := obj.If(Conditions{GenerationMatch: gen + 1}).NewReader(ctx); err == nil {
t.Fatalf("Unexpected successful download with failed preconditions IfGenerationMatch")
}
if _, err := obj.If(Conditions{GenerationMatch: gen}).NewReader(ctx); err != nil {
t.Fatalf("Download failed: %v", err)
}
}

func TestIntegration_ConditionalDownload(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
Expand Down
9 changes: 9 additions & 0 deletions storage/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"cloud.google.com/go/internal"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// runWithRetry calls the function until it returns nil or a non-retryable error, or
Expand Down Expand Up @@ -64,6 +66,13 @@ func shouldRetry(err error) bool {
return true
}
}
// HTTP 429, 502, 503, and 504 all map to gRPC UNAVAILABLE per
// https://grpc.github.io/grpc/core/md_doc_http-grpc-status-mapping.html.
//
// This is only necessary for the experimental gRPC-based media operations.
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
return true
}
// Unwrap is only supported in go1.13.x+
if e, ok := err.(interface{ Unwrap() error }); ok {
return shouldRetry(e.Unwrap())
Expand Down