Skip to content

Commit a455082

Browse files
authored
feat(bigquery/storage/managedwriter): improve protobuf support (#4589)
* feat(bigquery/storage/managedwriter): improve protobuf support After some ongoing discussions with the Storage API team, this PR improves support for proto2/proto3 syntax in protocol buffer code. * Updates testdata so that we have a proto2 and proto3 form of our SimpleMessage data. * Add reference schemas to testdata. * Updates proto conversion code in the adapt package so it's creating proto2 messages by default. The code paths for doing proto3 conversions are present, but not exported yet as the storage API doesn't properly handle proto3 expectations for conversion. Namely, conversion doesn't properly account for default values and use of wrapper types. * Adds benchmarks for dynamic schema generation and static serialization, to aid in some internal discussions.
1 parent 84f86a6 commit a455082

11 files changed

+1607
-262
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package adapt_test
16+
17+
import (
18+
"fmt"
19+
"testing"
20+
"time"
21+
22+
"cloud.google.com/go/bigquery"
23+
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
24+
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
25+
26+
"google.golang.org/protobuf/proto"
27+
"google.golang.org/protobuf/reflect/protoreflect"
28+
"google.golang.org/protobuf/types/known/wrapperspb"
29+
)
30+
31+
var benchDescriptor protoreflect.Descriptor
32+
33+
func BenchmarkStorageSchemaToDescriptor(b *testing.B) {
34+
syntaxLabels := []string{"proto2", "proto3"}
35+
for _, bm := range []struct {
36+
name string
37+
in bigquery.Schema
38+
}{
39+
{
40+
name: "SingleField",
41+
in: bigquery.Schema{
42+
{Name: "field", Type: bigquery.StringFieldType},
43+
},
44+
},
45+
{
46+
name: "NestedRecord",
47+
in: bigquery.Schema{
48+
{Name: "field1", Type: bigquery.StringFieldType},
49+
{Name: "field2", Type: bigquery.IntegerFieldType},
50+
{Name: "field3", Type: bigquery.BooleanFieldType},
51+
{
52+
Name: "field4",
53+
Type: bigquery.RecordFieldType,
54+
Schema: bigquery.Schema{
55+
{Name: "recordfield1", Type: bigquery.GeographyFieldType},
56+
{Name: "recordfield2", Type: bigquery.TimestampFieldType},
57+
},
58+
},
59+
},
60+
},
61+
{
62+
name: "SimpleMessage",
63+
in: testdata.SimpleMessageSchema,
64+
},
65+
{
66+
name: "GithubArchiveSchema",
67+
in: testdata.GithubArchiveSchema,
68+
},
69+
} {
70+
for _, s := range syntaxLabels {
71+
b.Run(fmt.Sprintf("%s-%s", bm.name, s), func(b *testing.B) {
72+
convSchema, err := adapt.BQSchemaToStorageTableSchema(bm.in)
73+
if err != nil {
74+
b.Errorf("%q: schema conversion fail: %v", bm.name, err)
75+
}
76+
for n := 0; n < b.N; n++ {
77+
if s == "proto3" {
78+
benchDescriptor, err = adapt.StorageSchemaToProto3Descriptor(convSchema, "root")
79+
} else {
80+
benchDescriptor, err = adapt.StorageSchemaToProto2Descriptor(convSchema, "root")
81+
}
82+
if err != nil {
83+
b.Errorf("failed to convert %q: %v", bm.name, err)
84+
}
85+
}
86+
})
87+
}
88+
}
89+
}
90+
91+
var staticBytes []byte
92+
93+
func BenchmarkStaticProtoSerialization(b *testing.B) {
94+
for _, bm := range []struct {
95+
name string
96+
in bigquery.Schema
97+
syntax string
98+
setterF func() protoreflect.ProtoMessage
99+
}{
100+
{
101+
name: "SimpleMessageProto2",
102+
setterF: func() protoreflect.ProtoMessage {
103+
return &testdata.SimpleMessageProto2{
104+
Name: proto.String(fmt.Sprintf("test-%d", time.Now().UnixNano())),
105+
Value: proto.Int64(time.Now().UnixNano()),
106+
}
107+
},
108+
},
109+
{
110+
name: "SimpleMessageProto3",
111+
setterF: func() protoreflect.ProtoMessage {
112+
return &testdata.SimpleMessageProto3{
113+
Name: fmt.Sprintf("test-%d", time.Now().UnixNano()),
114+
Value: &wrapperspb.Int64Value{Value: time.Now().UnixNano()},
115+
}
116+
},
117+
},
118+
{
119+
name: "GithubArchiveProto2",
120+
setterF: func() protoreflect.ProtoMessage {
121+
nowNano := time.Now().UnixNano()
122+
return &testdata.GithubArchiveMessageProto2{
123+
Type: proto.String("SomeEvent"),
124+
Public: proto.Bool(nowNano%2 == 0),
125+
Payload: proto.String(fmt.Sprintf("stuff %d", nowNano)),
126+
Repo: &testdata.GithubArchiveRepoProto2{
127+
Id: proto.Int64(nowNano),
128+
Name: proto.String("staticname"),
129+
Url: proto.String(fmt.Sprintf("foo.com/%d", nowNano)),
130+
},
131+
Actor: &testdata.GithubArchiveEntityProto2{
132+
Id: proto.Int64(nowNano % 1000),
133+
Login: proto.String(fmt.Sprintf("login-%d", nowNano%1000)),
134+
GravatarId: proto.String(fmt.Sprintf("grav-%d", nowNano%1000000)),
135+
AvatarUrl: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
136+
Url: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
137+
},
138+
Org: &testdata.GithubArchiveEntityProto2{
139+
Id: proto.Int64(nowNano % 1000),
140+
Login: proto.String(fmt.Sprintf("login-%d", nowNano%1000)),
141+
GravatarId: proto.String(fmt.Sprintf("grav-%d", nowNano%1000000)),
142+
AvatarUrl: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
143+
Url: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
144+
},
145+
CreatedAt: proto.Int64(nowNano),
146+
Id: proto.String(fmt.Sprintf("id%d", nowNano)),
147+
Other: proto.String("other"),
148+
}
149+
},
150+
},
151+
{
152+
// Only set a single top-level field in a larger message.
153+
name: "GithubArchiveProto2_Sparse",
154+
setterF: func() protoreflect.ProtoMessage {
155+
nowNano := time.Now().UnixNano()
156+
return &testdata.GithubArchiveMessageProto2{
157+
Id: proto.String(fmt.Sprintf("id%d", nowNano)),
158+
}
159+
},
160+
},
161+
{
162+
name: "GithubArchiveProto3",
163+
setterF: func() protoreflect.ProtoMessage {
164+
nowNano := time.Now().UnixNano()
165+
return &testdata.GithubArchiveMessageProto3{
166+
Type: &wrapperspb.StringValue{Value: "SomeEvent"},
167+
Public: &wrapperspb.BoolValue{Value: nowNano%2 == 0},
168+
Payload: &wrapperspb.StringValue{Value: fmt.Sprintf("stuff %d", nowNano)},
169+
Repo: &testdata.GithubArchiveRepoProto3{
170+
Id: &wrapperspb.Int64Value{Value: nowNano},
171+
Name: &wrapperspb.StringValue{Value: "staticname"},
172+
Url: &wrapperspb.StringValue{Value: fmt.Sprintf("foo.com/%d", nowNano)},
173+
},
174+
Actor: &testdata.GithubArchiveEntityProto3{
175+
Id: &wrapperspb.Int64Value{Value: nowNano % 1000},
176+
Login: &wrapperspb.StringValue{Value: fmt.Sprintf("login-%d", nowNano%1000)},
177+
GravatarId: &wrapperspb.StringValue{Value: fmt.Sprintf("grav-%d", nowNano%1000000)},
178+
AvatarUrl: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
179+
Url: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
180+
},
181+
Org: &testdata.GithubArchiveEntityProto3{
182+
Id: &wrapperspb.Int64Value{Value: nowNano % 1000},
183+
Login: &wrapperspb.StringValue{Value: fmt.Sprintf("login-%d", nowNano%1000)},
184+
GravatarId: &wrapperspb.StringValue{Value: fmt.Sprintf("grav-%d", nowNano%1000000)},
185+
AvatarUrl: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
186+
Url: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
187+
},
188+
CreatedAt: &wrapperspb.Int64Value{Value: nowNano},
189+
Id: &wrapperspb.StringValue{Value: fmt.Sprintf("id%d", nowNano)},
190+
Other: &wrapperspb.StringValue{Value: "other"},
191+
}
192+
},
193+
},
194+
{
195+
// Only set a single field in a larger message.
196+
name: "GithubArchiveProto3_Sparse",
197+
setterF: func() protoreflect.ProtoMessage {
198+
nowNano := time.Now().UnixNano()
199+
return &testdata.GithubArchiveMessageProto3{
200+
Id: &wrapperspb.StringValue{Value: fmt.Sprintf("id%d", nowNano)},
201+
}
202+
},
203+
},
204+
} {
205+
b.Run(bm.name, func(b *testing.B) {
206+
var totalBytes int64
207+
for n := 0; n < b.N; n++ {
208+
m := bm.setterF()
209+
out, err := proto.Marshal(m)
210+
if err != nil {
211+
b.Errorf("%q %q: Marshal: %v", bm.name, bm.syntax, err)
212+
}
213+
totalBytes = totalBytes + int64(len(out))
214+
staticBytes = out
215+
}
216+
b.Logf("N=%d, avg bytes/message: %d", b.N, totalBytes/int64(b.N))
217+
})
218+
}
219+
}

Diff for: bigquery/storage/managedwriter/adapt/protoconversion.go

+42-21
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,25 @@ import (
2727
"google.golang.org/protobuf/types/known/wrapperspb"
2828
)
2929

30-
// bqModeToFieldLabelMap holds mapping from field schema mode to proto label.
31-
// proto3 no longer allows use of REQUIRED labels, so we solve that elsewhere
32-
// and simply use optional.
33-
var bqModeToFieldLabelMap = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
30+
var bqModeToFieldLabelMapProto2 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
31+
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
32+
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
33+
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED,
34+
}
35+
36+
var bqModeToFieldLabelMapProto3 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
3437
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
3538
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
3639
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
3740
}
3841

