Skip to content

Commit

Permalink
test(ds): Refactor replication suite
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed May 9, 2024
1 parent 63e51fc commit 07aa708
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 221 deletions.
247 changes: 27 additions & 220 deletions apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
).

-define(diff_opts, #{
context => 20, window => 1000, compare_fun => fun message_eq/2
}).

opts() ->
opts(#{}).

Expand Down Expand Up @@ -78,7 +74,9 @@ t_replication_transfers_snapshots('end', Config) ->
t_replication_transfers_snapshots(Config) ->
NMsgs = 400,
NClients = 5,
{Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
{Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?FUNCTION_NAME, NClients, NMsgs
),

Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
Expand All @@ -100,7 +98,7 @@ t_replication_transfers_snapshots(Config) ->
ok = emqx_cth_cluster:stop_node(NodeOffline),

%% Fill the storage with messages and few additional generations.
apply_stream(?DB, Nodes -- [NodeOffline], Stream),
emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),

%% Restart the node.
[NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
Expand All @@ -126,7 +124,7 @@ t_replication_transfers_snapshots(Config) ->
ok = timer:sleep(3_000),

%% Check that the DB has been restored:
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end,
[]
).
Expand Down Expand Up @@ -154,7 +152,9 @@ t_rebalance('end', Config) ->
t_rebalance(Config) ->
NMsgs = 50,
NClients = 5,
{Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
{Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?FUNCTION_NAME, NClients, NMsgs
),
Nodes = [N1, N2 | _] = ?config(nodes, Config),
?check_trace(
#{timetrap => 30_000},
Expand Down Expand Up @@ -205,9 +205,9 @@ t_rebalance(Config) ->
],

%% 2. Start filling the storage:
apply_stream(?DB, Nodes, Stream),
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
timer:sleep(5000),
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams),
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams),
[
?defer_assert(
?assertEqual(
Expand All @@ -233,8 +233,10 @@ t_rebalance(Config) ->

%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
ct:pal("Transitions (~p -> ~p): ~p~n", [
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
]),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))),

%% Verify that at the end each node is now responsible for each shard.
?defer_assert(
Expand All @@ -245,7 +247,7 @@ t_rebalance(Config) ->
),

%% Verify that the messages are once again preserved after the rebalance:
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end,
[]
).
Expand Down Expand Up @@ -294,7 +296,7 @@ t_join_leave_errors(Config) ->

%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),

%% Impossible to leave the last site.
?assertEqual(
Expand All @@ -305,12 +307,12 @@ t_join_leave_errors(Config) ->
%% "Move" the DB to the other node.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch([_ | _], transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),

%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)).
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).

t_rebalance_chaotic_converges(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Expand All @@ -335,7 +337,9 @@ t_rebalance_chaotic_converges(Config) ->
Nodes = [N1, N2, N3] = ?config(nodes, Config),

NClients = 5,
{Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
{Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?FUNCTION_NAME, NClients, NMsgs
),

?check_trace(
#{},
Expand Down Expand Up @@ -385,10 +389,10 @@ t_rebalance_chaotic_converges(Config) ->
"Initially, the DB is assigned to [S1, S2]"
),

apply_stream(?DB, Nodes, Stream),
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),

%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),

?defer_assert(
?assertEqual(
Expand All @@ -401,7 +405,7 @@ t_rebalance_chaotic_converges(Config) ->
timer:sleep(5000),

%% Check that all messages are still there.
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end,
[]
).
Expand Down Expand Up @@ -447,7 +451,7 @@ t_rebalance_offline_restarts(Config) ->
%% Shut down N3 and then remove it from the DB.
ok = emqx_cth_cluster:stop_node(N3),
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
Transitions = transitions(N1, ?DB),
Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
ct:pal("Transitions: ~p~n", [Transitions]),

%% Wait until at least one transition completes.
Expand All @@ -462,7 +466,7 @@ t_rebalance_offline_restarts(Config) ->
),

%% Target state should still be reached eventually.
?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))),
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).

%%
Expand Down Expand Up @@ -491,10 +495,6 @@ ds_repl_meta(Node, Fun, Args) ->
ds_repl_shard(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).

transitions(Node, DB) ->
Shards = shards(Node, DB),
[{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])].

shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).

Expand Down Expand Up @@ -557,196 +557,3 @@ init_per_testcase(TCName, Config0) ->
end_per_testcase(TCName, Config) ->
ok = snabbkaffe:stop(),
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).

without_extra(L) ->
[I#message{extra = #{}} || I <- L].

%% Consume data from the DS storage on a given node as a stream:
-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}).

