Skip to content

Commit

Permalink
feat(sessds): Move config schema to a separate root
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed Feb 22, 2024
1 parent 30f1f03 commit c2f3f28
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 42 deletions.
8 changes: 7 additions & 1 deletion apps/emqx/src/emqx_config.erl
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -497,6 +497,12 @@ fill_defaults(RawConf, Opts) ->
).

-spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) ->
%% FIXME: kludge to prevent `emqx_config' module from filling in
%% the default values for backends and layouts. These records are
%% inside unions, and adding default values there will add
%% incompatible fields.
RawConf;
fill_defaults(SchemaMod, RawConf, Opts0) ->
Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
hocon_tconf:check_plain(
Expand Down
245 changes: 245 additions & 0 deletions apps/emqx/src/emqx_ds_schema.erl
@@ -0,0 +1,245 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

%% @doc Schema for EMQX_DS databases.
-module(emqx_ds_schema).

%% API:
-export([schema/0, translate_builtin/1]).

%% Behavior callbacks:
-export([fields/1, desc/1, namespace/0]).

-include("emqx_schema.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("hocon/include/hocon_types.hrl").

%%================================================================================
%% Type declarations
%%================================================================================

%%================================================================================
%% API
%%================================================================================

translate_builtin(#{
backend := builtin,
n_shards := NShards,
replication_factor := ReplFactor,
layout := Layout
}) ->
Storage =
case Layout of
#{
type := wildcard_optimized,
bits_per_topic_level := BitsPerTopicLevel,
epoch_bits := EpochBits,
topic_index_bytes := TIBytes
} ->
{emqx_ds_storage_bitfield_lts, #{
bits_per_topic_level => BitsPerTopicLevel,
topic_index_bytes => TIBytes,
epoch_bits => EpochBits
}};
#{type := reference} ->
{emqx_ds_storage_reference, #{}}
end,
#{
backend => builtin,
n_shards => NShards,
replication_factor => ReplFactor,
storage => Storage
}.

%%================================================================================
%% Behavior callbacks
%%================================================================================

namespace() ->
durable_storage.

schema() ->
[
{"messages",
ds_schema(#{
desc => ?DESC(messages),
importance => ?IMPORTANCE_HIDDEN,
default =>
#{
<<"backend">> => builtin
}
})}
].

fields("builtin") ->
%% Schema for the builtin backend:
[
{"backend",
sc(
builtin,
#{
importance => ?IMPORTANCE_MEDIUM,
'readOnly' => true,
default => builtin,
desc => ?DESC(builtin)
}
)},
{"_config_handler",
sc(
{module(), atom()},
#{
importance => ?IMPORTANCE_HIDDEN,
'readOnly' => true,
default => {?MODULE, translate_builtin}
}
)},
{"data_dir",
sc(
string(),
#{
desc => ?DESC(builtin_data_dir),
mapping => "emqx_durable_storage.db_data_dir",
required => false,
importance => ?IMPORTANCE_MEDIUM
}
)},
{"n_shards",
sc(
pos_integer(),
#{
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_n_shards),
default => 16
}
)},
{"replication_factor",
sc(
pos_integer(),
#{
default => 3,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"egress",
sc(
ref("builtin_egress"),
#{
desc => ?DESC(builtin_egress),
importance => ?IMPORTANCE_MEDIUM
}
)},
{"layout",
sc(
hoconsc:union([
ref("layout_builtin_wildcard_optimized"), ref("layout_builtin_reference")
]),
#{
desc => ?DESC(builtin_layout),
importance => ?IMPORTANCE_HIDDEN,
default =>
#{
<<"type">> => wildcard_optimized
}
}
)}
];
fields("builtin_egress") ->
[
{"max_items",
sc(
pos_integer(),
#{
default => 1000,
mapping => "emqx_durable_storage.egress_batch_size",
importance => ?IMPORTANCE_HIDDEN
}
)},
{"flush_interval",
sc(
emqx_schema:timeout_duration_ms(),
#{
default => 100,
mapping => "emqx_durable_storage.egress_flush_interval",
importance => ?IMPORTANCE_HIDDEN
}
)}
];
fields("layout_builtin_wildcard_optimized") ->
[
{"type",
sc(
wildcard_optimized,
#{
desc => ?DESC(layout_wildcard_optimized),
'readOnly' => true,
default => wildcard_optimized
}
)},
{"bits_per_topic_level",
sc(
range(1, 64),
#{
default => 64,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"epoch_bits",
sc(
range(0, 64),
#{
default => 10,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(wildcard_optimized_epoch_bits)
}
)},
{"topic_index_bytes",
sc(
pos_integer(),
#{
default => 4,
importance => ?IMPORTANCE_HIDDEN
}
)}
];
fields("layout_builtin_reference") ->
[
{"type",
sc(
reference,
#{'readOnly' => true}
)}
].

desc(_) ->
undefined.

%%================================================================================
%% Internal functions
%%================================================================================

ds_schema(Options) ->
sc(
hoconsc:union([
ref("builtin")
| emqx_schema_hooks:injection_point('session_persistence.storage_backends', [])
]),
Options
).

sc(Type, Meta) -> hoconsc:mk(Type, Meta).

ref(StructName) -> hoconsc:ref(?MODULE, StructName).
22 changes: 4 additions & 18 deletions apps/emqx/src/emqx_persistent_message.erl
Expand Up @@ -52,31 +52,17 @@ is_persistence_enabled() ->

-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
storage_backend(emqx_config:get([session_persistence, storage])).
storage_backend([durable_storage, messages]).

%% Dev-only option: force all messages to go through
%% `emqx_persistent_session_ds':
-spec force_ds() -> boolean().
force_ds() ->
emqx_config:get([session_persistence, force_persistence]).