42+
func convertModeToLabel(mode storagepb.TableFieldSchema_Mode, useProto3 bool) *descriptorpb.FieldDescriptorProto_Label {
43+
if useProto3 {
44+
return bqModeToFieldLabelMapProto3[mode].Enum()
45+
}
46+
return bqModeToFieldLabelMapProto2[mode].Enum()
47+
}
48+
3949
// Allows conversion between BQ schema type and FieldDescriptorProto's type.
4050
var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.FieldDescriptorProto_Type{
4151
storagepb.TableFieldSchema_BIGNUMERIC: descriptorpb.FieldDescriptorProto_TYPE_BYTES,
@@ -106,15 +116,24 @@ func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoref
106116
return nil
107117
}
108118

109-
// StorageSchemaToDescriptor builds a protoreflect.Descriptor for a given table schema.
110-
func StorageSchemaToDescriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
119+
// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
120+
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
111121
dc := make(dependencyCache)
112122
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
113123
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, false)
114124
}
115125

126+
// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
127+
//
128+
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
129+
// completeness.
130+
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
131+
dc := make(dependencyCache)
132+
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, true)
133+
}
134+
116135
// internal implementation of the conversion code.
117-
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, allowWrapperTypes bool) (protoreflect.Descriptor, error) {
136+
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.Descriptor, error) {
118137
if inSchema == nil {
119138
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
120139
}
@@ -145,7 +164,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
145164
deps = append(deps, foundDesc.ParentFile())
146165
}
147166
// construct field descriptor for the message
148-
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), allowWrapperTypes)
167+
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
149168
if err != nil {
150169
return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %v", err))
151170
}
@@ -155,7 +174,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
155174
ts := &storagepb.TableSchema{
156175
Fields: f.GetFields(),
157176
}
158-
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, allowWrapperTypes)
177+
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, useProto3)
159178
if err != nil {
160179
return nil, newConversionError(currentScope, fmt.Errorf("couldn't convert message: %v", err))
161180
}
@@ -166,14 +185,14 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
166185
if err != nil {
167186
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %v", err))
168187
}
169-
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, allowWrapperTypes)
188+
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
170189
if err != nil {
171190
return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %v", err))
172191
}
173192
fields = append(fields, fdp)
174193
}
175194
} else {
176-
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, allowWrapperTypes)
195+
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
177196
if err != nil {
178197
return nil, newConversionError(currentScope, err)
179198
}
@@ -201,6 +220,9 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
201220
Syntax: proto.String("proto3"),
202221
Dependency: depNames,
203222
}
223+
if !useProto3 {
224+
fdp.Syntax = proto.String("proto2")
225+
}
204226

