Skip to content

Commit

Permalink
Adds module to load/dump self-describing protocol buffers (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
popematt committed Jul 5, 2023
1 parent 35f07ca commit 02d3b8a
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 0 deletions.
100 changes: 100 additions & 0 deletions amazon/ionbenchmark/proto_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
A collection of tools to help with protocol buffers for use in the Ion Benchmark CLI.
Usage:
proto_tools.py <command> [<args>...]
proto_tools.py --help
Commands:{commands}
Options:
-h, --help
"""

import types
from docopt import docopt as _docopt
from google.protobuf import descriptor_pb2 as _descriptor_pb2
from self_describing_proto import SelfDescribingProtoSerde


def wrap_command():
"""
Wrap a protocol buffer in a SelfDescribingMessage protocol buffer.
Usage:
proto_tools.py wrap <schema_descriptor_file> <type_name> <input_file> [<output_file>]
Arguments:
<schema_descriptor_file> A Protobuf FileDescriptorSet, generated using `protoc --descriptor_set_out ...`.
<type_name> The type of <input_file>. Must be included in <schema_descriptor_file>.
<input_file> A protocol buffer to be wrapped in a self-describing protocol buffer.
<output_file> Where the wrapped protocol buffer should be saved. Default: `<input_file>.wrapped`
"""
arguments = _docopt(wrap_command.__doc__, help=True)
schema_file = arguments['<schema_descriptor_file>']
type_name = arguments['<type_name>']
input_file = arguments['<input_file>']
output_file = arguments['<output_file>'] or "{}.wrapped".format(input_file)

descriptor_set = _descriptor_pb2.FileDescriptorSet()

with open(schema_file, "rb") as schema_fp:
descriptor_set.ParseFromString(schema_fp.read())

sd_proto = SelfDescribingProtoSerde()
inner_obj = sd_proto.generate_class_definition(type_name, descriptor_set)()

with open(input_file, "rb") as fp:
inner_obj.ParseFromString(fp.read())
with open(output_file, "wb") as fp:
sd_proto.dump(inner_obj, fp)


def unwrap_command():
"""
Unwrap the inner protocol buffer of a SelfDescribingMessage protocol buffer.
Usage:
proto_tools.py unwrap <input_file> [<output_file>]
Arguments:
<input_file> The file to unwrap
<output_file> The destination for the unwrapped file. Default: `<input_file>.unwrapped`
"""
arguments = _docopt(wrap_command.__doc__, help=True)
input_file = arguments['<input_file>']
output_file = arguments['<output_file>'] or "{}.unwrapped".format(input_file)

sd_proto = SelfDescribingProtoSerde()
with open(input_file, "rb") as fp:
obj = sd_proto.load(fp)
with open(output_file, "wb") as fp:
fp.write(obj.SerializeToString())


def _list_commands():
"""
Get a formatted list of commands as a string.
"""
commands = ""
for name, member in [*globals().items()]:
if isinstance(member, types.FunctionType) and name.endswith("_command"):
display_name = name.removesuffix("_command")
# Get the first line of __doc__ to use as the summary.
description = next((s.strip() for s in (member.__doc__ or "").split('\n') if s.strip()), "")
commands += "\n {: <16}{}".format(display_name, description)
return commands


if __name__ == '__main__':
docs = __doc__.format(commands=_list_commands())
args = _docopt(docs, help=True, options_first=True)
func_name = "{}_command".format(args['<command>'])
if func_name in globals().items().mapping:
globals().items().mapping[func_name]()
else:
exit("%r is not a proto_tools.py command. See 'proto_tools.py --help'." % args['<command>'])
20 changes: 20 additions & 0 deletions amazon/ionbenchmark/self_describing_proto.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// This is the source for the generated `self_describing_proto_pb2.py` module. It should not need to be regenerated,
// but if you do need to regenerate it for some reason, run the following command in the root of the repo:
//
// protoc ./amazon/ionbenchmark/self_describing_proto.proto --python_out ./amazon/ionbenchmark/

syntax = "proto3";

import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";

message SelfDescribingMessage {
// Set of FileDescriptorProtos which describe the type and its dependencies.
google.protobuf.FileDescriptorSet descriptor_set = 1;

// The message and its type, encoded as an Any message.
google.protobuf.Any message = 2;
}
118 changes: 118 additions & 0 deletions amazon/ionbenchmark/self_describing_proto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from google.protobuf import message as _message
from google.protobuf import message_factory as _message_factory
from google.protobuf import reflection as _reflection

from self_describing_proto_pb2 import SelfDescribingMessage

import six


class SelfDescribingProtoSerde:
"""
This class provides functions for reading and writing self-describing protocol buffers.
This class uses the technique described in https://protobuf.dev/programming-guides/techniques/#self-description to
create a mostly self-describing protocol buffer. (It's not truly self-describing because the reader must know that
the serialized data uses the Self-describing Messages convention.)
To create a self-describing protocol buffer, see proto_tools.py.
This class makes no guarantees of thread-safety because each instance of this class has its own single instance of
the SelfDescribingMessage wrapper that it reuses for every load(s) call.
TODO: Add support for importing "well-known types" (https://protobuf.dev/reference/protobuf/google.protobuf/).
"""
def __init__(self, cache_type_info=False, reuse_inner_object=False):
"""
:param cache_type_info: Controls whether the type descriptor set and generated classes should be cached. Caching
the generated classes for load(s) can be significantly faster than not caching, but requires more memory for
the cache. WARNING: there is no cache eviction implemented yet.
(Has no effect for dump(s).)
:param reuse_inner_object: Controls whether loads should create a new instance of the inner message class each
time load(s) is called.
(Has no effect if cache_type_info is False. Has no effect for dump(s).)
"""
self._cache_type_info = cache_type_info
self._reuse_inner_object = reuse_inner_object
self._cached_outer_object = SelfDescribingMessage()
self._cached_inner_definitions = {}
self._cached_inner_objects = {}

@staticmethod
def generate_class_definition(type_name, descriptor_set):
"""
Generate a Python class for the given type_name using the provided descriptor_set.
"""
messages_types = _message_factory.GetMessages(descriptor_set.file)
message_type = messages_types[type_name]()

class DynamicMessage(six.with_metaclass(_reflection.GeneratedProtocolMessageType, _message.Message)):
DESCRIPTOR = message_type.DESCRIPTOR

return DynamicMessage

def _get_inner_object_instance(self, type_name, descriptor_set):
"""
Get an uninitialized instance of the inner object for the given message.
"""
if self._cache_type_info:
descriptor_set_key = descriptor_set.SerializeToString()

if descriptor_set_key not in self._cached_inner_definitions:
self._cached_inner_definitions[descriptor_set_key] = {}
if type_name not in self._cached_inner_definitions[descriptor_set_key]:
self._cached_inner_definitions[descriptor_set_key][type_name] = \
SelfDescribingProtoSerde.generate_class_definition(type_name, descriptor_set)

clazz = self._cached_inner_definitions[descriptor_set_key][type_name]

if self._reuse_inner_object:
if descriptor_set_key not in self._cached_inner_objects:
self._cached_inner_objects[descriptor_set_key] = {}
if type_name not in self._cached_inner_objects[descriptor_set_key]:
self._cached_inner_objects[descriptor_set_key][type_name] = clazz()

return self._cached_inner_objects[descriptor_set_key][type_name]
else:
return clazz()
else:
clazz = SelfDescribingProtoSerde.generate_class_definition(type_name, descriptor_set)
return clazz()

def loads(self, s):
"""
Deserialize a self-describing protocol buffer from bytes/string.
"""
outer_obj = self._cached_outer_object
outer_obj.ParseFromString(s)
inner_obj = self._get_inner_object_instance(outer_obj.message.type_url, outer_obj.descriptor_set)
inner_obj.ParseFromString(outer_obj.message.value)
return inner_obj

def load(self, fp):
"""
Deserialize a self-describing protocol buffer from a file.
"""
return self.loads(fp.read())

def dumps(self, obj):
"""
Serialize a protocol buffer message as self-describing protocol buffer bytes/string.
"""
# It seems to be faster to create a new SelfDescribingMessage for each call than it is to use the cached object
# and check and/or clear the descriptor for each call.
outer_object = SelfDescribingMessage()
obj.DESCRIPTOR.file.CopyToProto(self._cached_outer_object.descriptor_set.file.add())

outer_object.message.type_url = obj.DESCRIPTOR.full_name
outer_object.message.value = obj.SerializeToString()
return outer_object.SerializeToString()

def dump(self, obj, fp):
"""
Serialize a protocol buffer message as a self-describing protocol buffer file.
"""
fp.write(self.dumps(obj))
28 changes: 28 additions & 0 deletions amazon/ionbenchmark/self_describing_proto_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ cbor~=1.0.0
cbor2~=5.4.6
python-rapidjson~=1.9
ujson~=5.7.0
protobuf>=4.0.0

0 comments on commit 02d3b8a

Please sign in to comment.