From 63e51fca665b1cf83d72fb3a9453e29b2a57dbb1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 9 May 2024 01:20:15 +0200 Subject: [PATCH] test(ds): Use streams to fill the storage --- .../test/emqx_ds_replication_SUITE.erl | 399 ++++++++---------- .../test/emqx_utils_stream_tests.erl | 2 +- 2 files changed, 184 insertions(+), 217 deletions(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index f3a61d377c..9a89b0519a 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -30,7 +30,7 @@ ). -define(diff_opts, #{ - context => 20, window => 1000, max_failures => 1000, compare_fun => fun message_eq/2 + context => 20, window => 1000, compare_fun => fun message_eq/2 }). opts() -> @@ -76,64 +76,59 @@ t_replication_transfers_snapshots('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_replication_transfers_snapshots(Config) -> - NMsgs = 4000, + NMsgs = 400, + NClients = 5, + {Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config), + ?check_trace( + begin + %% Initialize DB on all nodes and wait for it to be online. + Opts = opts(#{n_shards => 1, n_sites => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + ?retry( + 500, + 10, + ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes]) + ), - %% Initialize DB on all nodes and wait for it to be online. - Opts = opts(#{n_shards => 1, n_sites => 3}), - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) - ), - ?retry( - 500, - 10, - ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes]) - ), - - %% Stop the DB on the "offline" node. - ok = emqx_cth_cluster:stop_node(NodeOffline), + %% Stop the DB on the "offline" node. + ok = emqx_cth_cluster:stop_node(NodeOffline), - %% Fill the storage with messages and few additional generations. - Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}), + %% Fill the storage with messages and few additional generations. + apply_stream(?DB, Nodes -- [NodeOffline], Stream), - %% Restart the node. - [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), - {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{ - ?snk_kind := dsrepl_snapshot_accepted, - ?snk_meta := #{node := NodeOffline} - }) - ), - ?assertEqual( - ok, - erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) - ), + %% Restart the node. + [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := dsrepl_snapshot_accepted, + ?snk_meta := #{node := NodeOffline} + }) + ), + ?assertEqual( + ok, + erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) + ), - %% Trigger storage operation and wait the replica to be restored. - _ = add_generation(Node, ?DB), - ?assertMatch( - {ok, _}, - snabbkaffe:receive_events(SRef) - ), + %% Trigger storage operation and wait the replica to be restored. + _ = add_generation(Node, ?DB), + ?assertMatch( + {ok, _}, + snabbkaffe:receive_events(SRef) + ), - %% Wait until any pending replication activities are finished (e.g. Raft log entries). - ok = timer:sleep(3_000), + %% Wait until any pending replication activities are finished (e.g. Raft log entries). + ok = timer:sleep(3_000), - %% Check that the DB has been restored. - Shard = hd(shards(NodeOffline, ?DB)), - MessagesOffline = lists:keysort( - #message.timestamp, - consume_shard(NodeOffline, ?DB, Shard, ['#'], 0) - ), - ?assertEqual( - sample(40, Messages), - sample(40, MessagesOffline) - ), - ?assertEqual( - Messages, - MessagesOffline + %% Check that the DB has been restored: + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] ). t_rebalance(init, Config) -> @@ -159,50 +154,13 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, - %% List of fake client IDs: - Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], - %% List of streams that generate messages for each "client" in its own topic: - TopicStreams = [ - {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(?FUNCTION_NAME, ClientId))} - || ClientId <- Clients - ], - %% Interleaved list of events: - Stream0 = emqx_utils_stream:interleave( - [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true - ), - Stream = emqx_utils_stream:interleave( - [ - {50, Stream0}, - emqx_utils_stream:const(add_generation) - ], - false - ), - Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), + {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), + Nodes = [N1, N2 | _] = ?config(nodes, Config), ?check_trace( #{timetrap => 30_000}, begin - %% 0. Inject schedulings to make sure the messages are - %% written to the storage before, during, and after - %% rebalance: - ?force_ordering( - #{?snk_kind := test_push_message, n := 10}, - #{?snk_kind := test_start_rebalance} - ), - ?force_ordering( - #{?snk_kind := test_start_rebalance1}, - #{?snk_kind := test_push_message, n := 20} - ), - ?force_ordering( - #{?snk_kind := test_push_message, n := 30}, - #{?snk_kind := test_start_rebalance2} - ), - ?force_ordering( - #{?snk_kind := test_end_rebalance}, - #{?snk_kind := test_push_message, n := 40} - ), %% 1. Initialize DB on the first node. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))), ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)), @@ -215,6 +173,22 @@ t_rebalance(Config) -> Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], ct:pal("Sites: ~p~n", [Sites]), + Sequence = [ + %% Join the second site to the DB replication sites: + {N1, join_db_site, S2}, + %% Should be a no-op: + {N2, join_db_site, S2}, + %% Now join the rest of the sites: + {N2, assign_db_sites, Sites} + ], + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:list(Sequence) + ], + true + ), + %% 1.2 Verify that all nodes have the same view of metadata storage: [ ?defer_assert( @@ -231,31 +205,9 @@ t_rebalance(Config) -> ], %% 2. Start filling the storage: - spawn_link( - fun() -> - NodeStream = emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), - apply_stream(?DB, NodeStream, Stream, 0) - end - ), - - %% 3. Start rebalance in the meanwhile: - ?tp(test_start_rebalance1, #{}), - %% 3.1 Join the second site to the DB replication sites. - ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), - %% Should be no-op. - ?assertEqual(ok, ?ON(N2, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), - - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), - - ?tp(test_start_rebalance2, #{}), - %% Now join the rest of the sites. - ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - ?tp(test_end_rebalance, #{}), + apply_stream(?DB, Nodes, Stream), + timer:sleep(5000), + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), [ ?defer_assert( ?assertEqual( @@ -279,17 +231,12 @@ t_rebalance(Config) -> ShardServers ), - %% Verify that the messages are preserved after the rebalance: - ?block_until(#{?snk_kind := all_done}), - timer:sleep(5000), - verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), - %% Scale down the cluster by removing the first node. ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - %% Verify that each node is now responsible for each shard. + %% Verify that at the end each node is now responsible for each shard. ?defer_assert( ?assertEqual( [0, 16, 16, 16], @@ -387,81 +334,77 @@ t_rebalance_chaotic_converges(Config) -> NMsgs = 500, Nodes = [N1, N2, N3] = ?config(nodes, Config), - %% Initialize DB on first two nodes. - Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), - ?assertEqual( - [{ok, ok}, {ok, ok}], - erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) - ), + NClients = 5, + {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), - %% Open DB on the last node. - ?assertEqual( - ok, - erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) - ), + ?check_trace( + #{}, + begin + %% Initialize DB on first two nodes. + Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), - %% Find out which sites there are. - Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), - %% Initially, the DB is assigned to [S1, S2]. - ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), - ?assertEqual( - lists:sort([S1, S2]), - ds_repl_meta(N1, db_sites, [?DB]) - ), + %% Open DB on the last node. + ?assertEqual( + ok, + erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + ), - %% Fill the storage with messages and few additional generations. - Messages0 = lists:append([ - fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), - fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}), - fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>}) - ]), - - %% Construct a chaotic transition sequence that changes assignment to [S2, S3]. - Sequence = [ - {N1, join_db_site, S3}, - {N2, leave_db_site, S2}, - {N3, leave_db_site, S1}, - {N1, join_db_site, S2}, - {N2, join_db_site, S1}, - {N3, leave_db_site, S3}, - {N1, leave_db_site, S1}, - {N2, join_db_site, S3} - ], + %% Find out which sites there are. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), - %% Apply the sequence while also filling the storage with messages. - TransitionMessages = lists:map( - fun({N, Operation, Site}) -> - %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), - %% Give some time for at least one transition to complete. - Transitions = transitions(N, ?DB), - ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), - %% Fill the storage with messages. - CID = integer_to_binary(erlang:system_time()), - fill_storage(N, ?DB, NMsgs, #{client_id => CID}) - end, - Sequence - ), + Sequence = [ + {N1, join_db_site, S3}, + {N2, leave_db_site, S2}, + {N3, leave_db_site, S1}, + {N1, join_db_site, S2}, + {N2, join_db_site, S1}, + {N3, leave_db_site, S3}, + {N1, leave_db_site, S1}, + {N2, join_db_site, S3} + ], - %% Wait for the last transition to complete. - ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + %% Interleaved list of events: + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:list(Sequence) + ], + true + ), - ?assertEqual( - lists:sort([S2, S3]), - ds_repl_meta(N1, db_sites, [?DB]) - ), + ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), + ?assertEqual( + lists:sort([S1, S2]), + ds_repl_meta(N1, db_sites, [?DB]), + "Initially, the DB is assigned to [S1, S2]" + ), - %% Wait until the LTS timestamp is updated - timer:sleep(5000), + apply_stream(?DB, Nodes, Stream), - %% Check that all messages are still there. - Messages = lists:append(TransitionMessages) ++ Messages0, - MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), - ?assertEqual(Messages, MessagesDB). + %% Wait for the last transition to complete. + ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + + ?defer_assert( + ?assertEqual( + lists:sort([S2, S3]), + ds_repl_meta(N1, db_sites, [?DB]) + ) + ), + + %% Wait until the LTS timestamp is updated: + timer:sleep(5000), + + %% Check that all messages are still there. + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] + ). t_rebalance_offline_restarts(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -535,7 +478,15 @@ ds_repl_meta(Node, Fun) -> ds_repl_meta(Node, Fun, []). ds_repl_meta(Node, Fun, Args) -> - erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). + try + erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args) + catch + EC:Err:Stack -> + ct:pal("emqx_ds_replication_layer_meta:~p(~p) @~p failed:~n~p:~p~nStack: ~p", [ + Fun, Args, Node, EC, Err, Stack + ]), + error(meta_op_failed) + end. ds_repl_shard(Node, Fun, Args) -> erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). @@ -553,27 +504,6 @@ shards_online(Node, DB) -> n_shards_online(Node, DB) -> length(shards_online(Node, DB)). -fill_storage(Node, DB, NMsgs, Opts) -> - fill_storage(Node, DB, NMsgs, 0, Opts). - -fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> - PAddGen = maps:get(p_addgen, Opts, 0.001), - R1 = push_message(Node, DB, I, Opts), - %probably(PAddGen, fun() -> add_generation(Node, DB) end), - R2 = [], - R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); -fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> - []. - -push_message(Node, DB, I, Opts) -> - Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), - %% {Bytes, _} = rand:bytes_s(5, rand:seed_s(default, I)), - Bytes = integer_to_binary(I), - ClientId = maps:get(client_id, Opts, <>), - Message = message(ClientId, Topic, Bytes, I * 100), - ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), - [Message]. - add_generation(Node, DB) -> ok = erpc:call(Node, emqx_ds, add_generation, [DB]), []. @@ -674,7 +604,7 @@ do_ds_topic_generation_stream(Node, Shard, It0) -> end ) of - {ok, It, []} -> + {ok, _It, []} -> []; {ok, end_of_stream} -> []; @@ -685,11 +615,19 @@ do_ds_topic_generation_stream(Node, Shard, It0) -> %% Payload generation: +apply_stream(DB, Nodes, Stream) -> + apply_stream( + DB, + emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + Stream, + 0 + ). + apply_stream(DB, NodeStream0, Stream0, N) -> case emqx_utils_stream:next(Stream0) of [] -> ?tp(all_done, #{}); - [Msg = #message{from = From} | Stream] -> + [Msg = #message{} | Stream] -> [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), ?tp( test_push_message, @@ -701,12 +639,40 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), apply_stream(DB, NodeStream, Stream, N + 1); [add_generation | Stream] -> - [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + %% FIXME: + [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0), %% add_generation(Node, DB), - apply_stream(DB, NodeStream, Stream, N) + apply_stream(DB, NodeStream, Stream, N); + [{Node, Operation, Arg} | Stream] when + Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + -> + ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), + %% Apply the transition. + ?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])), + %% Give some time for at least one transition to complete. + Transitions = transitions(Node, ?DB), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + apply_stream(DB, NodeStream0, Stream, N); + [Fun | Stream] when is_function(Fun) -> + Fun(), + apply_stream(DB, NodeStream0, Stream, N) end. %% @doc Create an infinite list of messages from a given client: +interleaved_topic_messages(TestCase, NClients, NMsgs) -> + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} + || ClientId <- Clients + ], + %% Interleaved stream of messages: + Stream = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + {Stream, TopicStreams}. + topic_messages(TestCase, ClientId) -> topic_messages(TestCase, ClientId, 0). @@ -726,7 +692,7 @@ client_topic(TestCase, ClientId) when is_atom(TestCase) -> client_topic(TestCase, ClientId) when is_binary(TestCase) -> <>. -message_eq(Msg1, {Key, Msg2}) -> +message_eq(Msg1, {_Key, Msg2}) -> %% Timestamps can be modified by the replication layer, ignore them: Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. @@ -734,7 +700,7 @@ message_eq(Msg1, {Key, Msg2}) -> -spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok. verify_stream_effects(TestCase, Nodes0, L) -> - lists:foreach( + Checked = lists:flatmap( fun({ClientId, Stream}) -> Nodes = nodes_of_clientid(ClientId, Nodes0), ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), @@ -744,7 +710,8 @@ verify_stream_effects(TestCase, Nodes0, L) -> [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes] end, L - ). + ), + ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). -spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 0b117215ef..0ab3cdb70c 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -162,7 +162,7 @@ interleave_test() -> S2 = emqx_utils_stream:list([a, b, c, d]), ?assertEqual( [1, 2, a, b, 3, c, d], - emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}])) + emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true)) ). csv_test() ->