Skip to content

Commit

Permalink
feat(firestore): adds snapshot reads impl. (#6718)
Browse files Browse the repository at this point in the history
* feat(firestore): adds snapshot reads impl.
  • Loading branch information
telpirion committed Oct 17, 2022
1 parent f968297 commit 43cc5bc
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 45 deletions.
71 changes: 62 additions & 9 deletions firestore/client.go
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// resourcePrefixHeader is the name of the metadata header used to indicate
Expand All @@ -53,9 +54,10 @@ const DetectProjectID = "*detect-project-id*"

// A Client provides access to the Firestore service.
type Client struct {
c *vkit.Client
projectID string
databaseID string // A client is tied to a single database.
c *vkit.Client
projectID string
databaseID string // A client is tied to a single database.
readSettings *readSettings // readSettings allows setting a snapshot time to read the database
}

// NewClient creates a new Firestore client that uses the given project.
Expand Down Expand Up @@ -94,9 +96,10 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
}
vc.SetGoogleClientInfo("gccl", internal.Version)
c := &Client{
c: vc,
projectID: projectID,
databaseID: "(default)", // always "(default)", for now
c: vc,
projectID: projectID,
databaseID: "(default)", // always "(default)", for now
readSettings: &readSettings{},
}
return c, nil
}
Expand Down Expand Up @@ -199,10 +202,10 @@ func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) (_ []*Docum
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.GetAll")
defer func() { trace.EndSpan(ctx, err) }()

return c.getAll(ctx, docRefs, nil)
return c.getAll(ctx, docRefs, nil, nil)
}

func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) (_ []*DocumentSnapshot, err error) {
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte, rs *readSettings) (_ []*DocumentSnapshot, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.BatchGetDocuments")
defer func() { trace.EndSpan(ctx, err) }()

Expand All @@ -219,9 +222,18 @@ func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte)
Database: c.path(),
Documents: docNames,
}

// Note that transaction ID and other consistency selectors are mutually exclusive.
// We respect the transaction first, any read options passed by the caller second,
// and any read options stored in the client third.
if rt, hasOpts := parseReadTime(c, rs); hasOpts {
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_ReadTime{ReadTime: rt}
}

if tid != nil {
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{tid}
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{Transaction: tid}
}

