Skip to content

Commit

Permalink
Removed code duplication for service add/drop
Browse files Browse the repository at this point in the history
  • Loading branch information
lordnull committed Nov 8, 2012
1 parent 530d0aa commit 2a0907c
Showing 1 changed file with 58 additions and 85 deletions.
143 changes: 58 additions & 85 deletions src/riak_core_node_watcher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,42 +175,18 @@ handle_call(get_avsn, _From, State) ->
{reply, State#state.avsn, State};

handle_call({service_up, Id, Pid}, _From, State) ->
%% Update the set of active services locally
Services = ordsets:add_element(Id, State#state.services),
S2 = State#state { services = Services },

%% remove any existing health checks
Healths = case orddict:find(Id, State#state.health_checks) of
error ->
State#state.health_checks;
{ok, Check} ->
health_fsm(remove, Id, Check),
orddict:erase(Id, State#state.health_checks)
end,
S2 = remove_health_check(Id, State),

%% Remove any existing mrefs for this service
delete_service_mref(Id),
S3 = add_service(Id, Pid, S2),

%% Setup a monitor for the Pid representing this service
Mref = erlang:monitor(process, Pid),
erlang:put(Mref, Id),
erlang:put(Id, Mref),

%% Update our local ETS table and broadcast
S3 = local_update(S2),
{reply, ok, update_avsn(S3#state{health_checks = Healths})};
{reply, ok, S3};

handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
%% update the active set of services if needed.
{reply, _, State1} = handle_call({service_up, Id, Pid}, From, State),

%% uninstall old health check
case orddict:find(Id, State#state.health_checks) of
{ok, OldCheck} ->
health_fsm(remove, Id, OldCheck);
error ->
ok
end,
State2 = remove_health_check(Id, State1),

%% install the health check
CheckInterval = proplists:get_value(check_interval, Options, 60),
Expand All @@ -226,30 +202,16 @@ handle_call({service_up, Id, Pid, MFA, Options}, From, State) ->
max_callback_failures = proplists:get_value(max_callback_failures, Options, 3),
interval_tref = IntervalTref
},
Healths = orddict:store(Id, CheckRec, State1#state.health_checks),
{reply, ok, State1#state{health_checks = Healths}};
Healths = orddict:store(Id, CheckRec, State2#state.health_checks),
{reply, ok, State2#state{health_checks = Healths}};

handle_call({service_down, Id}, _From, State) ->
%% Update the set of active services locally
Services = ordsets:del_element(Id, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(Id),

%% Update local ETS table and broadcast
S3 = local_update(S2),

%% Remove health check if any
case orddict:find(Id, State#state.health_checks) of
error ->
ok;
{ok, Check} ->
health_fsm(remove, Id, Check)
end,
S2 = remove_health_check(Id, State),

Healths = orddict:erase(Id, S3#state.health_checks),
{reply, ok, update_avsn(S3#state{health_checks = Healths})};
S3 = drop_service(Id, S2),

{reply, ok, S3};

handle_call({node_status, Status}, _From, State) ->
Transition = {State#state.status, Status},
Expand Down Expand Up @@ -316,19 +278,13 @@ handle_info({'DOWN', Mref, _, _Pid, _Info}, State) ->
delete_service_mref(Id),

%% remove any health checks in place
case orddict:find(Id, State#state.health_checks) of
error ->
ok;
{ok, Health} ->
health_fsm(remove, Id, Health)
end,
Healths = orddict:erase(Id, State#state.health_checks),
S2 = remove_health_check(Id, State),

%% Update our list of active services and ETS table
Services = ordsets:del_element(Id, State#state.services),
S2 = State#state { services = Services },
local_update(S2),
{noreply, update_avsn(S2#state{health_checks = Healths})}
S3 = S2#state { services = Services },
local_update(S3),
{noreply, update_avsn(S3)}
end;

handle_info({'EXIT', Pid, _Cause} = Msg, State) ->
Expand Down Expand Up @@ -561,6 +517,36 @@ internal_get_nodes(Service) ->
[]
end.

add_service(ServiceId, Pid, State) ->
%% Update the set of active services locally
Services = ordsets:add_element(ServiceId, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(ServiceId),

%% Setup a monitor for the Pid representing this service
Mref = erlang:monitor(process, Pid),
erlang:put(Mref, ServiceId),
erlang:put(ServiceId, Mref),

%% Update our local ETS table and broadcast
S3 = local_update(S2),
update_avsn(S3).

drop_service(ServiceId, State) ->
%% Update the set of active services locally
Services = ordsets:del_element(ServiceId, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(ServiceId),

%% Update local ETS table and broadcast
S3 = local_update(S2),

update_avsn(S3).

handle_check_msg(_Msg, undefined, State) ->
State;
handle_check_msg(_Msg, _ServiceId, #state{status = down} = State) ->
Expand All @@ -584,36 +570,23 @@ handle_check_return({ok, Check}, ServiceId, State) ->
handle_check_return({up, Check}, ServiceId, State) ->
#health_check{service_pid = Pid} = Check,
Healths = orddict:store(ServiceId, Check, State#state.health_checks),

%% Update the set of active services locally
Services = ordsets:add_element(ServiceId, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(ServiceId),

%% Setup a monitor for the Pid representing this service
Mref = erlang:monitor(process, Pid),
erlang:put(Mref, ServiceId),
erlang:put(ServiceId, Mref),

%% Update our local ETS table and broadcast
S3 = local_update(S2),
update_avsn(S3#state{health_checks = Healths});
S2 = State#state{health_checks = Healths},
add_service(ServiceId, Pid, S2);
handle_check_return({down, Check}, ServiceId, State) ->
Healths = orddict:store(ServiceId, Check, State#state.health_checks),
S2 = State#state{health_checks = Healths},
drop_service(ServiceId, S2).

%% Update the set of active services locally
Services = ordsets:del_element(ServiceId, State#state.services),
S2 = State#state { services = Services },

%% Remove any existing mrefs for this service
delete_service_mref(ServiceId),

%% Update local ETS table and broadcast
S3 = local_update(S2),

update_avsn(S3#state{health_checks = Healths}).
remove_health_check(ServiceId, State) ->
#state{health_checks = Healths} = State,
Healths2 = case orddict:find(ServiceId, Healths) of
error ->
Healths;
{ok, Check} ->
health_fsm(remove, ServiceId, Check),
orddict:erase(ServiceId, Healths)
end,
State#state{health_checks = Healths2}.

%% health checks are an fsm to make mental modeling easier.
%% There are X states:
Expand Down

0 comments on commit 2a0907c

Please sign in to comment.