diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index 33ff033d..a6f9f045 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -372,12 +372,14 @@ handle_cast(start_fullsync, State) -> handle_cast(stop_fullsync, State) -> % exit all running, cancel all timers, and reset the state. - [erlang:cancel_timer(Tref) || #partition_info{whereis_tref = Tref} <- State#state.whereis_waiting], - [begin - unlink(Pid), - riak_repl2_fssource:stop_fullsync(Pid), - riak_repl2_fssource_sup:disable(node(Pid), Part) - end || #partition_info{index = Part, running_source = Pid} <- State#state.running_sources], + _ = [erlang:cancel_timer(Tref) || #partition_info{whereis_tref = Tref} + <- State#state.whereis_waiting], + _ = [begin + unlink(Pid), + riak_repl2_fssource:stop_fullsync(Pid), + riak_repl2_fssource_sup:disable(node(Pid), Part) + end || #partition_info{index = Part, running_source = Pid} + <- State#state.running_sources], State2 = State#state{ largest_n = undefined, owners = [], @@ -591,7 +593,7 @@ handle_socket_msg({location, Partition, {Node, Ip, Port}}, #state{whereis_waitin State; {value, PartitionInfo, Waiting2} -> Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), % we don't know for sure it's no longer busy until we get a busy reply NewBusies = sets:del_element(Node, State#state.busy_nodes), State2 = State#state{whereis_waiting = Waiting2, busy_nodes = NewBusies}, @@ -608,7 +610,7 @@ handle_socket_msg({location_busy, Partition}, #state{whereis_waiting = Waiting} lager:info("anya Partition ~p is too busy on cluster ~p at node ~p", [Partition, State#state.other_cluster, PartitionInfo#partition_info.node]), Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), State2 = State#state{whereis_waiting = Waiting2}, Partition2 = PartitionInfo#partition_info{whereis_tref = undefined}, PQueue = State2#state.partition_queue, @@ -624,7 +626,7 @@ handle_socket_msg({location_busy, Partition, Node}, #state{whereis_waiting = Wai {value, PartitionInfo, Waiting2} -> lager:info("Partition ~p is too busy on cluster ~p at node ~p", [Partition, State#state.other_cluster, Node]), Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), State2 = State#state{whereis_waiting = Waiting2}, @@ -645,7 +647,7 @@ handle_socket_msg({location_down, Partition}, #state{whereis_waiting=Waiting} = lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]), Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), Dropped = [Partition | State#state.dropped], #state{retry_exits = RetryExits, error_exits = ErrorExits} = State, State2 = State#state{whereis_waiting = Waiting2, dropped = Dropped, @@ -659,7 +661,7 @@ handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Wait State; {value, PartitionInfo, Waiting2} -> Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), RetryLimit = app_helper:get_env(riak_repl, max_reserve_retries, ?DEFAULT_RESERVE_RETRIES), lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]), State2 = State#state{whereis_waiting = Waiting2}, diff --git a/src/riak_repl2_fssource.erl b/src/riak_repl2_fssource.erl index 9e80c42b..d193f8af 100644 --- a/src/riak_repl2_fssource.erl +++ b/src/riak_repl2_fssource.erl @@ -101,14 +101,12 @@ init([Partition, IP, Owner]) -> case connect(IP, SupportedStrategy, Partition) of {error, Reason} -> {stop, Reason}; - Result -> - Result + {ok, State}-> + {ok, State#state{owner = Owner}} end; {error, Reason} -> %% the vnode is probably busy. Try again later. - {stop, Reason}; - {ok, State}-> - {ok, State#state{owner = Owner}} + {stop, Reason} end. handle_call({connected, Socket, Transport, _Endpoint, Proto, Props},