Skip to content

Commit

Permalink
Merge pull request #10409 from thalesmg/protobuf-schema-v50
Browse files Browse the repository at this point in the history
feat(schema_registry): add support for protobuf schemas
  • Loading branch information
thalesmg committed Apr 17, 2023
2 parents 0c727fc + 9d15247 commit 586cd54
Show file tree
Hide file tree
Showing 15 changed files with 640 additions and 101 deletions.
5 changes: 1 addition & 4 deletions build
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ make_docs() {
}

assert_no_compile_time_only_deps() {
if [ "$("$FIND" "_build/$PROFILE/rel/emqx/lib/" -maxdepth 1 -name 'gpb-*' -type d)" != "" ]; then
echo "gpb should not be included in the release"
exit 1
fi
:
}

make_rel() {
Expand Down
1 change: 1 addition & 0 deletions changes/ee/feat-10409.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for [Protocol Buffers](https://protobuf.dev/) schemas in Schema Registry.
21 changes: 19 additions & 2 deletions lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@

-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).

-type schema_name() :: binary().
-type schema_source() :: binary().

-type encoded_data() :: iodata().
-type decoded_data() :: map().
-type serializer() :: fun((decoded_data()) -> encoded_data()).
-type deserializer() :: fun((encoded_data()) -> decoded_data()).
-type serializer() ::
fun((decoded_data()) -> encoded_data())
| fun((decoded_data(), term()) -> encoded_data()).
-type deserializer() ::
fun((encoded_data()) -> decoded_data())
| fun((encoded_data(), term()) -> decoded_data()).
-type destructor() :: fun(() -> ok).
-type serde_type() :: avro.
-type serde_opts() :: map().
Expand All @@ -29,6 +34,18 @@
destructor :: destructor()
}).
-type serde() :: #serde{}.

-record(protobuf_cache, {
fingerprint,
module,
module_binary
}).
-type protobuf_cache() :: #protobuf_cache{
fingerprint :: binary(),
module :: module(),
module_binary :: binary()
}.

-type serde_map() :: #{
name := schema_name(),
serializer := serializer(),
Expand Down
3 changes: 2 additions & 1 deletion lib-ee/emqx_ee_schema_registry/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
{deps, [
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}},
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
{gpb, "4.19.7"}
]}.

{shell, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
{applications, [
kernel,
stdlib,
erlavro
erlavro,
gpb
]},
{env, []},
{modules, []},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,14 @@ create_tables() ->
{record_name, serde},
{attributes, record_info(fields, serde)}
]),
ok = mria:wait_for_tables([?SERDE_TAB]),
ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
{type, set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
{storage, disc_only_copies},
{record_name, protobuf_cache},
{attributes, record_info(fields, protobuf_cache)}
]),
ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]),
ok.

do_build_serdes(Schemas) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,29 @@ fields(avro) ->
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
];
fields(protobuf) ->
[
{type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})},
{source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})},
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
];
fields("get_avro") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
fields("get_protobuf") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
fields("put_avro") ->
fields(avro);
fields("put_protobuf") ->
fields(protobuf);
fields("post_" ++ Type) ->
fields("get_" ++ Type).

desc(?CONF_KEY_ROOT) ->
?DESC("schema_registry_root");
desc(avro) ->
?DESC("avro_type");
desc(protobuf) ->
?DESC("protobuf_type");
desc(_) ->
undefined.

Expand Down Expand Up @@ -96,7 +108,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name).

supported_serde_types() ->
[avro].
[avro, protobuf].

