Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter/adapt): add RANGE support to adapt (
Browse files Browse the repository at this point in the history
#9836)

This PR adds support for schema conversions and proto conversions to the adapt subpackage.  For proto conversion, we build a static name for range message types rather than using the normal first-encountered-message, as the schema for a range field will also be the consistent for a given element type.

This PR also moves adapt to using subtests for table-driven testing to make it easier to isolate specific test cases.

Towards: https://togithub.com/googleapis/google-cloud-go/issues/9017
  • Loading branch information
shollyman committed Apr 24, 2024
1 parent bd18728 commit ae25253
Show file tree
Hide file tree
Showing 4 changed files with 449 additions and 80 deletions.
137 changes: 123 additions & 14 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -62,6 +62,13 @@ var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.Fiel
storagepb.TableFieldSchema_STRUCT: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
storagepb.TableFieldSchema_TIME: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_TIMESTAMP: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_RANGE: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
}

var allowedRangeTypes = []storagepb.TableFieldSchema_Type{
storagepb.TableFieldSchema_DATE,
storagepb.TableFieldSchema_DATETIME,
storagepb.TableFieldSchema_TIMESTAMP,
}

// Primitive types which can leverage packed encoding when repeated/arrays.
Expand Down Expand Up @@ -105,12 +112,26 @@ var bqTypeToWrapperMap = map[storagepb.TableFieldSchema_Type]string{
// filename used by well known types proto
var wellKnownTypesWrapperName = "google/protobuf/wrappers.proto"

var rangeTypesPrefix = "rangemessage_range_"

// dependencyCache is used to reduce the number of unique messages we generate by caching based on the tableschema.
//
// Keys are based on the base64-encoded serialized tableschema value.
type dependencyCache map[string]protoreflect.MessageDescriptor
type dependencyCache struct {
// keyed by element type
rangeTypes map[storagepb.TableFieldSchema_Type]protoreflect.MessageDescriptor
// general cache
msgs map[string]protoreflect.MessageDescriptor
}

func newDependencyCache() *dependencyCache {
return &dependencyCache{
rangeTypes: make(map[storagepb.TableFieldSchema_Type]protoreflect.MessageDescriptor),
msgs: make(map[string]protoreflect.MessageDescriptor),
}
}

func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.MessageDescriptor {
func (dm *dependencyCache) get(schema *storagepb.TableSchema) protoreflect.MessageDescriptor {
if dm == nil {
return nil
}
Expand All @@ -119,15 +140,23 @@ func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Messag
return nil
}
encoded := base64.StdEncoding.EncodeToString(b)
if desc, ok := (dm)[encoded]; ok {
if desc, ok := dm.msgs[encoded]; ok {
return desc
}
return nil
}

func (dm dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescriptorProto {
func (dm *dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescriptorProto {
var fdpList []*descriptorpb.FileDescriptorProto
for _, d := range dm {
// emit encountered messages.
for _, d := range dm.msgs {
if fd := d.ParentFile(); fd != nil {
fdp := protodesc.ToFileDescriptorProto(fd)
fdpList = append(fdpList, fdp)
}
}
// emit any range value types used.
for _, d := range dm.rangeTypes {
if fd := d.ParentFile(); fd != nil {
fdp := protodesc.ToFileDescriptorProto(fd)
fdpList = append(fdpList, fdp)
Expand All @@ -136,7 +165,7 @@ func (dm dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescript
return fdpList
}

func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.MessageDescriptor) error {
func (dm *dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.MessageDescriptor) error {
if dm == nil {
return fmt.Errorf("cache is nil")
}
Expand All @@ -145,24 +174,72 @@ func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoref
return fmt.Errorf("failed to serialize tableschema: %w", err)
}
encoded := base64.StdEncoding.EncodeToString(b)
(dm)[encoded] = descriptor
dm.msgs[encoded] = descriptor
return nil
}

func (dm *dependencyCache) addRangeByElementType(typ storagepb.TableFieldSchema_Type, useProto3 bool) (protoreflect.MessageDescriptor, error) {
if md, present := dm.rangeTypes[typ]; present {
// already added, do nothing.
return md, nil
}
// Not yet present. Build the message.
allowed := false
for _, a := range allowedRangeTypes {
if typ == a {
allowed = true
}
}
if !allowed {
return nil, fmt.Errorf("range does not support %q as a valid element type", typ.String())
}
ts := &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{
Name: "start",
Type: typ,
Mode: storagepb.TableFieldSchema_NULLABLE,
},
{
Name: "end",
Type: typ,
Mode: storagepb.TableFieldSchema_NULLABLE,
},
},
}
// we put the range types outside the hierarchical namespace as they're effectively BQ-specific well-known types.
msgTypeName := fmt.Sprintf("%s%s", rangeTypesPrefix, strings.ToLower(typ.String()))
// use a new dependency cache, as we don't want to taint the main one due to matching schema
md, err := storageSchemaToDescriptorInternal(ts, msgTypeName, newDependencyCache(), useProto3)
if err != nil {
return nil, fmt.Errorf("failed to generate range descriptor %q: %v", msgTypeName, err)
}
dm.rangeTypes[typ] = md
return md, nil
}

func (dm *dependencyCache) getRange(typ storagepb.TableFieldSchema_Type) protoreflect.MessageDescriptor {
md, ok := dm.rangeTypes[typ]
if !ok {
return nil
}
return md
}

// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
dc := newDependencyCache()
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, false)
return storageSchemaToDescriptorInternal(inSchema, scope, dc, false)
}

// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
//
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
// completeness.
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, true)
dc := newDependencyCache()
return storageSchemaToDescriptorInternal(inSchema, scope, dc, true)
}

// Internal implementation of the conversion code.
Expand All @@ -178,10 +255,11 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
for _, f := range inSchema.GetFields() {
fNumber = fNumber + 1
currentScope := fmt.Sprintf("%s__%s", scope, f.GetName())
// If we're dealing with a STRUCT type, we must deal with sub messages.
// As multiple submessages may share the same type definition, we use a dependency cache
// and interrogate it / populate it as we're going.

if f.Type == storagepb.TableFieldSchema_STRUCT {
// If we're dealing with a STRUCT type, we must deal with sub messages.
// As multiple submessages may share the same type definition, we use a dependency cache
// and interrogate it / populate it as we're going.
foundDesc := cache.get(&storagepb.TableSchema{Fields: f.GetFields()})
if foundDesc != nil {
// check to see if we already have this in current dependency list
Expand Down Expand Up @@ -225,6 +303,30 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
fields = append(fields, fdp)
}
} else {
if f.Type == storagepb.TableFieldSchema_RANGE {
// Range handling is a special case of general struct handling.
ret := f.GetRangeElementType()
if ret == nil {
return nil, fmt.Errorf("field %q is a RANGE, but doesn't include RangeElementType info", f.GetName())
}
foundDesc, err := cache.addRangeByElementType(ret.GetType(), useProto3)
if err != nil {
return nil, err
}
if foundDesc != nil {
haveDep := false
for _, dep := range deps {
if messageDependsOnFile(foundDesc, dep) {
haveDep = true
break
}
}
// If dep is missing, add to current dependencies.
if !haveDep {
deps = append(deps, foundDesc.ParentFile())
}
}
}
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, err)
Expand Down Expand Up @@ -318,6 +420,13 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
TypeName: proto.String(scope),
Label: convertModeToLabel(field.GetMode(), useProto3),
}
} else if field.GetType() == storagepb.TableFieldSchema_RANGE {
fdp = &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
TypeName: proto.String(fmt.Sprintf("%s%s", rangeTypesPrefix, strings.ToLower(field.GetRangeElementType().GetType().String()))),
Label: convertModeToLabel(field.GetMode(), useProto3),
}
} else {
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {
Expand Down

0 comments on commit ae25253

Please sign in to comment.