Skip to content

Commit

Permalink
Merge pull request #12612 from zmstone/0226-fix-schema-registry-repli…
Browse files Browse the repository at this point in the history
…cation

refactor: change schema registry SERD_TAB to ets
  • Loading branch information
zmstone committed Feb 29, 2024
2 parents 5bb769e + 10c1245 commit 5afe000
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 130 deletions.
28 changes: 9 additions & 19 deletions apps/emqx_schema_registry/include/emqx_schema_registry.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,29 @@

%% Note: this has the `_ee_' segment for backwards compatibility.
-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).

%% ETS table for serde build results.
-define(SERDE_TAB, emqx_schema_registry_serde_tab).

-define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
<<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">>
).
-type schema_name() :: binary().
-type schema_source() :: binary().

-type serde_args() :: list().

-type encoded_data() :: iodata().
-type decoded_data() :: map().
-type serializer() ::
fun((decoded_data()) -> encoded_data())
| fun((decoded_data(), term()) -> encoded_data()).
-type deserializer() ::
fun((encoded_data()) -> decoded_data())
| fun((encoded_data(), term()) -> decoded_data()).
-type destructor() :: fun(() -> ok).
-type serde_type() :: avro.

-type serde_type() :: avro | protobuf.
-type serde_opts() :: map().

-record(serde, {
name :: schema_name(),
serializer :: serializer(),
deserializer :: deserializer(),
destructor :: destructor()
type :: serde_type(),
eval_context :: term()
}).
-type serde() :: #serde{}.

Expand All @@ -50,11 +47,4 @@
module_binary :: binary()
}.

-type serde_map() :: #{
name := schema_name(),
serializer := serializer(),
deserializer := deserializer(),
destructor := destructor()
}.

-endif.
2 changes: 1 addition & 1 deletion apps/emqx_schema_registry/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_rule_engine, {path, "../emqx_rule_engine"}},
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
{erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
{gpb, "4.19.9"}
]}.

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_schema_registry/src/emqx_schema_registry.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, [emqx_schema_registry_sup]},
{mod, {emqx_schema_registry_app, []}},
{included_applications, [
Expand Down
46 changes: 14 additions & 32 deletions apps/emqx_schema_registry/src/emqx_schema_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
%% API
-export([
start_link/0,
get_serde/1,
add_schema/2,
get_schema/1,
delete_schema/1,
Expand All @@ -38,6 +37,11 @@
import_config/1
]).

%% for testing
-export([
get_serde/1
]).

-type schema() :: #{
type := serde_type(),
source := binary(),
Expand All @@ -51,13 +55,13 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec get_serde(schema_name()) -> {ok, serde_map()} | {error, not_found}.
-spec get_serde(schema_name()) -> {ok, serde()} | {error, not_found}.
get_serde(SchemaName) ->
case ets:lookup(?SERDE_TAB, to_bin(SchemaName)) of
[] ->
{error, not_found};
[Serde] ->
{ok, serde_to_map(Serde)}
{ok, Serde}
end.

-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
Expand Down Expand Up @@ -214,21 +218,15 @@ terminate(_Reason, _State) ->
%%-------------------------------------------------------------------------------------------------

create_tables() ->
ok = mria:create_table(?SERDE_TAB, [
{type, ordered_set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
{storage, ram_copies},
{record_name, serde},
{attributes, record_info(fields, serde)}
]),
ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]),
ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
{type, set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
{storage, rocksdb_copies},
{record_name, protobuf_cache},
{attributes, record_info(fields, protobuf_cache)}
]),
ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]),
ok = mria:wait_for_tables([?PROTOBUF_CACHE_TAB]),
ok.

do_build_serdes(Schemas) ->
Expand Down Expand Up @@ -290,15 +288,8 @@ do_build_serde(Name, Serde) when not is_binary(Name) ->
do_build_serde(to_bin(Name), Serde);
do_build_serde(Name, #{type := Type, source := Source}) ->
try
{Serializer, Deserializer, Destructor} =
emqx_schema_registry_serde:make_serde(Type, Name, Source),
Serde = #serde{
name = Name,
serializer = Serializer,
deserializer = Deserializer,
destructor = Destructor
},
ok = mria:dirty_write(?SERDE_TAB, Serde),
Serde = emqx_schema_registry_serde:make_serde(Type, Name, Source),
true = ets:insert(?SERDE_TAB, Serde),
ok
catch
Kind:Error:Stacktrace ->
Expand All @@ -320,9 +311,9 @@ ensure_serde_absent(Name) when not is_binary(Name) ->
ensure_serde_absent(to_bin(Name));
ensure_serde_absent(Name) ->
case get_serde(Name) of
{ok, #{destructor := Destructor}} ->
Destructor(),
ok = mria:dirty_delete(?SERDE_TAB, Name);
{ok, Serde} ->
_ = ets:delete(?SERDE_TAB, Name),
ok = emqx_schema_registry_serde:destroy(Serde);
{error, not_found} ->
ok
end.
Expand All @@ -346,12 +337,3 @@ schema_name_bin_to_atom(Bin) when size(Bin) > 255 ->
);
schema_name_bin_to_atom(Bin) ->
binary_to_atom(Bin, utf8).

-spec serde_to_map(serde()) -> serde_map().
serde_to_map(#serde{} = Serde) ->
#{
name => Serde#serde.name,
serializer => Serde#serde.serializer,
deserializer => Serde#serde.deserializer,
destructor => Serde#serde.destructor
}.
95 changes: 50 additions & 45 deletions apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_schema_registry_serde]}}]).

%% API
-export([
decode/2,
decode/3,
encode/2,
encode/3,
make_serde/3,
handle_rule_function/2
handle_rule_function/2,
destroy/1
]).

-export([
eval_decode/2,
eval_encode/2
]).

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -70,8 +74,8 @@ decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
case emqx_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{deserializer := Deserializer}} ->
apply(Deserializer, [RawData | VarArgs])
{ok, Serde} ->
eval_decode(Serde, [RawData | VarArgs])
end.

-spec encode(schema_name(), decoded_data()) -> encoded_data().
Expand All @@ -83,55 +87,56 @@ encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
case emqx_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{serializer := Serializer}} ->
apply(Serializer, [EncodedData | VarArgs])
{ok, Serde} ->
eval_encode(Serde, [EncodedData | VarArgs])
end.

-spec make_serde(serde_type(), schema_name(), schema_source()) ->
{serializer(), deserializer(), destructor()}.
make_serde(avro, Name, Source0) ->
Source = inject_avro_name(Name, Source0),
Serializer = avro:make_simple_encoder(Source, _Opts = []),
Deserializer = avro:make_simple_decoder(Source, [{map_type, map}, {record_type, map}]),
Destructor = fun() ->
?tp(serde_destroyed, #{type => avro, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor};
-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
make_serde(avro, Name, Source) ->
Store0 = avro_schema_store:new([map]),
%% import the schema into the map store with an assigned name
%% if it's a named schema (e.g. struct), then Name is added as alias
Store = avro_schema_store:import_schema_json(Name, Source, Store0),
#serde{
name = Name,
type = avro,
eval_context = Store
};
make_serde(protobuf, Name, Source) ->
SerdeMod = make_protobuf_serde_mod(Name, Source),
Serializer =
fun(DecodedData0, MessageName0) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8),
SerdeMod:encode_msg(DecodedData, MessageName)
end,
Deserializer =
fun(EncodedData, MessageName0) ->
MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = SerdeMod:decode_msg(EncodedData, MessageName),
emqx_utils_maps:binary_key_map(Decoded)
end,
Destructor =
fun() ->
unload_code(SerdeMod),
?tp(serde_destroyed, #{type => protobuf, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor}.
#serde{
name = Name,
type = protobuf,
eval_context = SerdeMod
}.

eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
avro_binary_decoder:decode(Data, Name, Store, Opts);
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
emqx_utils_maps:binary_key_map(Decoded).

eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
avro_binary_encoder:encode(Store, Name, Data);
eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8),
apply(SerdeMod, encode_msg, [DecodedData, MessageName]).

destroy(#serde{type = avro, name = _Name}) ->
?tp(serde_destroyed, #{type => avro, name => _Name}),
ok;
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
unload_code(SerdeMod),
?tp(serde_destroyed, #{type => protobuf, name => _Name}),
ok.

%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

-spec inject_avro_name(schema_name(), schema_source()) -> schema_source().
inject_avro_name(Name, Source0) ->
%% The schema checks that the source is a valid JSON when
%% typechecking, so we shouldn't need to validate here.
Schema0 = emqx_utils_json:decode(Source0, [return_maps]),
Schema = Schema0#{<<"name">> => Name},
emqx_utils_json:encode(Schema).

-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
make_protobuf_serde_mod(Name, Source) ->
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
Expand Down

0 comments on commit 5afe000

Please sign in to comment.