Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(firestore): adds Bulkwriter support to Firestore client #5946

Merged
merged 92 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from 89 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
1826c91
feat: adds Bulkwriter support to Firestore client
Apr 26, 2022
98da527
feat: more bw work
Apr 29, 2022
61dc7cc
feat: more callers bulkwriter
May 19, 2022
b93be38
fix: array issue
May 20, 2022
c9a5416
iter
May 20, 2022
cb5836a
iter
May 20, 2022
0b76b51
iter
May 21, 2022
59bfee1
iter
May 21, 2022
b7569ed
iter
May 26, 2022
3af83e0
iter
May 26, 2022
9a90b8a
iter
May 27, 2022
aa871db
iter
May 31, 2022
c44dc41
iter
May 31, 2022
816f42c
iter
May 31, 2022
13c0ce5
Merge branch 'main' into firestore-bulkwriter
May 31, 2022
6d616c4
Merge branch 'main' into firestore-bulkwriter
Jun 1, 2022
7facc45
add unit tests
Jun 1, 2022
11d79b7
iter
Jun 1, 2022
18feb31
iter
Jun 1, 2022
0e6acd1
Merge branch 'main' into firestore-bulkwriter
Jun 1, 2022
9f93108
iter
Jun 3, 2022
16c74c6
iter
Jun 6, 2022
80c0031
Merge branch 'main' into firestore-bulkwriter
Jun 6, 2022
423b11d
fix: test, linter
Jun 6, 2022
0def0f0
fix: linter, integration test
Jun 6, 2022
a575c80
Merge branch 'main' into firestore-bulkwriter
Jun 7, 2022
ac29450
feat: sync stuff
Jun 7, 2022
8e0a3b0
feat: integration test
Jun 13, 2022
457b5c6
Merge branch 'main' into firestore-bulkwriter
Jun 13, 2022
8ec9862
fix: integration test
Jun 13, 2022
a9c6171
Merge branch 'main' into firestore-bulkwriter
Jun 13, 2022
b7ee2f1
fix: support for Go 1.15
Jun 13, 2022
8fdecc3
Merge branch 'main' into firestore-bulkwriter
Jun 16, 2022
7f56415
chore(main): release bigquery 1.33.0 (#6057)
release-please[bot] Jun 16, 2022
4cd7e44
fix: per reviewer
Jun 16, 2022
9526432
fix: per reviewer
Jun 16, 2022
aedab83
fix: per reviewer
Jun 17, 2022
2bf15ac
fix: per reviewer
Jun 17, 2022
d688edb
Merge branch 'main' into firestore-bulkwriter
Jun 17, 2022
014804d
fix: per reviewer and linter
Jun 17, 2022
d047e26
added rate increase
Jun 17, 2022
dc7c766
added rate mutex; bundle size; retry flush
Jun 21, 2022
1c44771
Merge branch 'main' into firestore-bulkwriter
Jun 21, 2022
5cae326
Merge branch 'main' into firestore-bulkwriter
Jun 21, 2022
d14a47d
fix: per reviewer
Jun 22, 2022
d982b9c
Merge branch 'main' into firestore-bulkwriter
Jun 22, 2022
1094a00
fix: per linter
Jun 22, 2022
7eadfa3
Merge branch 'main' into firestore-bulkwriter
Jun 22, 2022
00d3130
fix: per reviewer
Jun 22, 2022
3969f3f
fix: per reviewer, clean up
Jun 23, 2022
7f389bc
feat: added error-checking test
Jun 23, 2022
28db505
Merge branch 'main' into firestore-bulkwriter
Jun 23, 2022
85ee986
fix: flattened op & job structs
Jun 24, 2022
bedfa89
fix: removed retry bundler
Jun 24, 2022
ee9b5dd
Merge branch 'main' into firestore-bulkwriter
Jun 24, 2022
a688985
Merge branch 'main' into firestore-bulkwriter
Jun 27, 2022
a783376
fix: per reviewer
Jun 27, 2022
b9e91be
fix: per reviewer
Jun 27, 2022
86d123e
fix: per reviewer
Jun 27, 2022
6820791
fix: contexts and stuff
Jun 28, 2022
41fb8ca
Merge branch 'main' into firestore-bulkwriter
Jun 28, 2022
ce894c6
Merge branch 'main' into firestore-bulkwriter
Jun 28, 2022
34f6532
Merge branch 'main' into firestore-bulkwriter
Jun 29, 2022
e9e8d09
Merge branch 'main' into firestore-bulkwriter
Jun 29, 2022
3e0b321
Merge branch 'main' into firestore-bulkwriter
Jun 30, 2022
6979fac
reverting changes to go.sum
Jul 12, 2022
2d80565
Merge branch 'main' into firestore-bulkwriter
Jul 12, 2022
e7dd6c5
revert changes to go.sum
Jul 12, 2022
36371a1
per reviewer
Jul 12, 2022
5879b65
Merge branch 'main' into firestore-bulkwriter
Jul 13, 2022
b5339e1
iter
Jul 13, 2022
4dda3f1
Revert "per reviewer"
Jul 13, 2022
28a1db7
fixed broken tests
Jul 13, 2022
224ecc6
per reviewer
Jul 13, 2022
a10c4ae
per reviewer
Jul 13, 2022
ef437bf
removed internal cancel() function
Jul 13, 2022
274b192
docs
Jul 13, 2022
ea52e21
Merge branch 'main' into firestore-bulkwriter
Jul 14, 2022
2c2928c
per reviewer
Jul 14, 2022
47f2d01
closing channels when done
Jul 15, 2022
175c1a3
Merge branch 'main' into firestore-bulkwriter
Jul 15, 2022
bd78417
per reviewer
Jul 15, 2022
0c0ff92
per reviewer
Jul 15, 2022
3dffdd0
Merge branch 'main' into firestore-bulkwriter
Jul 18, 2022
99b22f2
per linter
Jul 18, 2022
86c7cfb
Merge branch 'main' into firestore-bulkwriter
Jul 19, 2022
18f88ae
per reviewer
Jul 19, 2022
9b198c6
Merge branch 'main' into firestore-bulkwriter
Jul 19, 2022
8fa743a
Merge branch 'main' into firestore-bulkwriter
Jul 20, 2022
46253eb
per reviewer
Jul 20, 2022
a630e5d
Merge branch 'main' into firestore-bulkwriter
Jul 20, 2022
285e048
per reviewer
Jul 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 325 additions & 0 deletions firestore/bulkwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
package firestore

import (
"context"
"errors"
"fmt"
"sync"
"time"

vkit "cloud.google.com/go/firestore/apiv1"
"golang.org/x/time/rate"
"google.golang.org/api/support/bundler"
pb "google.golang.org/genproto/googleapis/firestore/v1"
)

const (
// maxBatchSize is the max number of writes to send in a request
maxBatchSize = 20
// maxRetryAttempts is the max number of times to retry a write
maxRetryAttempts = 10
// defaultStartingMaximumOpsPerSecond is the starting max number of requests to the service per second
defaultStartingMaximumOpsPerSecond = 500
// maxWritesPerSecond is the starting limit of writes allowed to callers per second
maxWritesPerSecond = maxBatchSize * defaultStartingMaximumOpsPerSecond
)

// bulkWriterResult contains the WriteResult or error results from an individual
// write to the database.
type bulkWriterResult struct {
result *pb.WriteResult // (cached) result from the operation
err error // (cached) any errors that occurred
}

// BulkWriterJob provides read-only access to the results of a BulkWriter write attempt.
type BulkWriterJob struct {
resultChan chan bulkWriterResult // send errors and results to this channel
write *pb.Write // the writes to apply to the database
attempts int // number of times this write has been attempted
resultsLock sync.Mutex // guards the cached wr and e values for the job
result *WriteResult // (cached) result from the operation
err error // (cached) any errors that occurred
ctx context.Context // context for canceling/timing out results
}

// Results gets the results of the BulkWriter write attempt.
// This method blocks if the results for this BulkWriterJob haven't been
// received.
func (j *BulkWriterJob) Results() (*WriteResult, error) {
j.resultsLock.Lock()
defer j.resultsLock.Unlock()
if j.result == nil && j.err == nil {
j.result, j.err = j.processResults() // cache the results for additional calls
}
return j.result, j.err
}

// processResults checks for errors returned from send() and packages up the
// results as WriteResult objects
func (j *BulkWriterJob) processResults() (*WriteResult, error) {
select {
case <-j.ctx.Done():
return nil, j.ctx.Err()
case bwr := <-j.resultChan:
if bwr.err != nil {
return nil, bwr.err
}
return writeResultFromProto(bwr.result)
}
}

// setError ensures that an error is returned on the error channel of BulkWriterJob.
func (j *BulkWriterJob) setError(e error) {
bwr := bulkWriterResult{
err: e,
result: nil,
}
j.resultChan <- bwr
close(j.resultChan)
}

// A BulkWriter supports concurrent writes to multiple documents. The BulkWriter
// submits document writes in maximum batches of 20 writes per request. Each
// request can contain many different document writes: create, delete, update,
// and set are all supported.
//
// Only one operation (create, set, update, delete) per document is allowed.
// BulkWriter cannot promise atomicity: individual writes can fail or succeed
// independent of each other. Bulkwriter does not apply writes in any set order;
// thus a document can't have set on it immediately after creation.
type BulkWriter struct {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
database string // the database as resource name: projects/[PROJECT]/databases/[DATABASE]
start time.Time // when this BulkWriter was started; used to calculate qps and rate increases
vc *vkit.Client // internal client
maxOpsPerSecond int // number of requests that can be sent per second
docUpdatePaths map[string]bool // document paths with corresponding writes in the queue
limiter rate.Limiter // limit requests to server to <= 500 qps
bundler *bundler.Bundler // handle bundling up writes to Firestore
ctx context.Context // context for canceling all BulkWriter operations
isOpenLock sync.Mutex // guards against setting isOpen concurrently
isOpen bool // flag that the BulkWriter is closed
telpirion marked this conversation as resolved.
Show resolved Hide resolved
}

// newBulkWriter creates a new instance of the BulkWriter.
func newBulkWriter(ctx context.Context, c *Client, database string) *BulkWriter {
// Although typically we shouldn't store Context objects, in this case we
// need to pass this Context through to the Bundler handler.
ctx = withResourceHeader(ctx, c.path())

bw := &BulkWriter{
database: database,
start: time.Now(),
vc: c.c,
isOpen: true,
maxOpsPerSecond: defaultStartingMaximumOpsPerSecond,
docUpdatePaths: make(map[string]bool),
ctx: ctx,
limiter: *rate.NewLimiter(rate.Limit(maxWritesPerSecond), 1),
}

// can't initialize within struct above; need instance reference to BulkWriter.send()
bw.bundler = bundler.NewBundler(&BulkWriterJob{}, bw.send)
bw.bundler.HandlerLimit = bw.maxOpsPerSecond
bw.bundler.BundleCountThreshold = maxBatchSize

return bw
}

// End sends all enqueued writes in parallel and closes the BulkWriter to new requests.
telpirion marked this conversation as resolved.
Show resolved Hide resolved
// After calling End(), calling any additional method automatically returns
// with an error. This method completes when there are no more pending writes
// in the queue.
func (bw *BulkWriter) End() {
Copy link
Member

@enocom enocom Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a minor race condition here. Here's the idea:

  • Goroutine 1 in checkWriteCondition acquires the lock, sees that isOpen is true, and then releases the lock
  • Goroutine 2 in End acquires the lock, sets isOpen to false, releases the lock, and then flushes
  • Goroutine 1 meanwhile starts a write

The question is: does the write succeed or not in this case?

One possible option would be to use a read write mutex, where the read mutex wraps the entire write attempt and the write mutex wraps isOpen here in End.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point.

Here's what I've done to address this:

  • Changed BulkWriter.isOpenLock to a RWMutex.
  • Added a Lock() and deferred Unlock() at the top of each DB mutator/write function (Create(), Delete(), etc).

Each mutator will hold the lock while queueing up the write for the Bundler. If End() is called during this time, it will will block while the mutator holds the lock.

bw.isOpenLock.Lock()
bw.isOpen = false
bw.isOpenLock.Unlock()
bw.Flush()
}

// Flush commits all writes that have been enqueued up to this point in parallel.
// This method blocks execution.
func (bw *BulkWriter) Flush() {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
bw.bundler.Flush()
}

// Create adds a document creation write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJob, error) {
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}

