Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Synchronous VNode Startup (AZ1011) #121

Merged
merged 6 commits into from

3 participants

@rustyio

Fixes regression where a service was declared "up" before all vnodes were started.

This pull request modifies riak_core_ring_handler:ensure_vnodes_started/N to add a longer wait for the application to start up, errors if the application fails, starts the vnodes for the application, and then declares the service up.

Depends upon the following pull requests:

p.s. - Yes, this is a dirty, dirty hack. In my opinion, there are issues of complexity and unclear responsibilities in riak_core_node_watcher, riak_core_node_watcher_events, riak_core_ring_events, and riak_core_ring_handler. This is due for refactoring.

@jonmeredith
Owner

Thanks Rusty. I'd like to think through consequences of where we mark services up before we merge - I'll look when I can.

@rustyio rustyio closed this
@rustyio rustyio reopened this
@jonmeredith
Owner

After Rusty humoring me over IM, a couple of items to resolve
1) Move the riak_core_ring_events:force_sync_update() call into riak_core:register so apps using core don't need to see its dirty laundry
2) Convert kv/pipe to use the newer riak_core:register() function instead of register_vnode_mod.

This change does couple the names of services to the name of the application which is undesirable, however pragmatism wins out until we can refactor.

@beerriot beerriot commented on the diff
src/riak_core_ring_handler.erl
((6 lines not shown))
catch error:badarg ->
exit(normal)
end,
- wait_for_app(App, 100, 100),
- [Mod:start_vnode(I) || I <- Startable],
- exit(normal)
+
+ %% Let the app finish starting...
+ case riak_core:wait_for_application(App) of
+ ok ->
+ %% Start the vnodes.
+ [Mod:start_vnode(I) || I <- Startable],
+
+ %% Mark the service as up.
+ SupName = list_to_atom(atom_to_list(App) ++ "_sup"),
+ SupPid = erlang:whereis(SupName),

I think the _sup suffix standard is a good practice, but another option is to use two undocumented functions:

{SupPid, _AppMod} = application_master:get_child(application_controller:get_master(riak_kv)).

Probably better to stick with how you have it already, but I found it too odd that OTP didn't expose something as simple as application:whereis(AppName) to resist digging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jonmeredith
Owner

+1 merge. Tested under load and discovered issues with WM routes being registered before sup started. Will submit a separate pull request for that.

@rustyio rustyio merged commit c762dc7 into 1.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 14, 2011
  1. @rustyio

    Change startup so that the application and vnodes start before the se…

    rustyio authored
    …rvice is declared as 'up'.
    
    AZ1011
Commits on Dec 15, 2011
  1. @rustyio

    Update riak_core:register/N to trigger a ring_update and start vnodse…

    rustyio authored
    …; move wait_for_app/N into riak_core, add wait_for_service/N, add logging to both.
    
    AZ1011
  2. @rustyio
  3. @rustyio
  4. @rustyio
  5. @rustyio
