Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(firestore): add opencensus tracing support #2942

Merged
merged 8 commits into from Feb 4, 2021
13 changes: 11 additions & 2 deletions firestore/client.go
Expand Up @@ -199,7 +199,10 @@ func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) (_ []*Docum
return c.getAll(ctx, docRefs, nil)
}

func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) ([]*DocumentSnapshot, error) {
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) (_ []*DocumentSnapshot, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.BatchGetDocuments")
defer func() { trace.EndSpan(ctx, err) }()

var docNames []string
docIndices := map[string][]int{} // doc name to positions in docRefs
for i, dr := range docRefs {
Expand Down Expand Up @@ -267,6 +270,9 @@ func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte)

// Collections returns an iterator over the top-level collections.
func (c *Client) Collections(ctx context.Context) *CollectionIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.ListCollectionIds")
defer func() { trace.EndSpan(ctx, nil) }()

it := &CollectionIterator{
client: c,
it: c.c.ListCollectionIds(
Expand All @@ -286,7 +292,10 @@ func (c *Client) Batch() *WriteBatch {
}

// commit calls the Commit RPC outside of a transaction.
func (c *Client) commit(ctx context.Context, ws []*pb.Write) ([]*WriteResult, error) {
func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit")
defer func() { trace.EndSpan(ctx, err) }()

req := &pb.CommitRequest{
Database: c.path(),
Writes: ws,
Expand Down
3 changes: 3 additions & 0 deletions firestore/docref.go
Expand Up @@ -677,6 +677,9 @@ func (d *DocumentRef) Update(ctx context.Context, updates []Update, preconds ...

// Collections returns an iterator over the immediate sub-collections of the document.
func (d *DocumentRef) Collections(ctx context.Context) *CollectionIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.DocumentRef.ListCollectionIds")
defer func() { trace.EndSpan(ctx, nil) }()

client := d.Parent.c
it := &CollectionIterator{
client: client,
Expand Down
4 changes: 4 additions & 0 deletions firestore/list_documents.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"

vkit "cloud.google.com/go/firestore/apiv1"
"cloud.google.com/go/internal/trace"
"google.golang.org/api/iterator"
pb "google.golang.org/genproto/googleapis/firestore/v1"
)
Expand All @@ -33,6 +34,9 @@ type DocumentRefIterator struct {
}

func newDocumentRefIterator(ctx context.Context, cr *CollectionRef, tid []byte) *DocumentRefIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.ListDocuments")
defer func() { trace.EndSpan(ctx, nil) }()

client := cr.c
req := &pb.ListDocumentsRequest{
Parent: cr.parentPath,
Expand Down
7 changes: 5 additions & 2 deletions firestore/query.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"cloud.google.com/go/internal/btree"
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/api/iterator"
pb "google.golang.org/genproto/googleapis/firestore/v1"
Expand Down Expand Up @@ -699,7 +700,10 @@ func newQueryDocumentIterator(ctx context.Context, q *Query, tid []byte) *queryD
}
}

func (it *queryDocumentIterator) next() (*DocumentSnapshot, error) {
func (it *queryDocumentIterator) next() (_ *DocumentSnapshot, err error) {
it.ctx = trace.StartSpan(it.ctx, "cloud.google.com/go/firestore.Query.RunQuery")
defer func() { trace.EndSpan(it.ctx, err) }()

client := it.q.c
if it.streamClient == nil {
sq, err := it.q.toProto()
Expand All @@ -719,7 +723,6 @@ func (it *queryDocumentIterator) next() (*DocumentSnapshot, error) {
}
}
var res *pb.RunQueryResponse
var err error
for {
res, err = it.streamClient.Recv()
if err == io.EOF {
Expand Down
5 changes: 5 additions & 0 deletions firestore/transaction.go
Expand Up @@ -116,11 +116,13 @@ func (c *Client) RunTransaction(ctx context.Context, f func(context.Context, *Tr
// TODO(jba): get backoff time from gRPC trailer metadata? See
// extractRetryDelay in https://code.googlesource.com/gocloud/+/master/spanner/retry.go.
for i := 0; i < t.maxAttempts; i++ {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/firestore.Client.BeginTransaction")
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
var res *pb.BeginTransactionResponse
res, err = t.c.c.BeginTransaction(t.ctx, &pb.BeginTransactionRequest{
Database: db,
Options: txOpts,
})
trace.EndSpan(t.ctx, err)
if err != nil {
return err
}
Expand All @@ -136,11 +138,14 @@ func (c *Client) RunTransaction(ctx context.Context, f func(context.Context, *Tr
// Prefer f's returned error to rollback error.
return err
}
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/firestore.Client.Commit")
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
_, err = t.c.c.Commit(t.ctx, &pb.CommitRequest{
Database: t.c.path(),
Writes: t.writes,
Transaction: t.id,
})
trace.EndSpan(t.ctx, err)

// If a read-write transaction returns Aborted, retry.
// On success or other failures, return here.
if t.readOnly || status.Code(err) != codes.Aborted {
Expand Down
8 changes: 6 additions & 2 deletions firestore/watch.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"cloud.google.com/go/internal/btree"
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/firestore/v1"
Expand Down Expand Up @@ -492,9 +493,12 @@ func (s *watchStream) recv() (*pb.ListenResponse, error) {
}
}

func (s *watchStream) open() (pb.Firestore_ListenClient, error) {
func (s *watchStream) open() (lc pb.Firestore_ListenClient, err error) {
s.ctx = trace.StartSpan(s.ctx, "cloud.google.com/go/firestore.watchStream.Listen")
defer func() { trace.EndSpan(s.ctx, err) }()

dbPath := s.c.path()
lc, err := s.c.c.Listen(withResourceHeader(s.ctx, dbPath))
lc, err = s.c.c.Listen(withResourceHeader(s.ctx, dbPath))
if err == nil {
err = lc.Send(&pb.ListenRequest{
Database: dbPath,
Expand Down