Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter/adapt): schema->protodescriptor (#…
Browse files Browse the repository at this point in the history
…6267)

This PR addresses a transitive dependency loss when working with nested
messages.  Previously, we only constructed a descriptor using the direct
dependencies, and this changes the implementation to use the cache of
all encountered descriptors rather than the local list of dependencies.

There's still some opportunities for improvement with regards to
proto3, in particular the wrapper types.  However, I'm deferring that
until we have that sorted out in the backend a bit better.

This also changes the testing slightly.  I've added a comment to the
test as well, but our problem is evaluating the dependencies for
correctness.  Due to the dynamic nature of how descriptors are created
we cannot use something like proto.Equal.

To address this, we make two comparisons.  In the first, we check the
descriptor directly without its dependencies.  In the second, we
call the normalizer to give us a self-contained descriptor proto, where
we can validate everything is present.  The gap here of course is that
a bug in the normalizer may mask a problem with the conversion, so
consideration is warranted when we revisit this.

Fixes: #6258

Co-authored-by: Steffany Brown <30247553+steffnay@users.noreply.github.com>
  • Loading branch information
shollyman and steffnay committed Jun 29, 2022
1 parent a216796 commit a017230
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 27 deletions.
44 changes: 29 additions & 15 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -86,9 +86,9 @@ var wellKnownTypesWrapperName = "google/protobuf/wrappers.proto"
// dependencyCache is used to reduce the number of unique messages we generate by caching based on the tableschema.
//
// keys are based on the base64-encoded serialized tableschema value.
type dependencyCache map[string]protoreflect.Descriptor
type dependencyCache map[string]protoreflect.MessageDescriptor

func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Descriptor {
func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.MessageDescriptor {
if dm == nil {
return nil
}
Expand All @@ -103,7 +103,18 @@ func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Descri
return nil
}

func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.Descriptor) error {
func (dm dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescriptorProto {
var fdpList []*descriptorpb.FileDescriptorProto
for _, d := range dm {
if fd := d.ParentFile(); fd != nil {
fdp := protodesc.ToFileDescriptorProto(fd)
fdpList = append(fdpList, fdp)
}
}
return fdpList
}

func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.MessageDescriptor) error {
if dm == nil {
return fmt.Errorf("cache is nil")
}
Expand Down Expand Up @@ -133,7 +144,7 @@ func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope stri
}

// internal implementation of the conversion code.
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.Descriptor, error) {
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.MessageDescriptor, error) {
if inSchema == nil {
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
}
Expand Down Expand Up @@ -206,9 +217,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
}

// Use the local dependencies to generate a list of filenames.
depNames := []string{
wellKnownTypesWrapperName,
}
depNames := []string{wellKnownTypesWrapperName}
for _, d := range deps {
depNames = append(depNames, d.ParentFile().Path())
}
Expand All @@ -226,22 +235,27 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st

// We'll need a FileDescriptorSet as we have a FileDescriptorProto for the current
// descriptor we're building, but we need to include all the referenced dependencies.
fds := &descriptorpb.FileDescriptorSet{
File: []*descriptorpb.FileDescriptorProto{
fdp,
protodesc.ToFileDescriptorProto(wrapperspb.File_google_protobuf_wrappers_proto),
},

fdpList := []*descriptorpb.FileDescriptorProto{
fdp,
protodesc.ToFileDescriptorProto(wrapperspb.File_google_protobuf_wrappers_proto),
}
for _, d := range deps {
fds.File = append(fds.File, protodesc.ToFileDescriptorProto(d))
fdpList = append(fdpList, cache.getFileDescriptorProtos()...)

fds := &descriptorpb.FileDescriptorSet{
File: fdpList,
}

// Load the set into a registry, then interrogate it for the descriptor corresponding to the top level message.
files, err := protodesc.NewFiles(fds)
if err != nil {
return nil, err
}
return files.FindDescriptorByName(protoreflect.FullName(scope))
found, err := files.FindDescriptorByName(protoreflect.FullName(scope))
if err != nil {
return nil, err
}
return found.(protoreflect.MessageDescriptor), nil
}

// tableFieldSchemaToFieldDescriptorProto builds individual field descriptors for a proto message.
Expand Down
229 changes: 217 additions & 12 deletions bigquery/storage/managedwriter/adapt/protoconversion_test.go
Expand Up @@ -31,12 +31,28 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)

