Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge remote branch 'cloudant:3102-fix-config_subscription'
This closes #25

Signed-off-by: ILYA Khlopotov <iilyak@ca.ibm.com>
  • Loading branch information
iilyak committed Aug 23, 2016
2 parents 15615b2 + 432264a commit 252467cb4a27637090b5f9006483f5b7ab551699
Showing 2 changed files with 138 additions and 51 deletions.
@@ -12,7 +12,7 @@

-module(mem3_shards).
-behaviour(gen_server).
-vsn(2).
-vsn(3).
-behaviour(config_listener).

-export([init/1, terminate/2, code_change/3]).
@@ -36,6 +36,7 @@
-define(DBS, mem3_dbs).
-define(SHARDS, mem3_shards).
-define(ATIMES, mem3_atimes).
-define(RELISTEN_DELAY, 5000).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -173,12 +174,10 @@ handle_config_change("mem3", "shards_db", _DbName, _, _) ->
handle_config_change(_, _, _, _, _) ->
{ok, nil}.

handle_config_terminate(_, stop, _) -> ok;
handle_config_terminate(_, _, _) ->
spawn(fun() ->
timer:sleep(5000),
config:listen_for_changes(?MODULE, nil)
end).
handle_config_terminate(_, stop, _) ->
ok;
handle_config_terminate(_Server, _Reason, _State) ->
erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).

init([]) ->
ets:new(?SHARDS, [
@@ -235,6 +234,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
handle_info({start_listener, Seq}, St) ->
{NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
{noreply, St#st{changes_pid=NewPid}};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
handle_info(_Msg, St) ->
{noreply, St}.

@@ -12,7 +12,7 @@

-module(mem3_sync_event_listener).
-behavior(couch_event_listener).
-behavior(config_listener).
-vsn(1).

-export([
start_link/0
@@ -26,13 +26,14 @@
handle_info/2
]).

-export([
handle_config_change/5,
handle_config_terminate/3
]).

-include_lib("mem3/include/mem3.hrl").

-ifdef(TEST).
-define(RELISTEN_DELAY, 500).
-else.
-define(RELISTEN_DELAY, 5000).
-endif.

-record(state, {
nodes,
shards,
@@ -59,7 +60,7 @@ start_link() ->
couch_event_listener:start_link(?MODULE, [], [all_dbs]).

init(_) ->
config:listen_for_changes(?MODULE, undefined),
ok = subscribe_for_config(),
Delay = config:get_integer("mem3", "sync_delay", 5000),
Frequency = config:get_integer("mem3", "sync_frequency", 500),
Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()),
@@ -110,48 +111,32 @@ handle_cast(Msg, St) ->

handle_info(timeout, St) ->
maybe_push_shards(St);
handle_info({config_change, "mem3", "sync_delay", Value, _}, St) ->
set_config(set_delay, Value, "ignoring bad value for mem3.sync_delay"),
maybe_push_shards(St);
handle_info({config_change, "mem3", "sync_frequency", Value, _}, St) ->
set_config(set_frequency, Value, "ignoring bad value for mem3.sync_frequency"),
maybe_push_shards(St);
handle_info({gen_event_EXIT, _Handler, _Reason}, St) ->
erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
maybe_push_shards(St);
handle_info(restart_config_listener, St) ->
ok = subscribe_for_config(),
maybe_push_shards(St);
handle_info({get_state, Ref, Caller}, St) ->
Caller ! {Ref, St},
{ok, St};
handle_info(Msg, St) ->
couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
maybe_push_shards(St).

handle_config_change("mem3", "sync_delay", Delay0, _, St) ->
try list_to_integer(Delay0) of
Delay1 ->
couch_event_listener:cast(
?MODULE,
{set_delay, Delay1}
)
catch error:badarg ->
couch_log:warning(
"ignoring bad value for mem3.sync_delay: ~p",
[Delay0]
)
end,
{ok, St};
handle_config_change("mem3", "sync_frequency", Frequency0, _, St) ->
try list_to_integer(Frequency0) of
Frequency1 ->
couch_event_listener:cast(
?MODULE,
{set_frequency, Frequency1}
)
set_config(Cmd, Value, Error) ->
try list_to_integer(Value) of
IntegerValue ->
couch_event_listener:cast(self(), {Cmd, IntegerValue})
catch error:badarg ->
couch_log:warning(
"ignoring bad value for mem3.sync_frequency: ~p",
[Frequency0]
)
end,
{ok, St};
handle_config_change(_, _, _, _, St) ->
{ok, St}.

handle_config_terminate(_, stop, _) -> ok;
handle_config_terminate(_Server, _Reason, St) ->
Fun = fun() ->
timer:sleep(5000),
config:listen_for_changes(?MODULE, St)
end,
spawn(Fun).
couch_log:warning("~s: ~p", [Error, Value])
end.

bucket_shard(ShardName, [B|Bs]=Buckets0) ->
case waiting(ShardName, Buckets0) of
@@ -222,3 +207,103 @@ push_shard(ShardName) ->
catch error:database_does_not_exist ->
ok
end.

subscribe_for_config() ->
config:subscribe_for_changes([
{"mem3", "sync_delay"},
{"mem3", "sync_frequency"}
]).

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").

setup() ->
ok = meck:new(couch_event, [passthrough]),
ok = meck:expect(couch_event, register_all, ['_'], ok),

ok = meck:new(config_notifier, [passthrough]),
ok = meck:expect(config_notifier, handle_event, [
{[{'_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)},
{['_', '_'], meck:passthrough()}
]),

application:start(config),
{ok, Pid} = ?MODULE:start_link(),
erlang:unlink(Pid),
meck:wait(config_notifier, subscribe, '_', 1000),
Pid.

teardown(Pid) ->
exit(Pid, shutdown),
application:stop(config),
(catch meck:unload(couch_event)),
(catch meck:unload(config_notifier)),
ok.

subscribe_for_config_test_() ->
{
"Subscrive for configuration changes",
{
foreach,
fun setup/0, fun teardown/1,
[
fun should_set_sync_delay/1,
fun should_set_sync_frequency/1,
fun should_restart_listener/1,
fun should_terminate/1
]
}
}.

should_set_sync_delay(Pid) ->
?_test(begin
config:set("mem3", "sync_delay", "123", false),
?assertMatch(#state{delay = 123}, capture(Pid)),
ok
end).

should_set_sync_frequency(Pid) ->
?_test(begin
config:set("mem3", "sync_frequency", "456", false),
?assertMatch(#state{frequency = 456}, capture(Pid)),
ok
end).

should_restart_listener(Pid) ->
?_test(begin
meck:reset(config_notifier),
config:set("mem3", "sync_frequency", "error", false),

meck:wait(config_notifier, subscribe, '_', 1000),
ok
end).

should_terminate(Pid) ->
?_test(begin
?assert(is_process_alive(Pid)),

EventMgr = whereis(config_event),

RestartFun = fun() -> exit(EventMgr, kill) end,
test_util:with_process_restart(config_event, RestartFun),

?assertNot(is_process_alive(EventMgr)),
?assertNot(is_process_alive(Pid)),
?assert(is_process_alive(whereis(config_event))),
ok
end).

capture(Pid) ->
Ref = make_ref(),
WaitFun = fun() ->
Pid ! {get_state, Ref, self()},
receive
{Ref, State} -> State
after 0 ->
wait
end
end,
test_util:wait(WaitFun).


-endif.

0 comments on commit 252467c

Please sign in to comment.