Skip to content

Commit

Permalink
Fixes #189 -- include socket peer info in lager messages
Browse files Browse the repository at this point in the history
  • Loading branch information
slfritchie committed Sep 27, 2013
1 parent 679efc7 commit 05dd3ab
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions src/riak_core_handoff_receiver.erl
Expand Up @@ -30,6 +30,7 @@
terminate/2, code_change/3]).

-record(state, {sock :: port(),
peer :: term(),
ssl_opts :: [] | list(),
tcp_mod :: atom(),
timeout_len :: non_neg_integer(),
Expand Down Expand Up @@ -62,27 +63,31 @@ handle_call({set_socket, Socket0}, _From, State = #state{ssl_opts = SslOpts}) ->
Socket = if SslOpts /= [] ->
{ok, Skt} = ssl:ssl_accept(Socket0, SslOpts, 30*1000),
ok = ssl:setopts(Skt, SockOpts),
Peer = safe_peername(Skt, ssl),
Skt;
true ->
ok = inet:setopts(Socket0, SockOpts),
Peer = safe_peername(Socket0, inet),
Socket0
end,
{reply, ok, State#state { sock = Socket }}.
{reply, ok, State#state { sock = Socket, peer = Peer }}.

handle_info({tcp_closed,_Socket},State=#state{partition=Partition,count=Count}) ->
handle_info({tcp_closed,_Socket},State=#state{partition=Partition,count=Count,
peer=Peer}) ->
lager:info("Handoff receiver for partition ~p exited after processing ~p"
" objects", [Partition, Count]),
" objects from ~p", [Partition, Count, Peer]),
{stop, normal, State};
handle_info({tcp_error, _Socket, _Reason}, State=#state{partition=Partition,count=Count}) ->
handle_info({tcp_error, _Socket, _Reason}, State=#state{partition=Partition,count=Count,
peer=Peer}) ->
lager:info("Handoff receiver for partition ~p exited after processing ~p"
" objects", [Partition, Count]),
" objects from ~p", [Partition, Count, Peer]),
{stop, normal, State};
handle_info({tcp, Socket, Data}, State) ->
[MsgType|MsgData] = Data,
case catch(process_message(MsgType, MsgData, State)) of
{'EXIT', Reason} ->
lager:error("Handoff receiver for partition ~p exited abnormally after "
"processing ~p objects: ~p", [State#state.partition, State#state.count, Reason]),
"processing ~p objects from ~p: ~p", [State#state.partition, State#state.count, State#state.peer, Reason]),
{stop, normal, State};
NewState when is_record(NewState, state) ->
InetMod = if NewState#state.ssl_opts /= [] -> ssl;
Expand All @@ -99,12 +104,13 @@ handle_info({ssl, Socket, Data}, State) ->
handle_info({tcp, Socket, Data}, State);
handle_info(timeout, State) ->
lager:error("Handoff receiver for partition ~p timed out after "
"processing ~p objects.", [State#state.partition, State#state.count]),
"processing ~p objects from ~p.", [State#state.partition, State#state.count, State#state.peer]),
{stop, normal, State}.

process_message(?PT_MSG_INIT, MsgData, State=#state{vnode_mod=VNodeMod}) ->
process_message(?PT_MSG_INIT, MsgData, State=#state{vnode_mod=VNodeMod,
peer=Peer}) ->
<<Partition:160/integer>> = MsgData,
lager:info("Receiving handoff data for partition ~p:~p", [VNodeMod, Partition]),
lager:info("Receiving handoff data for partition ~p:~p from ~p", [VNodeMod, Partition, Peer]),
{ok, VNode} = riak_core_vnode_master:get_vnode_pid(Partition, VNodeMod),
Data = [{mod_src_tgt, {VNodeMod, undefined, Partition}},
{vnode_pid, VNode}],
Expand Down Expand Up @@ -148,3 +154,11 @@ handle_cast(_Msg, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.

code_change(_OldVsn, State, _Extra) -> {ok, State}.

safe_peername(Skt, Mod) ->
case Mod:peername(Skt) of
{ok, Info} ->
Info;
_ ->
{unknown, unknown} % Real info is {Addr, Port}
end.

0 comments on commit 05dd3ab

Please sign in to comment.