// TestSchemaToProtoConversion validates behavior around converting table schemas to
// a descriptor. The challenges here are that we use dynamic proto registries to
// do this work, which means that we're unable to do things like proto.Equal comparisons
// between MessageDescriptors directly.
//
// Instead, we compare to two forms of the message in DescriptorProto form: In the first,
// we ensure the structure of the outer message is as expected. In the second, we compare
// to the normalized form of the DescriptorProto as that encapsulates all the dependencies
// within the NestedTypes definition.
func TestSchemaToProtoConversion(t *testing.T) {
testCases := []struct {
description string
bq *storagepb.TableSchema
wantProto2 *descriptorpb.DescriptorProto
wantProto3 *descriptorpb.DescriptorProto
// The un-normalized descriptor (sans dependencies)
wantProto2 *descriptorpb.DescriptorProto
// Normalized descriptor (all dependencies nested)
wantProto2Normalized *descriptorpb.DescriptorProto

// The un-normalized descriptor (sans dependencies)
wantProto3 *descriptorpb.DescriptorProto
// Normalized descriptor
wantProto3Normalized *descriptorpb.DescriptorProto
}{
{
description: "basic",
Expand All @@ -58,6 +74,18 @@ func TestSchemaToProtoConversion(t *testing.T) {
{Name: proto.String("baz"), Number: proto.Int32(3), Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("foo"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()},
{Name: proto.String("bar"), Number: proto.Int32(2), Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum()},
{Name: proto.String("baz"), Number: proto.Int32(3), Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
Expand Down Expand Up @@ -107,6 +135,43 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("curdate"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("rec"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String("root__rec"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("root__rec"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("userid"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum(),
},
{
Name: proto.String("location"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
Expand Down Expand Up @@ -205,37 +270,177 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
{
description: "multiple nesting levels",
bq: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{
Name: "outer_struct",
Type: storagepb.TableFieldSchema_STRUCT,
Mode: storagepb.TableFieldSchema_NULLABLE,
Fields: []*storagepb.TableFieldSchema{
{
Name: "inner_struct",
Type: storagepb.TableFieldSchema_STRUCT,
Mode: storagepb.TableFieldSchema_NULLABLE,
Fields: []*storagepb.TableFieldSchema{
{Name: "leaf_one", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
{Name: "leaf_two", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
},
},
},
},
{
Name: "other_field",
Type: storagepb.TableFieldSchema_INT64,
Mode: storagepb.TableFieldSchema_NULLABLE,
},
},
},
wantProto2: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("outer_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__outer_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("other_field"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("outer_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String("root__outer_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("other_field"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("root__outer_struct__inner_struct"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("leaf_one"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("leaf_two"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
{
Name: proto.String("root__outer_struct"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("inner_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String("root__outer_struct__inner_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("outer_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__outer_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("other_field"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".google.protobuf.Int64Value"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
}
for _, tc := range testCases {
// Proto2
p2d, err := StorageSchemaToProto2Descriptor(tc.bq, "root")
if err != nil {
t.Errorf("case (%s) failed proto2 conversion: %v", tc.description, err)
t.Fatalf("case (%s) failed proto2 conversion: %v", tc.description, err)
}

// convert it to DP form
// Convert to MessageDescriptor.
mDesc, ok := p2d.(protoreflect.MessageDescriptor)
if !ok {
t.Errorf("%s: couldn't convert proto2 to messagedescriptor", tc.description)
}
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto2, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
// Check the non-normalized case.
if tc.wantProto2 != nil {
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto2, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
}
}
// Check the normalized case.
if tc.wantProto2Normalized != nil {
gotDP, err := NormalizeDescriptor(mDesc)
if err != nil {
t.Errorf("failed to normalize: %v", err)
}
if diff := cmp.Diff(gotDP, tc.wantProto2Normalized, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2normalized: -got, +want:\n%s", tc.description, diff)
}
}

p3d, err := StorageSchemaToProto3Descriptor(tc.bq, "root")
if err != nil {
t.Fatalf("case (%s) failed proto3 conversion: %v", tc.description, err)
}

// Convert to MessageDescriptor.
mDesc, ok = p3d.(protoreflect.MessageDescriptor)
if !ok {
t.Errorf("%s: couldn't convert proto3 to messagedescriptor", tc.description)
}
gotDP = protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto3, protocmp.Transform()); diff != "" {
t.Errorf("%s proto3: -got, +want:\n%s", tc.description, diff)
// Check the non-normalized case.
if tc.wantProto3 != nil {
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto3, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
}
}
// Check the normalized case.
if tc.wantProto3Normalized != nil {
gotDP, err := NormalizeDescriptor(mDesc)
if err != nil {
t.Errorf("failed to normalize: %v", err)
}
if diff := cmp.Diff(gotDP, tc.wantProto3Normalized, protocmp.Transform()); diff != "" {
t.Errorf("%s proto3normalized: -got, +want:\n%s", tc.description, diff)
}
}

}
}

Expand Down

0 comments on commit a017230

Please sign in to comment.