205227
// We'll need a FileDescriptorSet as we have a FileDescriptorProto for the current
206228
// descriptor we're building, but we need to include all the referenced dependencies.
@@ -223,33 +245,32 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
223245
}
224246

225247
// tableFieldSchemaToFieldDescriptorProto builds individual field descriptors for a proto message.
226-
// We're using proto3 syntax, but BigQuery supports the notion of NULLs which conflicts with proto3 default value
227-
// behavior. To enable it, we look for nullable fields in the schema that should be scalars, and use the
228-
// well-known wrapper types.
248+
//
249+
// For proto3, in cases where the mode is nullable we use the well known wrapper types.
250+
// For proto2, we propagate the mode->label annotation as expected.
229251
//
230252
// Messages are always nullable, and repeated fields are as well.
231-
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, allowWrapperTypes bool) (*descriptorpb.FieldDescriptorProto, error) {
253+
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) {
232254
name := strings.ToLower(field.GetName())
233255
if field.GetType() == storagepb.TableFieldSchema_STRUCT {
234256
return &descriptorpb.FieldDescriptorProto{
235257
Name: proto.String(name),
236258
Number: proto.Int32(idx),
237259
TypeName: proto.String(scope),
238-
Label: bqModeToFieldLabelMap[field.GetMode()].Enum(),
260+
Label: convertModeToLabel(field.GetMode(), useProto3),
239261
}, nil
240262
}
241263

242-
// For (REQUIRED||REPEATED) fields, we use the expected scalar types, but the proto is
243-
// still marked OPTIONAL (proto3 semantics).
244-
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !allowWrapperTypes {
264+
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
265+
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {
245266
return &descriptorpb.FieldDescriptorProto{
246267
Name: proto.String(name),
247268
Number: proto.Int32(idx),
248269
Type: bqTypeToFieldTypeMap[field.GetType()].Enum(),
249-
Label: bqModeToFieldLabelMap[field.GetMode()].Enum(),
270+
Label: convertModeToLabel(field.GetMode(), useProto3),
250271
}, nil
251272
}
252-
// For NULLABLE, optionally use wrapper types.
273+
// For NULLABLE proto3 fields, use a wrapper type.
253274
return &descriptorpb.FieldDescriptorProto{
254275
Name: proto.String(name),
255276
Number: proto.Int32(idx),

0 commit comments

Comments
 (0)