refs() ->
[ref(Type) || Type <- supported_serde_types()].
Expand All @@ -105,6 +117,8 @@ refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
refs(#{<<"type">> := <<"avro">>}) ->
[ref(avro)];
refs(#{<<"type">> := <<"protobuf">>}) ->
[ref(protobuf)];
refs(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{
Expand All @@ -113,12 +127,14 @@ refs(_) ->
}).

refs_get_api() ->
[ref("get_avro")].
[ref("get_avro"), ref("get_protobuf")].

refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
refs_get_api(#{<<"type">> := <<"avro">>}) ->
[ref("get_avro")];
refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
[ref("get_protobuf")];
refs_get_api(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{
Expand Down
132 changes: 132 additions & 0 deletions lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
-module(emqx_ee_schema_registry_serde).

-include("emqx_ee_schema_registry.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_ee_schema_registry_serde]}}]).

%% API
-export([
decode/2,
Expand Down Expand Up @@ -55,6 +58,27 @@ make_serde(avro, Name, Source0) ->
?tp(serde_destroyed, #{type => avro, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor};
make_serde(protobuf, Name, Source) ->
SerdeMod = make_protobuf_serde_mod(Name, Source),
Serializer =
fun(DecodedData0, MessageName0) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8),
SerdeMod:encode_msg(DecodedData, MessageName)
end,
Deserializer =
fun(EncodedData, MessageName0) ->
MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = SerdeMod:decode_msg(EncodedData, MessageName),
emqx_utils_maps:binary_key_map(Decoded)
end,
Destructor =
fun() ->
unload_code(SerdeMod),
?tp(serde_destroyed, #{type => protobuf, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor}.

%%------------------------------------------------------------------------------
Expand All @@ -68,3 +92,111 @@ inject_avro_name(Name, Source0) ->
Schema0 = emqx_utils_json:decode(Source0, [return_maps]),
Schema = Schema0#{<<"name">> => Name},
emqx_utils_json:encode(Schema).

-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
make_protobuf_serde_mod(Name, Source) ->
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
case lazy_generate_protobuf_code(SerdeMod0, Source) of
{ok, SerdeMod, ModBinary} ->
load_code(SerdeMod, SerdeModFileName, ModBinary),
SerdeMod;
{error, #{error := Error, warnings := Warnings}} ->
?SLOG(
warning,
#{
msg => "error_generating_protobuf_code",
error => Error,
warnings => Warnings
}
),
error({invalid_protobuf_schema, Error})
end.

-spec protobuf_serde_mod_name(schema_name()) -> {module(), string()}.
protobuf_serde_mod_name(Name) ->
%% must be a string (list)
SerdeModName = "$schema_parser_" ++ binary_to_list(Name),
SerdeMod = list_to_atom(SerdeModName),
%% the "path" to the module, for `code:load_binary'.
SerdeModFileName = SerdeModName ++ ".memory",
{SerdeMod, SerdeModFileName}.

-spec lazy_generate_protobuf_code(module(), schema_source()) ->
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
lazy_generate_protobuf_code(SerdeMod0, Source) ->
%% We run this inside a transaction with locks to avoid running
%% the compile on all nodes; only one will get the lock, compile
%% the schema, and other nodes will simply read the final result.
{atomic, Res} = mria:transaction(
?SCHEMA_REGISTRY_SHARD,
fun lazy_generate_protobuf_code_trans/2,
[SerdeMod0, Source]
),
Res.

-spec lazy_generate_protobuf_code_trans(module(), schema_source()) ->
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
lazy_generate_protobuf_code_trans(SerdeMod0, Source) ->
Fingerprint = erlang:md5(Source),
_ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
[#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
?tp(schema_registry_protobuf_cache_hit, #{}),
{ok, SerdeMod, ModBinary};
[] ->
?tp(schema_registry_protobuf_cache_miss, #{}),
case generate_protobuf_code(SerdeMod0, Source) of
{ok, SerdeMod, ModBinary} ->
CacheEntry = #protobuf_cache{
fingerprint = Fingerprint,
module = SerdeMod,
module_binary = ModBinary
},
ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
{ok, SerdeMod, ModBinary};
{ok, SerdeMod, ModBinary, _Warnings} ->
CacheEntry = #protobuf_cache{
fingerprint = Fingerprint,
module = SerdeMod,
module_binary = ModBinary
},
ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
{ok, SerdeMod, ModBinary};
error ->
{error, #{error => undefined, warnings => []}};
{error, Error} ->
{error, #{error => Error, warnings => []}};
{error, Error, Warnings} ->
{error, #{error => Error, warnings => Warnings}}
end
end.

generate_protobuf_code(SerdeMod, Source) ->
gpb_compile:string(
SerdeMod,
Source,
[
binary,
strings_as_binaries,
{maps, true},
%% Fixme: currently, some bug in `gpb' prevents this
%% option from working with `oneof' types... We're then
%% forced to use atom key maps.
%% {maps_key_type, binary},
{maps_oneof, flat},
{verify, always},
{maps_unset_optional, omitted}
]
).

-spec load_code(module(), string(), binary()) -> ok.
load_code(SerdeMod, SerdeModFileName, ModBinary) ->
_ = code:purge(SerdeMod),
{module, SerdeMod} = code:load_binary(SerdeMod, SerdeModFileName, ModBinary),
ok.

-spec unload_code(module()) -> ok.
unload_code(SerdeMod) ->
_ = code:purge(SerdeMod),
_ = code:delete(SerdeMod),
ok.

0 comments on commit 586cd54

Please sign in to comment.