w, err := doc.newCreateWrites(datum)
if err != nil {
return nil, fmt.Errorf("firestore: cannot create %v with %v", doc.ID, datum)
}

if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
}

j := bw.write(w[0])
telpirion marked this conversation as resolved.
Show resolved Hide resolved
return j, nil
}

// Delete adds a document deletion write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkWriterJob, error) {
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}

w, err := doc.newDeleteWrites(preconds)
if err != nil {
return nil, fmt.Errorf("firestore: cannot delete doc %v", doc.ID)
telpirion marked this conversation as resolved.
Show resolved Hide resolved
}

if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
}

j := bw.write(w[0])
return j, nil
}

// Set adds a document set write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption) (*BulkWriterJob, error) {
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}

w, err := doc.newSetWrites(datum, opts)
if err != nil {
return nil, fmt.Errorf("firestore: cannot set %v on doc %v", datum, doc.ID)
}

if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
}

j := bw.write(w[0])
return j, nil
}

// Update adds a document update write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Precondition) (*BulkWriterJob, error) {
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}

w, err := doc.newUpdatePathWrites(updates, preconds)
if err != nil {
return nil, fmt.Errorf("firestore: cannot update doc %v", doc.ID)
}

if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
}

j := bw.write(w[0])
return j, nil
}

