Skip to content
Permalink
Browse files
Callback driven API for config_listener behaviour
This implementaion replaces a mix of message driven and callback driven
API for config_listener. In particular it replaces gen_event_EXIT
message with a call to Module:handle_config_stop(Pid, Reason, State).
This fixes the problem of using config:listen_for_changes in supervisor
context where there is no way to handle arbitrary messages.

COUCHDB-2561
  • Loading branch information
iilyak committed Feb 4, 2015
1 parent ee24f3e commit e151ae430454121db74a051b61bd23f582bcc39b
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 11 deletions.
@@ -175,7 +175,7 @@ delete(Section, Key, Persist, Reason) when is_list(Section), is_list(Key) ->


listen_for_changes(CallbackModule, InitialState) ->
config_listener:start(CallbackModule, InitialState).
gen_server:call(?MODULE, {listen_for_changes, CallbackModule, InitialState}).

init(IniFiles) ->
ets:new(?MODULE, [named_table, set, protected]),
@@ -250,14 +250,26 @@ handle_call(reload, _From, Config) ->
ets:delete(?MODULE, K)
end
end, nil, ?MODULE),
{reply, ok, Config}.

{reply, ok, Config};
handle_call({listen_for_changes, CallbackModule, InitialState},
{Subscriber, _}, Config) ->
Reply = config_listener:start(CallbackModule, {Subscriber, InitialState}),
{reply, Reply, Config}.

handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({gen_event_EXIT, {config_listener, Module}, shutdown}, State) ->
couch_log:notice("config_listener(~p) stopped with reason: shutdown~n", [Module]),
{noreply, State};
handle_info({gen_event_EXIT, {config_listener, Module}, normal}, State) ->
couch_log:info("config_listener(~p) stopped with reason: shutdown~n", [Module]),
{noreply, State};
handle_info({gen_event_EXIT, {config_listener, Module}, Reason}, State) ->
couch_log:error("config_listener(~p) stopped with reason: ~p~n", [Module, Reason]),
{noreply, State};
handle_info(Info, State) ->
couch_log:error("config:handle_info Info: ~p~n", [Info]),
{noreply, State}.
@@ -13,7 +13,7 @@
-module(config_listener).

-behaviour(gen_event).
-vsn(1).
-vsn(2).

%% Public interface
-export([start/2]).
@@ -22,11 +22,12 @@
-export([behaviour_info/1]).

%% Required gen_event interface
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
code_change/3]).

behaviour_info(callbacks) ->
[{handle_config_change,5}];
[{handle_config_change,5},
{handle_config_terminate, 3}];
behaviour_info(_) ->
undefined.

@@ -39,10 +40,10 @@ start(Module, Id, State) ->
init({Module, State}) ->
{ok, {Module, State}}.

handle_event({config_change, Sec, Key, Value, Persist}, {Module, State}) ->
handle_event({config_change, Sec, Key, Value, Persist}, {Module, {From, State}}) ->
case Module:handle_config_change(Sec, Key, Value, Persist, State) of
{ok, NewState} ->
{ok, {Module, NewState}};
{ok, {Module, {From, NewState}}};
remove_handler ->
remove_handler
end.
@@ -53,8 +54,8 @@ handle_call(_Request, St) ->
handle_info(_Info, St) ->
{ok, St}.

terminate(_Reason, _St) ->
ok.
terminate(Reason, {Module, {Subscriber, State}}) ->
Module:handle_config_terminate(Subscriber, Reason, State).

code_change(_OldVsn, St, _Extra) ->
{ok, St}.
@@ -12,6 +12,10 @@

-module(config_tests).

-beahiour(config_listener).

-export([handle_config_change/5, handle_config_terminate/3]).

-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").

@@ -50,6 +54,12 @@ setup(Chain) ->
setup_empty() ->
setup([]).

setup_config_listener() ->
setup(),
{ok, Pid} = spawn_listener(),
config:listen_for_changes(?MODULE, {Pid, self(), []}),
{Pid, self()}.

teardown({Pid, _}) ->
stop_listener(Pid),
[application:stop(App) || App <- ?DEPS];
@@ -59,6 +69,25 @@ teardown(_) ->
teardown(_, _) ->
[application:stop(App) || App <- ?DEPS].

handle_config_change("remove_handler", _Key, _Value, _Persist, _State) ->
remove_handler;
handle_config_change("update_state", Key, _Value, _Persist, {Listener, Subscriber, Items}) ->
NewState = {Listener, Subscriber, [Key|Items]},
ok = reply(NewState, NewState),
{ok, NewState};
handle_config_change(Section, Key, Value, Persist, State) ->
ok = reply({{Section, Key, Value, Persist}, State}, State),
{ok, State}.
handle_config_terminate(Self, Reason, State) ->
ok = reply({stop, Self, Reason, State}, State),
ok.

