Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: basho/riak_core
base: e6de7ca246
...
head fork: basho/riak_core
compare: c762dc709f
Checking mergeability… Don't worry, you can still create the pull request.
  • 7 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
View
52 src/riak_core.erl
@@ -22,11 +22,16 @@
-module(riak_core).
-export([stop/0, stop/1, join/1, join/4, remove/1, down/1, leave/0,
remove_from_cluster/1]).
--export([register_vnode_module/1, vnode_modules/0]).
+-export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0]).
-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]).
+-export([wait_for_application/1, wait_for_service/1]).
-compile({no_auto_import,[register/2]}).
+
+-define(WAIT_PRINT_INTERVAL, (60 * 1000)).
+-define(WAIT_POLL_INTERVAL, 100).
+
%% @spec stop() -> ok
%% @doc Stop the riak application and the calling process.
stop() -> stop("riak stop requested").
@@ -283,6 +288,7 @@ register(_App, []) ->
%% the ring.
{ok, _R} = riak_core_ring_manager:ring_trans(fun(R,_A) -> {new_ring, R} end,
undefined),
+ riak_core_ring_events:force_sync_update(),
ok;
register(App, [{bucket_fixup, FixupMod}|T]) ->
register_mod(get_app(App, FixupMod), FixupMod, bucket_fixups),
@@ -291,9 +297,6 @@ register(App, [{vnode_module, VNodeMod}|T]) ->
register_mod(get_app(App, VNodeMod), VNodeMod, vnode_modules),
register(App, T).
-register_vnode_module(VNodeMod) when is_atom(VNodeMod) ->
- register_mod(get_app(undefined, VNodeMod), VNodeMod, vnode_modules).
-
register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) ->
case application:get_env(riak_core, Type) of
undefined ->
@@ -355,3 +358,44 @@ app_for_module([{App,_,_}|T], Mod) ->
true -> {ok, App};
false -> app_for_module(T, Mod)
end.
+
+
+wait_for_application(App) ->
+ wait_for_application(App, 0).
+wait_for_application(App, Elapsed) ->
+ case lists:keymember(App, 1, application:which_applications()) of
+ true when Elapsed == 0 ->
+ ok;
+ true when Elapsed > 0 ->
+ lager:info("Wait complete for application ~p (~p seconds)", [App, Elapsed div 1000]),
+ ok;
+ false ->
+ %% Possibly print a notice.
+ ShouldPrint = Elapsed rem ?WAIT_PRINT_INTERVAL == 0,
+ case ShouldPrint of
+ true -> lager:info("Waiting for application ~p to start (~p seconds).", [App, Elapsed div 1000]);
+ false -> skip
+ end,
+ timer:sleep(?WAIT_POLL_INTERVAL),
+ wait_for_application(App, Elapsed + ?WAIT_POLL_INTERVAL)
+ end.
+
+wait_for_service(Service) ->
+ wait_for_service(Service, 0).
+wait_for_service(Service, Elapsed) ->
+ case lists:member(Service, riak_core_node_watcher:services(node())) of
+ true when Elapsed == 0 ->
+ ok;
+ true when Elapsed > 0 ->
+ lager:info("Wait complete for service ~p (~p seconds)", [Service, Elapsed div 1000]),
+ ok;
+ false ->
+ %% Possibly print a notice.
+ ShouldPrint = Elapsed rem ?WAIT_PRINT_INTERVAL == 0,
+ case ShouldPrint of
+ true -> lager:info("Waiting for service ~p to start (~p seconds)", [Service, Elapsed div 1000]);
+ false -> skip
+ end,
+ timer:sleep(?WAIT_POLL_INTERVAL),
+ wait_for_service(Service, Elapsed + ?WAIT_POLL_INTERVAL)
+ end.
View
33 src/riak_core_ring_handler.erl
@@ -114,16 +114,30 @@ ensure_vnodes_started({App,Mod}, Ring) ->
RegName = list_to_atom(
"riak_core_ring_handler_ensure_"
++ atom_to_list(Mod)),
- try register(RegName, self())
+ try erlang:register(RegName, self())
catch error:badarg ->
exit(normal)
end,
- wait_for_app(App, 100, 100),
- [Mod:start_vnode(I) || I <- Startable],
- exit(normal)
+
+ %% Let the app finish starting...
+ case riak_core:wait_for_application(App) of
+ ok ->
+ %% Start the vnodes.
+ [Mod:start_vnode(I) || I <- Startable],
+
+ %% Mark the service as up.
+ SupName = list_to_atom(atom_to_list(App) ++ "_sup"),
+ SupPid = erlang:whereis(SupName),
+ riak_core_node_watcher:service_up(App, SupPid),
+ exit(normal);
+ {error, Reason} ->
+ lager:critical("Failed to start application: ~p", [App]),
+ throw({error, Reason})
+ end
end),
Startable.
+
startable_vnodes(Mod, Ring) ->
AllMembers = riak_core_ring:all_members(Ring),
case {length(AllMembers), hd(AllMembers) =:= node()} of
@@ -144,14 +158,3 @@ startable_vnodes(Mod, Ring) ->
[RO | riak_core_ring:my_indices(Ring)]
end
end.
-
-wait_for_app(_, 0, _) ->
- bummer;
-wait_for_app(App, Count, Sleep) ->
- case lists:keymember(App, 1, application:which_applications()) of
- true ->
- ok;
- false ->
- timer:sleep(Sleep),
- wait_for_app(App, Count - 1, Sleep)
- end.
View
5 src/riak_core_sup.erl
@@ -31,7 +31,8 @@
-export([init/1]).
%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+-define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
-define (IF (Bool, A, B), if Bool -> A; true -> B end).
%% ===================================================================
@@ -59,7 +60,7 @@ init([]) ->
Children = lists:flatten(
[?CHILD(riak_core_sysmon_minder, worker),
?CHILD(riak_core_stat, worker),
- ?CHILD(riak_core_vnode_sup, supervisor),
+ ?CHILD(riak_core_vnode_sup, supervisor, 305000),
?CHILD(riak_core_eventhandler_sup, supervisor),
?CHILD(riak_core_handoff_manager, worker),
?CHILD(riak_core_handoff_listener, worker),

No commit comments for this range

Something went wrong with that request. Please try again.