Skip to content

Commit

Permalink
Refactor API'es in service_manager module.
Browse files Browse the repository at this point in the history
Callers of service_manager:spawn_monitor_rebalance and
service_manager:spawn_monitor_failover trap exit's and handle
completions/failures in the exact same way - absorb common bits of the
code into the respective API'es.

Change-Id: I157c8f71bb3e3e7b67ec1446c2fb08d0c1e3f016
Reviewed-on: https://review.couchbase.org/c/ns_server/+/183880
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: Artem Stemkovski <artem@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
hareen-kancharla committed Dec 14, 2022
1 parent d14dd7f commit a1e2c9d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 92 deletions.
26 changes: 2 additions & 24 deletions src/ns_rebalancer.erl
Expand Up @@ -430,30 +430,8 @@ rebalance_topology_aware_service(Service, KeepNodes, EjectNodes, DeltaNodes) ->
ns_rebalance_observer:update_progress(Service, Progress)
end,

misc:with_trap_exit(
fun () ->
{Pid, MRef} = service_manager:spawn_monitor_rebalance(
Service, KeepNodes,
EjectNodes, DeltaNodes, ProgressCallback),

receive
{'EXIT', _Pid, Reason} = Exit ->
?log_debug("Got an exit signal while waiting "
"for the service rebalance to complete. "
"Service: ~p. Exit message: ~p",
[Service, Exit]),

misc:terminate_and_wait(Pid, Reason),
exit(Reason);
{'DOWN', MRef, _, _, Reason} ->
case Reason of
normal ->
ok;
_ ->
exit({service_rebalance_failed, Service, Reason})
end
end
end).
service_manager:with_trap_exit_spawn_monitor_rebalance(
Service, KeepNodes, EjectNodes, DeltaNodes, ProgressCallback, #{}).

get_service_eject_delay(Service) ->
Default =
Expand Down
58 changes: 13 additions & 45 deletions src/service_janitor.erl
Expand Up @@ -107,31 +107,17 @@ do_orchestrate_initial_rebalance(Service) ->
EjectNodes = [],
DeltaNodes = [],

{Pid, MRef} = service_manager:spawn_monitor_rebalance(Service, KeepNodes,
EjectNodes,
DeltaNodes,
ProgressCallback),
receive
{'DOWN', MRef, _, Pid, Reason} ->
case Reason of
normal ->
ok = ns_cluster_membership:set_service_map(
Service, KeepNodes),
ok;
_ ->
{error, {initial_rebalance_failed, Service, Reason}}
end;
{'EXIT', _, Reason} = Exit ->
?log_debug("Received exit message ~p. Terminating initial rebalance",
[Exit]),
misc:terminate_and_wait(Pid, Reason),
exit(Reason)
after
?INITIAL_REBALANCE_TIMEOUT ->
?log_error("Initial rebalance of service `~p` takes too long (timeout ~p)",
[Service, ?INITIAL_REBALANCE_TIMEOUT]),
misc:terminate_and_wait(Pid, shutdown),
{error, {initial_rebalance_timeout, Service}}
Result =
service_manager:with_trap_exit_spawn_monitor_rebalance(
Service, KeepNodes, EjectNodes, DeltaNodes, ProgressCallback,
#{timeout => ?INITIAL_REBALANCE_TIMEOUT}),

case Result of
{error, {service_rebalance_timeout, Service}} ->
?log_error("Initial rebalance of service `~p` takes too long "
"(timeout ~p)", [Service, ?INITIAL_REBALANCE_TIMEOUT]);
_ ->
ok
end.

maybe_complete_pending_failover(Snapshot, Service) ->
Expand Down Expand Up @@ -216,26 +202,8 @@ complete_topology_aware_service_failover(Snapshot, Service) ->
end.

orchestrate_service_failover(Service, Nodes) ->
misc:with_trap_exit(
?cut(do_orchestrate_service_failover(Service, Nodes))).

do_orchestrate_service_failover(Service, Nodes) ->
{Pid, MRef} = service_manager:spawn_monitor_failover(Service, Nodes),

receive
{'DOWN', MRef, _, Pid, Reason} ->
case Reason of
normal ->
ok;
_ ->
{error, {failover_failed, Service, Reason}}
end;
{'EXIT', _, Reason} = Exit ->
?log_debug("Received exit message ~p. Terminating failover",
[Exit]),
misc:terminate_and_wait(Pid, Reason),
exit(Reason)
end.
service_manager:with_trap_exit_spawn_monitor_failover(
Service, Nodes, #{}).

handle_results(RVs) ->
NotOKs = [R || R <- RVs, R =/= ok],
Expand Down
82 changes: 59 additions & 23 deletions src/service_manager.erl
Expand Up @@ -12,7 +12,8 @@
-include("cut.hrl").
-include("ns_common.hrl").

-export([spawn_monitor_rebalance/5, spawn_monitor_failover/2]).
-export([with_trap_exit_spawn_monitor_rebalance/6,
with_trap_exit_spawn_monitor_failover/3]).

-record(state, { parent :: pid(),
service_manager :: pid(),
Expand All @@ -25,35 +26,70 @@
progress_callback :: fun ((dict:dict()) -> any()),
op_body :: fun ((#state{}, any()) -> any())}).

spawn_monitor_rebalance(Service, KeepNodes,
EjectNodes, DeltaNodes, ProgressCallback) ->
spawn_monitor(Service, rebalance, KeepNodes,
EjectNodes, DeltaNodes, ProgressCallback,
fun rebalance_op/2).
with_trap_exit_spawn_monitor_rebalance(Service, KeepNodes,
EjectNodes, DeltaNodes, ProgressCallback, Opts) ->
with_trap_exit_spawn_monitor(
Service, rebalance, KeepNodes,
EjectNodes, DeltaNodes, ProgressCallback,
fun rebalance_op/2, Opts).

spawn_monitor_failover(Service, KeepNodes) ->
with_trap_exit_spawn_monitor_failover(Service, KeepNodes, Opts) ->
ProgressCallback = fun (_) -> ok end,

spawn_monitor(Service, failover, KeepNodes, [], [], ProgressCallback,
fun rebalance_op/2).
with_trap_exit_spawn_monitor(
Service, failover, KeepNodes, [], [], ProgressCallback,
fun rebalance_op/2, Opts).

spawn_monitor(Service, Type, KeepNodes,
EjectNodes, DeltaNodes, ProgressCallback, OpBody) ->
with_trap_exit_spawn_monitor(
Service, Type, KeepNodes, EjectNodes, DeltaNodes,
ProgressCallback, OpBody, Opts) ->
Parent = self(),

misc:spawn_monitor(
Timeout = maps:get(timeout, Opts, infinity),

misc:with_trap_exit(
fun () ->
State = #state{parent = Parent,
service_manager = self(),
service = Service,
op_type = Type,
keep_nodes = KeepNodes,
eject_nodes = EjectNodes,
delta_nodes = DeltaNodes,
all_nodes = KeepNodes ++ EjectNodes,
progress_callback = ProgressCallback,
op_body = OpBody},
run_op(State)
{Pid, MRef} =
misc:spawn_monitor(
fun () ->
State = #state{parent = Parent,
service_manager = self(),
service = Service,
op_type = Type,
keep_nodes = KeepNodes,
eject_nodes = EjectNodes,
delta_nodes = DeltaNodes,
all_nodes = KeepNodes ++ EjectNodes,
progress_callback = ProgressCallback,
op_body = OpBody},
run_op(State)
end),

receive
{'EXIT', _Pid, Reason} = Exit ->
?log_debug("Got an exit signal while running op: ~p "
"for service: ~p. Exit message: ~p",
[Type, Service, Exit]),
misc:terminate_and_wait(Pid, Reason),
exit(Reason);
{'DOWN', MRef, _, _, Reason} ->
case Reason of
normal ->
ok;
_ ->
FailedAtom =
list_to_atom("service_" ++
atom_to_list(Type) ++ "_failed"),
exit({FailedAtom, Service, Reason})
end
after
Timeout ->
misc:terminate_and_wait(Pid, shutdown),
TimeoutAtom = list_to_atom("service_" ++
atom_to_list(Type) ++
"_timeout"),
{error, {TimeoutAtom, Service}}
end
end).

run_op(#state{parent = Parent} = State) ->
Expand Down

0 comments on commit a1e2c9d

Please sign in to comment.