From 9999ccd36c5d7f1a6dc6c9c26e89db5cfe7129f6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Apr 2024 17:39:31 +0200 Subject: [PATCH 01/14] feat(ds): Ignore safe cutoff time for streams without varying levels --- .../src/emqx_ds_storage_bitfield_lts.erl | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 2ec6674b60f..1cbdb92ee08 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -89,6 +89,7 @@ data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), + ts_bits :: non_neg_integer(), ts_offset :: non_neg_integer() }). @@ -213,7 +214,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> data = DataCF, trie = Trie, keymappers = KeymapperCache, - ts_offset = TSOffsetBits + ts_offset = TSOffsetBits, + ts_bits = TSBits }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -348,13 +350,39 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> - %% Compute safe cutoff time. - %% It's the point in time where the last complete epoch ends, so we need to know - %% the current time to compute it. +next( + Shard, + Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, + It = #{?topic_filter := TF, ?storage_key := Stream}, + BatchSize +) -> init_counters(), - Now = emqx_ds:timestamp_us(), - SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + %% Compute safe cutoff time. It's the point in time where the last + %% complete epoch ends, so we need to know the current time to + %% compute it. This is needed because new keys can be added before + %% the iterator. + IsWildcard = + case Stream of + {_StaticKey, []} -> false; + _ -> true + end, + SafeCutoffTime = + case IsWildcard of + true -> + Now = emqx_ds:timestamp_us(), + (Now bsr TSOffset) bsl TSOffset; + false -> + %% Iterators scanning streams without varying topic + %% levels can operate on incomplete epochs, since new + %% matching keys for the single topic are added in + %% lexicographic order. + %% + %% Note: this DOES NOT apply to non-wildcard topic + %% filters operating on streams with varying parts: + %% iterator can jump to the next topic and then it + %% won't backtrack. + 1 bsl TSBits - 1 + end, try next_until(Schema, It, SafeCutoffTime, BatchSize) after From bcfa7b2209170e32993e208732276220af3f0873 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 00:20:59 +0200 Subject: [PATCH 02/14] fix(ds): Destroy LTS tries when the generation is dropped --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 7 +++++++ .../src/emqx_ds_storage_bitfield_lts.erl | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index bd7cb3826ff..e087504de0a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -19,6 +19,7 @@ %% API: -export([ trie_create/1, trie_create/0, + destroy/1, trie_restore/2, trie_copy_learned_paths/2, topic_key/3, @@ -116,6 +117,12 @@ trie_create(UserOpts) -> trie_create() -> trie_create(#{}). +-spec destroy(trie()) -> ok. +destroy(#trie{trie = Trie, stats = Stats}) -> + catch ets:delete(Trie), + catch ets:delete(Stats), + ok. + %% @doc Restore trie from a dump -spec trie_restore(options(), [{_Key, _Val}]) -> trie(). trie_restore(Options, Dump) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 1cbdb92ee08..80264da792a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -240,7 +240,8 @@ post_creation_actions( s() ) -> ok. -drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> +drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> + emqx_ds_lts:destroy(Trie), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), ok = rocksdb:drop_column_family(DBHandle, DataCF), From 86d45522e328672787f0d9dddc02c75045a582b4 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 01:01:11 +0200 Subject: [PATCH 03/14] fix(dsrepl): Don't reverse elements of batches --- .../src/emqx_ds_replication_layer.erl | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 61126c164b3..3ff87ab44c7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -672,22 +672,15 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. - {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), - %% TODO - %% Batch is now reversed, but it should not make a lot of difference. - %% Even if it would be in order, it's still possible to write messages far away - %% in the past, i.e. when replica catches up with the leader. Storage layer - %% currently relies on wall clock time to decide if it's safe to iterate over - %% next epoch, this is likely wrong. Ideally it should rely on consensus clock - %% time instead. + {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), - NState = State#{latest := NLatest}, + NState = State#{latest := Latest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, {NState, Result, Effect}; @@ -730,7 +723,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) -> assign_timestamps(Latest + 1, Rest, [Message | Acc]) end; assign_timestamps(Latest, [], Acc) -> - {Latest, Acc}. + {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. From 8ac9700aabf2db79ac043e8091645c312b7167ea Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 16:44:43 +0200 Subject: [PATCH 04/14] feat(ds): Add an API for DB-global variables --- .../src/emqx_ds_builtin_db_sup.erl | 1 + .../src/emqx_ds_builtin_sup.erl | 30 ++++++++++++++++++- .../src/emqx_ds_replication_layer.erl | 1 + 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index ef160050027..06e925c1b01 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -118,6 +118,7 @@ which_dbs() -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_sup:clean_gvars(DB), emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 45e81bdc9ab..30b72e5a804 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -23,6 +23,7 @@ %% API: -export([start_db/2, stop_db/1]). +-export([set_gvar/3, get_gvar/3, clean_gvars/1]). %% behavior callbacks: -export([init/1]). @@ -39,6 +40,13 @@ -define(top, ?MODULE). -define(databases, emqx_ds_builtin_databases_sup). +-define(gvar_tab, emqx_ds_builtin_gvar). + +-record(gvar, { + k :: {emqx_ds:db(), _Key}, + v :: _Value +}). + %%================================================================================ %% API functions %%================================================================================ @@ -61,11 +69,30 @@ stop_db(DB) -> Pid when is_pid(Pid) -> _ = supervisor:terminate_child(?databases, DB), _ = supervisor:delete_child(?databases, DB), - ok; + clean_gvars(DB); undefined -> ok end. +%% @doc Set a DB-global variable. Please don't abuse this API. +-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. +set_gvar(DB, Key, Val) -> + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}). + +-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. +get_gvar(DB, Key, Default) -> + case ets:lookup(?gvar_tab, {DB, Key}) of + [#gvar{v = Val}] -> + Val; + [] -> + Default + end. + +-spec clean_gvars(emqx_ds:db()) -> ok. +clean_gvars(DB) -> + ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}), + ok. + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -96,6 +123,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, + ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 3ff87ab44c7..9f9f28676e8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -363,6 +363,7 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). +-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok. foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). From 1ff2e02fd9755cad044b8b59b42abafc7916c26e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 02:36:33 +0200 Subject: [PATCH 05/14] feat(ds): Pass current time to the storage layer via argument --- .../src/emqx_ds_replication_layer.erl | 6 +++-- .../src/emqx_ds_storage_bitfield_lts.erl | 13 +++++----- .../src/emqx_ds_storage_layer.erl | 26 ++++++++++++------- .../src/emqx_ds_storage_reference.erl | 8 +++--- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 16 ++++++++---- .../test/emqx_ds_test_helpers.erl | 26 ++++++++++++++----- 6 files changed, 61 insertions(+), 34 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9f9f28676e8..c79d60d07ac 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -491,7 +491,7 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize) + emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us()) ). -spec do_delete_next_v4( @@ -503,7 +503,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> - emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). + emqx_ds_storage_layer:delete_next( + {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us() + ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). do_add_generation_v2(_DB) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 80264da792a..5947b230040 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -34,8 +34,8 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5, + next/5, + delete_next/6, post_creation_actions/1 ]). @@ -354,8 +354,9 @@ update_iterator( next( Shard, Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, - It = #{?topic_filter := TF, ?storage_key := Stream}, - BatchSize + It = #{?storage_key := Stream}, + BatchSize, + Now ) -> init_counters(), %% Compute safe cutoff time. It's the point in time where the last @@ -370,7 +371,6 @@ next( SafeCutoffTime = case IsWildcard of true -> - Now = emqx_ds:timestamp_us(), (Now bsr TSOffset) bsl TSOffset; false -> %% Iterators scanning streams without varying topic @@ -415,12 +415,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, rocksdb:iterator_close(ITHandle) end. -delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. init_counters(), - Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, try delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 4981c3fc18b..36dc813e5d5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -31,8 +31,8 @@ make_iterator/4, make_delete_iterator/4, update_iterator/3, - next/3, - delete_next/4, + next/4, + delete_next/5, %% Generations update_config/3, @@ -223,9 +223,14 @@ ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer()) -> +-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) -> {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback delete_next( + shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> + {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. + -callback post_creation_actions(post_creation_context()) -> _Data. -optional_callbacks([post_creation_actions/1]). @@ -377,13 +382,13 @@ update_iterator( {error, unrecoverable, generation_not_found} end. --spec next(shard_id(), iterator(), pos_integer()) -> +-spec next(shard_id(), iterator(), pos_integer(), emqx_ds:time()) -> emqx_ds:next_result(iterator()). -next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> +next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize) of + case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of {ok, _GenIter, []} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: @@ -399,18 +404,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {error, unrecoverable, generation_not_found} end. --spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> +-spec delete_next( + shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> emqx_ds:delete_next_result(delete_iterator()). delete_next( Shard, Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0}, Selector, - BatchSize + BatchSize, + Now ) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of + case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 7aa54b9f366..3caf2c732d7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -37,8 +37,8 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5 + next/5, + delete_next/6 ]). %% internal exports: @@ -154,7 +154,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> last_seen_message_key = DSKey }}. -next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), Action = @@ -170,7 +170,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> It = It0#it{last_seen_message_key = Key}, {ok, It, lists:reverse(Messages)}. -delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) -> +delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> #delete_it{ topic_filter = TopicFilter, start_time = StartTime, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 78838e6758d..bb6d0f917d0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -73,13 +73,15 @@ t_iterate(_Config) -> begin [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), - {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), + {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next( + ?SHARD, It, 100, emqx_ds:timestamp_us() + ), Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], ?assertEqual( lists:map(fun integer_to_binary/1, Timestamps), payloads(Messages) ), - {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100) + {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100, emqx_ds:timestamp_us()) end || Topic <- Topics ], @@ -370,7 +372,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> F(It, 0) -> error({too_many_iterations, It}); F(It, N) -> - case emqx_ds_storage_layer:next(Shard, It, BatchSize) of + case emqx_ds_storage_layer:next(Shard, It, BatchSize, emqx_ds:timestamp_us()) of end_of_stream -> []; {ok, _NextIt, []} -> @@ -542,7 +544,11 @@ delete(_Shard, [], _Selector) -> delete(Shard, Iterators, Selector) -> {NewIterators0, N} = lists:foldl( fun(Iterator0, {AccIterators, NAcc}) -> - case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of + case + emqx_ds_storage_layer:delete_next( + Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us() + ) + of {ok, end_of_stream} -> {AccIterators, NAcc}; {ok, _Iterator1, 0} -> @@ -573,7 +579,7 @@ replay(_Shard, []) -> replay(Shard, Iterators) -> {NewIterators0, Messages0} = lists:foldl( fun(Iterator0, {AccIterators, AccMessages}) -> - case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of + case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of {ok, end_of_stream} -> {AccIterators, AccMessages}; {ok, _Iterator1, []} -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index be4f7bcdfc0..f5475223085 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -85,8 +85,14 @@ consume_stream(DB, Stream, TopicFilter, StartTime) -> consume_iter(DB, It) -> consume_iter(DB, It, #{}). -consume_iter(DB, It, Opts) -> - consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts). +consume_iter(DB, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds:next(DB, It, BatchSize) + end, + It0, + Opts + ). storage_consume(ShardId, TopicFilter) -> storage_consume(ShardId, TopicFilter, 0). @@ -108,16 +114,22 @@ storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) -> storage_consume_iter(ShardId, It) -> storage_consume_iter(ShardId, It, #{}). -storage_consume_iter(ShardId, It, Opts) -> - consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts). +storage_consume_iter(ShardId, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds_storage_layer:next(ShardId, It, BatchSize, emqx_ds:timestamp_us()) + end, + It0, + Opts + ). -consume_iter_with(NextFun, Args, It0, Opts) -> +consume_iter_with(NextFun, It0, Opts) -> BatchSize = maps:get(batch_size, Opts, 5), - case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of + case NextFun(It0, BatchSize) of {ok, It, _Msgs = []} -> {ok, It, []}; {ok, It1, Batch} -> - {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts), + {ok, It, Msgs} = consume_iter_with(NextFun, It1, Opts), {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs}; {ok, Eos = end_of_stream} -> {ok, Eos, []}; From b2a633aca13d8af9e8465cbd8f8cda22f11431e2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 19:27:01 +0200 Subject: [PATCH 06/14] fix(ds): Use leader's clock for computing LTS safe cutoff time --- .../src/emqx_ds_builtin_sup.erl | 5 +- .../src/emqx_ds_replication_layer.erl | 50 +++++++++++++++++-- .../src/emqx_ds_replication_layer.hrl | 3 ++ .../src/emqx_ds_replication_layer_shard.erl | 1 + .../src/emqx_ds_storage_bitfield_lts.erl | 42 +++++++++++++--- .../src/emqx_ds_storage_layer.erl | 29 +++++++++-- 6 files changed, 112 insertions(+), 18 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 30b72e5a804..97180535162 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -77,7 +77,8 @@ stop_db(DB) -> %% @doc Set a DB-global variable. Please don't abuse this API. -spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. set_gvar(DB, Key, Val) -> - ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}). + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}), + ok. -spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. get_gvar(DB, Key, Default) -> @@ -123,7 +124,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, - ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), + _ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index c79d60d07ac..90a26c4841a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -36,7 +36,8 @@ update_iterator/3, next/3, delete_next/4, - shard_of_message/3 + shard_of_message/3, + current_timestamp/2 ]). %% internal exports: @@ -65,6 +66,7 @@ -export([ init/1, apply/3, + tick/2, snapshot_module/0 ]). @@ -161,6 +163,8 @@ -type timestamp_us() :: non_neg_integer(). +-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}). + %%================================================================================ %% API functions %%================================================================================ @@ -367,6 +371,12 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). +%% @doc Messages have been replicated up to this timestamp on the +%% local server +-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time(). +current_timestamp(DB, Shard) -> + emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -491,7 +501,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us()) + emqx_ds_storage_layer:next( + ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) + ) ). -spec do_delete_next_v4( @@ -504,7 +516,11 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> emqx_ds_storage_layer:delete_next( - {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us() + {DB, Shard}, + Iter, + Selector, + BatchSize, + emqx_ds_replication_layer:current_timestamp(DB, Shard) ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). @@ -675,7 +691,7 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest0} = State + #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. @@ -686,6 +702,7 @@ apply( NState = State#{latest := Latest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, + emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest), {NState, Result, Effect}; apply( _RaftMeta, @@ -711,7 +728,20 @@ apply( #{db_shard := DBShard} = State ) -> Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), - {State, Result}. + {State, Result}; +apply( + _RaftMeta, + #{?tag := storage_event, ?payload := CustomEvent}, + #{db_shard := DBShard, latest := Latest0} = State +) -> + {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), + {State#{latest := Latest}, ok, Effects}. + +-spec tick(integer(), ra_state()) -> ra_machine:effects(). +tick(TimeMs, #{db_shard := DBShard, latest := Latest}) -> + {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), + handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). @@ -744,3 +774,13 @@ timeus_to_timestamp(TimestampUs) -> snapshot_module() -> emqx_ds_replication_snapshot. + +handle_custom_event(DBShard, Latest, Event) -> + try + Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event), + [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] + catch + EC:Err:Stacktrace -> + logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}), + [] + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 70812fa187d..9608241439d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -41,4 +41,7 @@ %% drop_generation -define(generation, 2). +%% custom events +-define(payload, 2). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index e0e70596a2b..ac495be1c8c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -16,6 +16,7 @@ -module(emqx_ds_replication_layer_shard). +%% API: -export([start_link/3]). %% Static server configuration diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 5947b230040..7342b097dda 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -36,7 +36,9 @@ update_iterator/4, next/5, delete_next/6, - post_creation_actions/1 + post_creation_actions/1, + + handle_event/4 ]). %% internal exports: @@ -90,7 +92,8 @@ trie :: emqx_ds_lts:trie(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), ts_bits :: non_neg_integer(), - ts_offset :: non_neg_integer() + ts_offset :: non_neg_integer(), + gvars :: ets:table() }). -type s() :: #s{}. @@ -142,6 +145,10 @@ -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). +%% GVar used for idle detection: +-define(IDLE_DETECT, idle_detect). +-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -215,7 +222,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> trie = Trie, keymappers = KeymapperCache, ts_offset = TSOffsetBits, - ts_bits = TSBits + ts_bits = TSBits, + gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}]) }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -240,8 +248,9 @@ post_creation_actions( s() ) -> ok. -drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> +drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> emqx_ds_lts:destroy(Trie), + catch ets:delete(GVars), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), ok = rocksdb:drop_column_family(DBHandle, DataCF), @@ -255,18 +264,21 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> +store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), - lists:foreach( - fun({Timestamp, Msg}) -> + MaxTs = lists:foldl( + fun({Timestamp, Msg}, Acc) -> {Key, _} = make_key(S, Timestamp, Msg), Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) + ok = rocksdb:put(DB, Data, Key, Val, []), + max(Acc, Timestamp) end, + 0, Messages ), Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), + ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to %% observe until there's `{no_slowdown, true}` in write options. @@ -469,6 +481,20 @@ delete_next_until( rocksdb:iterator_close(ITHandle) end. +handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> + %% Cause replication layer to bump timestamp when idle + case ets:lookup(Gvars, ?IDLE_DETECT) of + [{?IDLE_DETECT, false, LastWrittenTs}] when + ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time) + -> + ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), + [emqx_ds_storage_bitfield_lts_dummy_event]; + _ -> + [] + end; +handle_event(_ShardId, _Data, _Time, _Event) -> + []. + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 36dc813e5d5..fff3a77f36f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -42,7 +42,10 @@ %% Snapshotting take_snapshot/1, - accept_snapshot/1 + accept_snapshot/1, + + %% Custom events + handle_event/3 ]). %% gen_server @@ -79,7 +82,6 @@ %% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending %% records over the wire. - %% tags: -define(STREAM, 1). -define(IT, 2). @@ -201,6 +203,7 @@ -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> _Data. +%% Delete the schema and data -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. @@ -231,9 +234,11 @@ ) -> {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. +-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. + -callback post_creation_actions(post_creation_context()) -> _Data. --optional_callbacks([post_creation_actions/1]). +-optional_callbacks([post_creation_actions/1, handle_event/4]). %%================================================================================ %% API for the replication layer @@ -857,6 +862,24 @@ handle_accept_snapshot(ShardId) -> Dir = db_dir(ShardId), emqx_ds_storage_snapshot:new_writer(Dir). +%% FIXME: currently this interface is a hack to handle safe cutoff +%% timestamp in LTS. It has many shortcomings (can lead to infinite +%% loops if the CBM is not careful; events from one generation may be +%% sent to the next one, etc.) and the API is not well thought out in +%% general. +%% +%% The mechanism of storage layer events should be refined later. +-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +handle_event(Shard, Time, Event) -> + #{module := Mod, data := GenData} = generation_at(Shard, Time), + ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), + case erlang:function_exported(Mod, handle_event, 4) of + true -> + Mod:handle_event(Shard, GenData, Time, Event); + false -> + [] + end. + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- From 3642bcd1b6d5967dfa31940c1273aaf54e0ea51b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 26 Apr 2024 01:03:35 +0200 Subject: [PATCH 07/14] docs(ds): Fix comment for the builtin DS metrics --- apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index ce984db57d4..763d3860656 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -190,7 +190,7 @@ prometheus_per_db(NodeOrAggr) -> %% ... %% ''' %% -%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% If `NodeOrAggr' = `aggr' then node name is appended to the list of %% labels. prometheus_per_db(NodeOrAggr, DB, Acc0) -> Labels = [ From 68ca891f410b2b165f1ae8d08ee5de1e666ed45a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 6 May 2024 11:20:57 +0200 Subject: [PATCH 08/14] test(ds): Use streams to create traffic --- .../src/emqx_ds_replication_layer.erl | 51 ++- .../src/emqx_ds_replication_layer_shard.erl | 5 +- .../src/emqx_ds_storage_bitfield_lts.erl | 14 +- .../src/emqx_ds_storage_layer.erl | 7 + .../test/emqx_ds_replication_SUITE.erl | 407 +++++++++++++----- .../test/emqx_ds_test_helpers.erl | 2 + apps/emqx_utils/src/emqx_utils_stream.erl | 64 ++- .../test/emqx_utils_stream_tests.erl | 8 + 8 files changed, 425 insertions(+), 133 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 90a26c4841a..315a276ad53 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -88,6 +88,7 @@ ]). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include("emqx_ds_replication_layer.hrl"). %%================================================================================ @@ -691,37 +692,39 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State + #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> %% NOTE %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), - NState = State#{latest := Latest}, + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), %% TODO: Need to measure effects of changing frequency of `release_cursor`. - Effect = {release_cursor, RaftIdx, NState}, - emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest), - {NState, Result, Effect}; + Effect = {release_cursor, RaftIdx, State}, + {State, Result, Effect}; apply( _RaftMeta, #{?tag := add_generation, ?since := Since}, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State0 ) -> - {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), - NState = State#{latest := NLatest}, - {NState, Result}; + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), + {State, Result}; apply( _RaftMeta, #{?tag := update_config, ?since := Since, ?config := Opts}, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State0 ) -> - {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), - NState = State#{latest := NLatest}, - {NState, Result}; + State = State0#{latest := Latest}, + {State, Result}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, @@ -730,17 +733,28 @@ apply( Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), {State, Result}; apply( - _RaftMeta, + #{index := RaftIdx}, #{?tag := storage_event, ?payload := CustomEvent}, #{db_shard := DBShard, latest := Latest0} = State ) -> {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + set_ts(DBShard, Latest), + ?tp( + debug, + emqx_ds_replication_layer_storage_event, + #{ + shard => DBShard, ts => Timestamp, payload => CustomEvent + } + ), Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), {State#{latest := Latest}, ok, Effects}. -spec tick(integer(), ra_state()) -> ra_machine:effects(). -tick(TimeMs, #{db_shard := DBShard, latest := Latest}) -> +tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> + %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), + ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), + set_ts(DBShard, Latest), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> @@ -781,6 +795,11 @@ handle_custom_event(DBShard, Latest, Event) -> [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] catch EC:Err:Stacktrace -> - logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}), + ?tp(error, ds_storage_custom_even_fail, #{ + EC => Err, stacktrace => Stacktrace, event => Event + }), [] end. + +set_ts({DB, Shard}, TS) -> + emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), TS). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index ac495be1c8c..dca00222bbf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -309,7 +309,8 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), - case ra:restart_server(DB, LocalServer) of + MutableConfig = #{tick_timeout => 100}, + case ra:restart_server(DB, LocalServer, MutableConfig) of {error, name_not_registered} -> Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, @@ -320,7 +321,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ], ReplicationOpts ), - ok = ra:start_server(DB, #{ + ok = ra:start_server(DB, MutableConfig#{ id => LocalServer, uid => server_uid(DB, Shard), cluster_name => ClusterName, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 7342b097dda..db50e49ddda 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -482,13 +482,17 @@ delete_next_until( end. handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> - %% Cause replication layer to bump timestamp when idle case ets:lookup(Gvars, ?IDLE_DETECT) of - [{?IDLE_DETECT, false, LastWrittenTs}] when - ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time) - -> + [{?IDLE_DETECT, Latch, LastWrittenTs}] -> + ok; + [] -> + Latch = false, + LastWrittenTs = 0 + end, + case Latch of + false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) -> ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), - [emqx_ds_storage_bitfield_lts_dummy_event]; + [dummy_event]; _ -> [] end; diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index fff3a77f36f..175a0d515cc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -287,6 +287,13 @@ get_streams(Shard, TopicFilter, StartTime) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), + ?tp(get_streams_get_gen_topic, #{ + gen_id => GenId, + topic => TopicFilter, + start_time => StartTime, + streams => Streams, + gen_data => GenData + }), [ {GenId, ?stream_v2(GenId, InnerStream)} || InnerStream <- Streams diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 3b0e37c7fb9..b31b9b0c2ee 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -21,10 +21,18 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("snabbkaffe/include/test_macros.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(DB, testdb). +-define(ON(NODE, BODY), + erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +). + +-define(diff_opts, #{ + context => 20, window => 1000, max_failures => 1000, compare_fun => fun message_eq/2 +}). + opts() -> opts(#{}). @@ -32,7 +40,8 @@ opts(Overrides) -> maps:merge( #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, + %% storage => {emqx_ds_storage_reference, #{}}, + storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 1}}, n_shards => 16, n_sites => 1, replication_factor => 3, @@ -142,112 +151,140 @@ t_rebalance(init, Config) -> t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). +%% This testcase verifies that the storage rebalancing works correctly: +%% 1. Join/leave operations are applied successfully. +%% 2. Message data survives the rebalancing. +%% 3. Shard cluster membership converges to the target replica allocation. +%% 4. Replication factor is respected. t_rebalance(Config) -> - %% This testcase verifies that the storage rebalancing works correctly: - %% 1. Join/leave operations are applied successfully. - %% 2. Message data survives the rebalancing. - %% 3. Shard cluster membership converges to the target replica allocation. - %% 4. Replication factor is respected. - - NMsgs = 800, + NMsgs = 50, NClients = 5, + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + %% List of streams that generate messages for each "client" in its own topic: + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(?FUNCTION_NAME, ClientId))} + || ClientId <- Clients + ], + %% Interleaved list of events: + Stream = emqx_utils_stream:interleave([{2, Stream} || {_ClientId, Stream} <- TopicStreams]), Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), - - %% Initialize DB on the first node. - Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), - ?assertMatch( - Shards when length(Shards) == 16, - shards_online(N1, ?DB) - ), - - %% Open DB on the rest of the nodes. - ?assertEqual( - [{ok, ok} || _ <- [N2, N3, N4]], - erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) - ), - - Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), - - %% Only N1 should be responsible for all shards initially. - ?assertEqual( - [[S1] || _ <- Nodes], - [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] - ), - - %% Fill the storage with messages and few additional generations. - %% This will force shards to trigger snapshot transfers during rebalance. - ClientMessages = emqx_utils:pmap( - fun(CID) -> - N = lists:nth(1 + (CID rem length(Nodes)), Nodes), - fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) + ?check_trace( + #{timetrap => 30_000}, + begin + %% 0. Inject schedulings to make sure the messages are + %% written to the storage before, during, and after + %% rebalance: + ?force_ordering(#{?snk_kind := test_push_message, n := 10}, #{ + ?snk_kind := test_start_rebalance + }), + ?force_ordering(#{?snk_kind := test_start_rebalance}, #{ + ?snk_kind := test_push_message, n := 20 + }), + ?force_ordering(#{?snk_kind := test_end_rebalance}, #{ + ?snk_kind := test_push_message, n := 30 + }), + %% 1. Initialize DB on the first node. + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + + ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))), + ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)), + + %% 1.1 Open DB on the rest of the nodes: + [ + ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) + || Node <- Nodes + ], + + Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% 1.2 Verify that all nodes have the same view of metadata storage: + [ + ?defer_assert( + ?assertEqual( + [S1], + ?ON(Node, emqx_ds_replication_layer_meta:db_sites(?DB)), + #{ + msg => "Initially, only S1 should be responsible for all shards", + node => Node + } + ) + ) + || Node <- Nodes + ], + + %% 2. Start filling the storage: + spawn_link( + fun() -> + NodeStream = emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + apply_stream(?DB, NodeStream, Stream, 0) + end + ), + + %% 3. Start rebalance in the meanwhile: + ?tp(test_start_rebalance, #{}), + %% 3.1 Join the second site to the DB replication sites. + ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), + %% Should be no-op. + ?assertEqual(ok, ?ON(N2, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Now join the rest of the sites. + ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + ?tp(test_end_rebalance, #{}), + [ + ?defer_assert( + ?assertEqual( + 16 * 3 div length(Nodes), + n_shards_online(Node, ?DB), + "Each node is now responsible for 3/4 of the shards" + ) + ) + || Node <- Nodes + ], + + %% Verify that the set of shard servers matches the target allocation. + Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], + ShardServers = [ + shard_server_info(N, ?DB, Shard, Site, readiness) + || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), + Shard <- Shards + ], + ?assert( + lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), + ShardServers + ), + + %% Verify that the messages are preserved after the rebalance: + ?block_until(#{?snk_kind := all_done}), + timer:sleep(5000), + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), + + %% Scale down the cluster by removing the first node. + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for each shard. + ?defer_assert( + ?assertEqual( + [0, 16, 16, 16], + [n_shards_online(N, ?DB) || N <- Nodes] + ) + ), + + %% Verify that the messages are once again preserved after the rebalance: + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) end, - lists:seq(1, NClients), - infinity - ), - Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)), - - %% Join the second site to the DB replication sites. - ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), - %% Should be no-op. - ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), - - %% Fill in some more messages *during* the rebalance. - MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}), - - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), - - %% Now join the rest of the sites. - ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), - - %% Fill in some more messages *during* the rebalance. - MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - %% Verify that each node is now responsible for 3/4 of the shards. - ?assertEqual( - [(16 * 3) div length(Nodes) || _ <- Nodes], - [n_shards_online(N, ?DB) || N <- Nodes] - ), - - %% Verify that the set of shard servers matches the target allocation. - Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], - ShardServers = [ - shard_server_info(N, ?DB, Shard, Site, readiness) - || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), - Shard <- Shards - ], - ?assert( - lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), - ShardServers - ), - - %% Verify that the messages are preserved after the rebalance. - Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2, - MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesN4)), - ?assertEqual(Messages, MessagesN4), - - %% Scale down the cluster by removing the first node. - ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), - ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - %% Verify that each node is now responsible for each shard. - ?assertEqual( - [0, 16, 16, 16], - [n_shards_online(N, ?DB) || N <- Nodes] - ), - - %% Verify that the messages are once again preserved after the rebalance. - MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), - ?assertEqual(Messages, MessagesN3). + [] + ). t_join_leave_errors(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -400,6 +437,9 @@ t_rebalance_chaotic_converges(Config) -> ds_repl_meta(N1, db_sites, [?DB]) ), + %% Wait until the LTS timestamp is updated + timer:sleep(5000), + %% Check that all messages are still there. Messages = lists:append(TransitionMessages) ++ Messages0, MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), @@ -502,14 +542,16 @@ fill_storage(Node, DB, NMsgs, Opts) -> fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> PAddGen = maps:get(p_addgen, Opts, 0.001), R1 = push_message(Node, DB, I, Opts), - R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), + %probably(PAddGen, fun() -> add_generation(Node, DB) end), + R2 = [], R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> []. push_message(Node, DB, I, Opts) -> Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), - {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), + %% {Bytes, _} = rand:bytes_s(5, rand:seed_s(default, I)), + Bytes = integer_to_binary(I), ClientId = maps:get(client_id, Opts, <>), Message = message(ClientId, Topic, Bytes, I * 100), ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), @@ -545,9 +587,14 @@ probably(P, Fun) -> sample(N, List) -> L = length(List), - H = N div 2, - Filler = integer_to_list(L - N) ++ " more", - lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L). + case L =< N of + true -> + L; + false -> + H = N div 2, + Filler = integer_to_list(L - N) ++ " more", + lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L) + end. %% @@ -563,3 +610,145 @@ init_per_testcase(TCName, Config0) -> end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). + +without_extra(L) -> + [I#message{extra = #{}} || I <- L]. + +%% Consume data from the DS storage on a given node as a stream: +-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). + +%% Create a stream from the topic (wildcards are NOT supported for a +%% good reason: order of messages is implementation-dependent!): +-spec ds_topic_stream(binary(), binary(), node()) -> ds_stream(). +ds_topic_stream(ClientId, TopicBin, Node) -> + Topic = emqx_topic:words(TopicBin), + Shard = shard_of_clientid(Node, ClientId), + {ShardId, DSStreams} = + ?ON( + Node, + begin + DBShard = {?DB, Shard}, + {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} + end + ), + %% Sort streams by their rank Y, and chain them together: + emqx_utils_stream:chain([ + ds_topic_generation_stream(Node, ShardId, Topic, S) + || {_RankY, S} <- lists:sort(DSStreams) + ]). + +%% Note: produces messages with keys +ds_topic_generation_stream(Node, Shard, Topic, Stream) -> + {ok, Iterator} = ?ON( + Node, + emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) + ), + do_ds_topic_generation_stream(Node, Shard, Iterator). + +do_ds_topic_generation_stream(Node, Shard, It0) -> + Now = 99999999999999999999, + fun() -> + case ?ON(Node, emqx_ds_storage_layer:next(Shard, It0, 1, Now)) of + {ok, It, []} -> + []; + {ok, It, [KeyMsg]} -> + [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)] + end + end. + +%% Payload generation: + +apply_stream(DB, NodeStream0, Stream0, N) -> + case emqx_utils_stream:next(Stream0) of + [] -> + ?tp(all_done, #{}); + [Msg = #message{from = From} | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + ?tp( + test_push_message, + maps:merge( + emqx_message:to_map(Msg), + #{n => N} + ) + ), + ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), + apply_stream(DB, NodeStream, Stream, N + 1) + end. + +%% @doc Create an infinite list of messages from a given client: +topic_messages(TestCase, ClientId) -> + topic_messages(TestCase, ClientId, 0). + +topic_messages(TestCase, ClientId, N) -> + fun() -> + Msg = #message{ + from = ClientId, + topic = client_topic(TestCase, ClientId), + timestamp = N * 100, + payload = integer_to_binary(N) + }, + [Msg | topic_messages(TestCase, ClientId, N + 1)] + end. + +client_topic(TestCase, ClientId) when is_atom(TestCase) -> + client_topic(atom_to_binary(TestCase, utf8), ClientId); +client_topic(TestCase, ClientId) when is_binary(TestCase) -> + <>. + +message_eq(Msg1, {Key, Msg2}) -> + %% Timestamps can be modified by the replication layer, ignore them: + Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. + +%% Stream comparison: + +-spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok. +verify_stream_effects(TestCase, Nodes0, L) -> + lists:foreach( + fun({ClientId, Stream}) -> + Nodes = nodes_of_clientid(ClientId, Nodes0), + ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), + ?defer_assert( + ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) + ), + [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes] + end, + L + ). + +-spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. +verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> + ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), + ?defer_assert( + begin + snabbkaffe_diff:assert_lists_eq( + ExpectedStream, + ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node), + ?diff_opts#{comment => #{clientid => ClientId, node => Node}} + ), + ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) + end + ). + +%% Find which nodes from the list contain the shards for the given +%% client ID: +nodes_of_clientid(ClientId, Nodes = [N0 | _]) -> + Shard = shard_of_clientid(N0, ClientId), + SiteNodes = ?ON( + N0, + begin + Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard), + lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) + end + ), + lists:filter( + fun(N) -> + lists:member(N, SiteNodes) + end, + Nodes + ). + +shard_of_clientid(Node, ClientId) -> + ?ON( + Node, + emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid) + ). diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index f5475223085..26469c68509 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -18,6 +18,8 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx_utils/include/emqx_message.hrl"). + %% RPC mocking mock_rpc() -> diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index fac536532eb..e22f97ed785 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -23,8 +23,11 @@ mqueue/1, map/2, transpose/1, + chain/1, chain/2, - repeat/1 + repeat/1, + interleave/1, + limit_length/2 ]). %% Evaluating @@ -118,6 +121,11 @@ transpose_tail(S, Tail) -> end end. +%% @doc Make a stream by concatenating multiple streams. +-spec chain([stream(X)]) -> stream(X). +chain(L) -> + lists:foldl(fun chain/2, empty(), L). + %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). @@ -144,6 +152,41 @@ repeat(S) -> end end. +%% @doc Interleave the elements of the streams. +%% +%% This function accepts a list of tuples where the first element +%% specifies size of the "batch" to be consumed from the stream at a +%% time (stream is the second tuple element). If element of the list +%% is a plain stream, then the batch size is assumed to be 1. +-spec interleave([stream(X) | {non_neg_integer(), stream(X)}]) -> stream(X). +interleave(L0) -> + L = lists:map( + fun + (Stream) when is_function(Stream) -> + {1, Stream}; + (A = {N, _}) when N >= 0 -> + A + end, + L0 + ), + fun() -> + do_interleave(0, L, []) + end. + +%% @doc Truncate list to the given length +-spec limit_length(non_neg_integer(), stream(X)) -> stream(X). +limit_length(0, _) -> + fun() -> [] end; +limit_length(N, S) when N >= 0 -> + fun() -> + case next(S) of + [] -> + []; + [X | S1] -> + [X | limit_length(N - 1, S1)] + end + end. + %% %% @doc Produce the next value from the stream. @@ -237,3 +280,22 @@ csv_read_line([Line | Lines]) -> {Fields, Lines}; csv_read_line([]) -> eof. + +do_interleave(_, [], []) -> + []; +do_interleave(N, [{N, S} | Rest], Rev) -> + do_interleave(0, Rest, [{N, S} | Rev]); +do_interleave(_, [], Rev) -> + do_interleave(0, lists:reverse(Rev), []); +do_interleave(I, [{N, S} | Rest], Rev) when I < N -> + case next(S) of + [] -> + do_interleave(0, Rest, Rev); + [X | S1] -> + [ + X + | fun() -> + do_interleave(I + 1, [{N, S1} | Rest], Rev) + end + ] + end. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 60b67a4ff05..0b117215ef6 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -157,6 +157,14 @@ mqueue_test() -> emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) ). +interleave_test() -> + S1 = emqx_utils_stream:list([1, 2, 3]), + S2 = emqx_utils_stream:list([a, b, c, d]), + ?assertEqual( + [1, 2, a, b, 3, c, d], + emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}])) + ). + csv_test() -> Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual( From 1ddbbca90e9a3f70df15abf5fe9100ae35ce57c0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 7 May 2024 18:26:59 +0200 Subject: [PATCH 09/14] feat(ds): Allow incremental update of the LTS trie --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index e087504de0a..184f709e993 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -21,6 +21,7 @@ trie_create/1, trie_create/0, destroy/1, trie_restore/2, + trie_update/2, trie_copy_learned_paths/2, topic_key/3, match_topics/2, @@ -126,7 +127,11 @@ destroy(#trie{trie = Trie, stats = Stats}) -> %% @doc Restore trie from a dump -spec trie_restore(options(), [{_Key, _Val}]) -> trie(). trie_restore(Options, Dump) -> - Trie = trie_create(Options), + trie_update(trie_create(Options), Dump). + +%% @doc Update a trie with a dump of operations (used for replication) +-spec trie_update(trie(), [{_Key, _Val}]) -> trie(). +trie_update(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(Trie, StateFrom, Token, StateTo) From f250f00f3f1d8c2fb6ccc181d920850272bd7cac Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 8 May 2024 12:12:27 +0200 Subject: [PATCH 10/14] chore: Bump snabbkaffe version to 1.0.10 --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b58cd0cb787..27648a88dc9 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -34,7 +34,7 @@ {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, - {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}, + {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}, {ra, "2.7.3"} ]}. diff --git a/mix.exs b/mix.exs index 8fb920c8721..f90585423cb 100644 --- a/mix.exs +++ b/mix.exs @@ -71,7 +71,7 @@ defmodule EMQXUmbrella.MixProject do {:telemetry, "1.1.0"}, # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, - {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, + {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.10", override: true}, {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.1"}, diff --git a/rebar.config b/rebar.config index a0488b5091f..98f8e217745 100644 --- a/rebar.config +++ b/rebar.config @@ -96,7 +96,7 @@ {observer_cli, "1.7.1"}, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}, {getopt, "1.0.2"}, - {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}, + {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}}, From 2236af84bac27e71772ae85a978ec7ab0e398e37 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 8 May 2024 10:01:16 +0200 Subject: [PATCH 11/14] feat(ds): two-stage storage commit on the storage level --- .../src/emqx_ds_replication_layer.erl | 3 +- .../src/emqx_ds_storage_bitfield_lts.erl | 107 +++++++++++++++--- .../src/emqx_ds_storage_layer.erl | 78 ++++++++++--- .../src/emqx_ds_storage_reference.erl | 24 ++-- .../test/emqx_ds_replication_SUITE.erl | 1 + 5 files changed, 167 insertions(+), 46 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 315a276ad53..afc8db40d69 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -733,7 +733,7 @@ apply( Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), {State, Result}; apply( - #{index := RaftIdx}, + _RaftMeta, #{?tag := storage_event, ?payload := CustomEvent}, #{db_shard := DBShard, latest := Latest0} = State ) -> @@ -754,7 +754,6 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), - set_ts(DBShard, Latest), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index db50e49ddda..d05296a2983 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,7 +28,8 @@ create/4, open/5, drop/5, - store_batch/4, + prepare_batch/4, + commit_batch/3, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -68,6 +69,9 @@ -define(start_time, 3). -define(storage_key, 4). -define(last_seen_key, 5). +-define(cooked_payloads, 6). +-define(cooked_lts_ops, 7). +-define(cooked_ts, 8). -type options() :: #{ @@ -90,18 +94,28 @@ db :: rocksdb:db_handle(), data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), + trie_cf :: rocksdb:cf_handle(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), ts_bits :: non_neg_integer(), ts_offset :: non_neg_integer(), gvars :: ets:table() }). +-define(lts_persist_ops, emqx_ds_storage_bitfield_lts_ops). + -type s() :: #s{}. -type stream() :: emqx_ds_lts:msg_storage_key(). -type delete_stream() :: emqx_ds_lts:msg_storage_key(). +-type cooked_batch() :: + #{ + ?cooked_payloads := [{binary(), binary()}], + ?cooked_lts_ops := [{binary(), binary()}], + ?cooked_ts := integer() + }. + -type iterator() :: #{ ?tag := ?IT, @@ -220,6 +234,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> db = DBHandle, data = DataCF, trie = Trie, + trie_cf = TrieCF, keymappers = KeymapperCache, ts_offset = TSOffsetBits, ts_bits = TSBits, @@ -257,24 +272,65 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok. --spec store_batch( +-spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() ) -> - emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) -> + {ok, cooked_batch()}. +prepare_batch(_ShardId, S, Messages, _Options) -> + _ = erase(?lts_persist_ops), + {Payloads, MaxTs} = + lists:mapfoldl( + fun({Timestamp, Msg}, Acc) -> + {Key, _} = make_key(S, Timestamp, Msg), + Payload = {Key, message_to_value_v1(Msg)}, + {Payload, max(Acc, Timestamp)} + end, + 0, + Messages + ), + {ok, #{ + ?cooked_payloads => Payloads, + ?cooked_lts_ops => pop_lts_persist_ops(), + ?cooked_ts => MaxTs + }}. + +-spec commit_batch( + emqx_ds_storage_layer:shard_id(), + s(), + cooked_batch() +) -> ok. +commit_batch( + _ShardId, + _Data, + #{?cooked_payloads := [], ?cooked_lts_ops := LTS} +) -> + %% Assert: + [] = LTS, + ok; +commit_batch( + _ShardId, + #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} +) -> {ok, Batch} = rocksdb:batch(), - MaxTs = lists:foldl( - fun({Timestamp, Msg}, Acc) -> - {Key, _} = make_key(S, Timestamp, Msg), - Val = serialize(Msg), - ok = rocksdb:put(DB, Data, Key, Val, []), - max(Acc, Timestamp) + %% Commit LTS trie to the storage: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) + end, + LtsOps + ), + %% Apply LTS ops to the memory cache: + _ = emqx_ds_lts:trie_update(Trie, LtsOps), + %% Commit payloads: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)) end, - 0, - Messages + Payloads ), Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), @@ -780,9 +836,6 @@ value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, E extra = Extra }. -serialize(Msg) -> - term_to_binary(message_to_value_v1(Msg)). - deserialize(Blob) -> value_v1_to_message(binary_to_term(Blob)). @@ -810,7 +863,8 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). restore_trie(TopicIndexBytes, DB, CF) -> PersistCallback = fun(Key, Val) -> - rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) + push_lts_persist_op(Key, Val), + ok end, {ok, IT} = rocksdb:iterator(DB, CF, []), try @@ -858,8 +912,29 @@ data_cf(GenId) -> trie_cf(GenId) -> "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId). +-spec push_lts_persist_op(_Key, _Val) -> ok. +push_lts_persist_op(Key, Val) -> + case erlang:get(?lts_persist_ops) of + undefined -> + erlang:put(?lts_persist_ops, [{Key, Val}]); + L when is_list(L) -> + erlang:put(?lts_persist_ops, [{Key, Val} | L]) + end. + +-spec pop_lts_persist_ops() -> [{_Key, _Val}]. +pop_lts_persist_ops() -> + case erlang:erase(?lts_persist_ops) of + undefined -> + []; + L when is_list(L) -> + L + end. + -ifdef(TEST). +serialize(Msg) -> + term_to_binary(message_to_value_v1(Msg)). + serialize_deserialize_test() -> Msg = #message{ id = <<"message_id_val">>, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 175a0d515cc..df1253e1c78 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -26,6 +26,9 @@ %% Data store_batch/3, + prepare_batch/3, + commit_batch/2, + get_streams/3, get_delete_streams/3, make_iterator/4, @@ -66,7 +69,8 @@ shard_id/0, options/0, prototype/0, - post_creation_context/0 + post_creation_context/0, + cooked_batch/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -86,6 +90,7 @@ -define(STREAM, 1). -define(IT, 2). -define(DELETE_IT, 3). +-define(COOKED_BATCH, 4). %% keys: -define(tag, 1). @@ -132,6 +137,13 @@ ?enc := term() }. +-opaque cooked_batch() :: + #{ + ?tag := ?COOKED_BATCH, + ?generation := gen_id(), + ?enc := term() + }. + %%%% Generation: -define(GEN_KEY(GEN_ID), {generation, GEN_ID}). @@ -207,13 +219,19 @@ -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. --callback store_batch( +-callback prepare_batch( shard_id(), _Data, - [{emqx_ds:time(), emqx_types:message()}], + [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> - emqx_ds:store_batch_result(). + {ok, term()} | {error, _}. + +-callback commit_batch( + shard_id(), + _Data, + _CookedBatch +) -> ok. -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. @@ -261,20 +279,54 @@ drop_shard(Shard) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> +store_batch(Shard, Messages, Options) -> + ?tp(emqx_ds_storage_layer_store_batch, #{ + shard => Shard, messages => Messages, options => Options + }), + case prepare_batch(Shard, Messages, Options) of + {ok, CookedBatch} -> + commit_batch(Shard, CookedBatch); + ignore -> + ok; + Error = {error, _} -> + Error + end. + +-spec prepare_batch( + shard_id(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> {ok, cooked_batch()} | ignore | {error, _}. +prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. - ?tp(emqx_ds_storage_layer_store_batch, #{ + ?tp(emqx_ds_storage_layer_prepare_batch, #{ shard => Shard, messages => Messages, options => Options }), - #{module := Mod, data := GenData} = generation_at(Shard, Time), + {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), T0 = erlang:monotonic_time(microsecond), - Result = Mod:store_batch(Shard, GenData, Messages, Options), + Result = + case Mod:prepare_batch(Shard, GenData, Messages, Options) of + {ok, CookedBatch} -> + {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; + Error = {error, _} -> + Error + end, T1 = erlang:monotonic_time(microsecond), + %% TODO store->prepare emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result; -store_batch(_Shard, [], _Options) -> - ok. +prepare_batch(_Shard, [], _Options) -> + ignore. + +-spec commit_batch(shard_id(), cooked_batch()) -> ok. +commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> + #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), + T0 = erlang:monotonic_time(microsecond), + Result = Mod:commit_batch(Shard, GenData, CookedBatch), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result. -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{integer(), stream()}]. @@ -878,7 +930,7 @@ handle_accept_snapshot(ShardId) -> %% The mechanism of storage layer events should be refined later. -spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. handle_event(Shard, Time, Event) -> - #{module := Mod, data := GenData} = generation_at(Shard, Time), + {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), case erlang:function_exported(Mod, handle_event, 4) of true -> @@ -919,7 +971,7 @@ generations_since(Shard, Since) -> Schema ). --spec generation_at(shard_id(), emqx_ds:time()) -> generation(). +-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}. generation_at(Shard, Time) -> Schema = #{current_generation := Current} = get_schema_runtime(Shard), generation_at(Time, Current, Schema). @@ -930,7 +982,7 @@ generation_at(Time, GenId, Schema) -> #{since := Since} when Time < Since andalso GenId > 0 -> generation_at(Time, prev_generation_id(GenId), Schema); _ -> - Gen + {GenId, Gen} end. -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 3caf2c732d7..10007488c23 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,7 +31,8 @@ create/4, open/5, drop/5, - store_batch/4, + prepare_batch/4, + commit_batch/3, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -101,12 +102,14 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) -> +prepare_batch(_ShardId, _Data, Messages, _Options) -> + {ok, Messages}. + +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> {ok, Batch} = rocksdb:batch(), lists:foreach( - fun(Msg) -> - Id = erlang:unique_integer([monotonic]), - Key = <>, + fun({TS, Msg}) -> + Key = <>, Val = term_to_binary(Msg), rocksdb:batch_put(Batch, CF, Key, Val) end, @@ -114,16 +117,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru ), Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), rocksdb:release_batch(Batch), - Res; -store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> - lists:foreach( - fun({Timestamp, Msg}) -> - Key = <>, - Val = term_to_binary(Msg), - rocksdb:put(DB, CF, Key, Val, []) - end, - Messages - ). + Res. get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> [#stream{}]. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index b31b9b0c2ee..4670dfeb060 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -159,6 +159,7 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, + NEvents = NMsgs * NClients, %% List of fake client IDs: Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], %% List of streams that generate messages for each "client" in its own topic: From a0a39770434f20c497de9e55357116d5fa3c944b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 8 May 2024 23:15:46 +0200 Subject: [PATCH 12/14] feat(ds): Assign latest timestamp deterministically --- .../src/emqx_ds_replication_layer.erl | 12 ++-- .../src/emqx_ds_replication_layer.hrl | 1 + .../test/emqx_ds_replication_SUITE.erl | 68 +++++++++++++------ apps/emqx_utils/src/emqx_utils_stream.erl | 32 +++++---- 4 files changed, 76 insertions(+), 37 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index afc8db40d69..c6fdc69aac4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -734,20 +734,20 @@ apply( {State, Result}; apply( _RaftMeta, - #{?tag := storage_event, ?payload := CustomEvent}, + #{?tag := storage_event, ?payload := CustomEvent, ?now := Now}, #{db_shard := DBShard, latest := Latest0} = State ) -> - {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + Latest = max(Latest0, Now), set_ts(DBShard, Latest), ?tp( debug, emqx_ds_replication_layer_storage_event, #{ - shard => DBShard, ts => Timestamp, payload => CustomEvent + shard => DBShard, payload => CustomEvent, latest => Latest } ), - Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), - {State#{latest := Latest}, ok, Effects}. + Effects = handle_custom_event(DBShard, Latest, CustomEvent), + {State#{latest => Latest}, ok, Effects}. -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> @@ -791,7 +791,7 @@ snapshot_module() -> handle_custom_event(DBShard, Latest, Event) -> try Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event), - [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] + [{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events] catch EC:Err:Stacktrace -> ?tp(error, ds_storage_custom_even_fail, #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 9608241439d..4472b5a47b3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -43,5 +43,6 @@ %% custom events -define(payload, 2). +-define(now, 3). -endif. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 4670dfeb060..f3a61d377c4 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -41,7 +41,7 @@ opts(Overrides) -> #{ backend => builtin, %% storage => {emqx_ds_storage_reference, #{}}, - storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 1}}, + storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}}, n_shards => 16, n_sites => 1, replication_factor => 3, @@ -159,7 +159,6 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, - NEvents = NMsgs * NClients, %% List of fake client IDs: Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], %% List of streams that generate messages for each "client" in its own topic: @@ -168,7 +167,16 @@ t_rebalance(Config) -> || ClientId <- Clients ], %% Interleaved list of events: - Stream = emqx_utils_stream:interleave([{2, Stream} || {_ClientId, Stream} <- TopicStreams]), + Stream0 = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:const(add_generation) + ], + false + ), Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), ?check_trace( #{timetrap => 30_000}, @@ -176,15 +184,22 @@ t_rebalance(Config) -> %% 0. Inject schedulings to make sure the messages are %% written to the storage before, during, and after %% rebalance: - ?force_ordering(#{?snk_kind := test_push_message, n := 10}, #{ - ?snk_kind := test_start_rebalance - }), - ?force_ordering(#{?snk_kind := test_start_rebalance}, #{ - ?snk_kind := test_push_message, n := 20 - }), - ?force_ordering(#{?snk_kind := test_end_rebalance}, #{ - ?snk_kind := test_push_message, n := 30 - }), + ?force_ordering( + #{?snk_kind := test_push_message, n := 10}, + #{?snk_kind := test_start_rebalance} + ), + ?force_ordering( + #{?snk_kind := test_start_rebalance1}, + #{?snk_kind := test_push_message, n := 20} + ), + ?force_ordering( + #{?snk_kind := test_push_message, n := 30}, + #{?snk_kind := test_start_rebalance2} + ), + ?force_ordering( + #{?snk_kind := test_end_rebalance}, + #{?snk_kind := test_push_message, n := 40} + ), %% 1. Initialize DB on the first node. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), @@ -224,7 +239,7 @@ t_rebalance(Config) -> ), %% 3. Start rebalance in the meanwhile: - ?tp(test_start_rebalance, #{}), + ?tp(test_start_rebalance1, #{}), %% 3.1 Join the second site to the DB replication sites. ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), %% Should be no-op. @@ -233,6 +248,7 @@ t_rebalance(Config) -> ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + ?tp(test_start_rebalance2, #{}), %% Now join the rest of the sites. ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), @@ -619,7 +635,9 @@ without_extra(L) -> -type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). %% Create a stream from the topic (wildcards are NOT supported for a -%% good reason: order of messages is implementation-dependent!): +%% good reason: order of messages is implementation-dependent!). +%% +%% Note: stream produces messages with keys -spec ds_topic_stream(binary(), binary(), node()) -> ds_stream(). ds_topic_stream(ClientId, TopicBin, Node) -> Topic = emqx_topic:words(TopicBin), @@ -638,7 +656,6 @@ ds_topic_stream(ClientId, TopicBin, Node) -> || {_RankY, S} <- lists:sort(DSStreams) ]). -%% Note: produces messages with keys ds_topic_generation_stream(Node, Shard, Topic, Stream) -> {ok, Iterator} = ?ON( Node, @@ -647,11 +664,20 @@ ds_topic_generation_stream(Node, Shard, Topic, Stream) -> do_ds_topic_generation_stream(Node, Shard, Iterator). do_ds_topic_generation_stream(Node, Shard, It0) -> - Now = 99999999999999999999, fun() -> - case ?ON(Node, emqx_ds_storage_layer:next(Shard, It0, 1, Now)) of + case + ?ON( + Node, + begin + Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard), + emqx_ds_storage_layer:next(Shard, It0, 1, Now) + end + ) + of {ok, It, []} -> []; + {ok, end_of_stream} -> + []; {ok, It, [KeyMsg]} -> [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)] end @@ -673,7 +699,11 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ) ), ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), - apply_stream(DB, NodeStream, Stream, N + 1) + apply_stream(DB, NodeStream, Stream, N + 1); + [add_generation | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + %% add_generation(Node, DB), + apply_stream(DB, NodeStream, Stream, N) end. %% @doc Create an infinite list of messages from a given client: @@ -724,7 +754,7 @@ verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> snabbkaffe_diff:assert_lists_eq( ExpectedStream, ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node), - ?diff_opts#{comment => #{clientid => ClientId, node => Node}} + ?diff_opts ), ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) end diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index e22f97ed785..a38deceebe2 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -20,13 +20,14 @@ -export([ empty/0, list/1, + const/1, mqueue/1, map/2, transpose/1, chain/1, chain/2, repeat/1, - interleave/1, + interleave/2, limit_length/2 ]). @@ -72,6 +73,11 @@ list([]) -> list([X | Rest]) -> fun() -> [X | list(Rest)] end. +%% @doc Make a stream with a single element infinitely repeated +-spec const(T) -> stream(T). +const(T) -> + fun() -> [T | const(T)] end. + %% @doc Make a stream out of process message queue. -spec mqueue(timeout()) -> stream(any()). mqueue(Timeout) -> @@ -158,8 +164,8 @@ repeat(S) -> %% specifies size of the "batch" to be consumed from the stream at a %% time (stream is the second tuple element). If element of the list %% is a plain stream, then the batch size is assumed to be 1. --spec interleave([stream(X) | {non_neg_integer(), stream(X)}]) -> stream(X). -interleave(L0) -> +-spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X). +interleave(L0, ContinueAtEmpty) -> L = lists:map( fun (Stream) when is_function(Stream) -> @@ -170,7 +176,7 @@ interleave(L0) -> L0 ), fun() -> - do_interleave(0, L, []) + do_interleave(ContinueAtEmpty, 0, L, []) end. %% @doc Truncate list to the given length @@ -281,21 +287,23 @@ csv_read_line([Line | Lines]) -> csv_read_line([]) -> eof. -do_interleave(_, [], []) -> +do_interleave(_Cont, _, [], []) -> []; -do_interleave(N, [{N, S} | Rest], Rev) -> - do_interleave(0, Rest, [{N, S} | Rev]); -do_interleave(_, [], Rev) -> - do_interleave(0, lists:reverse(Rev), []); -do_interleave(I, [{N, S} | Rest], Rev) when I < N -> +do_interleave(Cont, N, [{N, S} | Rest], Rev) -> + do_interleave(Cont, 0, Rest, [{N, S} | Rev]); +do_interleave(Cont, _, [], Rev) -> + do_interleave(Cont, 0, lists:reverse(Rev), []); +do_interleave(Cont, I, [{N, S} | Rest], Rev) when I < N -> case next(S) of + [] when Cont -> + do_interleave(Cont, 0, Rest, Rev); [] -> - do_interleave(0, Rest, Rev); + []; [X | S1] -> [ X | fun() -> - do_interleave(I + 1, [{N, S1} | Rest], Rev) + do_interleave(Cont, I + 1, [{N, S1} | Rest], Rev) end ] end. From 63e51fca665b1cf83d72fb3a9453e29b2a57dbb1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 9 May 2024 01:20:15 +0200 Subject: [PATCH 13/14] test(ds): Use streams to fill the storage --- .../test/emqx_ds_replication_SUITE.erl | 399 ++++++++---------- .../test/emqx_utils_stream_tests.erl | 2 +- 2 files changed, 184 insertions(+), 217 deletions(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index f3a61d377c4..9a89b0519aa 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -30,7 +30,7 @@ ). -define(diff_opts, #{ - context => 20, window => 1000, max_failures => 1000, compare_fun => fun message_eq/2 + context => 20, window => 1000, compare_fun => fun message_eq/2 }). opts() -> @@ -76,64 +76,59 @@ t_replication_transfers_snapshots('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_replication_transfers_snapshots(Config) -> - NMsgs = 4000, + NMsgs = 400, + NClients = 5, + {Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config), + ?check_trace( + begin + %% Initialize DB on all nodes and wait for it to be online. + Opts = opts(#{n_shards => 1, n_sites => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + ?retry( + 500, + 10, + ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes]) + ), - %% Initialize DB on all nodes and wait for it to be online. - Opts = opts(#{n_shards => 1, n_sites => 3}), - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) - ), - ?retry( - 500, - 10, - ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes]) - ), - - %% Stop the DB on the "offline" node. - ok = emqx_cth_cluster:stop_node(NodeOffline), + %% Stop the DB on the "offline" node. + ok = emqx_cth_cluster:stop_node(NodeOffline), - %% Fill the storage with messages and few additional generations. - Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}), + %% Fill the storage with messages and few additional generations. + apply_stream(?DB, Nodes -- [NodeOffline], Stream), - %% Restart the node. - [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), - {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{ - ?snk_kind := dsrepl_snapshot_accepted, - ?snk_meta := #{node := NodeOffline} - }) - ), - ?assertEqual( - ok, - erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) - ), + %% Restart the node. + [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := dsrepl_snapshot_accepted, + ?snk_meta := #{node := NodeOffline} + }) + ), + ?assertEqual( + ok, + erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) + ), - %% Trigger storage operation and wait the replica to be restored. - _ = add_generation(Node, ?DB), - ?assertMatch( - {ok, _}, - snabbkaffe:receive_events(SRef) - ), + %% Trigger storage operation and wait the replica to be restored. + _ = add_generation(Node, ?DB), + ?assertMatch( + {ok, _}, + snabbkaffe:receive_events(SRef) + ), - %% Wait until any pending replication activities are finished (e.g. Raft log entries). - ok = timer:sleep(3_000), + %% Wait until any pending replication activities are finished (e.g. Raft log entries). + ok = timer:sleep(3_000), - %% Check that the DB has been restored. - Shard = hd(shards(NodeOffline, ?DB)), - MessagesOffline = lists:keysort( - #message.timestamp, - consume_shard(NodeOffline, ?DB, Shard, ['#'], 0) - ), - ?assertEqual( - sample(40, Messages), - sample(40, MessagesOffline) - ), - ?assertEqual( - Messages, - MessagesOffline + %% Check that the DB has been restored: + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] ). t_rebalance(init, Config) -> @@ -159,50 +154,13 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, - %% List of fake client IDs: - Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], - %% List of streams that generate messages for each "client" in its own topic: - TopicStreams = [ - {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(?FUNCTION_NAME, ClientId))} - || ClientId <- Clients - ], - %% Interleaved list of events: - Stream0 = emqx_utils_stream:interleave( - [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true - ), - Stream = emqx_utils_stream:interleave( - [ - {50, Stream0}, - emqx_utils_stream:const(add_generation) - ], - false - ), - Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), + {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + Nodes = [N1, N2 | _] = ?config(nodes, Config), ?check_trace( #{timetrap => 30_000}, begin - %% 0. Inject schedulings to make sure the messages are - %% written to the storage before, during, and after - %% rebalance: - ?force_ordering( - #{?snk_kind := test_push_message, n := 10}, - #{?snk_kind := test_start_rebalance} - ), - ?force_ordering( - #{?snk_kind := test_start_rebalance1}, - #{?snk_kind := test_push_message, n := 20} - ), - ?force_ordering( - #{?snk_kind := test_push_message, n := 30}, - #{?snk_kind := test_start_rebalance2} - ), - ?force_ordering( - #{?snk_kind := test_end_rebalance}, - #{?snk_kind := test_push_message, n := 40} - ), %% 1. Initialize DB on the first node. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))), ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)), @@ -215,6 +173,22 @@ t_rebalance(Config) -> Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], ct:pal("Sites: ~p~n", [Sites]), + Sequence = [ + %% Join the second site to the DB replication sites: + {N1, join_db_site, S2}, + %% Should be a no-op: + {N2, join_db_site, S2}, + %% Now join the rest of the sites: + {N2, assign_db_sites, Sites} + ], + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:list(Sequence) + ], + true + ), + %% 1.2 Verify that all nodes have the same view of metadata storage: [ ?defer_assert( @@ -231,31 +205,9 @@ t_rebalance(Config) -> ], %% 2. Start filling the storage: - spawn_link( - fun() -> - NodeStream = emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), - apply_stream(?DB, NodeStream, Stream, 0) - end - ), - - %% 3. Start rebalance in the meanwhile: - ?tp(test_start_rebalance1, #{}), - %% 3.1 Join the second site to the DB replication sites. - ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), - %% Should be no-op. - ?assertEqual(ok, ?ON(N2, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), - - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), - - ?tp(test_start_rebalance2, #{}), - %% Now join the rest of the sites. - ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - ?tp(test_end_rebalance, #{}), + apply_stream(?DB, Nodes, Stream), + timer:sleep(5000), + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), [ ?defer_assert( ?assertEqual( @@ -279,17 +231,12 @@ t_rebalance(Config) -> ShardServers ), - %% Verify that the messages are preserved after the rebalance: - ?block_until(#{?snk_kind := all_done}), - timer:sleep(5000), - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), - %% Scale down the cluster by removing the first node. ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - %% Verify that each node is now responsible for each shard. + %% Verify that at the end each node is now responsible for each shard. ?defer_assert( ?assertEqual( [0, 16, 16, 16], @@ -387,81 +334,77 @@ t_rebalance_chaotic_converges(Config) -> NMsgs = 500, Nodes = [N1, N2, N3] = ?config(nodes, Config), - %% Initialize DB on first two nodes. - Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), - ?assertEqual( - [{ok, ok}, {ok, ok}], - erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) - ), + NClients = 5, + {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), - %% Open DB on the last node. - ?assertEqual( - ok, - erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) - ), + ?check_trace( + #{}, + begin + %% Initialize DB on first two nodes. + Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), - %% Find out which sites there are. - Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), - %% Initially, the DB is assigned to [S1, S2]. - ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), - ?assertEqual( - lists:sort([S1, S2]), - ds_repl_meta(N1, db_sites, [?DB]) - ), + %% Open DB on the last node. + ?assertEqual( + ok, + erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + ), - %% Fill the storage with messages and few additional generations. - Messages0 = lists:append([ - fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), - fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}), - fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>}) - ]), - - %% Construct a chaotic transition sequence that changes assignment to [S2, S3]. - Sequence = [ - {N1, join_db_site, S3}, - {N2, leave_db_site, S2}, - {N3, leave_db_site, S1}, - {N1, join_db_site, S2}, - {N2, join_db_site, S1}, - {N3, leave_db_site, S3}, - {N1, leave_db_site, S1}, - {N2, join_db_site, S3} - ], + %% Find out which sites there are. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), - %% Apply the sequence while also filling the storage with messages. - TransitionMessages = lists:map( - fun({N, Operation, Site}) -> - %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), - %% Give some time for at least one transition to complete. - Transitions = transitions(N, ?DB), - ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), - %% Fill the storage with messages. - CID = integer_to_binary(erlang:system_time()), - fill_storage(N, ?DB, NMsgs, #{client_id => CID}) - end, - Sequence - ), + Sequence = [ + {N1, join_db_site, S3}, + {N2, leave_db_site, S2}, + {N3, leave_db_site, S1}, + {N1, join_db_site, S2}, + {N2, join_db_site, S1}, + {N3, leave_db_site, S3}, + {N1, leave_db_site, S1}, + {N2, join_db_site, S3} + ], - %% Wait for the last transition to complete. - ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + %% Interleaved list of events: + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:list(Sequence) + ], + true + ), - ?assertEqual( - lists:sort([S2, S3]), - ds_repl_meta(N1, db_sites, [?DB]) - ), + ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), + ?assertEqual( + lists:sort([S1, S2]), + ds_repl_meta(N1, db_sites, [?DB]), + "Initially, the DB is assigned to [S1, S2]" + ), - %% Wait until the LTS timestamp is updated - timer:sleep(5000), + apply_stream(?DB, Nodes, Stream), - %% Check that all messages are still there. - Messages = lists:append(TransitionMessages) ++ Messages0, - MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), - ?assertEqual(Messages, MessagesDB). + %% Wait for the last transition to complete. + ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + + ?defer_assert( + ?assertEqual( + lists:sort([S2, S3]), + ds_repl_meta(N1, db_sites, [?DB]) + ) + ), + + %% Wait until the LTS timestamp is updated: + timer:sleep(5000), + + %% Check that all messages are still there. + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] + ). t_rebalance_offline_restarts(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -535,7 +478,15 @@ ds_repl_meta(Node, Fun) -> ds_repl_meta(Node, Fun, []). ds_repl_meta(Node, Fun, Args) -> - erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). + try + erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args) + catch + EC:Err:Stack -> + ct:pal("emqx_ds_replication_layer_meta:~p(~p) @~p failed:~n~p:~p~nStack: ~p", [ + Fun, Args, Node, EC, Err, Stack + ]), + error(meta_op_failed) + end. ds_repl_shard(Node, Fun, Args) -> erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). @@ -553,27 +504,6 @@ shards_online(Node, DB) -> n_shards_online(Node, DB) -> length(shards_online(Node, DB)). -fill_storage(Node, DB, NMsgs, Opts) -> - fill_storage(Node, DB, NMsgs, 0, Opts). - -fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> - PAddGen = maps:get(p_addgen, Opts, 0.001), - R1 = push_message(Node, DB, I, Opts), - %probably(PAddGen, fun() -> add_generation(Node, DB) end), - R2 = [], - R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); -fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> - []. - -push_message(Node, DB, I, Opts) -> - Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), - %% {Bytes, _} = rand:bytes_s(5, rand:seed_s(default, I)), - Bytes = integer_to_binary(I), - ClientId = maps:get(client_id, Opts, <>), - Message = message(ClientId, Topic, Bytes, I * 100), - ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), - [Message]. - add_generation(Node, DB) -> ok = erpc:call(Node, emqx_ds, add_generation, [DB]), []. @@ -674,7 +604,7 @@ do_ds_topic_generation_stream(Node, Shard, It0) -> end ) of - {ok, It, []} -> + {ok, _It, []} -> []; {ok, end_of_stream} -> []; @@ -685,11 +615,19 @@ do_ds_topic_generation_stream(Node, Shard, It0) -> %% Payload generation: +apply_stream(DB, Nodes, Stream) -> + apply_stream( + DB, + emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + Stream, + 0 + ). + apply_stream(DB, NodeStream0, Stream0, N) -> case emqx_utils_stream:next(Stream0) of [] -> ?tp(all_done, #{}); - [Msg = #message{from = From} | Stream] -> + [Msg = #message{} | Stream] -> [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), ?tp( test_push_message, @@ -701,12 +639,40 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), apply_stream(DB, NodeStream, Stream, N + 1); [add_generation | Stream] -> - [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + %% FIXME: + [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0), %% add_generation(Node, DB), - apply_stream(DB, NodeStream, Stream, N) + apply_stream(DB, NodeStream, Stream, N); + [{Node, Operation, Arg} | Stream] when + Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + -> + ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), + %% Apply the transition. + ?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])), + %% Give some time for at least one transition to complete. + Transitions = transitions(Node, ?DB), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + apply_stream(DB, NodeStream0, Stream, N); + [Fun | Stream] when is_function(Fun) -> + Fun(), + apply_stream(DB, NodeStream0, Stream, N) end. %% @doc Create an infinite list of messages from a given client: +interleaved_topic_messages(TestCase, NClients, NMsgs) -> + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} + || ClientId <- Clients + ], + %% Interleaved stream of messages: + Stream = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + {Stream, TopicStreams}. + topic_messages(TestCase, ClientId) -> topic_messages(TestCase, ClientId, 0). @@ -726,7 +692,7 @@ client_topic(TestCase, ClientId) when is_atom(TestCase) -> client_topic(TestCase, ClientId) when is_binary(TestCase) -> <>. -message_eq(Msg1, {Key, Msg2}) -> +message_eq(Msg1, {_Key, Msg2}) -> %% Timestamps can be modified by the replication layer, ignore them: Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. @@ -734,7 +700,7 @@ message_eq(Msg1, {Key, Msg2}) -> -spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok. verify_stream_effects(TestCase, Nodes0, L) -> - lists:foreach( + Checked = lists:flatmap( fun({ClientId, Stream}) -> Nodes = nodes_of_clientid(ClientId, Nodes0), ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), @@ -744,7 +710,8 @@ verify_stream_effects(TestCase, Nodes0, L) -> [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes] end, L - ). + ), + ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). -spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 0b117215ef6..0ab3cdb70cc 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -162,7 +162,7 @@ interleave_test() -> S2 = emqx_utils_stream:list([a, b, c, d]), ?assertEqual( [1, 2, a, b, 3, c, d], - emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}])) + emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true)) ). csv_test() -> From 07aa708894c7234650d11865536195164ede05cf Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 9 May 2024 03:56:56 +0200 Subject: [PATCH 14/14] test(ds): Refactor replication suite --- .../test/emqx_ds_replication_SUITE.erl | 247 ++---------------- .../test/emqx_ds_storage_SUITE.erl | 2 +- .../test/emqx_ds_test_helpers.erl | 218 ++++++++++++++++ 3 files changed, 246 insertions(+), 221 deletions(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 9a89b0519aa..04c57aa80b1 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -29,10 +29,6 @@ erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) ). --define(diff_opts, #{ - context => 20, window => 1000, compare_fun => fun message_eq/2 -}). - opts() -> opts(#{}). @@ -78,7 +74,9 @@ t_replication_transfers_snapshots('end', Config) -> t_replication_transfers_snapshots(Config) -> NMsgs = 400, NClients = 5, - {Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + {Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config), @@ -100,7 +98,7 @@ t_replication_transfers_snapshots(Config) -> ok = emqx_cth_cluster:stop_node(NodeOffline), %% Fill the storage with messages and few additional generations. - apply_stream(?DB, Nodes -- [NodeOffline], Stream), + emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream), %% Restart the node. [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), @@ -126,7 +124,7 @@ t_replication_transfers_snapshots(Config) -> ok = timer:sleep(3_000), %% Check that the DB has been restored: - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, [] ). @@ -154,7 +152,9 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, - {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), Nodes = [N1, N2 | _] = ?config(nodes, Config), ?check_trace( #{timetrap => 30_000}, @@ -205,9 +205,9 @@ t_rebalance(Config) -> ], %% 2. Start filling the storage: - apply_stream(?DB, Nodes, Stream), + emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream), timer:sleep(5000), - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams), [ ?defer_assert( ?assertEqual( @@ -233,8 +233,10 @@ t_rebalance(Config) -> %% Scale down the cluster by removing the first node. ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), - ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + ct:pal("Transitions (~p -> ~p): ~p~n", [ + Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB) + ]), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))), %% Verify that at the end each node is now responsible for each shard. ?defer_assert( @@ -245,7 +247,7 @@ t_rebalance(Config) -> ), %% Verify that the messages are once again preserved after the rebalance: - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, [] ). @@ -294,7 +296,7 @@ t_join_leave_errors(Config) -> %% Should be no-op. ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), - ?assertEqual([], transitions(N1, ?DB)), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)), %% Impossible to leave the last site. ?assertEqual( @@ -305,12 +307,12 @@ t_join_leave_errors(Config) -> %% "Move" the DB to the other node. ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertMatch([_ | _], transitions(N1, ?DB)), - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), %% Should be no-op. ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertEqual([], transitions(N1, ?DB)). + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)). t_rebalance_chaotic_converges(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -335,7 +337,9 @@ t_rebalance_chaotic_converges(Config) -> Nodes = [N1, N2, N3] = ?config(nodes, Config), NClients = 5, - {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), ?check_trace( #{}, @@ -385,10 +389,10 @@ t_rebalance_chaotic_converges(Config) -> "Initially, the DB is assigned to [S1, S2]" ), - apply_stream(?DB, Nodes, Stream), + emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream), %% Wait for the last transition to complete. - ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + ?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?defer_assert( ?assertEqual( @@ -401,7 +405,7 @@ t_rebalance_chaotic_converges(Config) -> timer:sleep(5000), %% Check that all messages are still there. - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, [] ). @@ -447,7 +451,7 @@ t_rebalance_offline_restarts(Config) -> %% Shut down N3 and then remove it from the DB. ok = emqx_cth_cluster:stop_node(N3), ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), - Transitions = transitions(N1, ?DB), + Transitions = emqx_ds_test_helpers:transitions(N1, ?DB), ct:pal("Transitions: ~p~n", [Transitions]), %% Wait until at least one transition completes. @@ -462,7 +466,7 @@ t_rebalance_offline_restarts(Config) -> ), %% Target state should still be reached eventually. - ?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), + ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). %% @@ -491,10 +495,6 @@ ds_repl_meta(Node, Fun, Args) -> ds_repl_shard(Node, Fun, Args) -> erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). -transitions(Node, DB) -> - Shards = shards(Node, DB), - [{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])]. - shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). @@ -557,196 +557,3 @@ init_per_testcase(TCName, Config0) -> end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). - -without_extra(L) -> - [I#message{extra = #{}} || I <- L]. - -%% Consume data from the DS storage on a given node as a stream: --type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). - -%% Create a stream from the topic (wildcards are NOT supported for a -%% good reason: order of messages is implementation-dependent!). -%% -%% Note: stream produces messages with keys --spec ds_topic_stream(binary(), binary(), node()) -> ds_stream(). -ds_topic_stream(ClientId, TopicBin, Node) -> - Topic = emqx_topic:words(TopicBin), - Shard = shard_of_clientid(Node, ClientId), - {ShardId, DSStreams} = - ?ON( - Node, - begin - DBShard = {?DB, Shard}, - {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} - end - ), - %% Sort streams by their rank Y, and chain them together: - emqx_utils_stream:chain([ - ds_topic_generation_stream(Node, ShardId, Topic, S) - || {_RankY, S} <- lists:sort(DSStreams) - ]). - -ds_topic_generation_stream(Node, Shard, Topic, Stream) -> - {ok, Iterator} = ?ON( - Node, - emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) - ), - do_ds_topic_generation_stream(Node, Shard, Iterator). - -do_ds_topic_generation_stream(Node, Shard, It0) -> - fun() -> - case - ?ON( - Node, - begin - Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard), - emqx_ds_storage_layer:next(Shard, It0, 1, Now) - end - ) - of - {ok, _It, []} -> - []; - {ok, end_of_stream} -> - []; - {ok, It, [KeyMsg]} -> - [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)] - end - end. - -%% Payload generation: - -apply_stream(DB, Nodes, Stream) -> - apply_stream( - DB, - emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), - Stream, - 0 - ). - -apply_stream(DB, NodeStream0, Stream0, N) -> - case emqx_utils_stream:next(Stream0) of - [] -> - ?tp(all_done, #{}); - [Msg = #message{} | Stream] -> - [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), - ?tp( - test_push_message, - maps:merge( - emqx_message:to_map(Msg), - #{n => N} - ) - ), - ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), - apply_stream(DB, NodeStream, Stream, N + 1); - [add_generation | Stream] -> - %% FIXME: - [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0), - %% add_generation(Node, DB), - apply_stream(DB, NodeStream, Stream, N); - [{Node, Operation, Arg} | Stream] when - Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites - -> - ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), - %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])), - %% Give some time for at least one transition to complete. - Transitions = transitions(Node, ?DB), - ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), - apply_stream(DB, NodeStream0, Stream, N); - [Fun | Stream] when is_function(Fun) -> - Fun(), - apply_stream(DB, NodeStream0, Stream, N) - end. - -%% @doc Create an infinite list of messages from a given client: -interleaved_topic_messages(TestCase, NClients, NMsgs) -> - %% List of fake client IDs: - Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], - TopicStreams = [ - {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} - || ClientId <- Clients - ], - %% Interleaved stream of messages: - Stream = emqx_utils_stream:interleave( - [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true - ), - {Stream, TopicStreams}. - -topic_messages(TestCase, ClientId) -> - topic_messages(TestCase, ClientId, 0). - -topic_messages(TestCase, ClientId, N) -> - fun() -> - Msg = #message{ - from = ClientId, - topic = client_topic(TestCase, ClientId), - timestamp = N * 100, - payload = integer_to_binary(N) - }, - [Msg | topic_messages(TestCase, ClientId, N + 1)] - end. - -client_topic(TestCase, ClientId) when is_atom(TestCase) -> - client_topic(atom_to_binary(TestCase, utf8), ClientId); -client_topic(TestCase, ClientId) when is_binary(TestCase) -> - <>. - -message_eq(Msg1, {_Key, Msg2}) -> - %% Timestamps can be modified by the replication layer, ignore them: - Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. - -%% Stream comparison: - --spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok. -verify_stream_effects(TestCase, Nodes0, L) -> - Checked = lists:flatmap( - fun({ClientId, Stream}) -> - Nodes = nodes_of_clientid(ClientId, Nodes0), - ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), - ?defer_assert( - ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) - ), - [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes] - end, - L - ), - ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). - --spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. -verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> - ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), - ?defer_assert( - begin - snabbkaffe_diff:assert_lists_eq( - ExpectedStream, - ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node), - ?diff_opts - ), - ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) - end - ). - -%% Find which nodes from the list contain the shards for the given -%% client ID: -nodes_of_clientid(ClientId, Nodes = [N0 | _]) -> - Shard = shard_of_clientid(N0, ClientId), - SiteNodes = ?ON( - N0, - begin - Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard), - lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) - end - ), - lists:filter( - fun(N) -> - lists:member(N, SiteNodes) - end, - Nodes - ). - -shard_of_clientid(Node, ClientId) -> - ?ON( - Node, - emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid) - ). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl index eaddab0c663..39158c7ef01 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("stdlib/include/assert.hrl"). opts() -> - #{storage => {emqx_ds_storage_bitfield_lts, #{}}}. + #{storage => {emqx_ds_storage_reference, #{}}}. %% diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 26469c68509..3a01451992e 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -19,6 +19,12 @@ -compile(nowarn_export_all). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(ON(NODE, BODY), + erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +). %% RPC mocking @@ -59,8 +65,220 @@ mock_rpc_result(gen_rpc, ExpectFun) -> end end). +%% Consume data from the DS storage on a given node as a stream: +-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). + +%% @doc Create an infinite list of messages from a given client: +interleaved_topic_messages(TestCase, NClients, NMsgs) -> + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} + || ClientId <- Clients + ], + %% Interleaved stream of messages: + Stream = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + {Stream, TopicStreams}. + +topic_messages(TestCase, ClientId) -> + topic_messages(TestCase, ClientId, 0). + +topic_messages(TestCase, ClientId, N) -> + fun() -> + Msg = #message{ + from = ClientId, + topic = client_topic(TestCase, ClientId), + timestamp = N * 100, + payload = integer_to_binary(N) + }, + [Msg | topic_messages(TestCase, ClientId, N + 1)] + end. + +client_topic(TestCase, ClientId) when is_atom(TestCase) -> + client_topic(atom_to_binary(TestCase, utf8), ClientId); +client_topic(TestCase, ClientId) when is_binary(TestCase) -> + <>. + +ds_topic_generation_stream(DB, Node, Shard, Topic, Stream) -> + {ok, Iterator} = ?ON( + Node, + emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) + ), + do_ds_topic_generation_stream(DB, Node, Shard, Iterator). + +do_ds_topic_generation_stream(DB, Node, Shard, It0) -> + fun() -> + case + ?ON( + Node, + begin + Now = emqx_ds_replication_layer:current_timestamp(DB, Shard), + emqx_ds_storage_layer:next(Shard, It0, 1, Now) + end + ) + of + {ok, _It, []} -> + []; + {ok, end_of_stream} -> + []; + {ok, It, [KeyMsg]} -> + [KeyMsg | do_ds_topic_generation_stream(DB, Node, Shard, It)] + end + end. + +%% Payload generation: + +apply_stream(DB, Nodes, Stream) -> + apply_stream( + DB, + emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + Stream, + 0 + ). + +apply_stream(DB, NodeStream0, Stream0, N) -> + case emqx_utils_stream:next(Stream0) of + [] -> + ?tp(all_done, #{}); + [Msg = #message{} | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + ?tp( + test_push_message, + maps:merge( + emqx_message:to_map(Msg), + #{n => N} + ) + ), + ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), + apply_stream(DB, NodeStream, Stream, N + 1); + [add_generation | Stream] -> + %% FIXME: + [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + %% add_generation(Node, DB), + apply_stream(DB, NodeStream, Stream, N); + [{Node, Operation, Arg} | Stream] when + Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + -> + ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), + %% Apply the transition. + ?assertEqual( + ok, + ?ON( + Node, + emqx_ds_replication_layer_meta:Operation(DB, Arg) + ) + ), + %% Give some time for at least one transition to complete. + Transitions = transitions(Node, DB), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + apply_stream(DB, NodeStream0, Stream, N); + [Fun | Stream] when is_function(Fun) -> + Fun(), + apply_stream(DB, NodeStream0, Stream, N) + end. + +transitions(Node, DB) -> + ?ON( + Node, + begin + Shards = emqx_ds_replication_layer_meta:shards(DB), + [ + {S, T} + || S <- Shards, T <- emqx_ds_replication_layer_meta:replica_set_transitions(DB, S) + ] + end + ). + +%% Stream comparison + +message_eq(Msg1, {_Key, Msg2}) -> + %% Timestamps can be modified by the replication layer, ignore them: + Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. + %% Consuming streams and iterators +-spec verify_stream_effects(atom(), binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> + ok. +verify_stream_effects(DB, TestCase, Nodes0, L) -> + Checked = lists:flatmap( + fun({ClientId, Stream}) -> + Nodes = nodes_of_clientid(DB, ClientId, Nodes0), + ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), + ?defer_assert( + ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) + ), + [verify_stream_effects(DB, TestCase, Node, ClientId, Stream) || Node <- Nodes] + end, + L + ), + ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). + +-spec verify_stream_effects(atom(), binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. +verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) -> + ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), + DiffOpts = #{context => 20, window => 1000, compare_fun => fun message_eq/2}, + ?defer_assert( + begin + snabbkaffe_diff:assert_lists_eq( + ExpectedStream, + ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node), + DiffOpts + ), + ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) + end + ). + +%% Create a stream from the topic (wildcards are NOT supported for a +%% good reason: order of messages is implementation-dependent!). +%% +%% Note: stream produces messages with keys +-spec ds_topic_stream(atom(), binary(), binary(), node()) -> ds_stream(). +ds_topic_stream(DB, ClientId, TopicBin, Node) -> + Topic = emqx_topic:words(TopicBin), + Shard = shard_of_clientid(DB, Node, ClientId), + {ShardId, DSStreams} = + ?ON( + Node, + begin + DBShard = {DB, Shard}, + {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} + end + ), + %% Sort streams by their rank Y, and chain them together: + emqx_utils_stream:chain([ + ds_topic_generation_stream(DB, Node, ShardId, Topic, S) + || {_RankY, S} <- lists:sort(DSStreams) + ]). + +%% Find which nodes from the list contain the shards for the given +%% client ID: +nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) -> + Shard = shard_of_clientid(DB, N0, ClientId), + SiteNodes = ?ON( + N0, + begin + Sites = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) + end + ), + lists:filter( + fun(N) -> + lists:member(N, SiteNodes) + end, + Nodes + ). + +shard_of_clientid(DB, Node, ClientId) -> + ?ON( + Node, + emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid) + ). + +%% Consume eagerly: + consume(DB, TopicFilter) -> consume(DB, TopicFilter, 0).