Skip to content

Commit a2af4de

Browse files
authored
feat(bigquery/storage/managedwriter): improve method parity in managedwriter (#5007)
This PR exposes the raw methods for creating and committing streams to the wrapped managedwriter client. It allows users to interact with all the methods of the underlying API using the managedwriter client (which itself wraps the raw v1 client). The disadvantage is that it couples managedwriter directly to v1, as it accepts requests in the v1 namespace. The existing append interactions all use abstractions local to the managedwriter. PR also gets rid of the utility method for batch committing write streams; there's not a great deal of utility saved here vs the underlying method. Towards: #4366
1 parent 587bba5 commit a2af4de

File tree

2 files changed

+23
-21
lines changed

2 files changed

+23
-21
lines changed

Diff for: bigquery/storage/managedwriter/client.go

+16-20
Original file line numberDiff line numberDiff line change
@@ -177,28 +177,24 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
177177
return nil
178178
}
179179

180-
// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
181-
// as a single transaction. Streams must be finalized before committing.
180+
// BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
181+
// parent table.
182182
//
183-
// Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility
184-
// function TableParentFromStreamName can be used to derive this from a Stream's name.
185-
//
186-
// If the returned response contains stream errors, this indicates that the batch commit failed and no data was
187-
// committed.
188-
//
189-
// TODO: currently returns the raw response. Determine how we want to surface StreamErrors.
190-
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {
191-
192-
// determine table from first streamName, as all must share the same table.
193-
if len(streamNames) <= 0 {
194-
return nil, fmt.Errorf("no streamnames provided")
195-
}
183+
// Streams must be finalized before commit and cannot be committed multiple
184+
// times. Once a stream is committed, data in the stream becomes available
185+
// for read operations.
186+
func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
187+
return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
188+
}
196189

197-
req := &storagepb.BatchCommitWriteStreamsRequest{
198-
Parent: TableParentFromStreamName(streamNames[0]),
199-
WriteStreams: streamNames,
200-
}
201-
return c.rawClient.BatchCommitWriteStreams(ctx, req)
190+
// CreateWriteStream creates a write stream to the given table.
191+
// Additionally, every table has a special stream named ‘_default’
192+
// to which data can be written. This stream doesn’t need to be created using
193+
// CreateWriteStream. It is a stream that can be used simultaneously by any
194+
// number of clients. Data written to this stream is considered committed as
195+
// soon as an acknowledgement is received.
196+
func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
197+
return c.rawClient.CreateWriteStream(ctx, req, opts...)
202198
}
203199

204200
// getWriteStream returns information about a given write stream.

Diff for: bigquery/storage/managedwriter/integration_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"cloud.google.com/go/internal/uid"
2929
"go.opencensus.io/stats/view"
3030
"google.golang.org/api/option"
31+
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
3132
"google.golang.org/protobuf/encoding/protojson"
3233
"google.golang.org/protobuf/proto"
3334
"google.golang.org/protobuf/reflect/protodesc"
@@ -415,7 +416,12 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
415416
}
416417

417418
// Commit stream and validate.
418-
resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
419+
req := &storagepb.BatchCommitWriteStreamsRequest{
420+
Parent: TableParentFromStreamName(ms.StreamName()),
421+
WriteStreams: []string{ms.StreamName()},
422+
}
423+
424+
resp, err := mwClient.BatchCommitWriteStreams(ctx, req)
419425
if err != nil {
420426
t.Errorf("client.BatchCommit: %v", err)
421427
}

0 commit comments

Comments
 (0)