diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 195f116b1e..53e1ae1d5c 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -497,6 +497,12 @@ fill_defaults(RawConf, Opts) -> ). -spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map(). +fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) -> + %% FIXME: kludge to prevent `emqx_config' module from filling in + %% the default values for backends and layouts. These records are + %% inside unions, and adding default values there will add + %% incompatible fields. + RawConf; fill_defaults(SchemaMod, RawConf, Opts0) -> Opts = maps:merge(#{required => false, make_serializable => true}, Opts0), hocon_tconf:check_plain( diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl new file mode 100644 index 0000000000..25d1442184 --- /dev/null +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -0,0 +1,245 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Schema for EMQX_DS databases. +-module(emqx_ds_schema). + +%% API: +-export([schema/0, translate_builtin/1]). + +%% Behavior callbacks: +-export([fields/1, desc/1, namespace/0]). + +-include("emqx_schema.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("hocon/include/hocon_types.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%%================================================================================ +%% API +%%================================================================================ + +translate_builtin(#{ + backend := builtin, + n_shards := NShards, + replication_factor := ReplFactor, + layout := Layout +}) -> + Storage = + case Layout of + #{ + type := wildcard_optimized, + bits_per_topic_level := BitsPerTopicLevel, + epoch_bits := EpochBits, + topic_index_bytes := TIBytes + } -> + {emqx_ds_storage_bitfield_lts, #{ + bits_per_topic_level => BitsPerTopicLevel, + topic_index_bytes => TIBytes, + epoch_bits => EpochBits + }}; + #{type := reference} -> + {emqx_ds_storage_reference, #{}} + end, + #{ + backend => builtin, + n_shards => NShards, + replication_factor => ReplFactor, + storage => Storage + }. + +%%================================================================================ +%% Behavior callbacks +%%================================================================================ + +namespace() -> + durable_storage. + +schema() -> + [ + {"messages", + ds_schema(#{ + desc => ?DESC(messages), + importance => ?IMPORTANCE_HIDDEN, + default => + #{ + <<"backend">> => builtin + } + })} + ]. + +fields("builtin") -> + %% Schema for the builtin backend: + [ + {"backend", + sc( + builtin, + #{ + importance => ?IMPORTANCE_MEDIUM, + 'readOnly' => true, + default => builtin, + desc => ?DESC(builtin) + } + )}, + {"_config_handler", + sc( + {module(), atom()}, + #{ + importance => ?IMPORTANCE_HIDDEN, + 'readOnly' => true, + default => {?MODULE, translate_builtin} + } + )}, + {"data_dir", + sc( + string(), + #{ + desc => ?DESC(builtin_data_dir), + mapping => "emqx_durable_storage.db_data_dir", + required => false, + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"n_shards", + sc( + pos_integer(), + #{ + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(builtin_n_shards), + default => 16 + } + )}, + {"replication_factor", + sc( + pos_integer(), + #{ + default => 3, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"egress", + sc( + ref("builtin_egress"), + #{ + desc => ?DESC(builtin_egress), + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"layout", + sc( + hoconsc:union([ + ref("layout_builtin_wildcard_optimized"), ref("layout_builtin_reference") + ]), + #{ + desc => ?DESC(builtin_layout), + importance => ?IMPORTANCE_HIDDEN, + default => + #{ + <<"type">> => wildcard_optimized + } + } + )} + ]; +fields("builtin_egress") -> + [ + {"max_items", + sc( + pos_integer(), + #{ + default => 1000, + mapping => "emqx_durable_storage.egress_batch_size", + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"flush_interval", + sc( + emqx_schema:timeout_duration_ms(), + #{ + default => 100, + mapping => "emqx_durable_storage.egress_flush_interval", + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; +fields("layout_builtin_wildcard_optimized") -> + [ + {"type", + sc( + wildcard_optimized, + #{ + desc => ?DESC(layout_wildcard_optimized), + 'readOnly' => true, + default => wildcard_optimized + } + )}, + {"bits_per_topic_level", + sc( + range(1, 64), + #{ + default => 64, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"epoch_bits", + sc( + range(0, 64), + #{ + default => 10, + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(wildcard_optimized_epoch_bits) + } + )}, + {"topic_index_bytes", + sc( + pos_integer(), + #{ + default => 4, + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; +fields("layout_builtin_reference") -> + [ + {"type", + sc( + reference, + #{'readOnly' => true} + )} + ]. + +desc(_) -> + undefined. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ds_schema(Options) -> + sc( + hoconsc:union([ + ref("builtin") + | emqx_schema_hooks:injection_point('session_persistence.storage_backends', []) + ]), + Options + ). + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). + +ref(StructName) -> hoconsc:ref(?MODULE, StructName). diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index b178a742c0..9787dfd9a8 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -52,7 +52,7 @@ is_persistence_enabled() -> -spec storage_backend() -> emqx_ds:create_db_opts(). storage_backend() -> - storage_backend(emqx_config:get([session_persistence, storage])). + storage_backend([durable_storage, messages]). %% Dev-only option: force all messages to go through %% `emqx_persistent_session_ds': @@ -60,23 +60,9 @@ storage_backend() -> force_ds() -> emqx_config:get([session_persistence, force_persistence]). -storage_backend(#{ - builtin := #{ - enable := true, - n_shards := NShards, - replication_factor := ReplicationFactor - } -}) -> - #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => NShards, - replication_factor => ReplicationFactor - }; -storage_backend(#{ - fdb := #{enable := true} = FDBConfig -}) -> - FDBConfig#{backend => fdb}. +storage_backend(Path) -> + ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path), + apply(Module, Function, [ConfigTree]). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d7c477dea8..300b9decb3 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -252,6 +252,11 @@ roots(medium) -> sc( ref("overload_protection"), #{importance => ?IMPORTANCE_HIDDEN} + )}, + {"durable_storage", + sc( + ref("durable_storage"), + #{importance => ?IMPORTANCE_HIDDEN} )} ]; roots(low) -> @@ -1700,16 +1705,6 @@ fields("session_persistence") -> default => false } )}, - {"storage", - sc( - ref("session_storage_backend"), #{ - desc => ?DESC(session_persistence_storage), - validator => fun validate_backend_enabled/1, - default => #{ - <<"builtin">> => #{} - } - } - )}, {"max_batch_size", sc( pos_integer(), @@ -1847,7 +1842,9 @@ fields("session_storage_backend_builtin") -> importance => ?IMPORTANCE_HIDDEN } )} - ]. + ]; +fields("durable_storage") -> + emqx_ds_schema:schema(). mqtt_listener(Bind) -> base_listener(Bind) ++ @@ -2123,17 +2120,6 @@ ensure_list(V) -> filter(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined]. -validate_backend_enabled(Config) -> - Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config), - case maps:to_list(Enabled) of - [{_Type, _BackendConfig}] -> - ok; - _Conflicts = [_ | _] -> - {error, multiple_enabled_backends}; - _None = [] -> - {error, no_enabled_backend} - end. - %% @private This function defines the SSL opts which are commonly used by %% SSL listener and client. -spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema(). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 0ca1daa1c3..14989bdd81 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -50,7 +50,7 @@ init_per_testcase(t_message_gc = TestCase, Config) -> Opts = #{ extra_emqx_conf => "\n session_persistence.message_retention_period = 1s" - "\n session_persistence.storage.builtin.n_shards = 3" + "\n durable_storage.messages.n_shards = 3" }, common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts); init_per_testcase(TestCase, Config) -> diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon new file mode 100644 index 0000000000..ec69dca4ee --- /dev/null +++ b/rel/i18n/emqx_ds_schema.hocon @@ -0,0 +1,35 @@ +emqx_ds_schema { + +messages.desc: +"""Configuration related to the durable storage of MQTT messages.""" + +builtin.desc: +"""Builtin session storage backend utilizing embedded RocksDB key-value store.""" + +builtin_data_dir.desc: +"""File system directory where the database is located.""" + +builtin_n_shards.desc: +"""The builtin durable storage partitions data into shards. +This configuration parameter defines the number of shards. +Please note that it takes effect only during the initialization of the durable storage database. +Changing this configuration parameter after the database has been already created won't take any effect.""" + +builtin_egress.desc: +"""Configuration related to the buffering outcoming messages to the shards.""" + +builtin_layout.desc: +"""Storage layout is a method of arranging messages from various topics and clients on disc. + +Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficency of the replay.""" + + +layout_wildcard_optimized.desc: +"""_Wildcard-optimized_ layout is designed to maximize the throughput of the wildcard subscriptions covering large numbers of topics.""" + +wildcard_optimized_epoch_bits.desc: +"""Wildcard-optimized layout partitions messages recorded at different times into "epochs". +Each epoch can be consumed by the subscribers as a batch. +Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency.""" + +}