Skip to content

Commit

Permalink
Merge branch 'master' of github.com:fastip/gen_lb
Browse files Browse the repository at this point in the history
  • Loading branch information
joewilliams committed Apr 4, 2011
2 parents 94b5279 + 66115ab commit 39c2740
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions src/gen_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ round_robin(Nodes, Request, [Node|Context]=C) ->
%% @end
%%--------------------------------------------------------------------
init({Seeds, RemoteService, SelectNode, Context}) ->
Self = self(),
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true, [{node_type,all}]),
KnownNodes = query_cluster(Seeds),
Expand All @@ -147,14 +146,14 @@ handle_call(state, _From, State) ->
{reply, State, State};
handle_call(nodes, _From, State) ->
{reply, State#state.nodes, State};
handle_call({call, Request, Timeout}, From = {_, Ref}, State = #state{context=Context,remote_service=RemoteService,nodes=Nodes,seeds=Seeds,pending=Pending,handlers=Handlers,select_node=SelectNode}) ->
handle_call({call, Request, Timeout}, From = {_, Ref}, State = #state{context=Context,remote_service=RemoteService,nodes=Nodes,pending=Pending,handlers=Handlers,select_node=SelectNode}) ->
case sets:size(Nodes) of
0 ->
error_logger:error_msg("Cluster is down. Queueing request.~n"),
Pend = #pending{type=call,request=Request,ref=Ref,from=From,time=now(),timeout=Timeout},
{noreply, State#state{pending=[Pend|Pending]}};
_ ->
{Pid, Context2} = call_handler(From, self(), RemoteService, Nodes, Ref, Request, Context, SelectNode, Timeout),
{Pid, Context2} = call_handler(From, RemoteService, Nodes, Ref, Request, Context, SelectNode, Timeout),
{noreply, State#state{handlers=dict:store(Pid,Ref,Handlers),context=Context2}}
end;
handle_call(stop, _From, State) ->
Expand All @@ -169,7 +168,7 @@ handle_call(stop, _From, State) ->
%%--------------------------------------------------------------------
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast({cast, Request}, State = #state{context=Context,remote_service=RemoteService,nodes=Nodes,seeds=Seeds,pending=Pending,handlers=Handlers,select_node=SelectNode}) ->
handle_cast({cast, Request}, State = #state{context=Context,remote_service=RemoteService,nodes=Nodes,pending=Pending,select_node=SelectNode}) ->
case sets:size(Nodes) of
0 ->
error_logger:error_msg("Cluster is down. Queueing request.~n"),
Expand All @@ -187,14 +186,14 @@ handle_cast({cast, Request}, State = #state{context=Context,remote_service=Remot
%% @doc Handling all non call/cast messages
%% @end
%%--------------------------------------------------------------------
handle_info(heartbeat, State=#state{seeds=Seeds,handlers=Handlers,nodes=Nodes,beats_up=Beats}) ->
handle_info(heartbeat, State=#state{seeds=Seeds,nodes=Nodes,beats_up=Beats}) ->
KnownNodes = query_cluster([random_set_element(Nodes, Seeds)]),
spawn_connect_nodes(sets:to_list(KnownNodes)),
BeatsUp = case sets:size(Nodes) of
0 -> 0;
_ -> Beats+1
end,
{noreply, State#state{beats_up=Beats+1}};
{noreply, State#state{beats_up=BeatsUp}};
handle_info({'EXIT', Handler, Reason}, State=#state{handlers=Handlers}) ->
case Reason of
normal -> ok;
Expand All @@ -206,6 +205,8 @@ handle_info({nodeup,Node,_}, State) ->
{noreply, nodeup(Node,State)};
handle_info({nodedown,Node,_}, State) ->
{noreply, nodedown(Node,State)};
handle_info({cluster,_,Nodes}, State = #state{nodes=NodeSet}) ->
{noreply, State#state{nodes=sets:union(NodeSet, sets:from_list(Nodes))}};
handle_info(_Info, State) ->
error_logger:info_msg("Load balancer did not understand: ~p~n", [_Info]),
{noreply, State}.
Expand All @@ -232,7 +233,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
nodeup(Node, State=#state{nodes=Nodes,pending=Pending,handlers=Handlers,remote_service=RemoteService}) ->
nodeup(Node, State=#state{nodes=Nodes,pending=Pending,remote_service=RemoteService}) ->
error_logger:info_msg("~p: nodeup ~p~n", [?MODULE,Node]),
case verify_membership(Node, RemoteService) of
ok -> case length(Pending) of
Expand Down Expand Up @@ -299,7 +300,7 @@ cast_handler(RemoteService, Nodes, Request, Context, SelectNode) ->
{RemoteService,Node} ! Request,
Context2.

call_handler(From, Proxy, RemoteService, Nodes, Ref, Request, Context, SelectNode, Timeout) ->
call_handler(From, RemoteService, Nodes, Ref, Request, Context, SelectNode, Timeout) ->
{Node,Context2} = SelectNode(Nodes,Request,Context),
Pid = spawn_link(fun() ->
error_logger:info_msg("sending call to ~p~n", [{RemoteService, Node}]),
Expand All @@ -324,10 +325,10 @@ flush_pending(State = #state{pending=[#pending{type=call,request=Request,ref=Ref
error_logger:info_msg("Request ~p thrown out due to age.", [Ref]),
flush_pending(State#state{pending=Pending});
_ ->
{Pid, Context2} = call_handler(From, self(), RemoteService, Nodes, Ref, Request, Context, SelectNode, Timeout),
{Pid, Context2} = call_handler(From, RemoteService, Nodes, Ref, Request, Context, SelectNode, Timeout),
flush_pending(State#state{pending=Pending,handlers=dict:store(Pid,Ref,Handlers),context=Context2})
end;
flush_pending(State = #state{pending=[#pending{type=cast,request=Request}|Pending],context=Context,remote_service=RemoteService,nodes=Nodes,select_node=SelectNode}) ->
Context2 = cast_handler(RemoteService, Nodes, Request, Context, SelectNode),
flush_pending(State#state{pending=Pending}).
flush_pending(State#state{pending=Pending,context=Context2}).

0 comments on commit 39c2740

Please sign in to comment.