reply(Reply, {Listener, _, _}) ->
call_sync(Listener, {set, Reply}).

wait_reply(Listener) ->
call_sync(Listener, get).

config_test_() ->
{
"CouchDB config tests",
@@ -68,7 +97,8 @@ config_test_() ->
config_del_tests(),
config_override_tests(),
config_persistent_changes_tests(),
config_no_files_tests()
config_no_files_tests(),
config_listener_behaviour_tests()
]
}.

@@ -170,6 +200,22 @@ config_no_files_tests() ->
}
}.

config_listener_behaviour_tests() ->
{
"Test config_listener behaviour",
{
foreach,
fun setup_config_listener/0, fun teardown/1,
[
fun should_handle_value_change/1,
fun should_pass_correct_state_to_handle_config_change/1,
fun should_pass_correct_state_to_handle_config_terminate/1,
fun should_pass_subscriber_pid_to_handle_config_terminate/1,
fun should_not_call_handle_config_after_related_process_death/1,
fun should_remove_handler_when_requested/1
]
}
}.

should_load_all_configs() ->
?_assert(length(config:all()) > 0).
@@ -316,3 +362,92 @@ should_create_persistent_option() ->
ok = config:set("httpd", "bind_address", "127.0.0.1"),
config:get("httpd", "bind_address")
end).

should_handle_value_change({Pid, _}) ->
?_test(begin
ok = config:set("httpd", "port", "80", false),
?assertMatch({{"httpd", "port", "80", false}, _}, wait_reply(Pid))
end).
should_pass_correct_state_to_handle_config_change({Pid, _}) ->
?_test(begin
ok = config:set("httpd", "port", "80", false),
?assertMatch({_, {Pid, _, []}}, wait_reply(Pid)),
ok = config:set("update_state", "foo", "any", false),
?assertMatch({Pid, _, ["foo"]}, wait_reply(Pid))
end).
should_pass_correct_state_to_handle_config_terminate({Pid, _}) ->
?_test(begin
%% prepare some state
ok = config:set("httpd", "port", "80", false),
?assertMatch({_, {Pid, _, []}}, wait_reply(Pid)),
ok = config:set("update_state", "foo", "any", false),
?assertMatch({Pid, _, ["foo"]}, wait_reply(Pid)),

%% remove handler
ok = config:set("remove_handler", "any", "any", false),
Reply = wait_reply(Pid),
?assertMatch({stop, _, remove_handler, _}, Reply),

{stop, Subscriber, _Reason, State} = Reply,
?assert(is_pid(Subscriber)),
?assertMatch({Pid, Subscriber, ["foo"]}, State)
end).
should_pass_subscriber_pid_to_handle_config_terminate({Pid, SubscriberPid}) ->
?_test(begin
ok = config:set("remove_handler", "any", "any", false),
Reply = wait_reply(Pid),
?assertMatch({stop, _, remove_handler, _}, Reply),

{stop, Subscriber, _Reason, _State} = Reply,
?assertMatch(SubscriberPid, Subscriber)
end).
should_not_call_handle_config_after_related_process_death({Pid, _}) ->
?_test(begin
ok = config:set("remove_handler", "any", "any", false),
Reply = wait_reply(Pid),
?assertMatch({stop, _, remove_handler, _}, Reply),

ok = config:set("httpd", "port", "80", false),
?assertMatch(undefined, wait_reply(Pid))
end).
should_remove_handler_when_requested({Pid, _}) ->
?_test(begin
?assertEqual(1, n_handlers()),

ok = config:set("remove_handler", "any", "any", false),
Reply = wait_reply(Pid),
?assertMatch({stop, _, remove_handler, _}, Reply),

?assertEqual(0, n_handlers())
end).

call_sync(Listener, Msg) ->
Ref = make_ref(),
Listener ! {Ref, self(), Msg},
receive
{ok, Ref, Reply} -> Reply
after ?TIMEOUT ->
throw({error, {timeout, call_sync}})
end.

spawn_listener() ->
{ok, spawn(fun() -> loop(undefined) end)}.

stop_listener(Listener) ->
call_sync(Listener, stop).

loop(State) ->
receive
{Ref, From, stop} ->
From ! {ok, Ref, ok},
ok;
{Ref, From, {set, Value}} ->
From ! {ok, Ref, ok},
loop(Value);
{Ref, From, get} ->
From ! {ok, Ref, State},
loop(undefined)
end.

n_handlers() ->
length(gen_event:which_handlers(config_event)).

0 comments on commit e151ae4

Please sign in to comment.