streamClient, err := c.c.BatchGetDocuments(withResourceHeader(ctx, req.Database), req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -306,6 +318,15 @@ func (c *Client) BulkWriter(ctx context.Context) *BulkWriter {
return bw
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
func (c *Client) WithReadOptions(opts ...ReadOption) *Client {
for _, ro := range opts {
ro.apply(c.readSettings)
}
return c
}

// commit calls the Commit RPC outside of a transaction.
func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit")
Expand Down Expand Up @@ -381,3 +402,35 @@ func (ec emulatorCreds) GetRequestMetadata(ctx context.Context, uri ...string) (
func (ec emulatorCreds) RequireTransportSecurity() bool {
return false
}

// ReadTime specifies a time-specific snapshot of the database to read.
func ReadTime(t time.Time) ReadOption {
return readTime(t)
}

type readTime time.Time

func (rt readTime) apply(rs *readSettings) {
rs.readTime = time.Time(rt)
}

// ReadOption interface allows for abstraction of computing read time settings.
type ReadOption interface {
apply(*readSettings)
}

// readSettings contains the ReadOptions for a read operation
type readSettings struct {
readTime time.Time
}

// parseReadTime ensures that fallback order of read options is respected.
func parseReadTime(c *Client, rs *readSettings) (*timestamppb.Timestamp, bool) {
if rs != nil && !rs.readTime.IsZero() {
return &timestamppb.Timestamp{Seconds: int64(rs.readTime.Unix())}, true
}
if c.readSettings != nil && !c.readSettings.readTime.IsZero() {
return &timestamppb.Timestamp{Seconds: int64(c.readSettings.readTime.Unix())}, true
}
return nil, false
}
58 changes: 52 additions & 6 deletions firestore/client_test.go
Expand Up @@ -17,16 +17,19 @@ package firestore
import (
"context"
"testing"
"time"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

var testClient = &Client{
projectID: "projectID",
databaseID: "(default)",
projectID: "projectID",
databaseID: "(default)",
readSettings: &readSettings{},
}

func TestClientCollectionAndDoc(t *testing.T) {
Expand All @@ -45,16 +48,18 @@ func TestClientCollectionAndDoc(t *testing.T) {
path: "projects/projectID/databases/(default)/documents/X",
parentPath: db + "/documents",
},
readSettings: &readSettings{},
}
if !testEqual(coll1, wantc1) {
t.Fatalf("got\n%+v\nwant\n%+v", coll1, wantc1)
}
doc1 := testClient.Doc("X/a")
wantd1 := &DocumentRef{
Parent: coll1,
ID: "a",
Path: "projects/projectID/databases/(default)/documents/X/a",
shortPath: "X/a",
Parent: coll1,
ID: "a",
Path: "projects/projectID/databases/(default)/documents/X/a",
shortPath: "X/a",
readSettings: &readSettings{},
}

if !testEqual(doc1, wantd1) {
Expand Down Expand Up @@ -309,3 +314,44 @@ func TestGetAllErrors(t *testing.T) {
t.Error("got nil, want error")
}
}

func TestClient_WithReadOptions(t *testing.T) {
ctx := context.Background()
c, srv, cleanup := newMock(t)
defer cleanup()

const dbPath = "projects/projectID/databases/(default)"
const docPath = dbPath + "/documents/C/a"
tm := time.Date(2021, time.February, 20, 0, 0, 0, 0, time.UTC)

dr := &DocumentRef{
Parent: &CollectionRef{
c: c,
},
ID: "123",
Path: docPath,
}

srv.addRPC(&pb.BatchGetDocumentsRequest{
Database: dbPath,
Documents: []string{docPath},
ConsistencySelector: &pb.BatchGetDocumentsRequest_ReadTime{
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
},
}, []interface{}{
&pb.BatchGetDocumentsResponse{
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
Result: &pb.BatchGetDocumentsResponse_Found{
Found: &pb.Document{},
},
},
})

_, err := c.WithReadOptions(ReadTime(tm)).GetAll(ctx, []*DocumentRef{
dr,
})

if err != nil {
t.Fatal(err)
}
}
17 changes: 16 additions & 1 deletion firestore/collref.go
Expand Up @@ -49,6 +49,10 @@ type CollectionRef struct {

// Use the methods of Query on a CollectionRef to create and run queries.
Query

// readSettings specifies constraints for reading documents in the collection
// e.g. read time
readSettings *readSettings
}

func newTopLevelCollRef(c *Client, dbPath, id string) *CollectionRef {
Expand All @@ -64,6 +68,7 @@ func newTopLevelCollRef(c *Client, dbPath, id string) *CollectionRef {
path: dbPath + "/documents/" + id,
parentPath: dbPath + "/documents",
},
readSettings: &readSettings{},
}
}

Expand All @@ -82,6 +87,7 @@ func newCollRefWithParent(c *Client, parent *DocumentRef, id string) *Collection
path: parent.Path + "/" + id,
parentPath: parent.Path,
},
readSettings: &readSettings{},
}
}