// checkConditions determines whether this write attempt is valid. It returns
// an error if either the BulkWriter has already been closed or if it
// receives a nil document reference.
func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error {
bw.isOpenLock.Lock()
open := bw.isOpen
bw.isOpenLock.Unlock()
if !open {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("firestore: BulkWriter has been closed")
}

if doc == nil {
return errors.New("firestore: nil document contents")
}

_, havePath := bw.docUpdatePaths[doc.shortPath]
if havePath {
return fmt.Errorf("firestore: BulkWriter received duplicate write for path: %v", doc.shortPath)
}

bw.docUpdatePaths[doc.shortPath] = true

return nil
}

// write packages up write requests into bulkWriterJob objects.
func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob {

j := &BulkWriterJob{
resultChan: make(chan bulkWriterResult, 1),
write: w,
ctx: bw.ctx,
}

bw.limiter.Wait(bw.ctx)
// ignore operation size constraints and related errors; can't be inferred at compile time
// Bundler is set to accept an unlimited amount of bytes
_ = bw.bundler.Add(j, 0)

return j
}

// send transmits writes to the service and matches response results to job channels.
func (bw *BulkWriter) send(i interface{}) {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
bwj := i.([]*BulkWriterJob)

if len(bwj) == 0 {
return
}

var ws []*pb.Write
for _, w := range bwj {
ws = append(ws, w.write)
}

bwr := &pb.BatchWriteRequest{
Database: bw.database,
Writes: ws,
Labels: map[string]string{},
}

select {
case <-bw.ctx.Done():
return
default:
resp, err := bw.vc.BatchWrite(bw.ctx, bwr)
if err != nil {
// Do we need to be selective about what kind of errors we send?
for _, j := range bwj {
j.setError(err)
telpirion marked this conversation as resolved.
Show resolved Hide resolved
}
return
}
// Match write results with BulkWriterJob objects
for i, res := range resp.WriteResults {
enocom marked this conversation as resolved.
Show resolved Hide resolved
s := resp.Status[i]
c := s.GetCode()
if c != 0 { // Should we do an explicit check against rpc.Code enum?
j := bwj[i]
j.attempts++

// Do we need separate retry bundler?
if j.attempts < maxRetryAttempts {
// ignore operation size constraints and related errors; job size can't be inferred at compile time
// Bundler is set to accept an unlimited amount of bytes
_ = bw.bundler.Add(j, 0)
} else {
j.setError(fmt.Errorf("firestore: write failed with status: %v", s))
}
continue
}

bwj[i].resultChan <- bulkWriterResult{err: nil, result: res}
telpirion marked this conversation as resolved.
Show resolved Hide resolved
close(bwj[i].resultChan)
}
}
}
Loading