From 8907e5afb38536caef101f6eef3b620144a53076 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 21 Feb 2024 16:29:57 +0100 Subject: [PATCH 01/10] chore(sessds): Remove deprecated schema --- apps/emqx/src/emqx_schema.erl | 115 ---------------------------------- rel/i18n/emqx_schema.hocon | 83 ------------------------ 2 files changed, 198 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 33cf4c2135..f1849ff5ee 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -295,16 +295,6 @@ roots(low) -> converter => fun flapping_detect_converter/2 } )}, - {persistent_session_store, - sc( - ref("persistent_session_store"), - #{ - %% NOTE - %% Due to some quirks in interaction between `emqx_config` and - %% `hocon_tconf`, schema roots cannot currently be deprecated. - importance => ?IMPORTANCE_HIDDEN - } - )}, {session_persistence, sc( ref("session_persistence"), @@ -324,111 +314,6 @@ roots(low) -> )} ]. -fields("persistent_session_store") -> - Deprecated = #{deprecated => {since, "5.4.0"}}, - [ - {"enabled", - sc( - boolean(), - Deprecated#{ - default => false, - %% TODO(5.2): change field name to 'enable' and keep 'enabled' as an alias - aliases => [enable], - desc => ?DESC(persistent_session_store_enabled) - } - )}, - {"ds", - sc( - boolean(), - Deprecated#{ - default => false, - importance => ?IMPORTANCE_HIDDEN - } - )}, - {"on_disc", - sc( - boolean(), - Deprecated#{ - default => true, - desc => ?DESC(persistent_store_on_disc) - } - )}, - {"ram_cache", - sc( - boolean(), - Deprecated#{ - default => false, - desc => ?DESC(persistent_store_ram_cache) - } - )}, - {"backend", - sc( - hoconsc:union([ref("persistent_session_builtin")]), - Deprecated#{ - default => #{ - <<"type">> => <<"builtin">>, - <<"session">> => - #{<<"ram_cache">> => true}, - <<"session_messages">> => - #{<<"ram_cache">> => true}, - <<"messages">> => - #{<<"ram_cache">> => false} - }, - desc => ?DESC(persistent_session_store_backend) - } - )}, - {"max_retain_undelivered", - sc( - duration(), - Deprecated#{ - default => <<"1h">>, - desc => ?DESC(persistent_session_store_max_retain_undelivered) - } - )}, - {"message_gc_interval", - sc( - duration(), - Deprecated#{ - default => <<"1h">>, - desc => ?DESC(persistent_session_store_message_gc_interval) - } - )}, - {"session_message_gc_interval", - sc( - duration(), - Deprecated#{ - default => <<"1m">>, - desc => ?DESC(persistent_session_store_session_message_gc_interval) - } - )} - ]; -fields("persistent_table_mria_opts") -> - [ - {"ram_cache", - sc( - boolean(), - #{ - default => true, - desc => ?DESC(persistent_store_ram_cache) - } - )} - ]; -fields("persistent_session_builtin") -> - [ - {"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})}, - {"session", - sc(ref("persistent_table_mria_opts"), #{ - desc => ?DESC(persistent_session_builtin_session_table) - })}, - {"session_messages", - sc(ref("persistent_table_mria_opts"), #{ - desc => ?DESC(persistent_session_builtin_sess_msg_table) - })}, - {"messages", - sc(ref("persistent_table_mria_opts"), #{ - desc => ?DESC(persistent_session_builtin_messages_table) - })} - ]; fields("stats") -> [ {"enable", diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 1d795783b8..039c540b6b 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -148,12 +148,6 @@ mqtt_max_subscriptions.desc: mqtt_max_subscriptions.label: """Max Subscriptions""" -persistent_session_builtin_messages_table.desc: -"""Performance tuning options for built-in messages table.""" - -persistent_session_builtin_messages_table.label: -"""Persistent messages""" - sysmon_os_cpu_low_watermark.desc: """The threshold, as percentage of system CPU load, for how much system cpu can be used before the corresponding alarm is cleared. Disabled on Windows platform""" @@ -370,12 +364,6 @@ sysmon_top_num_items.desc: sysmon_top_num_items.label: """Top num items""" -persistent_session_builtin_session_table.desc: -"""Performance tuning options for built-in session table.""" - -persistent_session_builtin_session_table.label: -"""Persistent session""" - mqtt_upgrade_qos.desc: """Force upgrade of QoS level according to subscription.""" @@ -518,14 +506,6 @@ mqtt_max_inflight.desc: mqtt_max_inflight.label: """Max Inflight""" -persistent_session_store_enabled.desc: -"""Use the database to store information about persistent sessions. -This makes it possible to migrate a client connection to another -cluster node if a node is stopped.""" - -persistent_session_store_enabled.label: -"""Enable persistent session store""" - fields_deflate_opts_level.desc: """Compression level.""" @@ -544,14 +524,6 @@ fields_mqtt_quic_listener_load_balancing_mode.desc: fields_mqtt_quic_listener_load_balancing_mode.label: """Load balancing mode""" -persistent_session_store_session_message_gc_interval.desc: -"""The starting interval for garbage collection of transient data for -persistent session messages. This does not affect the lifetime length -of persistent session messages.""" - -persistent_session_store_session_message_gc_interval.label: -"""Session message GC interval""" - server_ssl_opts_schema_ocsp_refresh_http_timeout.desc: """The timeout for the HTTP request when checking OCSP responses.""" @@ -612,12 +584,6 @@ broker_session_locking_strategy.desc: - `quorum`: select some nodes to lock the session - `all`: lock the session on all the nodes in the cluster""" -persistent_store_ram_cache.desc: -"""Maintain a copy of the data in RAM for faster access.""" - -persistent_store_ram_cache.label: -"""RAM cache""" - fields_mqtt_quic_listener_stream_recv_window_default.desc: """Initial stream receive window size. Default: 32678""" @@ -834,14 +800,6 @@ force_shutdown_max_heap_size.desc: force_shutdown_max_heap_size.label: """Total heap size""" -persistent_store_on_disc.desc: -"""Save information about the persistent sessions on disc. -If this option is enabled, persistent sessions will survive full restart of the cluster. -Otherwise, all the data will be stored in RAM, and it will be lost when all the nodes in the cluster are stopped.""" - -persistent_store_on_disc.label: -"""Persist on disc""" - mqtt_ignore_loop_deliver.desc: """Whether the messages sent by the MQTT v3.1.1/v3.1.0 client will be looped back to the publisher itself, similar to No Local in MQTT 5.0.""" @@ -1051,13 +1009,6 @@ base_listener_limiter.desc: base_listener_limiter.label: """Type of the rate limit.""" -persistent_session_store_backend.desc: -"""Database management system used to store information about persistent sessions and messages. -- `builtin`: Use the embedded database (mria)""" - -persistent_session_store_backend.label: -"""Backend""" - alarm_validity_period.desc: """Retention time of deactivated alarms. Alarms are not deleted immediately when deactivated, but after the retention time.""" @@ -1095,14 +1046,6 @@ To disable this feature, input "" in the text box below. Only appli mqtt_response_information.label: """Response Information""" -persistent_session_store_max_retain_undelivered.desc: -"""The time messages that was not delivered to a persistent session -is stored before being garbage collected if the node the previous -session was handled on restarts of is stopped.""" - -persistent_session_store_max_retain_undelivered.label: -"""Max retain undelivered""" - fields_mqtt_quic_listener_migration_enabled.desc: """Enable clients to migrate IP addresses and tuples. Requires a cooperative load-balancer, or no load-balancer. Default: 1 (Enabled)""" @@ -1199,12 +1142,6 @@ until the subscriber disconnects. - `local`: send to a random local subscriber. If local subscriber was not found, send to a random subscriber cluster-wide""" -persistent_session_builtin_sess_msg_table.desc: -"""Performance tuning options for built-in session messages table.""" - -persistent_session_builtin_sess_msg_table.label: -"""Persistent session messages""" - mqtt_mqueue_store_qos0.desc: """Specifies whether to store QoS 0 messages in the message queue while the connection is down but the session remains.""" @@ -1389,14 +1326,6 @@ Supported configurations are the following: mqtt_peer_cert_as_clientid.label: """Use Peer Certificate as Client ID""" -persistent_session_store_message_gc_interval.desc: -"""The starting interval for garbage collection of undelivered messages to -a persistent session. This affects how often the "max_retain_undelivered" -is checked for removal.""" - -persistent_session_store_message_gc_interval.label: -"""Message GC interval""" - broker_shared_dispatch_ack_enabled.desc: """Deprecated. This was designed to avoid dispatching messages to a shared-subscription session which has the client disconnected. @@ -1606,18 +1535,6 @@ session_persistence_enable.desc: """Use durable storage for client sessions persistence. If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.""" -session_persistence_storage.desc: -"""Durable storage backend to use for session persistence.""" - -session_storage_backend_enable.desc: -"""Enable this backend.""" - -session_builtin_n_shards.desc: -"""Number of shards used for storing the messages.""" - -session_storage_backend_builtin.desc: -"""Builtin session storage backend utilizing embedded RocksDB key-value store.""" - session_ds_session_gc_interval.desc: """The interval at which session garbage collection is executed for persistent sessions.""" From 24337ecec7ca66138cc4bec6edfe2c99da299693 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 21 Feb 2024 16:31:06 +0100 Subject: [PATCH 02/10] feat(sessds): Move config schema to a separate root --- apps/emqx/src/emqx_config.erl | 9 + apps/emqx/src/emqx_ds_schema.erl | 245 ++++++++++++++++++ apps/emqx/src/emqx_persistent_message.erl | 22 +- apps/emqx/src/emqx_schema.erl | 91 +------ .../test/emqx_persistent_messages_SUITE.erl | 2 +- rel/i18n/emqx_ds_schema.hocon | 35 +++ 6 files changed, 301 insertions(+), 103 deletions(-) create mode 100644 apps/emqx/src/emqx_ds_schema.erl create mode 100644 rel/i18n/emqx_ds_schema.hocon diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index a52db329ae..5147f2b6de 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -497,6 +497,15 @@ fill_defaults(RawConf, Opts) -> ). -spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map(). +fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) -> + %% FIXME: kludge to prevent `emqx_config' module from filling in + %% the default values for backends and layouts. These records are + %% inside unions, and adding default values there will add + %% incompatible fields. + %% + %% Note: this function is called for each individual conf root, so + %% this clause only affects this particular subtree. + RawConf; fill_defaults(SchemaMod, RawConf, Opts0) -> Opts = maps:merge(#{required => false, make_serializable => true}, Opts0), hocon_tconf:check_plain( diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl new file mode 100644 index 0000000000..46b1716d00 --- /dev/null +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -0,0 +1,245 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Schema for EMQX_DS databases. +-module(emqx_ds_schema). + +%% API: +-export([schema/0, translate_builtin/1]). + +%% Behavior callbacks: +-export([fields/1, desc/1, namespace/0]). + +-include("emqx_schema.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("hocon/include/hocon_types.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%%================================================================================ +%% API +%%================================================================================ + +translate_builtin(#{ + backend := builtin, + n_shards := NShards, + replication_factor := ReplFactor, + layout := Layout +}) -> + Storage = + case Layout of + #{ + type := wildcard_optimized, + bits_per_topic_level := BitsPerTopicLevel, + epoch_bits := EpochBits, + topic_index_bytes := TIBytes + } -> + {emqx_ds_storage_bitfield_lts, #{ + bits_per_topic_level => BitsPerTopicLevel, + topic_index_bytes => TIBytes, + epoch_bits => EpochBits + }}; + #{type := reference} -> + {emqx_ds_storage_reference, #{}} + end, + #{ + backend => builtin, + n_shards => NShards, + replication_factor => ReplFactor, + storage => Storage + }. + +%%================================================================================ +%% Behavior callbacks +%%================================================================================ + +namespace() -> + durable_storage. + +schema() -> + [ + {"messages", + ds_schema(#{ + desc => ?DESC(messages), + importance => ?IMPORTANCE_HIDDEN, + default => + #{ + <<"backend">> => builtin + } + })} + ]. + +fields("builtin") -> + %% Schema for the builtin backend: + [ + {"backend", + sc( + builtin, + #{ + importance => ?IMPORTANCE_MEDIUM, + 'readOnly' => true, + default => builtin, + desc => ?DESC(builtin) + } + )}, + {"_config_handler", + sc( + {module(), atom()}, + #{ + importance => ?IMPORTANCE_HIDDEN, + 'readOnly' => true, + default => {?MODULE, translate_builtin} + } + )}, + {"data_dir", + sc( + string(), + #{ + desc => ?DESC(builtin_data_dir), + mapping => "emqx_durable_storage.db_data_dir", + required => false, + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"n_shards", + sc( + pos_integer(), + #{ + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(builtin_n_shards), + default => 16 + } + )}, + {"replication_factor", + sc( + pos_integer(), + #{ + default => 3, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"egress", + sc( + ref("builtin_egress"), + #{ + desc => ?DESC(builtin_egress), + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"layout", + sc( + hoconsc:union([ + ref("layout_builtin_wildcard_optimized"), ref("layout_builtin_reference") + ]), + #{ + desc => ?DESC(builtin_layout), + importance => ?IMPORTANCE_HIDDEN, + default => + #{ + <<"type">> => wildcard_optimized + } + } + )} + ]; +fields("builtin_egress") -> + [ + {"max_items", + sc( + pos_integer(), + #{ + default => 1000, + mapping => "emqx_durable_storage.egress_batch_size", + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"flush_interval", + sc( + emqx_schema:timeout_duration_ms(), + #{ + default => 100, + mapping => "emqx_durable_storage.egress_flush_interval", + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; +fields("layout_builtin_wildcard_optimized") -> + [ + {"type", + sc( + wildcard_optimized, + #{ + desc => ?DESC(layout_wildcard_optimized), + 'readOnly' => true, + default => wildcard_optimized + } + )}, + {"bits_per_topic_level", + sc( + range(1, 64), + #{ + default => 64, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"epoch_bits", + sc( + range(0, 64), + #{ + default => 10, + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(wildcard_optimized_epoch_bits) + } + )}, + {"topic_index_bytes", + sc( + pos_integer(), + #{ + default => 4, + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; +fields("layout_builtin_reference") -> + [ + {"type", + sc( + reference, + #{'readOnly' => true} + )} + ]. + +desc(_) -> + undefined. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +ds_schema(Options) -> + sc( + hoconsc:union([ + ref("builtin") + | emqx_schema_hooks:injection_point('durable_storage.backends', []) + ]), + Options + ). + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). + +ref(StructName) -> hoconsc:ref(?MODULE, StructName). diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index b178a742c0..9787dfd9a8 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -52,7 +52,7 @@ is_persistence_enabled() -> -spec storage_backend() -> emqx_ds:create_db_opts(). storage_backend() -> - storage_backend(emqx_config:get([session_persistence, storage])). + storage_backend([durable_storage, messages]). %% Dev-only option: force all messages to go through %% `emqx_persistent_session_ds': @@ -60,23 +60,9 @@ storage_backend() -> force_ds() -> emqx_config:get([session_persistence, force_persistence]). -storage_backend(#{ - builtin := #{ - enable := true, - n_shards := NShards, - replication_factor := ReplicationFactor - } -}) -> - #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => NShards, - replication_factor => ReplicationFactor - }; -storage_backend(#{ - fdb := #{enable := true} = FDBConfig -}) -> - FDBConfig#{backend => fdb}. +storage_backend(Path) -> + ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path), + apply(Module, Function, [ConfigTree]). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f1849ff5ee..65034f9edd 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -254,6 +254,11 @@ roots(medium) -> sc( ref("overload_protection"), #{importance => ?IMPORTANCE_HIDDEN} + )}, + {"durable_storage", + sc( + ref("durable_storage"), + #{importance => ?IMPORTANCE_HIDDEN} )} ]; roots(low) -> @@ -1654,16 +1659,6 @@ fields("session_persistence") -> default => false } )}, - {"storage", - sc( - ref("session_storage_backend"), #{ - desc => ?DESC(session_persistence_storage), - validator => fun validate_backend_enabled/1, - default => #{ - <<"builtin">> => #{} - } - } - )}, {"max_batch_size", sc( pos_integer(), @@ -1739,69 +1734,8 @@ fields("session_persistence") -> } )} ]; -fields("session_storage_backend") -> - [ - {"builtin", - sc(ref("session_storage_backend_builtin"), #{ - desc => ?DESC(session_storage_backend_builtin), - required => {false, recursively} - })} - ] ++ emqx_schema_hooks:injection_point('session_persistence.storage_backends', []); -fields("session_storage_backend_builtin") -> - [ - {"enable", - sc( - boolean(), - #{ - desc => ?DESC(session_storage_backend_enable), - default => true - } - )}, - {"data_dir", - sc( - string(), - #{ - desc => ?DESC(session_builtin_data_dir), - mapping => "emqx_durable_storage.db_data_dir", - required => false, - importance => ?IMPORTANCE_LOW - } - )}, - {"n_shards", - sc( - pos_integer(), - #{ - desc => ?DESC(session_builtin_n_shards), - default => 16 - } - )}, - {"replication_factor", - sc( - pos_integer(), - #{ - default => 3, - importance => ?IMPORTANCE_HIDDEN - } - )}, - {"egress_batch_size", - sc( - pos_integer(), - #{ - default => 1000, - mapping => "emqx_durable_storage.egress_batch_size", - importance => ?IMPORTANCE_HIDDEN - } - )}, - {"egress_flush_interval", - sc( - timeout_duration_ms(), - #{ - default => 100, - mapping => "emqx_durable_storage.egress_flush_interval", - importance => ?IMPORTANCE_HIDDEN - } - )} - ]. +fields("durable_storage") -> + emqx_ds_schema:schema(). mqtt_listener(Bind) -> base_listener(Bind) ++ @@ -2077,17 +2011,6 @@ ensure_list(V) -> filter(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined]. -validate_backend_enabled(Config) -> - Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config), - case maps:to_list(Enabled) of - [{_Type, _BackendConfig}] -> - ok; - _Conflicts = [_ | _] -> - {error, multiple_enabled_backends}; - _None = [] -> - {error, no_enabled_backend} - end. - %% @private This function defines the SSL opts which are commonly used by %% SSL listener and client. -spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema(). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 0ca1daa1c3..14989bdd81 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -50,7 +50,7 @@ init_per_testcase(t_message_gc = TestCase, Config) -> Opts = #{ extra_emqx_conf => "\n session_persistence.message_retention_period = 1s" - "\n session_persistence.storage.builtin.n_shards = 3" + "\n durable_storage.messages.n_shards = 3" }, common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts); init_per_testcase(TestCase, Config) -> diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon new file mode 100644 index 0000000000..d80c086e92 --- /dev/null +++ b/rel/i18n/emqx_ds_schema.hocon @@ -0,0 +1,35 @@ +emqx_ds_schema { + +messages.desc: +"""Configuration related to the durable storage of MQTT messages.""" + +builtin.desc: +"""Builtin session storage backend utilizing embedded RocksDB key-value store.""" + +builtin_data_dir.desc: +"""File system directory where the database is located.""" + +builtin_n_shards.desc: +"""The builtin durable storage partitions data into shards. +This configuration parameter defines the number of shards. +Please note that it takes effect only during the initialization of the durable storage database. +Changing this configuration parameter after the database has been already created won't take any effect.""" + +builtin_egress.desc: +"""Configuration related to the buffering of messages from the local node to the shard leader.""" + +builtin_layout.desc: +"""Storage layout is a method of arranging messages from various topics and clients on disc. + +Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficency of the replay.""" + + +layout_wildcard_optimized.desc: +"""_Wildcard-optimized_ layout is designed to maximize the throughput of the wildcard subscriptions covering large numbers of topics.""" + +wildcard_optimized_epoch_bits.desc: +"""Wildcard-optimized layout partitions messages recorded at different times into "epochs". +Each epoch can be consumed by the subscribers as a batch. +Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency.""" + +} From 17ab3c636293325f563733882cee0f50f309d46b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 11:43:18 +0100 Subject: [PATCH 03/10] chore(ds_schema): Use atoms for record and field names --- apps/emqx/src/emqx_ds_schema.erl | 44 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 46b1716d00..4efeffc5f1 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -74,7 +74,7 @@ namespace() -> schema() -> [ - {"messages", + {messages, ds_schema(#{ desc => ?DESC(messages), importance => ?IMPORTANCE_HIDDEN, @@ -85,10 +85,10 @@ schema() -> })} ]. -fields("builtin") -> +fields(builtin) -> %% Schema for the builtin backend: [ - {"backend", + {backend, sc( builtin, #{ @@ -98,7 +98,7 @@ fields("builtin") -> desc => ?DESC(builtin) } )}, - {"_config_handler", + {'_config_handler', sc( {module(), atom()}, #{ @@ -107,7 +107,7 @@ fields("builtin") -> default => {?MODULE, translate_builtin} } )}, - {"data_dir", + {data_dir, sc( string(), #{ @@ -117,7 +117,7 @@ fields("builtin") -> importance => ?IMPORTANCE_MEDIUM } )}, - {"n_shards", + {n_shards, sc( pos_integer(), #{ @@ -126,7 +126,7 @@ fields("builtin") -> default => 16 } )}, - {"replication_factor", + {replication_factor, sc( pos_integer(), #{ @@ -134,18 +134,18 @@ fields("builtin") -> importance => ?IMPORTANCE_HIDDEN } )}, - {"egress", + {egress, sc( - ref("builtin_egress"), + ref(builtin_egress), #{ desc => ?DESC(builtin_egress), importance => ?IMPORTANCE_MEDIUM } )}, - {"layout", + {layout, sc( hoconsc:union([ - ref("layout_builtin_wildcard_optimized"), ref("layout_builtin_reference") + ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference) ]), #{ desc => ?DESC(builtin_layout), @@ -157,9 +157,9 @@ fields("builtin") -> } )} ]; -fields("builtin_egress") -> +fields(builtin_egress) -> [ - {"max_items", + {max_items, sc( pos_integer(), #{ @@ -168,7 +168,7 @@ fields("builtin_egress") -> importance => ?IMPORTANCE_HIDDEN } )}, - {"flush_interval", + {flush_interval, sc( emqx_schema:timeout_duration_ms(), #{ @@ -178,9 +178,9 @@ fields("builtin_egress") -> } )} ]; -fields("layout_builtin_wildcard_optimized") -> +fields(layout_builtin_wildcard_optimized) -> [ - {"type", + {type, sc( wildcard_optimized, #{ @@ -189,7 +189,7 @@ fields("layout_builtin_wildcard_optimized") -> default => wildcard_optimized } )}, - {"bits_per_topic_level", + {bits_per_topic_level, sc( range(1, 64), #{ @@ -197,7 +197,7 @@ fields("layout_builtin_wildcard_optimized") -> importance => ?IMPORTANCE_HIDDEN } )}, - {"epoch_bits", + {epoch_bits, sc( range(0, 64), #{ @@ -206,7 +206,7 @@ fields("layout_builtin_wildcard_optimized") -> desc => ?DESC(wildcard_optimized_epoch_bits) } )}, - {"topic_index_bytes", + {topic_index_bytes, sc( pos_integer(), #{ @@ -215,9 +215,9 @@ fields("layout_builtin_wildcard_optimized") -> } )} ]; -fields("layout_builtin_reference") -> +fields(layout_builtin_reference) -> [ - {"type", + {type, sc( reference, #{'readOnly' => true} @@ -234,7 +234,7 @@ desc(_) -> ds_schema(Options) -> sc( hoconsc:union([ - ref("builtin") + ref(builtin) | emqx_schema_hooks:injection_point('durable_storage.backends', []) ]), Options From 786e30056b0697aacd416d65205f9d56c2736233 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 11:55:23 +0100 Subject: [PATCH 04/10] docs(ds): Add labels to the i18n for the storage schema --- rel/i18n/emqx_ds_schema.hocon | 45 +++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon index d80c086e92..999c62f8ee 100644 --- a/rel/i18n/emqx_ds_schema.hocon +++ b/rel/i18n/emqx_ds_schema.hocon @@ -1,35 +1,50 @@ emqx_ds_schema { +messages.label: "MQTT message storage" messages.desc: -"""Configuration related to the durable storage of MQTT messages.""" + """~ + Configuration related to the durable storage of MQTT messages.~""" +builtin.label: "Builtin backend" builtin.desc: -"""Builtin session storage backend utilizing embedded RocksDB key-value store.""" + """~ + Builtin session storage backend utilizing embedded RocksDB key-value store.~""" +builtin_data_dir.label: "Database location" builtin_data_dir.desc: -"""File system directory where the database is located.""" + """~ + File system directory where the database is located.~""" +builtin_n_shards.label: "Number of shards" builtin_n_shards.desc: -"""The builtin durable storage partitions data into shards. -This configuration parameter defines the number of shards. -Please note that it takes effect only during the initialization of the durable storage database. -Changing this configuration parameter after the database has been already created won't take any effect.""" + """~ + The builtin durable storage partitions data into shards. + This configuration parameter defines the number of shards. + Please note that it takes effect only during the initialization of the durable storage database. + Changing this configuration parameter after the database has been already created won't take any effect.~""" +builtin_egress.label: "Egress configuration" builtin_egress.desc: -"""Configuration related to the buffering of messages from the local node to the shard leader.""" + """~ + Configuration related to the buffering of messages from the local node to the shard leader.~""" +builtin_layout.label: "Storage layout" builtin_layout.desc: -"""Storage layout is a method of arranging messages from various topics and clients on disc. - -Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficency of the replay.""" + """~ + Storage layout is a method of arranging messages from various topics and clients on disc. + Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficency of the replay.~""" +layout_wildcard_optimized.label: "Wildcard-optimized storage layout" layout_wildcard_optimized.desc: -"""_Wildcard-optimized_ layout is designed to maximize the throughput of the wildcard subscriptions covering large numbers of topics.""" + """~ + _Wildcard-optimized_ layout is designed to maximize the throughput of the wildcard subscriptions covering large numbers of topics.~""" +wildcard_optimized_epoch_bits.label: "Epoch size" wildcard_optimized_epoch_bits.desc: -"""Wildcard-optimized layout partitions messages recorded at different times into "epochs". -Each epoch can be consumed by the subscribers as a batch. -Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency.""" + """~ + Wildcard-optimized layout partitions messages recorded at different times into "epochs". + Each epoch can be consumed by the subscribers as a batch. + Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency.~""" } From 94b0ab983d1749d39ce04eaa38083f031cdb0429 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 12:36:09 +0100 Subject: [PATCH 05/10] docs(ds): Add descriptions for the builtin's egress config --- apps/emqx/src/emqx_ds_schema.erl | 8 +++++--- apps/emqx/src/emqx_schema.erl | 2 +- rel/i18n/emqx_ds_schema.hocon | 11 +++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 4efeffc5f1..b1aff4a901 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -77,7 +77,7 @@ schema() -> {messages, ds_schema(#{ desc => ?DESC(messages), - importance => ?IMPORTANCE_HIDDEN, + importance => ?IMPORTANCE_MEDIUM, default => #{ <<"backend">> => builtin @@ -165,7 +165,8 @@ fields(builtin_egress) -> #{ default => 1000, mapping => "emqx_durable_storage.egress_batch_size", - importance => ?IMPORTANCE_HIDDEN + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(egress_max_items) } )}, {flush_interval, @@ -174,7 +175,8 @@ fields(builtin_egress) -> #{ default => 100, mapping => "emqx_durable_storage.egress_flush_interval", - importance => ?IMPORTANCE_HIDDEN + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(egress_flush_interval) } )} ]; diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 65034f9edd..6a2e37585f 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -258,7 +258,7 @@ roots(medium) -> {"durable_storage", sc( ref("durable_storage"), - #{importance => ?IMPORTANCE_HIDDEN} + #{importance => ?IMPORTANCE_MEDIUM} )} ]; roots(low) -> diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon index 999c62f8ee..452d462587 100644 --- a/rel/i18n/emqx_ds_schema.hocon +++ b/rel/i18n/emqx_ds_schema.hocon @@ -47,4 +47,15 @@ wildcard_optimized_epoch_bits.desc: Each epoch can be consumed by the subscribers as a batch. Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency.~""" +egress_max_items.label: "Max items" +egress_max_items.desc: + """~ + This configuration parameter defines maximum number of buffered messages stored in the egress buffer.~""" + +egress_flush_interval.label: "Flush interval" +egress_flush_interval.desc: + """~ + Maximum linger time for the buffered messages. + Egress buffer will be flushed _at least_ as often as `flush_interval`.~""" + } From 91ddbbcc3f3a46c555f0c7484b7afa24cc8bc3dc Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 12:37:14 +0100 Subject: [PATCH 06/10] fix(sessds): Replace min- and max- batch size with batch_size --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- apps/emqx/src/emqx_schema.erl | 22 ++++++++++-- rel/i18n/emqx_schema.hocon | 37 ++++++++++++++------ 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7494aca95f..2cbf65b472 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -733,7 +733,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) -> fetch_new_messages([], Session, _ClientInfo) -> Session; fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) -> - BatchSize = emqx_config:get([session_persistence, max_batch_size]), + BatchSize = emqx_config:get([session_persistence, batch_size]), case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of true -> %% Buffer is full: diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 6a2e37585f..3577656b77 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -258,7 +258,10 @@ roots(medium) -> {"durable_storage", sc( ref("durable_storage"), - #{importance => ?IMPORTANCE_MEDIUM} + #{ + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(durable_storage) + } )} ]; roots(low) -> @@ -1659,20 +1662,33 @@ fields("session_persistence") -> default => false } )}, + {"batch_size", + sc( + pos_integer(), + #{ + default => 100, + desc => ?DESC(session_ds_batch_size), + importance => ?IMPORTANCE_MEDIUM + } + )}, + %% Deprecated, now the replayer always use constant batch size: {"max_batch_size", sc( pos_integer(), #{ default => 100, - desc => ?DESC(session_ds_max_batch_size) + desc => ?DESC(session_ds_max_batch_size), + importance => ?IMPORTANCE_HIDDEN } )}, + %% Deprecated, now the replayer always use constant batch size: {"min_batch_size", sc( pos_integer(), #{ default => 100, - desc => ?DESC(session_ds_min_batch_size) + desc => ?DESC(session_ds_min_batch_size), + importance => ?IMPORTANCE_HIDDEN } )}, {"idle_poll_interval", diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 039c540b6b..c28076a189 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1531,29 +1531,46 @@ resource_tags.label: resource_tags.desc: """Tags to annotate this config entry.""" +session_persistence_enable.label: +"""Enable session persistence""" + session_persistence_enable.desc: """Use durable storage for client sessions persistence. -If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.""" +If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime. + +:::warning +This feature is currently experimental. Please don't enable it in the producation environments that contain valuable data. +:::""" + + +session_ds_session_gc_interval.label: +"""Session garbage collection interval""" session_ds_session_gc_interval.desc: """The interval at which session garbage collection is executed for persistent sessions.""" +session_ds_session_gc_batch_size.label: +"""Session garbage collection batch size""" + session_ds_session_gc_batch_size.desc: """The size of each batch of expired persistent sessions to be garbage collected per iteration.""" -session_ds_max_batch_size.desc: -"""This value affects the flow control for the persistent sessions. -The session queries the DB for the new messages in batches. -Size of the batch doesn't exceed this value or `ReceiveMaximum`, whichever is smaller.""" +session_ds_batch_size.label: +"""Batch size""" -session_ds_min_batch_size.desc: +session_ds_batch_size.desc: """This value affects the flow control for the persistent sessions. -The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller. +Persistent session queries the durable message storage in batches. +This value specifies size of the batch. + +Note: larger batches generally improve the throughput and overall performance of the system, but increase RAM usage per client.""" -`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" +durable_storage.label: +"""Durable storage""" -session_ds_message_retention_period.desc: -"""The minimum amount of time that messages should be retained for. After messages have been in storage for at least this period of time, they'll be dropped.""" +durable_storage.desc: +"""Configuration related to the EMQX durable storages. +EMQX uses durable storages to offload various data, such as MQTT messages, to disc.""" } From c18fc6a4bbad8da46648b648048f361803123d3f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 12:41:50 +0100 Subject: [PATCH 07/10] fix(sessds): Remove deprecated configuration parameters --- apps/emqx/src/emqx_schema.erl | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3577656b77..5d2459a81b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1671,26 +1671,6 @@ fields("session_persistence") -> importance => ?IMPORTANCE_MEDIUM } )}, - %% Deprecated, now the replayer always use constant batch size: - {"max_batch_size", - sc( - pos_integer(), - #{ - default => 100, - desc => ?DESC(session_ds_max_batch_size), - importance => ?IMPORTANCE_HIDDEN - } - )}, - %% Deprecated, now the replayer always use constant batch size: - {"min_batch_size", - sc( - pos_integer(), - #{ - default => 100, - desc => ?DESC(session_ds_min_batch_size), - importance => ?IMPORTANCE_HIDDEN - } - )}, {"idle_poll_interval", sc( timeout_duration(), From dd2e35345ffe71957173b2d4b0544a225d849798 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 13:41:05 +0100 Subject: [PATCH 08/10] docs(ds): Apply remarks --- rel/i18n/emqx_ds_schema.hocon | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon index 452d462587..7468746206 100644 --- a/rel/i18n/emqx_ds_schema.hocon +++ b/rel/i18n/emqx_ds_schema.hocon @@ -13,7 +13,9 @@ builtin.desc: builtin_data_dir.label: "Database location" builtin_data_dir.desc: """~ - File system directory where the database is located.~""" + File system directory where the database is located. + + By default it is equal to `node.data_dir`.~""" builtin_n_shards.label: "Number of shards" builtin_n_shards.desc: @@ -33,19 +35,32 @@ builtin_layout.desc: """~ Storage layout is a method of arranging messages from various topics and clients on disc. - Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficency of the replay.~""" + Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficiency of reading messages from the durable storage.~""" layout_wildcard_optimized.label: "Wildcard-optimized storage layout" layout_wildcard_optimized.desc: """~ - _Wildcard-optimized_ layout is designed to maximize the throughput of the wildcard subscriptions covering large numbers of topics.~""" + _Wildcard-optimized_ layout is designed to maximize the throughput of wildcard subscriptions covering large numbers of topics. + + For example, it can handle scenarios where a very large number of clients publish data to the topics containing their client ID, such as: `sensor/%device-version%/%clientid%/temperature`, `sensor/%device-version%/%clientid%/pressure`, etc. + This layout will automatically group such topics into a single stream, so a client subscribing to a topic filter containing wildcards (such as `sensor/+/+/temperature`) will be able to consume messages published by all devices as a single batch. + + This layout is efficient for non-wildcard subscriptions as well.~""" -wildcard_optimized_epoch_bits.label: "Epoch size" +wildcard_optimized_epoch_bits.label: "Epoch bits" wildcard_optimized_epoch_bits.desc: """~ Wildcard-optimized layout partitions messages recorded at different times into "epochs". - Each epoch can be consumed by the subscribers as a batch. - Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency.~""" + Reading messages from a single epoch can be done very efficiently, so larger epochs improve the throughput of subscribers, but may increase end-to-end latency + + Time span covered by each epoch grows exponentially with the value of `epoch_bits`: + + - `epoch_bits = 1`: epoch time = 1 millisecond + - `epoch_bits = 2`: 2 milliseconds + ... + - `epoch_bits = 10`: 1024 milliseconds + - `epoch_bits = 13`: ~8 seconds + ....~""" egress_max_items.label: "Max items" egress_max_items.desc: From e126393cbfc454f6ea61ff135ff895166fa81b51 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:56:41 +0100 Subject: [PATCH 09/10] fix(ds): Fix schema checker warnings for DS schema --- apps/emqx/src/emqx_ds_schema.erl | 69 ++++++++++++++++++++----------- apps/emqx/src/emqx_schema.erl | 8 ++-- changes/ce/fix-12562.en.md | 3 ++ rel/i18n/emqx_ds_schema.hocon | 55 +++++++++++++++--------- rel/i18n/emqx_schema.hocon | 2 +- scripts/spellcheck/dicts/emqx.txt | 2 + 6 files changed, 90 insertions(+), 49 deletions(-) create mode 100644 changes/ce/fix-12562.en.md diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index b1aff4a901..ef8300670d 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -76,12 +76,12 @@ schema() -> [ {messages, ds_schema(#{ - desc => ?DESC(messages), - importance => ?IMPORTANCE_MEDIUM, default => #{ <<"backend">> => builtin - } + }, + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(messages) })} ]. @@ -92,38 +92,38 @@ fields(builtin) -> sc( builtin, #{ - importance => ?IMPORTANCE_MEDIUM, 'readOnly' => true, default => builtin, - desc => ?DESC(builtin) + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(builtin_backend) } )}, {'_config_handler', sc( {module(), atom()}, #{ - importance => ?IMPORTANCE_HIDDEN, 'readOnly' => true, - default => {?MODULE, translate_builtin} + default => {?MODULE, translate_builtin}, + importance => ?IMPORTANCE_HIDDEN } )}, {data_dir, sc( string(), #{ - desc => ?DESC(builtin_data_dir), mapping => "emqx_durable_storage.db_data_dir", required => false, - importance => ?IMPORTANCE_MEDIUM + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(builtin_data_dir) } )}, {n_shards, sc( pos_integer(), #{ + default => 16, importance => ?IMPORTANCE_MEDIUM, - desc => ?DESC(builtin_n_shards), - default => 16 + desc => ?DESC(builtin_n_shards) } )}, {replication_factor, @@ -134,22 +134,20 @@ fields(builtin) -> importance => ?IMPORTANCE_HIDDEN } )}, - {egress, + {local_write_buffer, sc( - ref(builtin_egress), + ref(builtin_local_write_buffer), #{ - desc => ?DESC(builtin_egress), - importance => ?IMPORTANCE_MEDIUM + importance => ?IMPORTANCE_MEDIUM, + desc => ?DESC(builtin_local_write_buffer) } )}, {layout, sc( - hoconsc:union([ - ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference) - ]), + hoconsc:union(builtin_layouts()), #{ desc => ?DESC(builtin_layout), - importance => ?IMPORTANCE_HIDDEN, + importance => ?IMPORTANCE_MEDIUM, default => #{ <<"type">> => wildcard_optimized @@ -157,7 +155,7 @@ fields(builtin) -> } )} ]; -fields(builtin_egress) -> +fields(builtin_local_write_buffer) -> [ {max_items, sc( @@ -166,7 +164,7 @@ fields(builtin_egress) -> default => 1000, mapping => "emqx_durable_storage.egress_batch_size", importance => ?IMPORTANCE_MEDIUM, - desc => ?DESC(egress_max_items) + desc => ?DESC(builtin_local_write_buffer_max_items) } )}, {flush_interval, @@ -176,7 +174,7 @@ fields(builtin_egress) -> default => 100, mapping => "emqx_durable_storage.egress_flush_interval", importance => ?IMPORTANCE_MEDIUM, - desc => ?DESC(egress_flush_interval) + desc => ?DESC(builtin_local_write_buffer_flush_interval) } )} ]; @@ -186,9 +184,9 @@ fields(layout_builtin_wildcard_optimized) -> sc( wildcard_optimized, #{ - desc => ?DESC(layout_wildcard_optimized), 'readOnly' => true, - default => wildcard_optimized + default => wildcard_optimized, + desc => ?DESC(layout_builtin_wildcard_optimized_type) } )}, {bits_per_topic_level, @@ -222,10 +220,19 @@ fields(layout_builtin_reference) -> {type, sc( reference, - #{'readOnly' => true} + #{ + 'readOnly' => true, + importance => ?IMPORTANCE_HIDDEN + } )} ]. +desc(builtin) -> + ?DESC(builtin); +desc(builtin_local_write_buffer) -> + ?DESC(builtin_local_write_buffer); +desc(layout_builtin_wildcard_optimized) -> + ?DESC(layout_builtin_wildcard_optimized); desc(_) -> undefined. @@ -242,6 +249,18 @@ ds_schema(Options) -> Options ). +-ifndef(TEST). +builtin_layouts() -> + [ref(layout_builtin_wildcard_optimized)]. +-else. +builtin_layouts() -> + %% Reference layout stores everything in one stream, so it's not + %% suitable for production use. However, it's very simple and + %% produces a very predictabale replay order, which can be useful + %% for testing and debugging: + [ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)]. +-endif. + sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(StructName) -> hoconsc:ref(?MODULE, StructName). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 5d2459a81b..7889a13de8 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -255,9 +255,9 @@ roots(medium) -> ref("overload_protection"), #{importance => ?IMPORTANCE_HIDDEN} )}, - {"durable_storage", + {durable_storage, sc( - ref("durable_storage"), + ref(durable_storage), #{ importance => ?IMPORTANCE_MEDIUM, desc => ?DESC(durable_storage) @@ -1730,7 +1730,7 @@ fields("session_persistence") -> } )} ]; -fields("durable_storage") -> +fields(durable_storage) -> emqx_ds_schema:schema(). mqtt_listener(Bind) -> @@ -1985,6 +1985,8 @@ desc("crl_cache") -> "Global CRL cache options."; desc("session_persistence") -> "Settings governing durable sessions persistence."; +desc(durable_storage) -> + ?DESC(durable_storage); desc(_) -> undefined. diff --git a/changes/ce/fix-12562.en.md b/changes/ce/fix-12562.en.md new file mode 100644 index 0000000000..af16d1cc38 --- /dev/null +++ b/changes/ce/fix-12562.en.md @@ -0,0 +1,3 @@ +Add a new configuration root: `durable_storage`. + +This configuration tree contains the settings related to the new persistent session feature. diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon index 7468746206..89a276275e 100644 --- a/rel/i18n/emqx_ds_schema.hocon +++ b/rel/i18n/emqx_ds_schema.hocon @@ -10,25 +10,46 @@ builtin.desc: """~ Builtin session storage backend utilizing embedded RocksDB key-value store.~""" +builtin_backend.label: "Backend type" +builtin_backend.desc: + """~ + Built-in backend.~""" + builtin_data_dir.label: "Database location" builtin_data_dir.desc: """~ File system directory where the database is located. - By default it is equal to `node.data_dir`.~""" + By default, it is equal to `node.data_dir`.~""" builtin_n_shards.label: "Number of shards" builtin_n_shards.desc: """~ - The builtin durable storage partitions data into shards. + The built-in durable storage partitions data into shards. This configuration parameter defines the number of shards. Please note that it takes effect only during the initialization of the durable storage database. Changing this configuration parameter after the database has been already created won't take any effect.~""" -builtin_egress.label: "Egress configuration" -builtin_egress.desc: +builtin_local_write_buffer.label: "Local write buffer" +builtin_local_write_buffer.desc: + """~ + Configuration related to the buffering of messages sent from the local node to the shard leader. + + EMQX accumulates PUBLISH messages from the local clients in a write buffer before committing them to the durable storage. + This helps to hide network latency between EMQX nodes and improves write throughput.~""" + +builtin_local_write_buffer_max_items.label: "Max items" +builtin_local_write_buffer_max_items.desc: + """~ + This configuration parameter defines maximum number of messages stored in the local write buffer.~""" + +builtin_local_write_buffer_flush_interval.label: "Flush interval" +builtin_local_write_buffer_flush_interval.desc: """~ - Configuration related to the buffering of messages from the local node to the shard leader.~""" + Maximum linger time for the buffered messages. + Local write buffer will be flushed _at least_ as often as `flush_interval`. + + Larger values of `flush_interval` may lead to higher throughput and better overall performance, but may increase end-to-end latency.~""" builtin_layout.label: "Storage layout" builtin_layout.desc: @@ -37,8 +58,8 @@ builtin_layout.desc: Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficiency of reading messages from the durable storage.~""" -layout_wildcard_optimized.label: "Wildcard-optimized storage layout" -layout_wildcard_optimized.desc: +layout_builtin_wildcard_optimized.label: "Wildcard-optimized storage layout" +layout_builtin_wildcard_optimized.desc: """~ _Wildcard-optimized_ layout is designed to maximize the throughput of wildcard subscriptions covering large numbers of topics. @@ -47,11 +68,16 @@ layout_wildcard_optimized.desc: This layout is efficient for non-wildcard subscriptions as well.~""" +layout_builtin_wildcard_optimized_type.label: "Layout type" +layout_builtin_wildcard_optimized_type.desc: + """~ + Wildcard-optimized layout type.~""" + wildcard_optimized_epoch_bits.label: "Epoch bits" wildcard_optimized_epoch_bits.desc: """~ Wildcard-optimized layout partitions messages recorded at different times into "epochs". - Reading messages from a single epoch can be done very efficiently, so larger epochs improve the throughput of subscribers, but may increase end-to-end latency + Reading messages from a single epoch can be done very efficiently, so larger epochs improve the throughput of subscribers, but may increase end-to-end latency. Time span covered by each epoch grows exponentially with the value of `epoch_bits`: @@ -60,17 +86,6 @@ wildcard_optimized_epoch_bits.desc: ... - `epoch_bits = 10`: 1024 milliseconds - `epoch_bits = 13`: ~8 seconds - ....~""" - -egress_max_items.label: "Max items" -egress_max_items.desc: - """~ - This configuration parameter defines maximum number of buffered messages stored in the egress buffer.~""" - -egress_flush_interval.label: "Flush interval" -egress_flush_interval.desc: - """~ - Maximum linger time for the buffered messages. - Egress buffer will be flushed _at least_ as often as `flush_interval`.~""" + ...~""" } diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index c28076a189..0a0f71cfe3 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1539,7 +1539,7 @@ session_persistence_enable.desc: If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime. :::warning -This feature is currently experimental. Please don't enable it in the producation environments that contain valuable data. +This feature is currently experimental. Please don't enable it in the production environments that contain valuable data. :::""" diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index c2f5f54ef7..bb8bb397aa 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -284,9 +284,11 @@ TDengine clickhouse FormatType RocketMQ +RocksDB Keyspace OpenTSDB saml +storages idp ocpp OCPP From fe4c7cd2dc6152973605e1b9ab99d323c87e6772 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 23 Feb 2024 18:07:56 +0100 Subject: [PATCH 10/10] fix(ds): Apply review remarks, and hide certain fields --- apps/emqx/src/emqx_ds_schema.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index ef8300670d..5c552404e0 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -138,7 +138,7 @@ fields(builtin) -> sc( ref(builtin_local_write_buffer), #{ - importance => ?IMPORTANCE_MEDIUM, + importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(builtin_local_write_buffer) } )}, @@ -163,7 +163,7 @@ fields(builtin_local_write_buffer) -> #{ default => 1000, mapping => "emqx_durable_storage.egress_batch_size", - importance => ?IMPORTANCE_MEDIUM, + importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(builtin_local_write_buffer_max_items) } )}, @@ -173,7 +173,7 @@ fields(builtin_local_write_buffer) -> #{ default => 100, mapping => "emqx_durable_storage.egress_flush_interval", - importance => ?IMPORTANCE_MEDIUM, + importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(builtin_local_write_buffer_flush_interval) } )} @@ -202,7 +202,7 @@ fields(layout_builtin_wildcard_optimized) -> range(0, 64), #{ default => 10, - importance => ?IMPORTANCE_MEDIUM, + importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(wildcard_optimized_epoch_bits) } )},