%% Create a stream from the topic (wildcards are NOT supported for a
%% good reason: order of messages is implementation-dependent!).
%%
%% Note: stream produces messages with keys
-spec ds_topic_stream(binary(), binary(), node()) -> ds_stream().
ds_topic_stream(ClientId, TopicBin, Node) ->
Topic = emqx_topic:words(TopicBin),
Shard = shard_of_clientid(Node, ClientId),
{ShardId, DSStreams} =
?ON(
Node,
begin
DBShard = {?DB, Shard},
{DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
end
),
%% Sort streams by their rank Y, and chain them together:
emqx_utils_stream:chain([
ds_topic_generation_stream(Node, ShardId, Topic, S)
|| {_RankY, S} <- lists:sort(DSStreams)
]).

ds_topic_generation_stream(Node, Shard, Topic, Stream) ->
{ok, Iterator} = ?ON(
Node,
emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0)
),
do_ds_topic_generation_stream(Node, Shard, Iterator).

do_ds_topic_generation_stream(Node, Shard, It0) ->
fun() ->
case
?ON(
Node,
begin
Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard),
emqx_ds_storage_layer:next(Shard, It0, 1, Now)
end
)
of
{ok, _It, []} ->
[];
{ok, end_of_stream} ->
[];
{ok, It, [KeyMsg]} ->
[KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)]
end
end.

%% Payload generation:

apply_stream(DB, Nodes, Stream) ->
apply_stream(
DB,
emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
Stream,
0
).

apply_stream(DB, NodeStream0, Stream0, N) ->
case emqx_utils_stream:next(Stream0) of
[] ->
?tp(all_done, #{});
[Msg = #message{} | Stream] ->
[Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
?tp(
test_push_message,
maps:merge(
emqx_message:to_map(Msg),
#{n => N}
)
),
?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
apply_stream(DB, NodeStream, Stream, N + 1);
[add_generation | Stream] ->
%% FIXME:
[_Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
%% add_generation(Node, DB),
apply_stream(DB, NodeStream, Stream, N);
[{Node, Operation, Arg} | Stream] when
Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
->
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
%% Apply the transition.
?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])),
%% Give some time for at least one transition to complete.
Transitions = transitions(Node, ?DB),
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))),
apply_stream(DB, NodeStream0, Stream, N);
[Fun | Stream] when is_function(Fun) ->
Fun(),
apply_stream(DB, NodeStream0, Stream, N)
end.

%% @doc Create an infinite list of messages from a given client:
interleaved_topic_messages(TestCase, NClients, NMsgs) ->
%% List of fake client IDs:
Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
TopicStreams = [
{ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))}
|| ClientId <- Clients
],
%% Interleaved stream of messages:
Stream = emqx_utils_stream:interleave(
[{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
),
{Stream, TopicStreams}.

topic_messages(TestCase, ClientId) ->
topic_messages(TestCase, ClientId, 0).

topic_messages(TestCase, ClientId, N) ->
fun() ->
Msg = #message{
from = ClientId,
topic = client_topic(TestCase, ClientId),
timestamp = N * 100,
payload = integer_to_binary(N)
},
[Msg | topic_messages(TestCase, ClientId, N + 1)]
end.

client_topic(TestCase, ClientId) when is_atom(TestCase) ->
client_topic(atom_to_binary(TestCase, utf8), ClientId);
client_topic(TestCase, ClientId) when is_binary(TestCase) ->
<<TestCase/binary, "/", ClientId/binary>>.

message_eq(Msg1, {_Key, Msg2}) ->
%% Timestamps can be modified by the replication layer, ignore them:
Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.

%% Stream comparison:

-spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok.
verify_stream_effects(TestCase, Nodes0, L) ->
Checked = lists:flatmap(
fun({ClientId, Stream}) ->
Nodes = nodes_of_clientid(ClientId, Nodes0),
ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]),
?defer_assert(
?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId])
),
[verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes]
end,
L
),
?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")).

-spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) ->
ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
?defer_assert(
begin
snabbkaffe_diff:assert_lists_eq(
ExpectedStream,
ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node),
?diff_opts
),
ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
end
).

%% Find which nodes from the list contain the shards for the given
%% client ID:
nodes_of_clientid(ClientId, Nodes = [N0 | _]) ->
Shard = shard_of_clientid(N0, ClientId),
SiteNodes = ?ON(
N0,
begin
Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard),
lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites)
end
),
lists:filter(
fun(N) ->
lists:member(N, SiteNodes)
end,
Nodes
).

shard_of_clientid(Node, ClientId) ->
?ON(
Node,
emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid)
).
2 changes: 1 addition & 1 deletion apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-include_lib("stdlib/include/assert.hrl").

opts() ->
#{storage => {emqx_ds_storage_bitfield_lts, #{}}}.
#{storage => {emqx_ds_storage_reference, #{}}}.

%%

Expand Down
Loading

0 comments on commit 07aa708

Please sign in to comment.