Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor schema for the durable storage #12562

Merged
merged 10 commits into from Mar 4, 2024
9 changes: 9 additions & 0 deletions apps/emqx/src/emqx_config.erl
Expand Up @@ -497,6 +497,15 @@ fill_defaults(RawConf, Opts) ->
).

-spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) ->
%% FIXME: kludge to prevent `emqx_config' module from filling in
%% the default values for backends and layouts. These records are
%% inside unions, and adding default values there will add
%% incompatible fields.
%%
%% Note: this function is called for each individual conf root, so
%% this clause only affects this particular subtree.
RawConf;
fill_defaults(SchemaMod, RawConf, Opts0) ->
Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
hocon_tconf:check_plain(
Expand Down
266 changes: 266 additions & 0 deletions apps/emqx/src/emqx_ds_schema.erl
@@ -0,0 +1,266 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

%% @doc Schema for EMQX_DS databases.
-module(emqx_ds_schema).

%% API:
-export([schema/0, translate_builtin/1]).

%% Behavior callbacks:
-export([fields/1, desc/1, namespace/0]).

-include("emqx_schema.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("hocon/include/hocon_types.hrl").

%%================================================================================
%% Type declarations
%%================================================================================

%%================================================================================
%% API
%%================================================================================

translate_builtin(#{
savonarola marked this conversation as resolved.
Show resolved Hide resolved
backend := builtin,
n_shards := NShards,
replication_factor := ReplFactor,
layout := Layout
}) ->
Storage =
case Layout of
#{
type := wildcard_optimized,
bits_per_topic_level := BitsPerTopicLevel,
epoch_bits := EpochBits,
topic_index_bytes := TIBytes
} ->
{emqx_ds_storage_bitfield_lts, #{
bits_per_topic_level => BitsPerTopicLevel,
topic_index_bytes => TIBytes,
epoch_bits => EpochBits
}};
#{type := reference} ->
{emqx_ds_storage_reference, #{}}
end,
#{
backend => builtin,
n_shards => NShards,
replication_factor => ReplFactor,
storage => Storage
}.

%%================================================================================
%% Behavior callbacks
%%================================================================================

namespace() ->
durable_storage.

schema() ->
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
[
{messages,
ds_schema(#{
default =>
#{
<<"backend">> => builtin
},
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(messages)
})}
].

fields(builtin) ->
%% Schema for the builtin backend:
[
{backend,
sc(
builtin,
#{
'readOnly' => true,
default => builtin,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_backend)
}
)},
{'_config_handler',
sc(
{module(), atom()},
#{
'readOnly' => true,
default => {?MODULE, translate_builtin},
importance => ?IMPORTANCE_HIDDEN
}
)},
{data_dir,
sc(
string(),
#{
mapping => "emqx_durable_storage.db_data_dir",
required => false,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_data_dir)
}
)},
{n_shards,
sc(
pos_integer(),
#{
default => 16,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_n_shards)
}
)},
{replication_factor,
sc(
pos_integer(),
#{
default => 3,
importance => ?IMPORTANCE_HIDDEN
ieQu1 marked this conversation as resolved.
Show resolved Hide resolved
}
)},
{local_write_buffer,
sc(
ref(builtin_local_write_buffer),
#{
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_local_write_buffer)
}
)},
{layout,
sc(
hoconsc:union(builtin_layouts()),
#{
desc => ?DESC(builtin_layout),
importance => ?IMPORTANCE_MEDIUM,
default =>
#{
<<"type">> => wildcard_optimized
}
}
)}
];
fields(builtin_local_write_buffer) ->
[
{max_items,
sc(
pos_integer(),
#{
default => 1000,
mapping => "emqx_durable_storage.egress_batch_size",
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_local_write_buffer_max_items)
}
)},
{flush_interval,
sc(
emqx_schema:timeout_duration_ms(),
#{
default => 100,
mapping => "emqx_durable_storage.egress_flush_interval",
importance => ?IMPORTANCE_MEDIUM,
ieQu1 marked this conversation as resolved.
Show resolved Hide resolved
desc => ?DESC(builtin_local_write_buffer_flush_interval)
}
)}
];
fields(layout_builtin_wildcard_optimized) ->
[
{type,
sc(
wildcard_optimized,
#{
'readOnly' => true,
default => wildcard_optimized,
desc => ?DESC(layout_builtin_wildcard_optimized_type)
}
)},
{bits_per_topic_level,
sc(
range(1, 64),
#{
default => 64,
importance => ?IMPORTANCE_HIDDEN
}
)},
{epoch_bits,
sc(
range(0, 64),
#{
default => 10,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(wildcard_optimized_epoch_bits)
}
)},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could turn that option into a time interval instead. Work on the consistent replication has so far shown that one of the most practical ways to deal with ordering guarantees is to just use microsecond-presicion timestamps for message keys, but changing precision will suddenly make this option backward-incompatible. One significant downside is that we won't be able to respect specified value exactly and need to tell the user what's the actual epoch size somehow.

Copy link
Member Author

@ieQu1 ieQu1 Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mentioned in the other PR that using microsecond-precision timestamp for serialization is fine, but it must be implemented in a way that does NOT affect neither EMQX application, NOR the storage layer. Otherwise we leak abstractions from the replication layer.

EMQX application owns the #message record, and assumes that the timestamp is in milliseconds. DS must store and restore message as is (at least type- and unit-wise), so changing replication layer in the future doesn't result in breakage.

There is no reason why the storage layer must be desined otherwise. I guess proper way to address this is to pass unique message id (which could be derived from the microsecond TS), from replication layer to the storage layer, instead of generating it in the storage layer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...Made it hidden for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

EMQX application owns the #message record, and assumes that the timestamp is in milliseconds.

That's why I mentioned message key and not emqx-level message timestamp.

NOR the storage layer.

Arguable. I do not honestly see why storage layer could not accommodate replication layer needs and handle timestamps with variable precision for the sake of overall simplicity and compactness.

Copy link
Member Author

@ieQu1 ieQu1 Feb 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguable. I do not honestly see why storage layer could not accommodate replication layer needs and handle timestamps with variable precision for the sake of overall simplicity and compactness.

Well, I would like to avoid it if possible, since it will, to some degree, break the layered model of the DS. It should be possible to replace the replication layer (if we come up with a new & faster replication protocol, for example), and reuse the storage layer modules. Storage layer may in theory implement some clever way of serializing messages that depends on the timestamp's fixed size.
It's all hypothetical, of course, but it illustrates why we should strive to keep the layers untangled.

I don't think it will lead to any complixity, to be honest. bitfield_lts module makes it quite trivial to map the serialization key next to the timestamp, so, in effect, one gets a microsecond timestamp in the RocksDB key. It's simply a matter of the API.

Sure, treating microseconds as a separate entity will make it impossible to configure bitfield_lts module to seek to an epoch with a microsecond precision, but I don't see it as a problem, because the top-level API (emqx_ds:make_iterator) won't be able to make use of it anyway.

{topic_index_bytes,
sc(
pos_integer(),
#{
default => 4,
importance => ?IMPORTANCE_HIDDEN
}
)}
];
fields(layout_builtin_reference) ->
[
{type,
sc(
reference,
#{
'readOnly' => true,
importance => ?IMPORTANCE_HIDDEN
}
)}
].

desc(builtin) ->
?DESC(builtin);
desc(builtin_local_write_buffer) ->
?DESC(builtin_local_write_buffer);
desc(layout_builtin_wildcard_optimized) ->
?DESC(layout_builtin_wildcard_optimized);
desc(_) ->
undefined.

%%================================================================================
%% Internal functions
%%================================================================================

ds_schema(Options) ->
sc(
hoconsc:union([
ref(builtin)
| emqx_schema_hooks:injection_point('durable_storage.backends', [])
]),
Options
).

-ifndef(TEST).
builtin_layouts() ->
[ref(layout_builtin_wildcard_optimized)].
-else.
builtin_layouts() ->
%% Reference layout stores everything in one stream, so it's not
%% suitable for production use. However, it's very simple and
%% produces a very predictabale replay order, which can be useful
%% for testing and debugging:
[ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
-endif.

sc(Type, Meta) -> hoconsc:mk(Type, Meta).

ref(StructName) -> hoconsc:ref(?MODULE, StructName).
22 changes: 4 additions & 18 deletions apps/emqx/src/emqx_persistent_message.erl
Expand Up @@ -52,31 +52,17 @@ is_persistence_enabled() ->

-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
storage_backend(emqx_config:get([session_persistence, storage])).
storage_backend([durable_storage, messages]).

%% Dev-only option: force all messages to go through
%% `emqx_persistent_session_ds':
-spec force_ds() -> boolean().
force_ds() ->
emqx_config:get([session_persistence, force_persistence]).

storage_backend(#{
builtin := #{
enable := true,
n_shards := NShards,
replication_factor := ReplicationFactor
}
}) ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => NShards,
replication_factor => ReplicationFactor
};
storage_backend(#{
fdb := #{enable := true} = FDBConfig
}) ->
FDBConfig#{backend => fdb}.
storage_backend(Path) ->
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
keynslug marked this conversation as resolved.
Show resolved Hide resolved
apply(Module, Function, [ConfigTree]).

%%--------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_persistent_session_ds.erl
Expand Up @@ -733,7 +733,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) ->
fetch_new_messages([], Session, _ClientInfo) ->
Session;
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
BatchSize = emqx_config:get([session_persistence, max_batch_size]),
BatchSize = emqx_config:get([session_persistence, batch_size]),
case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
true ->
%% Buffer is full:
Expand Down