Expand Down Expand Up @@ -121,7 +127,7 @@ func (c *CollectionRef) Add(ctx context.Context, data interface{}) (*DocumentRef
// missing documents. A missing document is a document that does not exist but has
// sub-documents.
func (c *CollectionRef) DocumentRefs(ctx context.Context) *DocumentRefIterator {
return newDocumentRefIterator(ctx, c, nil)
return newDocumentRefIterator(ctx, c, nil, c.readSettings)
}

const alphanum = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
Expand All @@ -136,3 +142,12 @@ func uniqueID() string {
}
return string(b)
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
func (c *CollectionRef) WithReadOptions(opts ...ReadOption) *CollectionRef {
for _, ro := range opts {
ro.apply(c.readSettings)
}
return c
}
43 changes: 39 additions & 4 deletions firestore/collref_test.go
Expand Up @@ -17,19 +17,22 @@ package firestore
import (
"context"
"testing"
"time"

"github.com/golang/protobuf/proto"
pb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestDoc(t *testing.T) {
coll := testClient.Collection("C")
got := coll.Doc("d")
want := &DocumentRef{
Parent: coll,
ID: "d",
Path: "projects/projectID/databases/(default)/documents/C/d",
shortPath: "C/d",
Parent: coll,
ID: "d",
Path: "projects/projectID/databases/(default)/documents/C/d",
shortPath: "C/d",
readSettings: &readSettings{},
}
if !testEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
Expand Down Expand Up @@ -98,3 +101,35 @@ func TestNilErrors(t *testing.T) {
t.Fatalf("got <%v>, want <%v>", err, errNilDocRef)
}
}

func TestCollRef_WithReadOptions(t *testing.T) {
ctx := context.Background()
c, srv, cleanup := newMock(t)
defer cleanup()

const dbPath = "projects/projectID/databases/(default)"
const docPath = dbPath + "/documents/C/a"
tm := time.Date(2021, time.February, 20, 0, 0, 0, 0, time.UTC)

srv.addRPC(&pb.ListDocumentsRequest{
Parent: dbPath,
CollectionId: "myCollection",
ShowMissing: true,
ConsistencySelector: &pb.ListDocumentsRequest_ReadTime{
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
},
}, []interface{}{
&pb.ListDocumentsResponse{
Documents: []*pb.Document{
{
Name: docPath,
},
},
},
})

_, err := c.Collection("myCollection").WithReadOptions(ReadTime(tm)).DocumentRefs(ctx).GetAll()
if err == nil {
t.Fatal(err)
}
}
28 changes: 21 additions & 7 deletions firestore/docref.go
Expand Up @@ -47,14 +47,18 @@ type DocumentRef struct {

// The ID of the document: the last component of the resource path.
ID string

// The options (only read time currently supported) for reading this document
readSettings *readSettings
}

func newDocRef(parent *CollectionRef, id string) *DocumentRef {
return &DocumentRef{
Parent: parent,
ID: id,
Path: parent.Path + "/" + id,
shortPath: parent.selfPath + "/" + id,
Parent: parent,
ID: id,
Path: parent.Path + "/" + id,
shortPath: parent.selfPath + "/" + id,
readSettings: &readSettings{},
}
}

Expand All @@ -77,7 +81,8 @@ func (d *DocumentRef) Get(ctx context.Context) (_ *DocumentSnapshot, err error)
if d == nil {
return nil, errNilDocRef
}
docsnaps, err := d.Parent.c.getAll(ctx, []*DocumentRef{d}, nil)

docsnaps, err := d.Parent.c.getAll(ctx, []*DocumentRef{d}, nil, d.readSettings)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -803,7 +808,7 @@ type DocumentSnapshotIterator struct {
// Next is not expected to return iterator.Done unless it is called after Stop.
// Rarely, networking issues may also cause iterator.Done to be returned.
func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
btree, _, readTime, err := it.ws.nextSnapshot()
btree, _, rt, err := it.ws.nextSnapshot()
if err != nil {
if err == io.EOF {
err = iterator.Done
Expand All @@ -812,7 +817,7 @@ func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
return nil, err
}
if btree.Len() == 0 { // document deleted
return &DocumentSnapshot{Ref: it.docref, ReadTime: readTime}, nil
return &DocumentSnapshot{Ref: it.docref, ReadTime: rt}, nil
}
snap, _ := btree.At(0)
return snap.(*DocumentSnapshot), nil
Expand All @@ -824,3 +829,12 @@ func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
func (it *DocumentSnapshotIterator) Stop() {
it.ws.stop()
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
func (d *DocumentRef) WithReadOptions(opts ...ReadOption) *DocumentRef {
for _, ro := range opts {
ro.apply(d.readSettings)
}
return d
}

0 comments on commit 43cc5bc

Please sign in to comment.