Skip to content

Commit 4f7193b

Browse files
authored
feat(bigquery/storage/managedwriter): add base client (#4422)
* feat(bigquery/storage/managedwriter): add base client This PR adds a base client and implements some of the surface. All the streaming client abstractions are elided and will be introduced in subsequent PRs, but this PR does include non-streaming RPC methods Alongside the client, we introduce an option type (WriterOption) for constructing a client in a variadic fashion. The client contains an internal settings type, streamSettings, which contains fields of note for both the streaming client abstraction and its flow controller. Testing: This PR contains unit tests, but doesn't include integration tests. I'll start hoisting that in soon. * clarify docstring * address comment lint errors * refactor into an explicit Client and ManagedStream type * adjust NewManagedStream signature, make dest table optional * update comment
1 parent 10fd816 commit 4f7193b

File tree

5 files changed

+537
-0
lines changed

5 files changed

+537
-0
lines changed

Diff for: bigquery/storage/managedwriter/client.go

+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"runtime"
21+
"strings"
22+
23+
storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
24+
"google.golang.org/api/option"
25+
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
26+
)
27+
28+
// Client is a managed BigQuery Storage write client scoped to a single project.
29+
type Client struct {
30+
rawClient *storage.BigQueryWriteClient
31+
projectID string
32+
}
33+
34+
// NewClient instantiates a new client.
35+
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
36+
numConns := runtime.GOMAXPROCS(0)
37+
if numConns > 4 {
38+
numConns = 4
39+
}
40+
o := []option.ClientOption{
41+
option.WithGRPCConnectionPool(numConns),
42+
}
43+
o = append(o, opts...)
44+
45+
rawClient, err := storage.NewBigQueryWriteClient(ctx, o...)
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
return &Client{
51+
rawClient: rawClient,
52+
projectID: projectID,
53+
}, nil
54+
}
55+
56+
// NewManagedStream establishes a new managed stream for appending data into a table.
57+
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
58+
59+
ms := &ManagedStream{
60+
streamSettings: defaultStreamSettings(),
61+
c: c,
62+
}
63+
64+
// apply writer options
65+
for _, opt := range opts {
66+
opt(ms)
67+
}
68+
69+
if err := c.validateOptions(ctx, ms); err != nil {
70+
return nil, err
71+
}
72+
73+
if ms.streamSettings.streamID == "" {
74+
// not instantiated with a stream, construct one.
75+
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
76+
if ms.streamSettings.streamType != DefaultStream {
77+
// For everything but a default stream, we create a new stream on behalf of the user.
78+
req := &storagepb.CreateWriteStreamRequest{
79+
Parent: ms.destinationTable,
80+
WriteStream: &storagepb.WriteStream{
81+
Type: streamTypeToEnum(ms.streamSettings.streamType),
82+
}}
83+
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
84+
if err != nil {
85+
return nil, fmt.Errorf("couldn't create write stream: %v", err)
86+
}
87+
streamName = resp.GetName()
88+
}
89+
ms.streamSettings.streamID = streamName
90+
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
91+
}
92+
93+
return ms, nil
94+
}
95+
96+
// validateOptions is used to validate that we received a sane/compatible set of WriterOptions
97+
// for constructing a new managed stream.
98+
func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
99+
if ms == nil {
100+
return fmt.Errorf("no managed stream definition")
101+
}
102+
if ms.streamSettings.streamID != "" {
103+
// User supplied a stream, we need to verify it exists.
104+
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID)
105+
if err != nil {
106+
return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
107+
}
108+
// update type and destination based on stream metadata
109+
ms.streamSettings.streamType = StreamType(info.Type.String())
110+
ms.destinationTable = tableParentFromStreamName(ms.streamSettings.streamID)
111+
}
112+
if ms.destinationTable == "" {
113+
return fmt.Errorf("no destination table specified")
114+
}
115+
// we could auto-select DEFAULT here, but let's force users to be specific for now.
116+
if ms.StreamType() == "" {
117+
return fmt.Errorf("stream type wasn't specified")
118+
}
119+
return nil
120+
}
121+
122+
// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
123+
// as a single transaction. Streams must be finalized before committing.
124+
//
125+
// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type.
126+
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {
127+
128+
// determine table from first streamName, as all must share the same table.
129+
if len(streamNames) <= 0 {
130+
return nil, fmt.Errorf("no streamnames provided")
131+
}
132+
133+
req := &storagepb.BatchCommitWriteStreamsRequest{
134+
Parent: tableParentFromStreamName(streamNames[0]),
135+
WriteStreams: streamNames,
136+
}
137+
return c.rawClient.BatchCommitWriteStreams(ctx, req)
138+
}
139+
140+
// getWriteStream returns information about a given write stream.
141+
//
142+
// It's primarily used for setup validation, and not exposed directly to end users.
143+
func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) {
144+
req := &storagepb.GetWriteStreamRequest{
145+
Name: streamName,
146+
}
147+
return c.rawClient.GetWriteStream(ctx, req)
148+
}
149+
150+
// tableParentFromStreamName return the corresponding parent table
151+
// identifier given a fully qualified streamname.
152+
func tableParentFromStreamName(streamName string) string {
153+
// Stream IDs have the following prefix:
154+
// projects/{project}/datasets/{dataset}/tables/{table}/blah
155+
parts := strings.SplitN(streamName, "/", 7)
156+
if len(parts) < 7 {
157+
// invalid; just pass back the input
158+
return streamName
159+
}
160+
return strings.Join(parts[:6], "/")
161+
}

