Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add test for realtime queue data buildup on sink clusters #545

Merged
merged 2 commits into from

3 participants

@kellymclaughlin
Collaborator

No description provided.

@kellymclaughlin
Collaborator

This test is associated with this PR.

@lordnull
Collaborator

The rt_cascading test is not named in the same manner as the rest of the repl tests, but it wasn't caught before it was merged in. The only other test that uses rt_ at the beginning of its name is one for riak_test. To avoid a similar fate of confusion for this test, I'd recommend renaming it to 'repl_rt_cascading_rtq'.

@kellymclaughlin
Collaborator

The rt_cascading test is not named in the same manner as the rest of the repl tests, but it wasn't caught before it was merged in. The only other test that uses rt_ at the beginning of its name is one for riak_test. To avoid a similar fate of confusion for this test, I'd recommend renaming it to 'repl_rt_cascading_rtq'.

Great point, will make that change.

@andrewjstone
  • code inspection
  • test fails without riak_repl#547
  • test passes with riak_repl#547
@kellymclaughlin kellymclaughlin merged commit bfb35d5 into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 185 additions and 0 deletions.
  1. +185 −0 tests/repl_rt_cascading_rtq.erl
View
185 tests/repl_rt_cascading_rtq.erl
@@ -0,0 +1,185 @@
+-module(repl_rt_cascading_rtq).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TEST_BUCKET, <<"rt-cascading-rtq-systest-a">>).
+
+setup() ->
+ rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
+
+ {SourceLeader, SinkLeaderA, SinkLeaderB, _, _, _} = ClusterNodes = make_clusters(),
+
+ connect_clusters(SourceLeader, SinkLeaderA, "SinkA"),
+ connect_clusters(SourceLeader, SinkLeaderB, "SinkB"),
+ ClusterNodes.
+
+confirm() ->
+ SetupData = setup(),
+ rtq_data_buildup_test(SetupData),
+ pass.
+
+%% This test case is designed to ensure that there is no realtime
+%% queue buildup on sink nodes that do not serve as source nodes for
+%% any other clusters. It constructs a simple toplogy with a single
+%% source cluster replicating to two sinks. The toplogy for this test
+%% is as follows:
+%% +--------+
+%% | Source |
+%% +--------+
+%% ^ ^
+%% / \
+%% V V
+%% +-------+ +-------+
+%% | SinkA | | SinkB |
+%% +-------+ +-------+
+rtq_data_buildup_test(ClusterNodes) ->
+ {SourceLeader, SinkLeaderA, SinkLeaderB, SourceNodes, _SinkANodes, _SinkBNodes} = ClusterNodes,
+
+ %% Enable RT replication from source cluster "SinkA"
+ lager:info("Enabling realtime between ~p and ~p", [SourceLeader, SinkLeaderB]),
+ enable_rt(SourceLeader, SourceNodes, "SinkA"),
+ %% Enable RT replication from source cluster "SinkB"
+ lager:info("Enabling realtime between ~p and ~p", [SourceLeader, SinkLeaderA]),
+ enable_rt(SourceLeader, SourceNodes, "SinkB"),
+
+ %% Get the baseline byte count for the rtq for each sink cluster
+ SinkAInitialQueueSize = rtq_bytes(SinkLeaderA),
+ SinkBInitialQueueSize = rtq_bytes(SinkLeaderB),
+
+ %% Write keys to source cluster A
+ KeyCount = 1001,
+ write_to_cluster(SourceLeader, 1, KeyCount),
+ read_from_cluster(SinkLeaderA, 1, KeyCount, 0),
+ read_from_cluster(SinkLeaderB, 1, KeyCount, 0),
+
+ %% Verify the rt queue is still at the initial size for both sink clusters
+ ?assertEqual(SinkAInitialQueueSize, rtq_bytes(SinkLeaderA)),
+ ?assertEqual(SinkBInitialQueueSize, rtq_bytes(SinkLeaderB)).
+
+rtq_bytes(Node) ->
+ RtqStatus = rpc:call(Node, riak_repl2_rtq, status, []),
+ proplists:get_value(bytes, RtqStatus).
+
+make_clusters() ->
+ NodeCount = rt_config:get(num_nodes, 6),
+ lager:info("Deploy ~p nodes", [NodeCount]),
+ Nodes = deploy_nodes(NodeCount, true),
+
+ {SourceNodes, SinkNodes} = lists:split(2, Nodes),
+ {SinkANodes, SinkBNodes} = lists:split(2, SinkNodes),
+ lager:info("SinkANodes: ~p", [SinkANodes]),
+ lager:info("SinkBNodes: ~p", [SinkBNodes]),
+
+ lager:info("Build source cluster"),
+ repl_util:make_cluster(SourceNodes),
+
+ lager:info("Build sink cluster A"),
+ repl_util:make_cluster(SinkANodes),
+
+ lager:info("Build sink cluster B"),
+ repl_util:make_cluster(SinkBNodes),
+
+ SourceFirst = hd(SourceNodes),
+ AFirst = hd(SinkANodes),
+ BFirst = hd(SinkBNodes),
+
+ %% Name the clusters
+ repl_util:name_cluster(SourceFirst, "Source"),
+ repl_util:name_cluster(AFirst, "SinkA"),
+ repl_util:name_cluster(BFirst, "SinkB"),
+
+ lager:info("Waiting for convergence."),
+ rt:wait_until_ring_converged(SourceNodes),
+ rt:wait_until_ring_converged(SinkANodes),
+ rt:wait_until_ring_converged(SinkBNodes),
+
+ lager:info("Waiting for transfers to complete."),
+ rt:wait_until_transfers_complete(SourceNodes),
+ rt:wait_until_transfers_complete(SinkANodes),
+ rt:wait_until_transfers_complete(SinkBNodes),
+
+ %% get the leader for the source cluster
+ lager:info("waiting for leader to converge on the source cluster"),
+ ?assertEqual(ok, repl_util:wait_until_leader_converge(SourceNodes)),
+
+ %% get the leader for the first sink cluster
+ lager:info("waiting for leader to converge on sink cluster A"),
+ ?assertEqual(ok, repl_util:wait_until_leader_converge(SinkANodes)),
+
+ %% get the leader for the second cluster
+ lager:info("waiting for leader to converge on cluster B"),
+ ?assertEqual(ok, repl_util:wait_until_leader_converge(SinkBNodes)),
+
+ SourceLeader = repl_util:get_leader(SourceFirst),
+ ALeader = repl_util:get_leader(AFirst),
+ BLeader = repl_util:get_leader(BFirst),
+
+ %% Uncomment the following 2 lines to verify that pre-2.0 versions
+ %% of Riak behave as expected if cascading writes are disabled for
+ %% the sink clusters.
+ %% disable_cascading(ALeader, SinkANodes),
+ %% disable_cascading(BLeader, SinkBNodes),
+
+ lager:info("Source Leader: ~p SinkALeader: ~p SinkBLeader: ~p", [SourceLeader, ALeader, BLeader]),
+ {SourceLeader, ALeader, BLeader, SourceNodes, SinkANodes, SinkBNodes}.
+
+%% @doc Connect two clusters using a given name.
+connect_cluster(Source, Port, Name) ->
+ lager:info("Connecting ~p to ~p for cluster ~p.",
+ [Source, Port, Name]),
+ repl_util:connect_cluster(Source, "127.0.0.1", Port),
+ ?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
+
+%% @doc Connect two clusters for replication using their respective leader nodes.
+connect_clusters(SourceLeader, SinkLeader, SinkName) ->
+ SinkPort = repl_util:get_port(SinkLeader),
+ lager:info("connect source cluster to ~p on port ~p", [SinkName, SinkPort]),
+ repl_util:connect_cluster(SourceLeader, "127.0.0.1", SinkPort),
+ ?assertEqual(ok, repl_util:wait_for_connection(SourceLeader, SinkName)).
+
+cluster_conf(_CascadingWrites) ->
+ [
+ {riak_repl,
+ [
+ %% turn off fullsync
+ {fullsync_on_connect, false},
+ {fullsync_interval, disabled},
+ {max_fssource_cluster, 20},
+ {max_fssource_node, 20},
+ {max_fssink_node, 20},
+ {rtq_max_bytes, 1048576}
+ ]}
+ ].
+
+deploy_nodes(NumNodes, true) ->
+ rt:deploy_nodes(NumNodes, cluster_conf(always));
+deploy_nodes(NumNodes, false) ->
+ rt:deploy_nodes(NumNodes, cluster_conf(never)).
+
+%% @doc Turn on Realtime replication on the cluster lead by LeaderA.
+%% The clusters must already have been named and connected.
+enable_rt(SourceLeader, SourceNodes, SinkName) ->
+ repl_util:enable_realtime(SourceLeader, SinkName),
+ rt:wait_until_ring_converged(SourceNodes),
+
+ repl_util:start_realtime(SourceLeader, SinkName),
+ rt:wait_until_ring_converged(SourceNodes).
+
+%% @doc Turn off Realtime replication on the cluster lead by LeaderA.
+disable_cascading(Leader, Nodes) ->
+ rpc:call(Leader, riak_repl_console, realtime_cascades, [["never"]]),
+ rt:wait_until_ring_converged(Nodes).
+
+%% @doc Write a series of keys and ensure they are all written.
+write_to_cluster(Node, Start, End) ->
+ lager:info("Writing ~p keys to node ~p.", [End - Start, Node]),
+ ?assertEqual([],
+ repl_util:do_write(Node, Start, End, ?TEST_BUCKET, 1)).
+
+%% @doc Read from cluster a series of keys, asserting a certain number
+%% of errors.
+read_from_cluster(Node, Start, End, Errors) ->
+ lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
+ Res2 = rt:systest_read(Node, Start, End, ?TEST_BUCKET, 1),
+ ?assertEqual(Errors, length(Res2)).
Something went wrong with that request. Please try again.