Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ring events monitoring by riak_core_node_watcher #399

Closed
wants to merge 8 commits into from
1 change: 1 addition & 0 deletions ebin/riak_core.app
Expand Up @@ -54,6 +54,7 @@
riak_core_repair,
riak_core_ring,
riak_core_ring_events,
riak_core_ring_events_sup,
riak_core_ring_handler,
riak_core_ring_manager,
riak_core_ring_util,
Expand Down
62 changes: 58 additions & 4 deletions src/riak_core_ring_events.erl
Expand Up @@ -31,17 +31,23 @@
add_callback/1,
add_sup_callback/1,
add_guarded_callback/1,
delete_handler/2,
ring_update/1,
force_update/0,
ring_sync_update/1,
force_sync_update/0]).
force_sync_update/0,
get_pid/0]).

%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).

-record(state, { callback }).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

%% ===================================================================
%% API functions
%% ===================================================================
Expand All @@ -59,13 +65,22 @@ add_guarded_handler(Handler, Args) ->
riak_core:add_guarded_event_handler(?MODULE, Handler, Args).

add_callback(Fn) when is_function(Fn) ->
gen_event:add_handler(?MODULE, {?MODULE, make_ref()}, [Fn]).
HandlerName = {?MODULE, make_ref()},
gen_event:add_handler(?MODULE, HandlerName, [Fn]),
HandlerName.

add_sup_callback(Fn) when is_function(Fn) ->
gen_event:add_sup_handler(?MODULE, {?MODULE, make_ref()}, [Fn]).
HandlerName = {?MODULE, make_ref()},
gen_event:add_sup_handler(?MODULE, HandlerName, [Fn]),
HandlerName.

add_guarded_callback(Fn) when is_function(Fn) ->
riak_core:add_guarded_event_handler(?MODULE, {?MODULE, make_ref()}, [Fn]).
HandlerName = {?MODULE, make_ref()},
riak_core:add_guarded_event_handler(?MODULE, HandlerName, [Fn]),
HandlerName.

delete_handler(HandlerName, ReasonArgs) ->
gen_event:delete_handler(?MODULE, HandlerName, ReasonArgs).

force_update() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Expand All @@ -81,6 +96,9 @@ force_sync_update() ->
ring_sync_update(Ring) ->
gen_event:sync_notify(?MODULE, {ring_update, Ring}).

get_pid() ->
whereis(?MODULE).

%% ===================================================================
%% gen_event callbacks
%% ===================================================================
Expand All @@ -106,3 +124,39 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

-ifdef(TEST).

delete_handler_test_() ->
RingMgr = riak_core_ring_manager,
{setup,
fun() ->
meck:new(RingMgr, [passthrough]),
meck:expect(RingMgr, get_my_ring,
fun() -> {ok, fake_ring_here} end),
?MODULE:start_link()
end,
fun(_) ->
meck:unload(RingMgr)
end,
[
fun () ->
Name = ?MODULE:add_sup_callback(
fun(Arg) -> RingMgr:test_dummy_func(Arg) end),

BogusRing = bogus_ring_stand_in,
?MODULE:ring_update(BogusRing),
ok = ?MODULE:delete_handler(Name, unused),
{error, _} = ?MODULE:delete_handler(Name, unused),

%% test_dummy_func is called twice: once by add_sup_callback()
%% and once by ring_update().
[
{_, {RingMgr, get_my_ring, _}, _},
{_, {RingMgr, test_dummy_func, _}, _},
{_, {RingMgr, test_dummy_func, [BogusRing]}, _}
] = meck:history(RingMgr),
ok
end
]}.

-endif. % TEST
60 changes: 60 additions & 0 deletions src/riak_core_ring_events_sup.erl
@@ -0,0 +1,60 @@
%% -------------------------------------------------------------------
%%
%% riak_core: Core Riak Application
%%
%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_core_ring_events_sup).

-behaviour(supervisor).

%% API
-export([start_link/0, start_riak_core_node_watcher/0]).

%% Supervisor callbacks
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).

%% ===================================================================
%% API functions
%% ===================================================================

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

riak_core_node_watcher_childspec() ->
?CHILD(riak_core_node_watcher, worker).

start_riak_core_node_watcher() ->
supervisor:start_child(?MODULE, riak_core_node_watcher_childspec()).

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

init([]) ->
Children = lists:flatten(
[
?CHILD(riak_core_ring_events, worker)
]),

{ok, {{one_for_all, 9999, 10}, Children}}.
4 changes: 4 additions & 0 deletions src/riak_core_ring_manager.erl
Expand Up @@ -97,6 +97,7 @@
}).

-export([setup_ets/1, cleanup_ets/1, set_ring_global/1]). %% For EUnit testing
-export([test_dummy_func/1]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Expand Down Expand Up @@ -622,6 +623,9 @@ prune_write_ring(Ring, State) ->
State2 = set_ring(Ring, State),
State2.

test_dummy_func(_Arg) ->
ok.

%% ===================================================================
%% Unit tests
%% ===================================================================
Expand Down
9 changes: 7 additions & 2 deletions src/riak_core_sup.erl
Expand Up @@ -50,14 +50,19 @@ init([]) ->
[?CHILD(riak_core_sysmon_minder, worker),
?CHILD(riak_core_vnode_sup, supervisor, 305000),
?CHILD(riak_core_eventhandler_sup, supervisor),
?CHILD(riak_core_ring_events, worker),
?CHILD(riak_core_ring_events_sup, supervisor),
?CHILD(riak_core_ring_manager, worker),
?CHILD(riak_core_metadata_manager, worker),
?CHILD(riak_core_metadata_hashtree, worker),
?CHILD(riak_core_broadcast, worker),
?CHILD(riak_core_vnode_proxy_sup, supervisor),
?CHILD(riak_core_node_watcher_events, worker),
?CHILD(riak_core_node_watcher, worker),
%% riak_core_node_watcher is no longer started here. It is
%% now a *dynamic* child of the riak_core_ring_events_sup
%% and started by riak_core_vnode_manager:init/1. If it is
%% started by riak_core_ring_events_sup during its startup,
%% it is too early, and Riak rings never settle after a ring
%% change.
?CHILD(riak_core_vnode_manager, worker),
?CHILD(riak_core_capability, worker),
?CHILD(riak_core_handoff_sup, supervisor),
Expand Down
3 changes: 3 additions & 0 deletions src/riak_core_vnode_manager.erl
Expand Up @@ -206,6 +206,9 @@ get_all_vnodes(Mod) ->

%% @private
init(_State) ->
%% See riak_core_sup:init/1 for why this call is here.
{ok, _Pid} = riak_core_ring_events_sup:start_riak_core_node_watcher(),

{ok, Ring, CHBin} = riak_core_ring_manager:get_raw_ring_chashbin(),
Mods = [Mod || {_, Mod} <- riak_core:vnode_modules()],
State = #state{forwarding=dict:new(), handoff=dict:new(),
Expand Down