Skip to content
This repository has been archived by the owner on Aug 9, 2021. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
dergraf committed Oct 21, 2015
1 parent e229b5a commit 48363e0
Showing 1 changed file with 10 additions and 32 deletions.
42 changes: 10 additions & 32 deletions src/vmq_cluster_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
bytes_dropped={os:timestamp(), 0},
bytes_send={os:timestamp(), 0}}).

-define(REMONITOR, 5000).
-define(RECONNECT, 1000).

%%%===================================================================
Expand Down Expand Up @@ -140,23 +139,7 @@ handle_message({NetEv, _}, #state{reconnect_tref=TRef} = State)
%% we're already reconnecting
TRef
end,
State#state{reconnect_tref=NewTRef, reachable=false};
handle_message({nodedown, Node}, #state{node=Node} = State) ->
erlang:send_after(?REMONITOR, self(), remonitor),
State#state{reachable=false};
handle_message(remonitor, #state{node=Node, reachable=Reachable} = State) ->
case Reachable of
true ->
ignore;
false ->
erlang:monitor_node(Node, true)
end,
case net_adm:ping(Node) of
pong ->
State#state{reachable=true};
_ ->
State#state{reachable=false}
end;
State#state{reachable=false, reconnect_tref=NewTRef};
handle_message(reconnect, #state{reachable=false} = State) ->
connect(State#state{reconnect_tref=undefined});
handle_message(Msg, #state{node=Node, reachable=Reachable} = State) ->
Expand All @@ -173,9 +156,10 @@ maybe_flush(#state{pending=Pending} = State) ->
State
end.

internal_flush(#state{reachable=false} = State) -> State;
internal_flush(#state{pending=[]} = State) -> State;
internal_flush(#state{pending=Pending, node=Node, transport=Transport,
reconnect_tref=TRef, socket=Socket, bytes_send={{M, S, _}, V}} = State) ->
socket=Socket, bytes_send={{M, S, _}, V}} = State) ->
L = iolist_size(Pending),
Msg = [<<"vmq-send", L:32>>|lists:reverse(Pending)],
case Transport:send(Socket, Msg) of
Expand All @@ -190,15 +174,9 @@ internal_flush(#state{pending=Pending, node=Node, transport=Transport,
end,
State#state{pending=[], bytes_send=NewBytesSend};
{error, Reason} ->
NewTRef =
case TRef of
undefined ->
lager:warning("can't send ~p bytes to ~p due to ~p, reconnect!",
lager:warning("can't send ~p bytes to ~p due to ~p, reconnect!",
[iolist_size(Pending), Node, Reason]),
reconnect_timer();
_ -> TRef
end,
State#state{reachable=false, reconnect_tref=NewTRef}
State#state{reachable=false, reconnect_tref=reconnect_timer()}
end.

connect(#state{node=RemoteNode} = State) ->
Expand All @@ -213,16 +191,16 @@ connect(#state{node=RemoteNode} = State) ->
Msg = [<<"vmq-connect">>, <<L:32, NodeName/binary>>],
case Transport:send(Socket, Msg) of
ok ->
self() ! remonitor, %% let remonitor decide if we are reachable
State#state{socket=Socket, transport=Transport};
State#state{socket=Socket, transport=Transport,
%% !!! remote node is reachable
reachable=true};
{error, Reason} ->
lager:warning("can't initiate connect to cluster node ~p due to ~p", [RemoteNode, Reason]),
State#state{reachable=false}
State#state{reachable=false, reconnect_tref=reconnect_timer()}
end;
{error, Reason} ->
lager:warning("can't connect to cluster node ~p due to ~p", [RemoteNode, Reason]),
TRef = reconnect_timer(),
State#state{reachable=false, reconnect_tref=TRef}
State#state{reachable=false, reconnect_tref=reconnect_timer()}
end;
{badrpc, nodedown} ->
%% we don't scream.. vmq_cluster_mon screams
Expand Down

0 comments on commit 48363e0

Please sign in to comment.