Skip to content

Commit

Permalink
Handle timeouts / downs when computing state (#40)
Browse files Browse the repository at this point in the history
* [RTI-2893] Handle timeouts / downs when computing state

* [RTI-2893] Refactor
  • Loading branch information
elbrujohalcon committed Sep 18, 2018
1 parent d83936c commit 43a2aed
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 36 deletions.
61 changes: 31 additions & 30 deletions src/mero.erl
Original file line number Diff line number Diff line change
Expand Up @@ -334,41 +334,42 @@ shard_crc32(Key, ShardSize) ->

%% @doc: Returns the state of the sockets of a Cluster
state(ClusterName) ->
{Links, Monitors, Free, Connected, Connecting, Failed, MessageQueue} =
lists:foldr(
fun
({Cluster, _, _, Pool, _},
{ALinks, AMonitors, AFree, AConnected, AConnecting, AFailed, AMessageQueue})
when (Cluster == ClusterName) ->
begin
St = mero_pool:state(Pool),
{
ALinks + proplists:get_value(links, St),
AMonitors + proplists:get_value(monitors, St),
AFree + proplists:get_value(free, St),
AConnected + proplists:get_value(num_connected, St),
AConnecting + proplists:get_value(num_connecting, St),
AFailed + proplists:get_value(num_failed_connecting, St),
AMessageQueue + proplists:get_value(message_queue_len, St)}

end;
(_, Acc) ->
Acc
end, {0, 0, 0, 0, 0, 0, 0}, mero_cluster:child_definitions()),
[
{links, Links},
{monitors, Monitors},
{free, Free},
{connected, Connected},
{connecting, Connecting},
{failed, Failed},
{message_queue_len, MessageQueue}].

ZeroState = [
{links, 0},
{monitors, 0},
{free, 0},
{connected, 0},
{connecting, 0},
{failed, 0},
{message_queue_len, 0}
],
lists:foldr(
fun
({Cluster, _, _, Pool, _}, Acc) when (Cluster == ClusterName) ->
inc_state(mero_pool:state(Pool), Acc);
(_, Acc) ->
Acc
end, ZeroState, mero_cluster:child_definitions()).

%% @doc: Returns the state of the sockets for all clusters
state() ->
[{Cluster, state(Cluster)} || Cluster <- mero_cluster:clusters()].

inc_state({error, _}, Acc) ->
Acc;
inc_state(St, Acc) ->
lists:map(
fun ({connected, AccV}) ->
{connected, AccV + proplists:get_value(num_connected, St)};
({connecting, AccV}) ->
{connecting, AccV + proplists:get_value(num_connecting, St)};
({failed, AccV}) ->
{failed, AccV + proplists:get_value(num_failed_connecting, St)};
({K, AccV}) ->
{K, AccV + proplists:get_value(K, St)}
end, Acc).


deep_state(ClusterName) ->
F = fun
({Cluster, _, _, Pool, _}, Acc) when (Cluster == ClusterName) ->
Expand Down
97 changes: 91 additions & 6 deletions test/mero_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
multiget_undefineds/1,
set/1,
undefined_counter/1,
cas/1,
mincrease_counter/1,
cas/1,
madd/1,
mset/1,
mcas/1
mcas/1,
state_ok/1,
state_error/1,
state_timeout/1
]).

-define(HOST, "127.0.0.1").
Expand Down Expand Up @@ -88,7 +90,10 @@ groups() ->
multiget_undefineds,
set,
undefined_counter,
cas
cas,
state_ok,
state_error,
state_timeout
]
},
{binary_protocol, [shuffle, {repeat_until_any_fail, 1}],
Expand All @@ -109,7 +114,10 @@ groups() ->
cas,
madd,
mset,
mcas
mcas,
state_ok,
state_error,
state_timeout
]
}
].
Expand Down Expand Up @@ -146,15 +154,21 @@ init_per_group(binary_protocol, Config) ->
end_per_group(_GroupName, _Config) ->
ok.

init_per_testcase(_Module, Conf) ->
init_per_testcase(TestCase, Conf) when TestCase == state_error; TestCase == state_timeout ->
meck:new(mero_pool, [passthrough]),
init_per_testcase(default, Conf);
init_per_testcase(_TestCase, Conf) ->
application:load(mero),
ClusterConfig = ?config(cluster_config, Conf),
Pids = mero_test_util:start_server(ClusterConfig, 1, 1, 30000, 90000),
mero_conf:timeout_write(1000),
mero_conf:timeout_read(1000),
[{pids, Pids} | Conf].

end_per_testcase(_Module, Conf) ->
end_per_testcase(TestCase, Conf) when TestCase == state_error; TestCase == state_timeout ->
meck:unload(mero_pool),
end_per_testcase(default, Conf);
end_per_testcase(_TestCase, Conf) ->
{ok, Pids} = proplists:get_value(pids, Conf),
mero_test_util:stop_servers(Pids),
ok = application:stop(mero),
Expand Down Expand Up @@ -463,6 +477,77 @@ mcas(_) ->
),
?assertEqual(Expected, mero:mcas(cluster, NUpdates, 5000)).

state_ok(_) ->
State = mero:state(),
?assertEqual(
[
{connected, 1},
{connecting, 0},
{failed, 0},
{free, 1},
{links, 3},
{message_queue_len, 0},
{monitors, 0}
], lists:sort(proplists:get_value(cluster2, State))),
?assertEqual(
[
{connected, 2},
{connecting, 0},
{failed, 0},
{free, 2},
{links, 6},
{message_queue_len, 0},
{monitors, 0}
], lists:sort(proplists:get_value(cluster, State))).

state_error(_) ->
meck:expect(mero_pool, state, 1, {error, down}),
State = mero:state(),
?assertEqual(
[
{connected, 0},
{connecting, 0},
{failed, 0},
{free, 0},
{links, 0},
{message_queue_len, 0},
{monitors, 0}
], lists:sort(proplists:get_value(cluster2, State))),
?assertEqual(
[
{connected, 0},
{connecting, 0},
{failed, 0},
{free, 0},
{links, 0},
{message_queue_len, 0},
{monitors, 0}
], lists:sort(proplists:get_value(cluster, State))).

state_timeout(_) ->
meck:expect(mero_pool, state, 1, {error, timeout}),
State = mero:state(),
?assertEqual(
[
{connected, 0},
{connecting, 0},
{failed, 0},
{free, 0},
{links, 0},
{message_queue_len, 0},
{monitors, 0}
], lists:sort(proplists:get_value(cluster2, State))),
?assertEqual(
[
{connected, 0},
{connecting, 0},
{failed, 0},
{free, 0},
{links, 0},
{message_queue_len, 0},
{monitors, 0}
], lists:sort(proplists:get_value(cluster, State))).


%%%=============================================================================
%%% Internal functions
Expand Down

0 comments on commit 43a2aed

Please sign in to comment.