storage_backend(#{
builtin := #{
enable := true,
n_shards := NShards,
replication_factor := ReplicationFactor
}
}) ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => NShards,
replication_factor => ReplicationFactor
};
storage_backend(#{
fdb := #{enable := true} = FDBConfig
}) ->
FDBConfig#{backend => fdb}.
storage_backend(Path) ->
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
apply(Module, Function, [ConfigTree]).

%%--------------------------------------------------------------------

Expand Down
30 changes: 8 additions & 22 deletions apps/emqx/src/emqx_schema.erl
Expand Up @@ -252,6 +252,11 @@ roots(medium) ->
sc(
ref("overload_protection"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"durable_storage",
sc(
ref("durable_storage"),
#{importance => ?IMPORTANCE_HIDDEN}
)}
];
roots(low) ->
Expand Down Expand Up @@ -1700,16 +1705,6 @@ fields("session_persistence") ->
default => false
}
)},
{"storage",
sc(
ref("session_storage_backend"), #{
desc => ?DESC(session_persistence_storage),
validator => fun validate_backend_enabled/1,
default => #{
<<"builtin">> => #{}
}
}
)},
{"max_batch_size",
sc(
pos_integer(),
Expand Down Expand Up @@ -1847,7 +1842,9 @@ fields("session_storage_backend_builtin") ->
importance => ?IMPORTANCE_HIDDEN
}
)}
].
];
fields("durable_storage") ->
emqx_ds_schema:schema().

mqtt_listener(Bind) ->
base_listener(Bind) ++
Expand Down Expand Up @@ -2123,17 +2120,6 @@ ensure_list(V) ->
filter(Opts) ->
[{K, V} || {K, V} <- Opts, V =/= undefined].

validate_backend_enabled(Config) ->
Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
case maps:to_list(Enabled) of
[{_Type, _BackendConfig}] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_enabled_backends};
_None = [] ->
{error, no_enabled_backend}
end.

%% @private This function defines the SSL opts which are commonly used by
%% SSL listener and client.
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/test/emqx_persistent_messages_SUITE.erl
Expand Up @@ -50,7 +50,7 @@ init_per_testcase(t_message_gc = TestCase, Config) ->
Opts = #{
extra_emqx_conf =>
"\n session_persistence.message_retention_period = 1s"
"\n session_persistence.storage.builtin.n_shards = 3"
"\n durable_storage.messages.n_shards = 3"
},
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
init_per_testcase(TestCase, Config) ->
Expand Down

0 comments on commit c2f3f28

Please sign in to comment.