-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat(bigquery/storage/managedwriter): add append stream plumbing #4452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigquery/storage/managedwriter): add append stream plumbing #4452
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skipped reviewing the test for now, will look at those with the next pass incase refactors are needed. Just a handful of comments.
|
||
ms := &ManagedStream{ | ||
streamSettings: defaultStreamSettings(), | ||
c: c, | ||
ctx: ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to Cody: check if we need this. Could it be passed in later? At the very least we might want to document how context is used for ManagedStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be a bit wonky here due to the BQ client here having a bunch of retained context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided to stick with retained context for now and document. We could potentially get clever using the contexts passed in to AppendRows, but then the lifetime of the contexts become even more ambiguous (e.g. a single append context gets used for the receive processor, etc).
} | ||
|
||
// Always return the retained ARC if the arg differs. | ||
if arc != ms.arc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will differ when ms.arc is nil, does this need an extra check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only time ms.arc will be nil is when arc is nil (we have never opened a connection), so we'll skip this conditional.
|
||
func (ms *ManagedStream) append(pw *pendingWrite) error { | ||
return ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { | ||
// TODO: we should only send stream ID and schema for the first message in a new stream, but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooh, good point. Will think about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plumbed this in. I ended up removing the intermediate call() abstraction, as we're only using it for sending appends and for issuing the CloseSend() and making those two methods call getStream directly. This ensures the sync.Once fires only for appends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR adds enough of the wiring to the client to being testing via integration tests. It adapts a similar pattern to the pullstream in pubsub, in that it abstracts individual calls from stream state management.
There's two significant units of future work that may yield changes here:
For traffic efficiency sake, we only want to add things like the stream ID, schema, and trace ID to the first append on any stream.
For stream connection retry, we may want to re-send writes that were sent but we didn't get an acknowledgement back. For default/committed streams, this behavior may yield additional writes (at least once semantics). For buffered/pending streams, it means either the library or user should know to expect "data already present" for these resent-writes.
Towards #4366