-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat(bigquery/storage/managedwriter): support variadic appends #5102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from a Go perspective. I will let Tim sign off from the BQ perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions.
@@ -66,7 +67,9 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { | |||
// append request. | |||
type pendingWrite struct { | |||
request *storagepb.AppendRowsRequest | |||
result *AppendResult | |||
// for schema evolution cases, accept a new schema | |||
newSchema *descriptorpb.DescriptorProto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this property need to be removed once 205756033 is resolved? It might be redundant with AppendRowsRequest.proto_rows.writer_schema.proto_descriptor
.
Or am I understanding that email thread that we won't be exposing the underlying AppendRowsRequest to users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this veneer I'm wrapping the append request. My expectation is we'll end up with per-stream caches of schema and per-stream append queues for the multiplexing case. Currently we just retain a single schema and append queue (though in go it's a channel rather than a queue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this also the reason why you can optionally have a new schema updated as part of a return error. I was puzzled how one can automatically fix schema updates on runtime though? Is there any realistic example of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the returned schema is the notification of schema extension. It's a case where you do something like an ALTER TABLE ADD COLUMN or extend schema via tables.update while streaming data, and the change gets acknowledged by the streaming backend by setting the new schema. My expectation is we'll add a callback registration for this as well, but not in this change.
|
||
// setup a new stream. | ||
ms, err := mwClient.NewManagedStream(ctx, | ||
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. So the users don't see the stream ID, either? I guess that gives us enough flexibility for the current schema evolution workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's effectively two paths to getting a managed stream: allow NewManagedStream()
to deal with the stream construction by specifying table/type/etc, or do it yourself and pass it in via WithStreamName()
option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect users who explicitly do stream construction to be more likely to be doing dynamic proto schema stuff, as one of the things you get back from stream creation is table schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only thing I was wondering is why in a typed language we are in this API passing it as a string. Wouldn't it make much more sense to actually request 3 separate parameters or some kind of struct if you want to make some of it optional. Having to format it ourselves feels a bit weird. I mean, it's possible, and I've done so, but it feels odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons: this is a bit of a standard in the APIs (see aip.dev for more context), and we want to avoid some circular dependencies by having the managedwriter depend on cloud.google.com/go/bigquery directly.
#5017 will make it easier to generate the string for the table resources if you end up using this option from a bigquery resource.
return ms.arc, ms.pending, nil | ||
} | ||
if arc != ms.arc && forceReconnect && ms.arc != nil { | ||
// TODO: is closing send sufficient? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO was verified by integration test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do we need to create a new stream if it's not the default stream?
// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible | ||
// with the schema currently set for the stream. | ||
// | ||
// Use the sentinel value NoStreamOffset to omit sending of the offset value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my testing, they must send an offset if using PENDING mode. And they can't if using default stream, right? I didn't test with BUFFERED, so maybe it's optional there? Maybe there's a better way to communicate when NoStreamOffset should be set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly enough, this gets to the other breaking change I've been considering here. Remove offset as a static argument from the AppendRows() function and add a WithOffset()
AppendOption.
The origin of the NoStreamOffset
was to simplify the AppendResult, as Go's lack of null vs default value makes it more complex to deal with the optional offset. I didn't want to do *int64
, but it's an option to consider as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this is already a kind of managed writer i wonder if this cannot be abstracted away completely? But I guess to not force how to handle errors in a retry-able manner it is needed to expose it? Either way, I do not find the NoStreamOffset sentient value a big deal, works fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went ahead and made the offset part of the variadic options.
This changes the signatures for appending to:
- No offset set:
<ManagedStream>.AppendRows(ctx, data)
- Offset set:
<ManagedStream>.AppendRows(ctx, data, WithOffset(offset))
|
||
// setup a new stream. | ||
ms, err := mwClient.NewManagedStream(ctx, | ||
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only thing I was wondering is why in a typed language we are in this API passing it as a string. Wouldn't it make much more sense to actually request 3 separate parameters or some kind of struct if you want to make some of it optional. Having to format it ourselves feels a bit weird. I mean, it's possible, and I've done so, but it feels odd.
Value: proto.Int64(180), | ||
Other: proto.String("hello evolution"), | ||
} | ||
descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I find one of the more harder parts as a beginner to start making use of proto models for streaming into the storage API. It's quite a chain of commands that aren't really something you would ever figure out by just looking at the API. Only way I learned how to do this was by checking these examples. I wonder if somehow there isn't an easier way we can make it just pass in a the generated proto type somehow. Dunno. Haven't found an easy way myself for my bqwriter
wrapper, otherwise I would have already done it.
return ms.arc, ms.pending, nil | ||
} | ||
if arc != ms.arc && forceReconnect && ms.arc != nil { | ||
// TODO: is closing send sufficient? | ||
(*ms.arc).CloseSend() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this smells fishy, or is it just me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a temporary issue until the backend allows schema change on an already open stream connection; I'm not enamored of it but this will get cleaned up.
// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible | ||
// with the schema currently set for the stream. | ||
// | ||
// Use the sentinel value NoStreamOffset to omit sending of the offset value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this is already a kind of managed writer i wonder if this cannot be abstracted away completely? But I guess to not force how to handle errors in a retry-able manner it is needed to expose it? Either way, I do not find the NoStreamOffset sentient value a big deal, works fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AppendOption design LGTM, but one worry:
By hiding the fact that we're recreating the stream, does it make the use of offset harder to understand / make it a breaking change when the API no longer requires a stream reset? Though, perhaps there's some signal we send at the end of the stream so that folks don't need to know specifically that a schema change could cause that?
Management of the network stream connection is abstracted from the user. This PR uses a CloseSend() on the network stream to cleanly signal a new connection: existing appends in flight will still process on the recv side, and the next append (either new or due to a retry) will pick up a new connection. Schema change notification from the backend isn't currently in this veneer (but will come in a future PR). If you change the schema in a compatible way, retrying an old proto message with a new schema that has additional fields/tags shouldn't be an issue as that's the power of proto extension in a nutshell. If the table is changed to an incompatible schema, the stream itself is invalid (for explicitly created streams). Default streams are a special case here, but essentially a similar metadata inconsistency like the existing tabledata.insertall when schema changes arrive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after clarifying offline that it's reconnecting but not creating a new (backend) stream. Hooray for ambiguous streams 🙄
BREAKING CHANGE: adds a variadic option to the AppendRows() method, removes offset argument
This updates the call signature to allow variadic appends, and introduces two new AppendOption options: one for setting the offset in an optional fashion (
WithOffset()
), and one for updating the schema (UpdateSchemaDescriptor()
).Due to current API limitations, this means that we need to close/reconnect the open connection when this option is passed. However, this should be resolved in the backend eventually. Internal issue 205756033 tracks this.
In practice this means the following changes need attention by consumers of this library:
For the "don't set an offset" behavior:
mystream.AppendRows(ctx, data, managedwriter.NoStreamOffset)
mystream.AppendRows(ctx, data)
For the "set an an offset" behavior:
mystream.AppendRows(ctx, data, offset)
mystream.AppendRows(ctx, data, managedwriter.WithOffset(offset))