Skip to content

Commit

Permalink
Merge 07aa708 into 018d79b
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed May 9, 2024
2 parents 018d79b + 07aa708 commit 0974ed6
Show file tree
Hide file tree
Showing 19 changed files with 1,008 additions and 379 deletions.
2 changes: 1 addition & 1 deletion apps/emqx/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{ra, "2.7.3"}
]}.

Expand Down
1 change: 1 addition & 0 deletions apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ which_dbs() ->
init({#?db_sup{db = DB}, DefaultOpts}) ->
%% Spec for the top-level supervisor for the database:
logger:notice("Starting DS DB ~p", [DB]),
emqx_ds_builtin_sup:clean_gvars(DB),
emqx_ds_builtin_metrics:init_for_db(DB),
Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
ok = start_ra_system(DB, Opts),
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ prometheus_per_db(NodeOrAggr) ->
%% ...
%% '''
%%
%% If `NodeOrAggr' = `node' then node name is appended to the list of
%% If `NodeOrAggr' = `aggr' then node name is appended to the list of
%% labels.
prometheus_per_db(NodeOrAggr, DB, Acc0) ->
Labels = [
Expand Down
31 changes: 30 additions & 1 deletion apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

%% API:
-export([start_db/2, stop_db/1]).
-export([set_gvar/3, get_gvar/3, clean_gvars/1]).

%% behavior callbacks:
-export([init/1]).
Expand All @@ -39,6 +40,13 @@
-define(top, ?MODULE).
-define(databases, emqx_ds_builtin_databases_sup).

-define(gvar_tab, emqx_ds_builtin_gvar).

-record(gvar, {
k :: {emqx_ds:db(), _Key},
v :: _Value
}).

%%================================================================================
%% API functions
%%================================================================================
Expand All @@ -61,11 +69,31 @@ stop_db(DB) ->
Pid when is_pid(Pid) ->
_ = supervisor:terminate_child(?databases, DB),
_ = supervisor:delete_child(?databases, DB),
ok;
clean_gvars(DB);
undefined ->
ok
end.

%% @doc Set a DB-global variable. Please don't abuse this API.
-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok.
set_gvar(DB, Key, Val) ->
ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}),
ok.

-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val.
get_gvar(DB, Key, Default) ->
case ets:lookup(?gvar_tab, {DB, Key}) of
[#gvar{v = Val}] ->
Val;
[] ->
Default
end.

-spec clean_gvars(emqx_ds:db()) -> ok.
clean_gvars(DB) ->
ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}),
ok.

%%================================================================================
%% behavior callbacks
%%================================================================================
Expand Down Expand Up @@ -96,6 +124,7 @@ init(?top) ->
type => supervisor,
shutdown => infinity
},
_ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]),
%%
SupFlags = #{
strategy => one_for_all,
Expand Down
14 changes: 13 additions & 1 deletion apps/emqx_durable_storage/src/emqx_ds_lts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
%% API:
-export([
trie_create/1, trie_create/0,
destroy/1,
trie_restore/2,
trie_update/2,
trie_copy_learned_paths/2,
topic_key/3,
match_topics/2,
Expand Down Expand Up @@ -116,10 +118,20 @@ trie_create(UserOpts) ->
trie_create() ->
trie_create(#{}).

-spec destroy(trie()) -> ok.
destroy(#trie{trie = Trie, stats = Stats}) ->
catch ets:delete(Trie),
catch ets:delete(Stats),
ok.

%% @doc Restore trie from a dump
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
trie_restore(Options, Dump) ->
Trie = trie_create(Options),
trie_update(trie_create(Options), Dump).

%% @doc Update a trie with a dump of operations (used for replication)
-spec trie_update(trie(), [{_Key, _Val}]) -> trie().
trie_update(Trie, Dump) ->
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(Trie, StateFrom, Token, StateTo)
Expand Down
104 changes: 79 additions & 25 deletions apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
update_iterator/3,
next/3,
delete_next/4,
shard_of_message/3
shard_of_message/3,
current_timestamp/2
]).

%% internal exports:
Expand Down Expand Up @@ -65,6 +66,7 @@
-export([
init/1,
apply/3,
tick/2,

snapshot_module/0
]).
Expand All @@ -86,6 +88,7 @@
]).

-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds_replication_layer.hrl").

%%================================================================================
Expand Down Expand Up @@ -161,6 +164,8 @@

-type timestamp_us() :: non_neg_integer().

-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}).

%%================================================================================
%% API functions
%%================================================================================
Expand Down Expand Up @@ -363,9 +368,16 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
end,
integer_to_binary(Hash).

-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok.
foreach_shard(DB, Fun) ->
lists:foreach(Fun, list_shards(DB)).

%% @doc Messages have been replicated up to this timestamp on the
%% local server
-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time().
current_timestamp(DB, Shard) ->
emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0).

%%================================================================================
%% behavior callbacks
%%================================================================================
Expand Down Expand Up @@ -490,7 +502,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:next(ShardId, Iter, BatchSize)
emqx_ds_storage_layer:next(
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
)
).

-spec do_delete_next_v4(
Expand All @@ -502,7 +516,13 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
) ->
emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()).
do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize).
emqx_ds_storage_layer:delete_next(
{DB, Shard},
Iter,
Selector,
BatchSize,
emqx_ds_replication_layer:current_timestamp(DB, Shard)
).

-spec do_add_generation_v2(emqx_ds:db()) -> no_return().
do_add_generation_v2(_DB) ->
Expand Down Expand Up @@ -672,50 +692,69 @@ apply(
?tag := ?BATCH,
?batch_messages := MessagesIn
},
#{db_shard := DBShard, latest := Latest} = State
#{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
) ->
%% NOTE
%% Unique timestamp tracking real time closely.
%% With microsecond granularity it should be nearly impossible for it to run
%% too far ahead than the real time clock.
{NLatest, Messages} = assign_timestamps(Latest, MessagesIn),
%% TODO
%% Batch is now reversed, but it should not make a lot of difference.
%% Even if it would be in order, it's still possible to write messages far away
%% in the past, i.e. when replica catches up with the leader. Storage layer
%% currently relies on wall clock time to decide if it's safe to iterate over
%% next epoch, this is likely wrong. Ideally it should rely on consensus clock
%% time instead.
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}),
{Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
NState = State#{latest := NLatest},
State = State0#{latest := Latest},
set_ts(DBShard, Latest),
%% TODO: Need to measure effects of changing frequency of `release_cursor`.
Effect = {release_cursor, RaftIdx, NState},
{NState, Result, Effect};
Effect = {release_cursor, RaftIdx, State},
{State, Result, Effect};
apply(
_RaftMeta,
#{?tag := add_generation, ?since := Since},
#{db_shard := DBShard, latest := Latest} = State
#{db_shard := DBShard, latest := Latest0} = State0
) ->
{Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
{Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
NState = State#{latest := NLatest},
{NState, Result};
State = State0#{latest := Latest},
set_ts(DBShard, Latest),
{State, Result};
apply(
_RaftMeta,
#{?tag := update_config, ?since := Since, ?config := Opts},
#{db_shard := DBShard, latest := Latest} = State
#{db_shard := DBShard, latest := Latest0} = State0
) ->
{Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
{Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
NState = State#{latest := NLatest},
{NState, Result};
State = State0#{latest := Latest},
{State, Result};
apply(
_RaftMeta,
#{?tag := drop_generation, ?generation := GenId},
#{db_shard := DBShard} = State
) ->
Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
{State, Result}.
{State, Result};
apply(
_RaftMeta,
#{?tag := storage_event, ?payload := CustomEvent, ?now := Now},
#{db_shard := DBShard, latest := Latest0} = State
) ->
Latest = max(Latest0, Now),
set_ts(DBShard, Latest),
?tp(
debug,
emqx_ds_replication_layer_storage_event,
#{
shard => DBShard, payload => CustomEvent, latest => Latest
}
),
Effects = handle_custom_event(DBShard, Latest, CustomEvent),
{State#{latest => Latest}, ok, Effects}.

-spec tick(integer(), ra_state()) -> ra_machine:effects().
tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
%% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
{Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest),
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
handle_custom_event(DBShard, Timestamp, tick).

assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, []).
Expand All @@ -730,7 +769,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
assign_timestamps(Latest + 1, Rest, [Message | Acc])
end;
assign_timestamps(Latest, [], Acc) ->
{Latest, Acc}.
{Latest, lists:reverse(Acc)}.

assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}.
Expand All @@ -748,3 +787,18 @@ timeus_to_timestamp(TimestampUs) ->

snapshot_module() ->
emqx_ds_replication_snapshot.

handle_custom_event(DBShard, Latest, Event) ->
try
Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event),
[{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events]
catch
EC:Err:Stacktrace ->
?tp(error, ds_storage_custom_even_fail, #{
EC => Err, stacktrace => Stacktrace, event => Event
}),
[]
end.

set_ts({DB, Shard}, TS) ->
emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), TS).
4 changes: 4 additions & 0 deletions apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@
%% drop_generation
-define(generation, 2).

%% custom events
-define(payload, 2).
-define(now, 3).

-endif.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

-module(emqx_ds_replication_layer_shard).

%% API:
-export([start_link/3]).

%% Static server configuration
Expand Down Expand Up @@ -325,7 +326,8 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard),
case ra:restart_server(DB, LocalServer) of
MutableConfig = #{tick_timeout => 100},
case ra:restart_server(DB, LocalServer, MutableConfig) of
{error, name_not_registered} ->
Bootstrap = true,
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
Expand All @@ -336,7 +338,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
],
ReplicationOpts
),
ok = ra:start_server(DB, #{
ok = ra:start_server(DB, MutableConfig#{
id => LocalServer,
uid => server_uid(DB, Shard),
cluster_name => ClusterName,
Expand Down

0 comments on commit 0974ed6

Please sign in to comment.