This page is out of date. Refresh to see the latest.
View
52 src/riak_core.erl
@@ -22,11 +22,16 @@
-module(riak_core).
-export([stop/0, stop/1, join/1, join/4, remove/1, down/1, leave/0,
remove_from_cluster/1]).
--export([register_vnode_module/1, vnode_modules/0]).
+-export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0]).
-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]).
+-export([wait_for_application/1, wait_for_service/1]).
-compile({no_auto_import,[register/2]}).
+
+-define(WAIT_PRINT_INTERVAL, (60 * 1000)).
+-define(WAIT_POLL_INTERVAL, 100).
+
%% @spec stop() -> ok
%% @doc Stop the riak application and the calling process.
stop() -> stop("riak stop requested").
@@ -283,6 +288,7 @@ register(_App, []) ->
%% the ring.
{ok, _R} = riak_core_ring_manager:ring_trans(fun(R,_A) -> {new_ring, R} end,
undefined),
+ riak_core_ring_events:force_sync_update(),
ok;
register(App, [{bucket_fixup, FixupMod}|T]) ->
register_mod(get_app(App, FixupMod), FixupMod, bucket_fixups),
@@ -291,9 +297,6 @@ register(App, [{vnode_module, VNodeMod}|T]) ->
register_mod(get_app(App, VNodeMod), VNodeMod, vnode_modules),
register(App, T).
-register_vnode_module(VNodeMod) when is_atom(VNodeMod) ->
- register_mod(get_app(undefined, VNodeMod), VNodeMod, vnode_modules).
-
register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) ->
case application:get_env(riak_core, Type) of
undefined ->
@@ -355,3 +358,44 @@ app_for_module([{App,_,_}|T], Mod) ->
true -> {ok, App};
false -> app_for_module(T, Mod)
end.
+
+
+wait_for_application(App) ->
+ wait_for_application(App, 0).
+wait_for_application(App, Elapsed) ->
+ case lists:keymember(App, 1, application:which_applications()) of
+ true when Elapsed == 0 ->
+ ok;
+ true when Elapsed > 0 ->
+ lager:info("Wait complete for application ~p (~p seconds)", [App, Elapsed div 1000]),
+ ok;
+ false ->
+ %% Possibly print a notice.
+ ShouldPrint = Elapsed rem ?WAIT_PRINT_INTERVAL == 0,
+ case ShouldPrint of
+ true -> lager:info("Waiting for application ~p to start (~p seconds).", [App, Elapsed div 1000]);
+ false -> skip
+ end,
+ timer:sleep(?WAIT_POLL_INTERVAL),
+ wait_for_application(App, Elapsed + ?WAIT_POLL_INTERVAL)
+ end.
+
+wait_for_service(Service) ->
+ wait_for_service(Service, 0).
+wait_for_service(Service, Elapsed) ->
+ case lists:member(Service, riak_core_node_watcher:services(node())) of
+ true when Elapsed == 0 ->
+ ok;
+ true when Elapsed > 0 ->
+ lager:info("Wait complete for service ~p (~p seconds)", [Service, Elapsed div 1000]),
+ ok;
+ false ->
+ %% Possibly print a notice.
+ ShouldPrint = Elapsed rem ?WAIT_PRINT_INTERVAL == 0,
+ case ShouldPrint of
+ true -> lager:info("Waiting for service ~p to start (~p seconds)", [Service, Elapsed div 1000]);
+ false -> skip
+ end,
+ timer:sleep(?WAIT_POLL_INTERVAL),
+ wait_for_service(Service, Elapsed + ?WAIT_POLL_INTERVAL)
+ end.
View
33 src/riak_core_ring_handler.erl
@@ -114,16 +114,30 @@ ensure_vnodes_started({App,Mod}, Ring) ->
RegName = list_to_atom(
"riak_core_ring_handler_ensure_"
++ atom_to_list(Mod)),
- try register(RegName, self())
+ try erlang:register(RegName, self())
catch error:badarg ->
exit(normal)
end,
- wait_for_app(App, 100, 100),
- [Mod:start_vnode(I) || I <- Startable],
- exit(normal)
+
+ %% Let the app finish starting...
+ case riak_core:wait_for_application(App) of
+ ok ->
+ %% Start the vnodes.
+ [Mod:start_vnode(I) || I <- Startable],
+
+ %% Mark the service as up.
+ SupName = list_to_atom(atom_to_list(App) ++ "_sup"),
+ SupPid = erlang:whereis(SupName),

I think the _sup suffix standard is a good practice, but another option is to use two undocumented functions:

{SupPid, _AppMod} = application_master:get_child(application_controller:get_master(riak_kv)).

Probably better to stick with how you have it already, but I found it too odd that OTP didn't expose something as simple as application:whereis(AppName) to resist digging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ riak_core_node_watcher:service_up(App, SupPid),
+ exit(normal);
+ {error, Reason} ->
+ lager:critical("Failed to start application: ~p", [App]),
+ throw({error, Reason})
+ end
end),
Startable.
+
startable_vnodes(Mod, Ring) ->
AllMembers = riak_core_ring:all_members(Ring),
case {length(AllMembers), hd(AllMembers) =:= node()} of
@@ -144,14 +158,3 @@ startable_vnodes(Mod, Ring) ->
[RO | riak_core_ring:my_indices(Ring)]
end
end.
-
-wait_for_app(_, 0, _) ->
- bummer;
-wait_for_app(App, Count, Sleep) ->
- case lists:keymember(App, 1, application:which_applications()) of
- true ->
- ok;
- false ->
- timer:sleep(Sleep),
- wait_for_app(App, Count - 1, Sleep)
- end.
View
5 src/riak_core_sup.erl
@@ -31,7 +31,8 @@
-export([init/1]).
%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+-define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
-define (IF (Bool, A, B), if Bool -> A; true -> B end).
%% ===================================================================
@@ -59,7 +60,7 @@ init([]) ->
Children = lists:flatten(
[?CHILD(riak_core_sysmon_minder, worker),
?CHILD(riak_core_stat, worker),
- ?CHILD(riak_core_vnode_sup, supervisor),
+ ?CHILD(riak_core_vnode_sup, supervisor, 305000),
?CHILD(riak_core_eventhandler_sup, supervisor),
?CHILD(riak_core_handoff_manager, worker),
?CHILD(riak_core_handoff_listener, worker),
Something went wrong with that request. Please try again.