Skip to content
Permalink
Browse files
Merge pull request #8 from cloudant/update_handle_config_terminate
Reconfigure IOQ on config update
  • Loading branch information
iilyak committed Feb 19, 2019
2 parents aa88b1e + b5b801a commit 89fe01fdada8ffdd7afc8aed7521f8c1c99d23f8
Showing 2 changed files with 150 additions and 0 deletions.
@@ -12,11 +12,16 @@

-module(ioq_sup).
-behaviour(supervisor).
-vsn(1).
-behaviour(config_listener).
-export([start_link/0, init/1]).
-export([get_ioq2_servers/0]).
-export([handle_config_change/5, handle_config_terminate/3]).
-export([processes/1]).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(CHILD_WITH_ARGS(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@@ -27,6 +32,7 @@ init([]) ->
{ok, {
{one_for_one, 5, 10},
[
?CHILD_WITH_ARGS(config_listener_mon, worker, [?MODULE, nil]),
?CHILD(ioq_server, worker),
?CHILD(ioq_osq, worker)
| IOQ2Children
@@ -47,3 +53,42 @@ get_ioq2_servers() ->
lists:map(fun(I) ->
list_to_atom("ioq_server_" ++ integer_to_list(I))
end, lists:seq(1, erlang:system_info(schedulers))).

handle_config_change("ioq", _Key, _Val, _Persist, St) ->
gen_server:cast(ioq_server, update_config),
{ok, St};
handle_config_change("ioq2" ++ _, _Key, _Val, _Persist, St) ->
lists:foreach(fun({_Id, Pid}) ->
gen_server:call(Pid, update_config)
end, processes(ioq2)),
{ok, St};
handle_config_change(_Sec, _Key, _Val, _Persist, St) ->
{ok, St}.

handle_config_terminate(_Server, _Reason, _State) ->
gen_server:cast(ioq_server, update_config),
spawn(fun() ->
lists:foreach(fun({_Id, Pid}) ->
gen_server:call(Pid, update_config)
end, processes(ioq2))
end),
ok.

processes(ioq2) ->
filter_children("^ioq_server_.*$");
processes(ioq) ->
filter_children("^ioq_server$");
processes(config_listener_mon) ->
filter_children("^config_listener_mon$");
processes(Arg) ->
{error, [
{expected_one_of, [ioq, ioq2, config_listener_mon]},
{got, Arg}]}.

filter_children(RegExp) ->
lists:filtermap(fun({Id, P, _, _}) ->
case re:run(atom_to_list(Id), RegExp) of
{match, _} -> {true, {Id, P}};
_ -> false
end
end, supervisor:which_children(?MODULE)).
@@ -30,6 +30,111 @@
}
]).

config_update_test_() ->
{
"Test config updates",
{
foreach,
fun() -> test_util:start_applications([config, ioq]) end,
fun test_util:stop_applications/1,
[
fun t_restart_config_listener/1,
fun t_update_ioq_config/1,
fun t_update_ioq2_config/1,
fun t_update_ioq_config_on_listener_restart/1,
fun t_update_ioq2_config_on_listener_restart/1
]
}
}.

t_restart_config_listener(_) ->
?_test(begin
[{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
?assert(is_process_alive(ConfigMonitor)),
test_util:stop_sync(ConfigMonitor),
?assertNot(is_process_alive(ConfigMonitor)),
NewConfigMonitor = test_util:wait(fun() ->
case ioq_sup:processes(config_listener_mon) of
[] -> wait;
[{_, Pid}] -> Pid
end
end),
?assert(is_process_alive(NewConfigMonitor))
end).

t_update_ioq_config(_) ->
?_test(begin
[{_, IoqServer}] = ioq_sup:processes(ioq),
gen_server:call(IoqServer, {set_concurrency, 10}),
?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
?assert(is_process_alive(IoqServer)),
config:set("ioq", "concurrency", "200", false),
?assertNotEqual(timeout, test_util:wait(fun() ->
case gen_server:call(IoqServer, get_concurrency) of
200 -> 200;
_ -> wait
end
end)),
?assert(is_process_alive(IoqServer))
end).

t_update_ioq_config_on_listener_restart(_) ->
?_test(begin
[{_, IoqServer}] = ioq_sup:processes(ioq),
DefaultConcurrency = gen_server:call(IoqServer, get_concurrency),
gen_server:call(IoqServer, {set_concurrency, 10}),
?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
?assert(is_process_alive(IoqServer)),

[{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
?assert(is_process_alive(ConfigMonitor)),
test_util:stop_sync(ConfigMonitor),

?assertNotEqual(timeout, test_util:wait(fun() ->
case gen_server:call(IoqServer, get_concurrency) of
DefaultConcurrency -> ok;
_ -> wait
end
end)),
?assert(is_process_alive(IoqServer))
end).

t_update_ioq2_config(_) ->
?_test(begin
[{_, IoqServer} | _] = ioq_sup:processes(ioq2),
gen_server:call(IoqServer, {set_concurrency, 10}),
?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
?assert(is_process_alive(IoqServer)),
config:set("ioq2", "concurrency", "200", false),
?assertNotEqual(timeout, test_util:wait(fun() ->
case gen_server:call(IoqServer, get_concurrency) of
200 -> 200;
_ -> wait
end
end)),
?assert(is_process_alive(IoqServer))
end).

t_update_ioq2_config_on_listener_restart(_) ->
?_test(begin
[{_, IoqServer} | _] = ioq_sup:processes(ioq2),
DefaultConcurrency = gen_server:call(IoqServer, get_concurrency),
gen_server:call(IoqServer, {set_concurrency, 10}),
?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
?assert(is_process_alive(IoqServer)),

[{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
?assert(is_process_alive(ConfigMonitor)),
test_util:stop_sync(ConfigMonitor),

?assertNotEqual(timeout, test_util:wait(fun() ->
case gen_server:call(IoqServer, get_concurrency) of
DefaultConcurrency -> ok;
_ -> wait
end
end)),
?assert(is_process_alive(IoqServer))
end).

priorities_test_() ->
{ok, ShardP} = ioq_config:build_shard_priorities(?SHARDS_CONFIG),

0 comments on commit 89fe01f

Please sign in to comment.