Diff for: bigquery/storage/managedwriter/client_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import "testing"
18+
19+
func TestTableParentFromStreamName(t *testing.T) {
20+
testCases := []struct {
21+
in string
22+
want string
23+
}{
24+
{
25+
"bad",
26+
"bad",
27+
},
28+
{
29+
"projects/foo/datasets/bar/tables/baz",
30+
"projects/foo/datasets/bar/tables/baz",
31+
},
32+
{
33+
"projects/foo/datasets/bar/tables/baz/zip/zam/zoomie",
34+
"projects/foo/datasets/bar/tables/baz",
35+
},
36+
{
37+
"projects/foo/datasets/bar/tables/baz/_default",
38+
"projects/foo/datasets/bar/tables/baz",
39+
},
40+
}
41+
42+
for _, tc := range testCases {
43+
got := tableParentFromStreamName(tc.in)
44+
if got != tc.want {
45+
t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
46+
}
47+
}
48+
}

Diff for: bigquery/storage/managedwriter/managed_stream.go

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
20+
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
21+
"google.golang.org/protobuf/types/known/wrapperspb"
22+
)
23+
24+
// StreamType indicates the type of stream this write client is managing.
25+
type StreamType string
26+
27+
var (
28+
// DefaultStream most closely mimics the legacy bigquery
29+
// tabledata.insertAll semantics. Successful inserts are
30+
// committed immediately, and there's no tracking offsets as
31+
// all writes go into a "default" stream that always exists
32+
// for a table.
33+
DefaultStream StreamType = "DEFAULT"
34+
35+
// CommittedStream appends data immediately, but creates a
36+
// discrete stream for the work so that offset tracking can
37+
// be used to track writes.
38+
CommittedStream StreamType = "COMMITTED"
39+
40+
// BufferedStream is a form of checkpointed stream, that allows
41+
// you to advance the offset of visible rows via Flush operations.
42+
BufferedStream StreamType = "BUFFERED"
43+
44+
// PendingStream is a stream in which no data is made visible to
45+
// readers until the stream is finalized and committed explicitly.
46+
PendingStream StreamType = "PENDING"
47+
)
48+
49+
func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
50+
switch t {
51+
case CommittedStream:
52+
return storagepb.WriteStream_COMMITTED
53+
case PendingStream:
54+
return storagepb.WriteStream_PENDING
55+
case BufferedStream:
56+
return storagepb.WriteStream_BUFFERED
57+
default:
58+
return storagepb.WriteStream_TYPE_UNSPECIFIED
59+
}
60+
}
61+
62+
// ManagedStream is the abstraction over a single write stream.
63+
type ManagedStream struct {
64+
streamSettings *streamSettings
65+
destinationTable string
66+
c *Client
67+
}
68+
69+
// streamSettings govern behavior of the append stream RPCs.
70+
type streamSettings struct {
71+
72+
// streamID contains the reference to the destination stream.
73+
streamID string
74+
75+
// streamType governs behavior of the client, such as how
76+
// offset handling is managed.
77+
streamType StreamType
78+
79+
// MaxInflightRequests governs how many unacknowledged
80+
// append writes can be outstanding into the system.
81+
MaxInflightRequests int
82+
83+
// MaxInflightBytes governs how many unacknowledged
84+
// request bytes can be outstanding into the system.
85+
MaxInflightBytes int
86+
87+
// TracePrefix sets a suitable prefix for the trace ID set on
88+
// append requests. Useful for diagnostic purposes.
89+
TracePrefix string
90+
}
91+
92+
func defaultStreamSettings() *streamSettings {
93+
return &streamSettings{
94+
streamType: DefaultStream,
95+
MaxInflightRequests: 1000,
96+
MaxInflightBytes: 0,
97+
TracePrefix: "defaultManagedWriter",
98+
}
99+
}
100+
101+
// StreamName returns the corresponding write stream ID being managed by this writer.
102+
func (ms *ManagedStream) StreamName() string {
103+
return ms.streamSettings.streamID
104+
}
105+
106+
// StreamType returns the configured type for this stream.
107+
func (ms *ManagedStream) StreamType() StreamType {
108+
return ms.streamSettings.streamType
109+
}
110+
111+
// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
112+
// this method for other stream types yields an error.
113+
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
114+
req := &storagepb.FlushRowsRequest{
115+
WriteStream: ms.streamSettings.streamID,
116+
Offset: &wrapperspb.Int64Value{
117+
Value: offset,
118+
},
119+
}
120+
resp, err := ms.c.rawClient.FlushRows(ctx, req)
121+
if err != nil {
122+
return 0, err
123+
}
124+
return resp.GetOffset(), nil
125+
}
126+
127+
// Finalize is used to mark a stream as complete, and thus ensure no further data can
128+
// be appended to the stream. You cannot finalize a DefaultStream, as it always exists.
129+
//
130+
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
131+
// data in a PendingStream.
132+
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
133+
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
134+
req := &storagepb.FinalizeWriteStreamRequest{
135+
Name: ms.streamSettings.streamID,
136+
}
137+
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
138+
if err != nil {
139+
return 0, err
140+
}
141+
return resp.GetRowCount(), nil
142+
}

0 commit comments

Comments
 (0)