Skip to content

Commit c54aa74

Browse files
authored
feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor (#4681)
* feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor This functionality supports the "bring your own proto" case for writing data. Towards: #4366
1 parent b9226eb commit c54aa74

File tree

6 files changed

+1424
-1
lines changed

6 files changed

+1424
-1
lines changed

Diff for: bigquery/storage/managedwriter/adapt/protoconversion.go

+134
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,137 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
279279
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
280280
}, nil
281281
}
282+
283+
// NormalizeDescriptor builds a self-contained DescriptorProto suitable for communicating schema
284+
// information with the BigQuery Storage write API. It's primarily used for cases where users are
285+
// interested in sending data using a predefined protocol buffer message.
286+
//
287+
// The storage API accepts a single DescriptorProto for decoding message data. In many cases, a message
288+
// is comprised of multiple independent messages, from the same .proto file or from multiple sources. Rather
289+
// than being forced to communicate all these messages independently, what this method does is rewrite the
290+
// DescriptorProto to inline all messages as nested submessages. As the backend only cares about the types
291+
// and not the namespaces when decoding, this is sufficient for the needs of the API's representation.
292+
//
293+
// In addition to nesting messages, this method also handles some encapsulation of enum types to avoid possible
294+
// conflicts due to ambiguities.
295+
func NormalizeDescriptor(in protoreflect.MessageDescriptor) (*descriptorpb.DescriptorProto, error) {
296+
return normalizeDescriptorInternal(in, newStringSet(), newStringSet(), newStringSet(), nil)
297+
}
298+
299+
func normalizeDescriptorInternal(in protoreflect.MessageDescriptor, visitedTypes, enumTypes, structTypes *stringSet, root *descriptorpb.DescriptorProto) (*descriptorpb.DescriptorProto, error) {
300+
if in == nil {
301+
return nil, fmt.Errorf("no messagedescriptor provided")
302+
}
303+
resultDP := &descriptorpb.DescriptorProto{}
304+
if root == nil {
305+
root = resultDP
306+
}
307+
fullProtoName := string(in.FullName())
308+
resultDP.Name = proto.String(normalizeName(fullProtoName))
309+
visitedTypes.add(fullProtoName)
310+
for i := 0; i < in.Fields().Len(); i++ {
311+
inField := in.Fields().Get(i)
312+
resultFDP := protodesc.ToFieldDescriptorProto(inField)
313+
if inField.Kind() == protoreflect.MessageKind || inField.Kind() == protoreflect.GroupKind {
314+
// Handle fields that reference messages.
315+
// Groups are a proto2-ism which predated nested messages.
316+
msgFullName := string(inField.Message().FullName())
317+
if !skipNormalization(msgFullName) {
318+
// for everything but well known types, normalize.
319+
normName := normalizeName(string(msgFullName))
320+
if structTypes.contains(msgFullName) {
321+
resultFDP.TypeName = proto.String(normName)
322+
} else {
323+
if visitedTypes.contains(msgFullName) {
324+
return nil, fmt.Errorf("recursize type not supported: %s", inField.FullName())
325+
}
326+
visitedTypes.add(msgFullName)
327+
dp, err := normalizeDescriptorInternal(inField.Message(), visitedTypes, enumTypes, structTypes, root)
328+
if err != nil {
329+
return nil, fmt.Errorf("error converting message %s: %v", inField.FullName(), err)
330+
}
331+
root.NestedType = append(root.NestedType, dp)
332+
visitedTypes.delete(msgFullName)
333+
lastNested := root.GetNestedType()[len(root.GetNestedType())-1].GetName()
334+
resultFDP.TypeName = proto.String(lastNested)
335+
}
336+
}
337+
}
338+
if inField.Kind() == protoreflect.EnumKind {
339+
// For enums, in order to avoid value conflict, we will always define
340+
// a enclosing struct called enum_full_name_E that includes the actual
341+
// enum.
342+
enumFullName := string(inField.Enum().FullName())
343+
enclosingTypeName := normalizeName(enumFullName) + "_E"
344+
enumName := string(inField.Enum().Name())
345+
actualFullName := fmt.Sprintf("%s.%s", enclosingTypeName, enumName)
346+
if enumTypes.contains(enumFullName) {
347+
resultFDP.TypeName = proto.String(actualFullName)
348+
} else {
349+
enumDP := protodesc.ToEnumDescriptorProto(inField.Enum())
350+
enumDP.Name = proto.String(enumName)
351+
resultDP.NestedType = append(resultDP.NestedType, &descriptorpb.DescriptorProto{
352+
Name: proto.String(enclosingTypeName),
353+
EnumType: []*descriptorpb.EnumDescriptorProto{enumDP},
354+
})
355+
resultFDP.TypeName = proto.String(actualFullName)
356+
enumTypes.add(enumFullName)
357+
}
358+
}
359+
resultDP.Field = append(resultDP.Field, resultFDP)
360+
}
361+
structTypes.add(fullProtoName)
362+
return resultDP, nil
363+
}
364+
365+
type stringSet struct {
366+
m map[string]struct{}
367+
}
368+
369+
func (s *stringSet) contains(k string) bool {
370+
_, ok := s.m[k]
371+
return ok
372+
}
373+
374+
func (s *stringSet) add(k string) {
375+
s.m[k] = struct{}{}
376+
}
377+
378+
func (s *stringSet) delete(k string) {
379+
delete(s.m, k)
380+
}
381+
382+
func newStringSet() *stringSet {
383+
return &stringSet{
384+
m: make(map[string]struct{}),
385+
}
386+
}
387+
388+
func normalizeName(in string) string {
389+
return strings.Replace(in, ".", "_", -1)
390+
}
391+
392+
// these types don't get normalized into the fully-contained structure.
393+
var normalizationSkipList = []string{
394+
/*
395+
TODO: when backend supports resolving well known types, this list should be enabled.
396+
"google.protobuf.DoubleValue",
397+
"google.protobuf.FloatValue",
398+
"google.protobuf.Int64Value",
399+
"google.protobuf.UInt64Value",
400+
"google.protobuf.Int32Value",
401+
"google.protobuf.Uint32Value",
402+
"google.protobuf.BoolValue",
403+
"google.protobuf.StringValue",
404+
"google.protobuf.BytesValue",
405+
*/
406+
}
407+
408+
func skipNormalization(fullName string) bool {
409+
for _, v := range normalizationSkipList {
410+
if v == fullName {
411+
return true
412+
}
413+
}
414+
return false
415+
}

0 commit comments

Comments
 (0)