Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial migration of connection manager to core. Really this time.

  • Loading branch information...
commit 844d700b7079af847f3495e91208236ce76af580 1 parent 22309ac
@buddhisthead buddhisthead authored
View
72 include/riak_core_connection.hrl
@@ -0,0 +1,72 @@
+%% Riak Core Connection Manager
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+
+%% handshake messages to safely initiate a connection. Let's not accept
+%% a connection to a telnet session by accident!
+-define(CTRL_REV, {1,0}).
+-define(CTRL_HELLO, <<"riak-ctrl:hello">>).
+-define(CTRL_TELL_IP_ADDR, <<"riak-ctrl:ip_addr">>).
+-define(CTRL_ACK, <<"riak-ctrl:ack">>).
+
+-define(CONNECTION_SETUP_TIMEOUT, 10000).
+
+-define(CTRL_OPTIONS, [binary,
+ {keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+%% Tcp options shared during the connection and negotiation phase
+-define(CONNECT_OPTIONS, [binary,
+ {keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+-type(ip_addr_str() :: string()).
+-type(ip_portnum() :: non_neg_integer()).
+-type(ip_addr() :: {ip_addr_str(), ip_portnum()}).
+-type(tcp_options() :: [any()]).
+
+-type(proto_id() :: atom()).
+-type(rev() :: non_neg_integer()). %% major or minor revision number
+-type(proto() :: {proto_id(), {rev(), rev()}}). %% e.g. {realtime_repl, 1, 0}
+-type(protoprefs() :: {proto_id(), [{rev(), rev()}]}).
+
+-type(cluster_finder_fun() :: fun(() -> {ok,node()} | {error, term()})).
+
+%% Function = fun(Socket, Transport, Protocol, Args) -> ok
+%% Protocol :: proto()
+-type(service_started_callback() :: fun((inet:socket(), module(), proto(), [any()]) -> no_return())).
+
+%% Host protocol spec
+-type(hostspec() :: {protoprefs(), {tcp_options(), module(), service_started_callback(), [any()]}}).
+
+%% Client protocol spec
+-type(clientspec() :: {protoprefs(), {tcp_options(), module(),[any()]}}).
+
+
+%% Scheduler strategies tell the connection manager how distribute the load.
+%%
+%% Client scheduler strategies
+%% ---------------------------
+%% default := service side decides how to choose node to connect to. limits number of
+%% accepted connections to max_nb_cons().
+%% askme := the connection manager will call your client for a custom protocol strategy
+%% and likewise will expect that the service side has plugged the cluster
+%% manager with support for that custom strategy. UNSUPPORTED so far.
+%% TODO: add a behaviour for the client to implement that includes this callback.
+%% Service scheduler strategies
+%% ----------------------------
+%% round_robin := choose the next available node on cluster to service request. limits
+%% the number of accepted connections to max_nb_cons().
+%% custom := service must provide a strategy to the cluster manager for choosing nodes
+%% UNSUPPORTED so far. Should we use a behaviour for the service module?
+-type(max_nb_cons() :: non_neg_integer()).
+-type(client_scheduler_strategy() :: default | askme).
+-type(service_scheduler_strategy() :: {round_robin, max_nb_cons()} | custom).
+
+%% service manager statistics, can maybe get shared by other layers too
+-record(stats, {open_connections = 0 : non_negative_integer()}).
View
187 src/riak_core_connection.erl
@@ -0,0 +1,187 @@
+%% Riak Replication Subprotocol Server Dispatch and Client Connections
+%%
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+
+-module(riak_core_connection).
+
+-include("riak_core_connection.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%%-define(TRACE(Stmt),Stmt).
+-define(TRACE(Stmt),ok).
+
+%% public API
+-export([connect/2,
+ sync_connect/2]).
+
+%% internal functions
+-export([async_connect_proc/3]).
+
+%%TODO: move to riak_ring_core...symbolic cluster naming
+-export([set_symbolic_clustername/2, set_symbolic_clustername/1,
+ symbolic_clustername/1, symbolic_clustername/0]).
+
+%% @doc Sets the symbolic, human readable, name of this cluster.
+set_symbolic_clustername(Ring, ClusterName) ->
+ {new_ring,
+ riak_core_ring:update_meta(symbolic_clustername, ClusterName, Ring)}.
+
+set_symbolic_clustername(ClusterName) when is_list(ClusterName) ->
+ {error, "argument is not a string"};
+set_symbolic_clustername(ClusterName) ->
+ case riak_core_ring_manager:get_my_ring() of
+ {ok, _Ring} ->
+ riak_core_ring_manager:ring_trans(fun riak_core_connection:set_symbolic_clustername/2,
+ ClusterName);
+ {error, Reason} ->
+ lager:error("Can't set symbolic clustername because: ~p", [Reason])
+ end.
+
+symbolic_clustername(Ring) ->
+ case riak_core_ring:get_meta(symbolic_clustername, Ring) of
+ {ok, Name} -> Name;
+ undefined -> "undefined"
+ end.
+
+%% @doc Returns the symbolic, human readable, name of this cluster.
+symbolic_clustername() ->
+ case riak_core_ring_manager:get_my_ring() of
+ {ok, Ring} ->
+ symbolic_clustername(Ring);
+ {error, Reason} ->
+ lager:error("Can't read symbolic clustername because: ~p", [Reason]),
+ "undefined"
+ end.
+
+%% Make async connection request. The connection manager is responsible for retry/backoff
+%% and calls your module's functions on success or error (asynchrously):
+%% Module:connected(Socket, TransportModule, {IpAddress, Port}, {Proto, MyVer, RemoteVer}, Args)
+%% Module:connect_failed(Proto, {error, Reason}, Args)
+%% Reason could be 'protocol_version_not_supported"
+%%
+%%
+%% You can set options on the tcp connection, e.g.
+%% [{packet, 4}, {active, false}, {keepalive, true}, {nodelay, true}]
+%%
+%% ClientProtocol specifies the preferences of the client, in terms of what versions
+%% of a protocol it speaks. The host will choose the highest common major version and
+%% inform the client via the callback Module:connected() in the HostProtocol parameter.
+%%
+%% Note: that the connection will initially be setup with the `binary` option
+%% because protocol negotiation requires binary. TODO: should we allow non-binary
+%% options? Check that binary is not overwritten.
+%%
+%% connect returns the pid() of the asynchronous process that will attempt the connection.
+
+-spec(connect(ip_addr(), clientspec()) -> pid()).
+connect({IP,Port}, ClientSpec) ->
+ ?TRACE(?debugMsg("spawning async_connect link")),
+ %% start a process to handle the connection request asyncrhonously
+ proc_lib:spawn_link(?MODULE, async_connect_proc, [self(), {IP,Port}, ClientSpec]).
+
+sync_connect({IP,Port}, ClientSpec) ->
+ sync_connect_status(self(), {IP,Port}, ClientSpec).
+
+%% @private
+
+%% exchange brief handshake with client to ensure that we're supporting sub-protocols.
+%% client -> server : Hello {1,0} [Capabilities]
+%% server -> client : Ack {1,0} [Capabilities]
+exchange_handshakes_with(host, Socket, Transport, MyCaps) ->
+ Hello = term_to_binary({?CTRL_HELLO, ?CTRL_REV, MyCaps}),
+ case Transport:send(Socket, Hello) of
+ ok ->
+ ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from host", [?CTRL_ACK])),
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, Ack} ->
+ case binary_to_term(Ack) of
+ {?CTRL_ACK, TheirRev, TheirCaps} ->
+ Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps],
+ {ok,Props};
+ {error, _Reason} = Error ->
+ Error;
+ Msg ->
+ {error, Msg}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ Error ->
+ Error
+ end.
+
+async_connect_proc(Parent, {IP,Port}, ProtocolSpec) ->
+ sync_connect_status(Parent, {IP,Port}, ProtocolSpec).
+
+%% connect synchronously to remote addr/port and return status
+sync_connect_status(_Parent, {IP,Port}, {ClientProtocol, {Options, Module, Args}}) ->
+ Timeout = ?CONNECTION_SETUP_TIMEOUT,
+ Transport = ranch_tcp,
+ %% connect to host's {IP,Port}
+ ?TRACE(?debugFmt("sync_connect: connect to ~p", [{IP,Port}])),
+ case gen_tcp:connect(IP, Port, ?CONNECT_OPTIONS, Timeout) of
+ {ok, Socket} ->
+ ?TRACE(?debugFmt("Setting system options on client side: ~p", [?CONNECT_OPTIONS])),
+ Transport:setopts(Socket, ?CONNECT_OPTIONS),
+ %% handshake to make sure it's a riak sub-protocol dispatcher
+ MyName = symbolic_clustername(),
+ MyCaps = [{clustername, MyName}],
+ case exchange_handshakes_with(host, Socket, Transport, MyCaps) of
+ {ok,Props} ->
+ %% ask for protocol, see what host has
+ case negotiate_proto_with_server(Socket, Transport, ClientProtocol) of
+ {ok,HostProtocol} ->
+ %% set client's requested Tcp options
+ ?TRACE(?debugFmt("Setting user options on client side; ~p", [Options])),
+ Transport:setopts(Socket, Options),
+ %% notify requester of connection and negotiated protocol from host
+ %% pass back returned value in case problem detected on connection
+ %% by module. requestor is responsible for transferring control
+ %% of the socket.
+ Module:connected(Socket, Transport, {IP, Port}, HostProtocol, Args, Props);
+ {error, Reason} ->
+ ?TRACE(?debugFmt("negotiate_proto_with_server returned: ~p", [{error,Reason}])),
+ %% Module:connect_failed(ClientProtocol, {error, Reason}, Args),
+ {error, Reason}
+ end;
+ {error, closed} ->
+ %% socket got closed, don't report this
+ {error, closed};
+ {error, Reason} ->
+ lager:error("Failed to exchange handshake with host. Error = ~p", [Reason]),
+ {error, Reason};
+ Error ->
+ %% failed to exchange handshakes
+ lager:error("Failed to exchange handshake with host. Error = ~p", [Error]),
+ {error, Error}
+ end;
+ {error, Reason} ->
+ %% Module:connect_failed(ClientProtocol, {error, Reason}, Args),
+ {error, Reason}
+ end.
+
+%% Negotiate the highest common major protocol revisision with the connected server.
+%% client -> server : Prefs List = {SubProto, [{Major, Minor}]}
+%% server -> client : selected version = {SubProto, {Major, HostMinor, ClientMinor}}
+%%
+%% returns {ok,{Proto,{Major,ClientMinor},{Major,HostMinor}}} | {error, Reason}
+negotiate_proto_with_server(Socket, Transport, ClientProtocol) ->
+ ?TRACE(?debugFmt("negotiate protocol with host, client proto = ~p", [ClientProtocol])),
+ Transport:send(Socket, erlang:term_to_binary(ClientProtocol)),
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, NegotiatedProtocolBin} ->
+ case erlang:binary_to_term(NegotiatedProtocolBin) of
+ {ok, {Proto,{CommonMajor,HMinor,CMinor}}} ->
+ {ok, {Proto,{CommonMajor,CMinor},{CommonMajor,HMinor}}};
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ lager:error("Failed to receive protocol ~p response from server. Reason = ~p",
+ [ClientProtocol, Reason]),
+ {error, connection_failed}
+ end.
View
600 src/riak_core_connection_mgr.erl
@@ -0,0 +1,600 @@
+%% Riak Replication Subprotocol Server Dispatch and Client Connections
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+
+-module(riak_core_connection_mgr).
+-behaviour(gen_server).
+
+-include("riak_core_connection.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% controls retry and backoff.
+-define(INITIAL_BACKOFF, 1 * 1000). %% 1 second initial backoff per endpoint
+-define(MAX_BACKOFF, 1 * 60 * 1000). %% 1 minute maximum backoff per endpoint
+-define(EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, 10 * 1000). %% 10 seconds until retrying the list again
+
+%% retry delay if locator returned empty list
+-ifdef(TEST).
+-define(TRACE(Stmt),Stmt).
+%%-define(TRACE(Stmt),ok).
+-define(DEFAULT_RETRY_NO_ENDPOINTS, 2 * 1000). %% short for testing to avoid timeout
+-else.
+-define(TRACE(Stmt),ok).
+-define(DEFAULT_RETRY_NO_ENDPOINTS, 5 * 1000). %% 5 seconds
+-endif.
+
+
+-define(SERVER, riak_core_connection_manager).
+-define(MAX_LISTENERS, 100).
+
+-type(counter() :: non_neg_integer()).
+
+%% Connection manager strategy (per Jon M.)
+%% when a connection request comes in,
+%% + call the locator service to get the list of {transport, {address, port}}
+%% + create a linked helper process to call riak_core_connection (just once) on the next available
+%% connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary)
+%% + on connection it transfers control of the socket back to the connmgr, casts a success message back
+%% to the connection manager and exits normally.
+%% - on success, the connection manager increments successful connects, reset the backoff timeout on
+%% that connection.
+%% - on failure, casts a failure message back to the connection manager (error, timeout etc) the
+%% connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure
+%% counter and starts a timer for the backoff time (and updates it for next time). The connection
+%% manager checks for the next non--blacklisted endpoint in the connection request list to launch
+%% a new connection, if the list is empty call the locator service again to get a new list. If all
+%% connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff
+%% time too).
+
+%% End-point status state, updated for failed and successful connection attempts,
+%% or by timers that fire to update the backoff time.
+%% TODO: add folsom window'd stats
+%% handle an EXIT from the helper process if it dies
+-record(ep, {addr, %% endpoint {IP, Port}
+ nb_curr_connections = 0 :: counter(), %% number of current connections
+ nb_success = 0 :: counter(), %% total successfull connects on this ep
+ nb_failures = 0 :: counter(), %% total failed connects on this ep
+ is_black_listed = false :: boolean(), %% true after a failed connection attempt
+ backoff_delay=0 :: counter(), %% incremented on each failure, reset to zero on success
+ failures = orddict:new() :: orddict:orddict(), %% failure reasons
+ last_fail_time :: erlang:timestamp(), %% time of last failure since 1970
+ next_try_secs :: counter() %% time in seconds to next retry attempt
+ }).
+
+%% connection request record
+-record(req, {ref, % Unique reference for this connection request
+ pid, % Helper pid trying to make connection
+ target, % target to connect to {Type, Name}
+ spec, % client spec
+ strategy, % connection strategy
+ cur, % current connection endpoint
+ state = init, % init | connecting | connected | cancelled
+ status % history of connection attempts
+ }).
+
+
+%% connection manager state:
+%% cluster_finder := function that returns the ip address
+-record(state, {is_paused = false :: boolean(),
+ cluster_finder = fun() -> {error, undefined} end :: cluster_finder_fun(),
+ pending = [] :: [#req{}], % pending requests
+ %% endpoints :: {module(),ip_addr()} -> ep()
+ endpoints = orddict:new() :: orddict:orddict(), %% known endpoints w/status
+ locators = orddict:new() :: orddict:orddict(), %% connection locators
+ nb_total_succeeded = 0 :: counter(),
+ nb_total_failed = 0 :: counter()
+ }).
+
+-export([start_link/0,
+ resume/0,
+ pause/0,
+ is_paused/0,
+ set_cluster_finder/1,
+ get_cluster_finder/0,
+ connect/2, connect/3,
+ disconnect/1,
+ register_locator/2,
+ apply_locator/2,
+ reset_backoff/0,
+ stop/0
+ ]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% internal functions
+-export([connection_helper/4, increase_backoff/1]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+-spec(start_link() -> {ok, pid()}).
+start_link() ->
+ Args = [],
+ Options = [],
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options).
+
+%% resume() will begin/resume accepting and establishing new connections, in
+%% order to maintain the protocols that have been (or continue to be) registered
+%% and unregistered. pause() will not kill any existing connections, but will
+%% cease accepting new requests or retrying lost connections.
+-spec(resume() -> ok).
+resume() ->
+ gen_server:cast(?SERVER, resume).
+
+-spec(pause() -> ok).
+pause() ->
+ gen_server:cast(?SERVER, pause).
+
+%% return paused state
+is_paused() ->
+ gen_server:call(?SERVER, is_paused).
+
+%% reset all backoff delays to zero
+reset_backoff() ->
+ gen_server:cast(?SERVER, reset_backoff).
+
+%% Specify a function that will return the IP/Port of our Cluster Manager.
+%% Connection Manager will call this function each time it wants to find the
+%% current ClusterManager
+-spec(set_cluster_finder(cluster_finder_fun()) -> ok).
+set_cluster_finder(Fun) ->
+ gen_server:cast(?SERVER, {set_cluster_finder, Fun}).
+
+%% Return the current function that finds the Cluster Manager
+get_cluster_finder() ->
+ gen_server:call(?SERVER, get_cluster_finder).
+
+%% Register a locator - for the given Name and strategy it returns {ok, [{IP,Port}]}
+%% list of endpoints to connect to, in order. The list may be empty.
+%% If the query can never be answered
+%% return {error, Reason}.
+%% fun(Name
+register_locator(Type, Fun) ->
+ gen_server:call(?SERVER, {register_locator, Type, Fun}, infinity).
+
+apply_locator(Name, Strategy) ->
+ gen_server:call(?SERVER, {apply_locator, Name, Strategy}, infinity).
+
+%% Establish a connection to the remote destination. be persistent about it,
+%% but not too annoying to the remote end. Connect by name of cluster or
+%% IP address. Use default strategy to find "best" peer for connection.
+%%
+%% Targets are found by applying a registered locator for it.
+%% The identity locator is pre-installed, so if you want to connect to a list
+%% of IP and Port addresses, supply a Target like this: {identity, [{IP, Port},...]},
+%% where IP::string() and Port::integer(). You can also pass {identity, {IP, Port}}
+%% and the locator will use just that one IP. With a list, it will rotate
+%% trying them all until a connection is established.
+%%
+%% Other locator types must be registered with this connection manager
+%% before calling connect().
+%%
+%% Supervision must be done by the calling process if desired. No supervision
+%% is done here.
+%%
+connect(Target, ClientSpec) ->
+ gen_server:call(?SERVER, {connect, Target, ClientSpec, default}).
+
+connect(Target, ClientSpec, Strategy) ->
+ gen_server:call(?SERVER, {connect, Target, ClientSpec, Strategy}).
+
+disconnect(Target) ->
+ gen_server:cast(?SERVER, {disconnect, Target}).
+
+stop() ->
+ gen_server:call(?SERVER, stop).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ process_flag(trap_exit, true),
+ %% install default "identity" locator
+ Locator = fun identity_locator/2,
+ {ok, #state{is_paused = false,
+ locators = orddict:store(identity, Locator, orddict:new())
+ }}.
+
+handle_call(is_paused, _From, State) ->
+ {reply, State#state.is_paused, State};
+
+handle_call(get_cluster_finder, _From, State) ->
+ {reply, State#state.cluster_finder, State};
+
+%% connect based on address. Return process id of helper
+handle_call({connect, Target, ClientSpec, Strategy}, _From, State) ->
+ Reference = make_ref(),
+ Request = #req{ref = Reference,
+ target = Target,
+ pid = undefined,
+ spec = ClientSpec,
+ state = init,
+ strategy = Strategy},
+ %% add request to pending queue so it may be found in restarts
+ State2 = State#state{pending = lists:keystore(Reference, #req.ref,
+ State#state.pending,
+ Request)},
+ {reply, {ok, Reference}, start_request(Request, State2)};
+
+handle_call({get_endpoint_backoff, Addr}, _From, State) ->
+ {reply, {ok, get_endpoint_backoff(Addr, State#state.endpoints)}, State};
+
+handle_call({register_locator, Type, Fun}, _From,
+ State = #state{locators = Locators}) ->
+ {reply, ok, State#state{locators = orddict:store(Type, Fun, Locators)}};
+
+handle_call({apply_locator, Target, Strategy}, _From,
+ State = #state{locators = Locators}) ->
+ AddrsOrError = locate_endpoints(Target, Strategy, Locators),
+ {reply, AddrsOrError, State};
+
+handle_call(stop, _From, State) ->
+ %% TODO do we need to cleanup helper pids here?
+ {stop, normal, ok, State};
+
+handle_call({should_try_endpoint, Ref, Addr}, _From, State = #state{pending=Pending}) ->
+ case lists:keyfind(Ref, #req.ref, Pending) of
+ false ->
+ %% This should never happen
+ {reply, false, State};
+ Req ->
+ {Answer, ReqState} =
+ case Req#req.state of
+ cancelled ->
+ %% helper process hasn't cancelled itself yet.
+ {false, cancelled};
+ _ ->
+ {true, connecting}
+ end,
+ {reply, Answer, State#state{pending = lists:keystore(Ref, #req.ref, Pending,
+ Req#req{cur = Addr,
+ state = ReqState})}}
+ end;
+
+handle_call(_Unhandled, _From, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])),
+ {reply, {error, unhandled}, State}.
+
+handle_cast(pause, State) ->
+ {noreply, State#state{is_paused = true}};
+
+handle_cast(resume, State) ->
+ {noreply, State#state{is_paused = false}};
+
+handle_cast({disconnect, Target}, State) ->
+ {noreply, disconnect_from_target(Target, State)};
+
+%% reset all backoff delays to zero.
+%% TODO: restart stalled connections.
+handle_cast(reset_backoff, State) ->
+ NewEps = reset_backoff(State#state.endpoints),
+ {noreply, State#state{endpoints = NewEps}};
+
+handle_cast({set_cluster_finder, FinderFun}, State) ->
+ {noreply, State#state{cluster_finder=FinderFun}};
+
+%% helper process says no endpoints were returned by the locators.
+%% helper process will schedule a retry.
+handle_cast({conmgr_no_endpoints, _Ref}, State) ->
+ %% mark connection as black-listed and start timer for reset
+ {noreply, State};
+
+%% helper process says it failed to reach an address.
+handle_cast({endpoint_failed, Addr, Reason, ProtocolId}, State) ->
+ %% mark connection as black-listed and start timer for reset
+ {noreply, fail_endpoint(Addr, Reason, ProtocolId, State)}.
+
+%% it is time to remove Addr from the black-listed addresses
+handle_info({backoff_timer, Addr}, State = #state{endpoints = EPs}) ->
+ case orddict:find(Addr, EPs) of
+ {ok, EP} ->
+ EP2 = EP#ep{is_black_listed = false},
+ {noreply, State#state{endpoints = orddict:store(Addr,EP2,EPs)}};
+ error ->
+ %% TODO: Should never happen because the Addr came from the EP list.
+ {norepy, State}
+ end;
+handle_info({retry_req, Ref}, State = #state{pending = Pending}) ->
+ case lists:keyfind(Ref, #req.ref, Pending) of
+ false ->
+ %% TODO: should never happen
+ {noreply, State};
+ Req ->
+ {noreply, start_request(Req, State)}
+ end;
+
+%%% All of the connection helpers end here
+%% cases:
+%% helper succeeded -> update EP stats: BL<-false, backoff_delay<-0
+%% helper failed -> updates EP stats: failures++, backoff_delay++
+%% other Pid failed -> pass on linked error
+handle_info({'EXIT', From, Reason}, State = #state{pending = Pending}) ->
+ %% Work out which endpoint it was
+ case lists:keytake(From, #req.pid, Pending) of
+ false ->
+ %% Must have been something we were linked to, or linked to us
+ lager:error("Connection Manager exiting because linked process ~p exited for reason: ~p",
+ [From, Reason]),
+ exit({linked, From, Reason});
+ {value, #req{cur = Cur, ref = Ref}=Req, Pending2} ->
+ {{ProtocolId, _Foo},_Bar} = Req#req.spec,
+ case Reason of
+ ok ->
+ %% update the stats module
+ Stat = conn_success,
+ riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId),
+ %% riak_core_connection set up and handlers called
+ {noreply, connect_endpoint(Cur, State#state{pending = Pending2})};
+
+ {ok, cancelled} ->
+ %% helper process has been cancelled and has exited nicely.
+ %% update the stats module
+ Stat = conn_cancelled,
+ riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId),
+ %% toss out the cancelled request from pending.
+ {noreply, State#state{pending = Pending2}};
+
+ {error, endpoints_exhausted, Ref} ->
+ %% tried all known endpoints. schedule a retry.
+ %% reuse the existing request Reference, Ref.
+ case Req#req.state of
+ cancelled ->
+ %% oops. that request was cancelled. No retry
+ {noreply, State#state{pending = Pending2}};
+ _ ->
+ {noreply, schedule_retry(?EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, Ref, State)}
+ end;
+
+ Reason -> % something bad happened to the connection, reuse the request
+ ?TRACE(?debugFmt("handle_info: EP failed on ~p for ~p. removed Ref ~p",
+ [Cur, Reason, Ref])),
+ lager:warning("handle_info: endpoint ~p failed: ~p. removed Ref ~p",
+ [Cur, Reason, Ref]),
+ State2 = fail_endpoint(Cur, Reason, ProtocolId, State),
+ %% the connection helper will not retry. It's up the caller.
+ State3 = fail_request(Reason, Req, State2),
+ {noreply, State3}
+ end
+ end;
+handle_info(_Unhandled, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])),
+ lager:error("Unhandled gen_server info: ~p", [_Unhandled]),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Private
+%%%===================================================================
+
+identity_locator({IP,Port}, _Policy) ->
+ {ok, [{IP,Port}]};
+identity_locator([Ips], _Policy) ->
+ {ok, Ips}.
+
+%% close the pending connection and cancel the request
+disconnect_from_target(Target, State = #state{pending = Pending}) ->
+ lager:debug("Disconnecting from: ~p", [Target]),
+ case lists:keyfind(Target, #req.target, Pending) of
+ false ->
+ %% already gone!
+ State;
+ Req ->
+ %% The helper process will discover the cancellation when it asks if it
+ %% should connect to an endpoint.
+ State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending,
+ Req#req{state = cancelled})}
+ end.
+
+%% schedule a retry to occur after Interval milliseconds.
+%% do not clear the pid from pending. the exit handler will do that.
+schedule_retry(Interval, Ref, State = #state{pending = Pending}) ->
+ case lists:keyfind(Ref, #req.ref, Pending) of
+ false ->
+ %% this should never happen
+ lager:error("ConnectionManager: failed to find connection ref while scheduling retry."),
+ State;
+ Req ->
+ case Req#req.state of
+ cancelled ->
+ %% the request was cancelled, so no rescheduling wanted.
+ State;
+ _ ->
+ %% reschedule request to happen in the future
+ erlang:send_after(Interval, self(), {retry_req, Ref}),
+ State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending,
+ Req#req{cur = undefined})}
+ end
+ end.
+
+%% Start process to make connection to available endpoints. Return a reference for
+%% this connection attempt.
+start_request(#req{state=cancelled}, State) ->
+ State;
+start_request(Req = #req{ref=Ref, target=Target, spec=ClientSpec, strategy=Strategy},
+ State) ->
+ case locate_endpoints(Target, Strategy, State#state.locators) of
+ {ok, []} ->
+ %% locators provided no addresses
+ gen_server:cast(?SERVER, {conmgr_no_endpoints, Ref}),
+ Interval = app_helper:get_env(riak_core, connmgr_no_endpoint_retry,
+ ?DEFAULT_RETRY_NO_ENDPOINTS),
+ lager:debug("Connection Manager located no endpoints for: ~p. Will retry.", [Target]),
+ %% schedule a retry and exit
+ schedule_retry(Interval, Ref, State);
+ {ok, EpAddrs } ->
+ lager:debug("Connection Manager located endpoints: ~p", [EpAddrs]),
+ AllEps = update_endpoints(EpAddrs, State#state.endpoints),
+ TryAddrs = filter_blacklisted_endpoints(EpAddrs, AllEps),
+ lager:debug("Connection Manager trying endpoints: ~p", [TryAddrs]),
+ Pid = spawn_link(
+ fun() -> exit(try connection_helper(Ref, ClientSpec, Strategy, TryAddrs)
+ catch T:R -> {exception, {T, R}}
+ end)
+ end),
+ State#state{endpoints = AllEps,
+ pending = lists:keystore(Ref, #req.ref, State#state.pending,
+ Req#req{pid = Pid,
+ state = connecting,
+ cur = undefined})};
+ {error, Reason} ->
+ fail_request(Reason, Req, State)
+ end.
+
+%% reset the backoff delay to zero for all endpoints
+reset_backoff(Endpoints) ->
+ orddict:map(fun(_Addr,EP) -> EP#ep{backoff_delay = 0} end,Endpoints).
+
+%% increase the backoff delay, but cap at a maximum
+increase_backoff(0) ->
+ ?INITIAL_BACKOFF;
+increase_backoff(Delay) when Delay > ?MAX_BACKOFF ->
+ ?MAX_BACKOFF;
+increase_backoff(Delay) ->
+ 2 * Delay.
+
+%% Convert an inet:address to a string if needed.
+string_of_ip(IP) when is_tuple(IP) ->
+ inet_parse:ntoa(IP);
+string_of_ip(IP) ->
+ IP.
+
+string_of_ipport({IP,Port}) ->
+ string_of_ip(IP) ++ ":" ++ erlang:integer_to_list(Port).
+
+%% A spawned process that will walk down the list of endpoints and try them
+%% all until exhausting the list. This process is responsible for waiting for
+%% the backoff delay for each endpoint.
+connection_helper(Ref, _Protocol, _Strategy, []) ->
+ %% exhausted the list of endpoints. let server start new helper process
+ {error, endpoints_exhausted, Ref};
+connection_helper(Ref, Protocol, Strategy, [Addr|Addrs]) ->
+ {{ProtocolId, _Foo},_Bar} = Protocol,
+ %% delay by the backoff_delay for this endpoint.
+ {ok, BackoffDelay} = gen_server:call(?SERVER, {get_endpoint_backoff, Addr}),
+ lager:debug("Holding off ~p seconds before trying ~p at ~p",
+ [(BackoffDelay/1000), ProtocolId, string_of_ipport(Addr)]),
+ timer:sleep(BackoffDelay),
+ case gen_server:call(?SERVER, {should_try_endpoint, Ref, Addr}) of
+ true ->
+ lager:debug("Trying connection to: ~p at ~p", [ProtocolId, string_of_ipport(Addr)]),
+ case riak_core_connection:sync_connect(Addr, Protocol) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% notify connection manager this EP failed and try next one
+ gen_server:cast(?SERVER, {endpoint_failed, Addr, Reason, ProtocolId}),
+ connection_helper(Ref, Protocol, Strategy, Addrs)
+ end;
+ _ ->
+ %% connection request has been cancelled
+ lager:debug("Ignoring connection to: ~p at ~p because it was cancelled",
+ [ProtocolId, string_of_ipport(Addr)]),
+ {ok, cancelled}
+ end.
+
+locate_endpoints({Type, Name}, Strategy, Locators) ->
+ case orddict:find(Type, Locators) of
+ {ok, Locate} ->
+ case Locate(Name, Strategy) of
+ error ->
+ {error, {bad_target_name_args, Type, Name}};
+ Addrs ->
+ Addrs
+ end;
+ error ->
+ {error, {unknown_target_type, Type}}
+ end.
+
+%% Make note of the failed connection attempt and update
+%% our book keeping for that endpoint. Black-list it, and
+%% adjust a backoff timer so that we wait a while before
+%% trying this endpoint again.
+fail_endpoint(Addr, Reason, ProtocolId, State) ->
+ %% update the stats module
+ Stat = {conn_error, Reason},
+ riak_core_connection_mgr_stats:update(Stat, Addr, ProtocolId),
+ %% update the endpoint
+ Fun = fun(EP=#ep{backoff_delay = Backoff, failures = Failures}) ->
+ erlang:send_after(Backoff, self(), {backoff_timer, Addr}),
+ EP#ep{failures = orddict:update_counter(Reason, 1, Failures),
+ nb_failures = EP#ep.nb_failures + 1,
+ backoff_delay = increase_backoff(Backoff),
+ last_fail_time = os:timestamp(),
+ next_try_secs = Backoff/1000,
+ is_black_listed = true}
+ end,
+ update_endpoint(Addr, Fun, State).
+
+connect_endpoint(Addr, State) ->
+ update_endpoint(Addr, fun(EP) ->
+ EP#ep{is_black_listed = false,
+ nb_success = EP#ep.nb_success + 1,
+ next_try_secs = 0,
+ backoff_delay = 0}
+ end, State).
+
+%% Return the current backoff delay for the named Address,
+%% or if we can't find that address in the endpoints - the
+%% initial backoff.
+get_endpoint_backoff(Addr, EPs) ->
+ case orddict:find(Addr, EPs) of
+ error ->
+ 0;
+ {ok, EP} ->
+ EP#ep.backoff_delay
+ end.
+
+update_endpoint(Addr, Fun, State = #state{endpoints = EPs}) ->
+ case orddict:find(Addr, EPs) of
+ error ->
+ EP2 = Fun(#ep{addr = Addr}),
+ State#state{endpoints = orddict:store(Addr,EP2,EPs)};
+ {ok, EP} ->
+ EP2 = Fun(EP),
+ State#state{endpoints = orddict:store(Addr,EP2,EPs)}
+ end.
+
+fail_request(Reason, #req{ref = Ref, spec = Spec},
+ State = #state{pending = Pending}) ->
+ %% Tell the module it failed
+ {Proto, {_TcpOptions, Module,Args}} = Spec,
+ Module:connect_failed(Proto, {error, Reason}, Args),
+ %% Remove the request from the pending list
+ State#state{pending = lists:keydelete(Ref, #req.ref, Pending)}.
+
+update_endpoints(Addrs, Endpoints) ->
+ %% add addr to Endpoints if not already there
+ Fun = (fun(Addr, EPs) ->
+ case orddict:is_key(Addr, Endpoints) of
+ true -> EPs;
+ false ->
+ EP = #ep{addr=Addr},
+ orddict:store(Addr, EP, EPs)
+ end
+ end),
+ lists:foldl(Fun, Endpoints, Addrs).
+
+%% Return the addresses of non-blacklisted endpoints that are also
+%% members of the list EpAddrs.
+filter_blacklisted_endpoints(EpAddrs, AllEps) ->
+ PredicateFun = (fun(Addr) ->
+ case orddict:find(Addr, AllEps) of
+ {ok, EP} ->
+ EP#ep.is_black_listed == false;
+ error ->
+ false
+ end
+ end),
+ lists:filter(PredicateFun, EpAddrs).
View
258 src/riak_core_connection_mgr_stats.erl
@@ -0,0 +1,258 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core_connection_mgr_stats: collect, aggregate, and provide stats for
+%% connections made by the connection manager
+%%
+%% Copyright (c) 3012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% -------------------------------------------------------------------
+%%
+
+-module(riak_core_connection_mgr_stats).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0,
+ get_stats/0,
+ get_stats_by_ip/1,
+ get_stats_by_protocol/1,
+ get_consolidated_stats/0,
+ update/3,
+ register_stats/0,
+ produce_stats/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+-define(APP, riak_conn_mgr_stats). %% registered in riak_repl_app:start/2
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register_stats() ->
+ [(catch folsom_metrics:delete_metric(Stat)) || Stat <- folsom_metrics:get_metrics(),
+ is_tuple(Stat), element(1, Stat) == ?APP],
+ [register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
+ riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
+
+%% @spec get_stats() -> proplist()
+%% @doc Return all stats from the cached value. This will refresh
+%% the cache if it's been over 5 seconds since the last query.
+%% When the cache needs to get the latest values, it will call our
+%% produce_stats() function.
+get_stats() ->
+ case riak_core_stat_cache:get_stats(?APP) of
+ {ok, Stats, _TS} ->
+ Stats;
+ Error -> Error
+ end.
+
+get_consolidated_stats() ->
+ Strings = [format_stat(Stat) || Stat <- get_stats()],
+ stats_as_atoms(Strings).
+
+%% Sort and convert {K,V} stats to atoms
+stats_as_atoms(StringStats) ->
+ lists:map(fun({S,V}) -> {list_to_atom(S), V} end, lists:sort(StringStats)).
+
+format_stat({{?APP, conn_error, StatName}, [{count,N},{one,_W}]}) ->
+ {"conn_error_" ++ atom_to_list(StatName), N};
+format_stat({{?APP, conn_error, StatName, total}, N}) ->
+ {"conn_error_" ++ atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, conn_error, StatName, Addr, total}, N}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(StatName)
+ ++ "_total", N};
+format_stat({{?APP, conn_error, StatName, Addr, ProtocolId, total},N}) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(ProtocolId) ++ "_"
+ ++ atom_to_list(StatName)
+ ++ "_total" , N};
+format_stat({{?APP, conn_error, StatName, ProtocolId, total},N}) ->
+ {atom_to_list(ProtocolId) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(StatName)
+ ++ "_total", N};
+format_stat({{?APP, conn_error, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(ProtocolId) ++ "_"
+ ++ atom_to_list(StatName), N};
+format_stat({{?APP, conn_error, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(StatName), N};
+format_stat({{?APP, conn_error, StatName, ProtocolId},[{count,N},{one,_W}]}) ->
+ {"conn_error_" ++ atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N};
+
+format_stat({{?APP, StatName},[{count,N},{one,_W}]}) ->
+ {atom_to_list(StatName), N};
+format_stat({{?APP, StatName, total},N}) ->
+ {atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, StatName, Addr, total},N}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr)
+ ++ "_" ++ atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr) ++ "_" ++ atom_to_list(StatName),N};
+format_stat({{?APP, StatName, ProtocolId, total},N}) when is_atom(ProtocolId) ->
+ {atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, StatName, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
+ {atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N};
+format_stat({{?APP, StatName, Addr, ProtocolId, total},N}) when is_atom(ProtocolId) ->
+ {string_of_ipaddr(Addr)
+ ++ "_" ++ atom_to_list(ProtocolId)
+ ++ "_" ++ atom_to_list(StatName)
+ ++ "_total", N};
+format_stat({{?APP, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
+ {string_of_ipaddr(Addr)
+ ++ "_" ++ atom_to_list(ProtocolId)
+ ++ "_" ++ atom_to_list(StatName), N}.
+
+string_of_ipaddr({IP, Port}) when is_list(IP) ->
+ lists:flatten(io_lib:format("~s:~p", [IP, Port]));
+string_of_ipaddr({IP, Port}) when is_tuple(IP) ->
+ lists:flatten(io_lib:format("~s:~p", [inet_parse:ntoa(IP), Port])).
+
+%% Get stats filtered by given IP address
+get_stats_by_ip({_IP, _Port}=Addr) ->
+ AllStats = get_stats(),
+ Stats = lists:filter(fun(S) -> predicate_by_ip(S,Addr) end, AllStats),
+ stats_as_atoms([format_stat(Stat) || Stat <- Stats]).
+
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId},_Value}, _MatchAddr) ->
+ false;
+predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId, total},_Value}, _MatchAddr) ->
+ false;
+predicate_by_ip({{_App, _StatName, MatchAddr},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, _StatName, MatchAddr, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip(_X, _MatchAddr) ->
+ false.
+
+%% Get stats filtered by given protocol-id (e.g. rt_repl)
+get_stats_by_protocol(ProtocolId) ->
+ AllStats = get_stats(),
+ Stats = lists:filter(fun(S) -> predicate_by_protocol(S,ProtocolId) end, AllStats),
+ stats_as_atoms([format_stat(Stat) || Stat <- Stats]).
+
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr},_Value}, _MatchId) ->
+ false;
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr, total},_Value}, _MatchId) ->
+ false;
+predicate_by_protocol({{_App, _StatName, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, _StatName, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, _StatName, _Addr, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, _StatName, _Addr, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol(_X, _MatchId) ->
+ false.
+
+%% Public interface to accumulate stats
+update(Stat, Addr, ProtocolId) ->
+ gen_server:cast(?SERVER, {update, Stat, Addr, ProtocolId}).
+
+%% gen_server
+
+init([]) ->
+ register_stats(),
+ {ok, ok}.
+
+handle_call(_Req, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast({update, Stat, Addr, ProtocolId}, State) ->
+ do_update(Stat, Addr, ProtocolId),
+ {noreply, State};
+handle_cast(_Req, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Update a stat for given IP-Address, Cluster, and Protocol-id
+do_update({conn_error, Error}, IPAddr, Protocol) ->
+ create_or_update({?APP, conn_error, Error, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error}, 1, spiral),
+ create_or_update({?APP, conn_error, Error, IPAddr, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error, IPAddr}, 1, spiral),
+ create_or_update({?APP, conn_error, Error, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error, Protocol}, 1, spiral),
+ create_or_update({?APP, conn_error, Error, IPAddr, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error, IPAddr, Protocol}, 1, spiral);
+
+do_update(Stat, IPAddr, Protocol) ->
+ create_or_update({?APP, Stat, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat}, 1, spiral),
+ create_or_update({?APP, Stat, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat, Protocol}, 1, spiral),
+ create_or_update({?APP, Stat, IPAddr, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat, IPAddr }, 1, spiral),
+ create_or_update({?APP, Stat, IPAddr, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat, IPAddr, Protocol}, 1, spiral).
+
+%% private
+
+%% dynamically update (and create if needed) a stat
+create_or_update(Name, UpdateVal, Type) ->
+ case (catch folsom_metrics:notify_existing_metric(Name, UpdateVal, Type)) of
+ ok ->
+ ok;
+ {'EXIT', _} ->
+ register_stat(Name, Type),
+ create_or_update(Name, UpdateVal, Type)
+ end.
+
+register_stat(Name, spiral) ->
+ folsom_metrics:new_spiral(Name);
+register_stat(Name, counter) ->
+ folsom_metrics:new_counter(Name).
+
+%% @spec produce_stats() -> proplist()
+%% @doc Produce a proplist-formatted view of the current aggregation
+%% of stats.
+produce_stats() ->
+ Stats = [Stat || Stat <- folsom_metrics:get_metrics(), is_tuple(Stat), element(1, Stat) == ?APP],
+ lists:flatten([{Stat, get_stat(Stat)} || Stat <- Stats]).
+
+%% Get the value of the named stats metric
+%% NOTE: won't work for Histograms
+get_stat(Name) ->
+ folsom_metrics:get_metric_value(Name).
+
+%% Return list of static stat names and types to register
+stats() -> []. %% no static stats to register
View
417 src/riak_core_service_mgr.erl
@@ -0,0 +1,417 @@
+%% Riak Replication Subprotocol Server Dispatcher
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+
+-module(riak_core_service_mgr).
+-behaviour(gen_server).
+
+-include("riak_core_connection.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-define(TRACE(Stmt),Stmt).
+%%-define(TRACE(Stmt),ok).
+-else.
+-define(TRACE(Stmt),ok).
+-endif.
+
+-define(SERVER, riak_core_service_manager).
+-define(MAX_LISTENERS, 100).
+
+%% services := registered protocols, key :: proto_id()
+-record(state, {dispatch_addr = {"localhost", 9000} :: ip_addr(),
+ services = orddict:new() :: orddict:orddict(),
+ dispatcher_pid = undefined :: pid(),
+ status_notifiers = [],
+ service_stats = orddict:new() :: orddict:orddict(), % proto-id -> stats()
+ refs = []
+ }).
+
+-export([start_link/0, start_link/1,
+ register_service/2,
+ unregister_service/1,
+ is_registered/1,
+ register_stats_fun/1,
+ get_stats/0,
+ stop/0
+ ]).
+
+%% ranch callbacks
+-export([start_link/4]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% internal
+-export([dispatch_service/4]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%% start the Service Manager on the default/configured Ip Address and Port.
+%% All sub-protocols will be dispatched from there.
+-spec(start_link() -> {ok, pid()}).
+start_link() ->
+ ServiceAddr = case app_helper:get_env(riak_core, cluster_mgr) of
+ undefined ->
+ lager:error("cluster_mgr is not configured for riak_core in
+ app.config, defaulting to {\"127.0.0.1\", 0}."),
+ {"127.0.0.1", 0};
+ Res ->
+ Res
+ end,
+ start_link(ServiceAddr).
+
+%% start the Service Manager on the given Ip Address and Port.
+%% All sub-protocols will be dispatched from there.
+-spec(start_link(ip_addr()) -> {ok, pid()}).
+start_link({IP,Port}) ->
+ ?TRACE(?debugFmt("Starting Core Service Manager at ~p", [{IP,Port}])),
+ lager:info("Starting Core Service Manager at ~p", [{IP,Port}]),
+ Args = [{IP,Port}],
+ Options = [],
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options).
+
+%% Once a protocol specification is registered, it will be kept available by the
+%% Service Manager.
+%% Note that the callee is responsible for taking ownership of the socket via
+%% Transport:controlling_process(Socket, Pid)
+-spec(register_service(hostspec(), service_scheduler_strategy()) -> ok).
+register_service(HostProtocol, Strategy) ->
+ %% only one strategy is supported as yet
+ {round_robin, _NB} = Strategy,
+ gen_server:cast(?SERVER, {register_service, HostProtocol, Strategy}).
+
+%% Unregister the given protocol-id.
+%% Existing connections for this protocol are not killed. New connections
+%% for this protocol will not be accepted until re-registered.
+-spec(unregister_service(proto_id()) -> ok).
+unregister_service(ProtocolId) ->
+ gen_server:cast(?SERVER, {unregister_service, ProtocolId}).
+
+-spec(is_registered(proto_id()) -> boolean()).
+is_registered(ProtocolId) ->
+ gen_server:call(?SERVER, {is_registered, service, ProtocolId}).
+
+%% Register a callback function that will get called periodically or
+%% when the connection status of services changes. The function will
+%% receive a list of tuples: {<protocol-id>, <stats>} where stats
+%% holds the number of open connections that have been accepted for that
+%% protocol type. This can be used to report load, in the form of
+%% connected-ness, for each protocol type, to remote clusters, e.g.,
+%% making it possible for schedulers to balance the number of
+%% connections across a cluster.
+register_stats_fun(Fun) ->
+ gen_server:cast(?SERVER, {register_stats_fun, Fun}).
+
+get_stats() ->
+ gen_server:call(?SERVER, get_stats).
+
+%% abrubtly kill all connections and stop disptaching services
+stop() ->
+ gen_server:call(?SERVER, stop).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([IpAddr]) ->
+ {ok, Pid} = start_dispatcher(IpAddr, ?MAX_LISTENERS, []),
+ {ok, #state{dispatch_addr = IpAddr, dispatcher_pid=Pid}}.
+
+handle_call({is_registered, service, ProtocolId}, _From, State) ->
+ Found = orddict:is_key(ProtocolId, State#state.services),
+ {reply, Found, State};
+
+handle_call(get_services, _From, State) ->
+ {reply, orddict:to_list(State#state.services), State};
+
+handle_call(stop, _From, State) ->
+ ranch:stop_listener(State#state.dispatch_addr),
+ {stop, normal, ok, State};
+
+handle_call(get_stats, _From, State) ->
+ Stats = orddict:to_list(State#state.service_stats),
+ PStats = [{Protocol, Count} || {Protocol,{_Stats,Count}} <- Stats],
+ {reply, PStats, State};
+
+handle_call(_Unhandled, _From, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])),
+ {reply, {error, unhandled}, State}.
+
+handle_cast({register_service, Protocol, Strategy}, State) ->
+ {{ProtocolId,_Revs},_Rest} = Protocol,
+ NewDict = orddict:store(ProtocolId, {Protocol, Strategy}, State#state.services),
+ {noreply, State#state{services=NewDict}};
+
+handle_cast({unregister_service, ProtocolId}, State) ->
+ NewDict = orddict:erase(ProtocolId, State#state.services),
+ {noreply, State#state{services=NewDict}};
+
+handle_cast({register_stats_fun, Fun}, State) ->
+ Notifiers = [Fun | State#state.status_notifiers],
+ erlang:send_after(500, self(), status_update_timer),
+ {noreply, State#state{status_notifiers=Notifiers}};
+
+%% TODO: unregister support for notifiers?
+%% handle_cast({unregister_node_status_fun, Fun}, State) ->
+%% Notifiers = [Fun | State#state.notifiers],
+%% {noreply, State#state{status_notifiers=Notifiers}};
+
+handle_cast({service_up_event, Pid, ProtocolId}, State) ->
+ ?TRACE(?debugFmt("Service up event: ~p", [ProtocolId])),
+ erlang:send_after(500, self(), status_update_timer),
+ Ref = erlang:monitor(process, Pid), %% arrange for us to receive 'DOWN' when Pid terminates
+ ServiceStats = incr_count_for_protocol_id(ProtocolId, 1, State#state.service_stats),
+ Refs = [{Ref,ProtocolId} | State#state.refs],
+ {noreply, State#state{service_stats=ServiceStats, refs=Refs}};
+
+handle_cast({service_down_event, _Pid, ProtocolId}, State) ->
+ ?TRACE(?debugFmt("Service down event: ~p", [ProtocolId])),
+ erlang:send_after(500, self(), status_update_timer),
+ ServiceStats = incr_count_for_protocol_id(ProtocolId, -1, State#state.service_stats),
+ {noreply, State#state{service_stats = ServiceStats}};
+
+handle_cast(_Unhandled, _State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server cast: ~p", [_Unhandled])),
+ {error, unhandled}. %% this will crash the server
+
+handle_info(status_update_timer, State) ->
+ %% notify all registered parties of this node's services counts
+ Stats = orddict:to_list(State#state.service_stats),
+ PStats = [ {Protocol, Count} || {Protocol,{_Stats,Count}} <- Stats],
+ [NotifyFun(PStats) || NotifyFun <- State#state.status_notifiers],
+ {noreply, State};
+
+%% Get notified of a service that went down.
+%% Remove it's Ref and pass the event on to get counted by ProtocolId
+handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
+ Refs = State#state.refs,
+ Refs2 = case lists:keytake(Ref, 1, Refs) of
+ {value, {Ref, ProtocolId}, Rest} ->
+ gen_server:cast(?SERVER, {service_down_event, Pid, ProtocolId}),
+ Rest;
+ error ->
+ Refs
+ end,
+ {noreply, State#state{refs=Refs2}};
+
+handle_info(_Unhandled, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Private
+%%%===================================================================
+
+incr_count_for_protocol_id(ProtocolId, Incr, ServiceStatus) ->
+ Stats2 = case orddict:find(ProtocolId, ServiceStatus) of
+ {ok, Stats} ->
+ Count = Stats#stats.open_connections,
+ Stats#stats{open_connections = Count + Incr};
+ error ->
+ #stats{open_connections = Incr}
+ end,
+ orddict:store(ProtocolId, Stats2, ServiceStatus).
+
+%% Host callback function, called by ranch for each accepted connection by way of
+%% of the ranch:start_listener() call above, specifying this module.
+start_link(Listener, Socket, Transport, SubProtocols) ->
+ ?TRACE(?debugMsg("Start_link dispatch_service")),
+ {ok, spawn_link(?MODULE, dispatch_service, [Listener, Socket, Transport, SubProtocols])}.
+
+%% Body of the main dispatch loop. This is instantiated once for each connection
+%% we accept because it transforms itself into the SubProtocol once it receives
+%% the sub protocol and version, negotiated with the client.
+dispatch_service(Listener, Socket, Transport, _Args) ->
+ %% tell ranch "we've got it. thanks pardner"
+ ok = ranch:accept_ack(Listener),
+ %% set some starting options for the channel; these should match the client
+ ?TRACE(?debugFmt("setting system options on service side: ~p", [?CONNECT_OPTIONS])),
+ ok = Transport:setopts(Socket, ?CONNECT_OPTIONS),
+ %% Version 1.0 capabilities just passes our clustername
+ MyName = riak_core_connection:symbolic_clustername(),
+ MyCaps = [{clustername, MyName}],
+ case exchange_handshakes_with(client, Socket, Transport, MyCaps) of
+ {ok,Props} ->
+ %% get latest set of registered services from gen_server and do negotiation
+ Services = gen_server:call(?SERVER, get_services),
+ SubProtocols = [Protocol || {_Key,{Protocol,_Strategy}} <- Services],
+ ?TRACE(?debugFmt("started dispatch_service with protocols: ~p",
+ [SubProtocols])),
+ Negotiated = negotiate_proto_with_client(Socket, Transport, SubProtocols),
+ ?TRACE(?debugFmt("negotiated = ~p", [Negotiated])),
+ start_negotiated_service(Socket, Transport, Negotiated, Props);
+ Error ->
+ Error
+ end.
+
+%% start user's module:function and transfer socket to it's process.
+start_negotiated_service(_Socket, _Transport, {error, Reason}, _Props) ->
+ ?TRACE(?debugFmt("service dispatch failed with ~p", [{error, Reason}])),
+ lager:error("service dispatch failed with ~p", [{error, Reason}]),
+ {error, Reason};
+%% Note that the callee is responsible for taking ownership of the socket via
+%% Transport:controlling_process(Socket, Pid),
+start_negotiated_service(Socket, Transport,
+ {NegotiatedProtocols, {Options, Module, Function, Args}},
+ Props) ->
+ %% Set requested Tcp socket options now that we've finished handshake phase
+ ?TRACE(?debugFmt("Setting user options on service side; ~p", [Options])),
+ ?TRACE(?debugFmt("negotiated protocols: ~p", [NegotiatedProtocols])),
+ Transport:setopts(Socket, Options),
+ %% call service body function for matching protocol. The callee should start
+ %% a process or gen_server or such, and return {ok, pid()}.
+ case Module:Function(Socket, Transport, NegotiatedProtocols, Args, Props) of
+ {ok, Pid} ->
+ {ok,{ClientProto,_Client,_Host}} = NegotiatedProtocols,
+ gen_server:cast(?SERVER, {service_up_event, Pid, ClientProto}),
+ {ok, Pid};
+ Error ->
+ ?TRACE(?debugFmt("service dispatch of ~p:~p failed with ~p",
+ [Module, Function, Error])),
+ lager:error("service dispatch of ~p:~p failed with ~p",
+ [Module, Function, Error]),
+ Error
+ end.
+
+%% Negotiate the highest common major protocol revisision with the connected client.
+%% client -> server : Prefs List = {SubProto, [{Major, Minor}]} as binary
+%% server -> client : selected version = {SubProto, {Major, HostMinor}} as binary
+%%
+%% returns {ok,{{Proto,MyVer,RemoteVer},Options,Module,Function,Args}} | Error
+negotiate_proto_with_client(Socket, Transport, HostProtocols) ->
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, PrefsBin} ->
+ {ClientProto,Versions} = erlang:binary_to_term(PrefsBin),
+ case choose_version({ClientProto,Versions}, HostProtocols) of
+ {error, Reason} ->
+ lager:error("Failed to negotiate protocol ~p from client because ~p",
+ [ClientProto, Reason]),
+ Transport:send(Socket, erlang:term_to_binary({error,Reason})),
+ {error, Reason};
+ {ok,{ClientProto,Major,CN,HN}, Rest} ->
+ Transport:send(Socket, erlang:term_to_binary({ok,{ClientProto,{Major,HN,CN}}})),
+ {{ok,{ClientProto,{Major,HN},{Major,CN}}}, Rest};
+ {error, Reason, Rest} ->
+ lager:error("Failed to negotiate protocol ~p from client because ~p",
+ [ClientProto, Reason]),
+ %% notify client it failed to negotiate
+ Transport:send(Socket, erlang:term_to_binary({error,Reason})),
+ {{error, Reason}, Rest}
+ end;
+ {error, Reason} ->
+ lager:error("Failed to receive protocol request from client. Error = ~p",
+ [Reason]),
+ {error, connection_failed}
+ end.
+
+choose_version({ClientProto,ClientVersions}=_CProtocol, HostProtocols) ->
+ ?TRACE(?debugFmt("choose_version: client proto = ~p, HostProtocols = ~p",
+ [_CProtocol, HostProtocols])),
+ %% first, see if the host supports the subprotocol
+ case [H || {{HostProto,_Versions},_Rest}=H <- HostProtocols, ClientProto == HostProto] of
+ [] ->
+ %% oops! The host does not support this sub protocol type
+ lager:error("Failed to find host support for protocol: ~p", [ClientProto]),
+ ?TRACE(?debugMsg("choose_version: no common protocols")),
+ {error,protocol_not_supported};
+ [{{_HostProto,HostVersions},Rest}=_Matched | _DuplicatesIgnored] ->
+ ?TRACE(?debugFmt("choose_version: unsorted = ~p clientversions = ~p",
+ [_Matched, ClientVersions])),
+ CommonVers = [{CM,CN,HN} || {CM,CN} <- ClientVersions, {HM,HN} <- HostVersions, CM == HM],
+ ?TRACE(?debugFmt("common versions = ~p", [CommonVers])),
+ %% sort by major version, highest to lowest, and grab the top one.
+ case lists:reverse(lists:keysort(1,CommonVers)) of
+ [] ->
+ %% oops! No common major versions for Proto.
+ ?TRACE(?debugFmt("Failed to find a common major version for protocol: ~p",
+ [ClientProto])),
+ lager:error("Failed to find a common major version for protocol: ~p", [ClientProto]),
+ {error,protocol_version_not_supported,Rest};
+ [{Major,CN,HN}] ->
+ {ok, {ClientProto,Major,CN,HN},Rest};
+ [{Major,CN,HN}, _] ->
+ {ok, {ClientProto,Major,CN,HN},Rest}
+ end
+ end.
+
+%% exchange brief handshake with client to ensure that we're supporting sub-protocols.
+%% client -> server : Hello {1,0} [Capabilities]
+%% server -> client : Ack {1,0} [Capabilities]
+exchange_handshakes_with(client, Socket, Transport, MyCaps) ->
+ ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from client", [?CTRL_HELLO])),
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, Hello} ->
+ %% read their hello
+ case binary_to_term(Hello) of
+ {?CTRL_HELLO, TheirRev, TheirCaps} ->
+ Ack = term_to_binary({?CTRL_ACK, ?CTRL_REV, MyCaps}),
+ Transport:send(Socket, Ack),
+ %% make some props to hand dispatched service
+ Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps],
+ {ok,Props};
+ Msg ->
+ %% tell other side we are failing them
+ Error = {error, bad_handshake},
+ Transport:send(Socket, erlang:term_to_binary(Error)),
+ lager:error("Control protocol handshake with client got unexpected hello: ~p",
+ [Msg]),
+ Error
+ end;
+ {error, Reason} ->
+ lager:error("Failed to exchange handshake with client. Error = ~p", [Reason]),
+ {error, Reason}
+ end.
+
+%% Returns true if the IP address given is a valid host IP address.
+%% stolen from riak_repl_util.erl
+valid_host_ip("0.0.0.0") ->
+ true;
+valid_host_ip(IP) ->
+ {ok, IFs} = inet:getifaddrs(),
+ {ok, NormIP} = normalize_ip(IP),
+ lists:foldl(
+ fun({_IF, Attrs}, Match) ->
+ case lists:member({addr, NormIP}, Attrs) of
+ true ->
+ true;
+ _ ->
+ Match
+ end
+ end, false, IFs).
+
+%% Convert IP address the tuple form
+normalize_ip(IP) when is_list(IP) ->
+ inet_parse:address(IP);
+normalize_ip(IP) when is_tuple(IP) ->
+ {ok, IP}.
+
+%% @doc Start the connection dispatcher with a limit of MaxListeners
+%% listener connections and supported sub-protocols. When a connection
+%% request arrives, it is mapped via the associated Protocol atom to an
+%% acceptor function called as Module:Function(Listener, Socket, Transport, Args),
+%% which must create it's own process and return {ok, pid()}
+
+-spec(start_dispatcher(ip_addr(), non_neg_integer(), [hostspec()]) -> {ok, pid()}).
+start_dispatcher({IP,Port}, MaxListeners, SubProtocols) ->
+ case valid_host_ip(IP) of
+ true ->
+ {ok, RawAddress} = inet_parse:address(IP),
+ {ok, Pid} = ranch:start_listener({IP,Port}, MaxListeners, ranch_tcp,
+ [{ip, RawAddress}, {port, Port}],
+ ?MODULE, SubProtocols),
+ lager:info("Service manager: listening on ~s:~p", [IP, Port]),
+ {ok, Pid};
+ _ ->
+ lager:error("Service Mananger: failed to start on ~s:~p - invalid address.",
+ [IP, Port])
+ end.
View
247 test/riak_core_connection_mgr_tests.erl
@@ -0,0 +1,247 @@
+%% Eunit test cases for the Connection Manager
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+
+-module(riak_core_connection_mgr_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TRACE(Stmt),Stmt).
+%%-define(TRACE(Stmt),ok).
+
+%% internal functions
+-export([testService/5,
+ connected/6, connect_failed/3
+ ]).
+
+%% Locator selector types
+-define(REMOTE_LOCATOR_TYPE, remote).
+-define(ADDR_LOCATOR_TYPE, addr).
+
+%% My cluster
+-define(MY_CLUSTER_NAME, "bob").
+-define(MY_CLUSTER_ADDR, {"127.0.0.1", 4097}).
+
+%% Remote cluster
+-define(REMOTE_CLUSTER_NAME, "betty").
+-define(REMOTE_CLUSTER_ADDR, {"127.0.0.1", 4096}).
+-define(REMOTE_ADDRS, [{"127.0.0.1",5001}, {"127.0.0.1",5002}, {"127.0.0.1",5003},
+ ?REMOTE_CLUSTER_ADDR]).
+
+-define(MAX_CONS, 2).
+-define(TCP_OPTIONS, [{keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+%% Tell the connection manager how to find out "remote" end points.
+%% For testing, we just make up some local addresses.
+register_remote_locator() ->
+ Remotes = orddict:from_list([{?REMOTE_CLUSTER_NAME, ?REMOTE_ADDRS}]),
+ Locator = fun(Name, _Policy) ->
+ case orddict:find(Name, Remotes) of
+ false ->
+ {error, {unknown, Name}};
+ OKEndpoints ->
+ OKEndpoints
+ end
+ end,
+ ok = riak_core_connection_mgr:register_locator(?REMOTE_LOCATOR_TYPE, Locator).
+
+register_addr_locator() ->
+ Locator = fun(Name, _Policy) -> {ok, [Name]} end,
+ ok = riak_core_connection_mgr:register_locator(?ADDR_LOCATOR_TYPE, Locator).
+
+register_empty_locator() ->
+ Locator = fun(_Name, _Policy) -> {ok, []} end,
+ ok = riak_core_connection_mgr:register_locator(?REMOTE_LOCATOR_TYPE, Locator).
+
+%% this test runs first and leaves the server running for other tests
+start_link_test() ->
+ %% normally, ranch would be started as part of a supervisor tree, but we
+ %% need to start it here so that a supervision tree will be created.
+ ok = application:start(ranch),
+ {ok, _Pid1} = riak_core_service_mgr:start_link(?REMOTE_CLUSTER_ADDR),
+ {ok, _Pid2} = riak_core_connection_mgr:start_link().
+
+register_locator_remote_test() ->
+ register_remote_locator(),
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+ case riak_core_connection_mgr:apply_locator(Target, Strategy) of
+ {ok, Addrs} ->
+ ?assert(Addrs == ?REMOTE_ADDRS);
+ Error ->
+ ?debugFmt("register_locator_remote_test: unexpected error ~p", [Error]),
+ ?assert(false)
+ end.
+
+register_locator_addr_test() ->
+ register_addr_locator(),
+ Target = {?ADDR_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR},
+ Strategy = default,
+ case riak_core_connection_mgr:apply_locator(Target, Strategy) of
+ {ok, Addrs} ->
+ ?assert(Addrs == [?REMOTE_CLUSTER_ADDR]);
+ Error ->
+ ?debugFmt("register_locator_addrs_test: unexpected error ~p", [Error]),
+ ?assert(false)
+ end.
+
+bad_locator_args_test() ->
+ register_addr_locator(),
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR}, %% bad args for 'addr'
+ Strategy = default,
+ case riak_core_connection_mgr:apply_locator(Target, Strategy) of
+ {ok, Addrs} ->
+ ?debugFmt("bad_locator_args_test: unexpected match ~p", [Addrs]),
+ ?assert(false);
+ {error, {bad_target_name_args, remote, ?REMOTE_CLUSTER_ADDR}} ->
+ ?assert(true);
+ Error ->
+ ?debugFmt("bad_locator_args_test: unexpected error ~p", [Error]),
+ ?assert(false)
+ end.
+
+%% conn_mgr should start up running!
+is_paused_test() ->
+ ?assert(riak_core_connection_mgr:is_paused() == false).
+
+%% pause and check that it's paused
+pause_test() ->
+ riak_core_connection_mgr:pause(),
+ ?assert(riak_core_connection_mgr:is_paused() == true).
+
+%% resume and check that it's not paused
+resume_test() ->
+ riak_core_connection_mgr:resume(),
+ ?assert(riak_core_connection_mgr:is_paused() == false).
+
+%% set/get the cluster manager finding function
+set_get_finder_function_test() ->
+ FinderFun = fun() -> {ok, node()} end,
+ riak_core_connection_mgr:set_cluster_finder(FinderFun),
+ FoundFun = riak_core_connection_mgr:get_cluster_finder(),
+ ?assert(FinderFun == FoundFun).
+
+start_service() ->
+ %% start dispatcher
+ ExpectedRevs = [{1,0}, {1,0}],
+ TestProtocol = {{testproto, [{1,0}]}, {?TCP_OPTIONS, ?MODULE, testService, ExpectedRevs}},
+ riak_core_service_mgr:register_service(TestProtocol, {round_robin,10}),
+ ?assert(riak_core_service_mgr:is_registered(testproto) == true).
+
+client_connection_test() ->
+ %% start a test service
+ start_service(),
+ %% do async connect via connection_mgr
+ ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]},
+ Target = {?ADDR_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR},
+ Strategy = default,
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ timer:sleep(1000).
+
+client_connect_via_cluster_name_test() ->
+ start_service(),
+ %% do async connect via connection_mgr
+ ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]},
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ timer:sleep(1000).
+
+client_retries_test() ->
+ ?TRACE(?debugMsg(" --------------- retry test ------------- ")),
+ %% start the service a while after the client has been started so the client
+ %% will do retries.
+
+ %% do async connect via connection_mgr
+ ExpectedArgs = {retry_test, [{1,0}, {1,0}]},
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ %% delay so the client will keep trying
+ ?TRACE(?debugMsg(" ------ sleeping 3 sec")),
+ timer:sleep(3000),
+ %% resume and confirm not paused, which should cause service to start and connection :-)
+ ?TRACE(?debugMsg(" ------ resuming services")),
+ start_service(),
+ %% allow connection to setup
+ ?TRACE(?debugMsg(" ------ sleeping 2 sec")),
+ timer:sleep(1000).
+
+empty_locator_test() ->
+ ?TRACE(?debugMsg(" --------------- empty locator test ------------- ")),
+ register_empty_locator(), %% replace remote locator with one that returns empty list
+ start_service(),
+ %% do async connect via connection_mgr
+ ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]},
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ %% allow conn manager to try and schedule a few retries
+ timer:sleep(1000),
+ %% restore the remote locator that gives a good endpoint
+ register_locator_remote_test(),
+ %% allow enough time for retry mechanism to kick in
+ timer:sleep(2000).
+ %% we should get a connection
+
+cleanup_test() ->
+ riak_core_service_mgr:stop(),
+ riak_core_connection_mgr:stop(),
+ application:stop(ranch).
+
+%%------------------------
+%% Helper functions
+%%------------------------
+
+%% Protocol Service functions
+testService(_Socket, _Transport, {error, _Reason}, _Args, _Props) ->
+ ?assert(false);
+testService(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, _Props) ->
+ ?TRACE(?debugMsg("testService started")),
+ [ExpectedMyVer, ExpectedRemoteVer] = Args,
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer),
+ ?assert(Proto == testproto),
+ timer:sleep(2000),
+ {ok, self()}.
+
+%% Client side protocol callbacks
+connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, _Props) ->
+ ?TRACE(?debugFmt("testClient started, connected to ~p:~p", [_IP,_Port])),
+ {_TestType, [ExpectedMyVer, ExpectedRemoteVer]} = Args,
+ ?assert(Proto == testproto),
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer).
+
+connect_failed({_Proto,_Vers}, {error, Reason}, Args) ->
+
+ case Args of
+ expectedToFail ->
+ ?TRACE(?debugFmt("connect_failed: (EXPECTED) when expected to fail: ~p with ~p",
+ [Reason, Args])),
+ ?assert(Reason == econnrefused);
+ {retry_test, _Stuff} ->
+ ?TRACE(?debugFmt("connect_failed: (EXPECTED) during retry test: ~p",
+ [Reason])),
+ ok;
+ Other ->
+ ?TRACE(?debugFmt("connect_failed: (UNEXPECTED) ~p with args = ~p",
+ [Reason, Other])),
+ ?assert(false == Other)
+ end.
View
106 test/riak_core_connection_tests.erl
@@ -0,0 +1,106 @@
+-module(riak_core_connection_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([test1service/5, connected/6, connect_failed/3]).
+
+-define(TEST_ADDR, { "127.0.0.1", 4097}).
+-define(MAX_CONS, 2).
+-define(TCP_OPTIONS, [{keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+%% host service functions
+test1service(_Socket, _Transport, {error, Reason}, Args, _Props) ->
+ ?debugFmt("test1service failed with {error, ~p}", [Reason]),
+ ?assert(Args == failed_host_args),
+ ?assert(Reason == protocol_version_not_supported),
+ {error, Reason};
+test1service(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, Props) ->
+ [ExpectedMyVer, ExpectedRemoteVer] = Args,
+ RemoteClusterName = proplists:get_value(clustername, Props),
+ ?debugFmt("test1service started with Args ~p Props ~p", [Args, Props]),
+ ?assert(RemoteClusterName == "undefined"),
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer),
+ ?assert(Proto == test1proto),
+ timer:sleep(2000),
+ {ok, self()}.
+
+%% client connection callbacks
+connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, Props) ->
+ [ExpectedMyVer, ExpectedRemoteVer] = Args,
+ RemoteClusterName = proplists:get_value(clustername, Props),
+ ?debugFmt("connected with Args ~p Props ~p", [Args, Props]),
+ ?assert(RemoteClusterName == "undefined"),
+ ?assert(Proto == test1proto),
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer),
+ timer:sleep(2000).
+
+connect_failed({Proto,_Vers}, {error, Reason}, Args) ->
+ ?debugFmt("connect_failed: Reason = ~p Args = ~p", [Reason, Args]),
+ ?assert(Args == failed_client_args),
+ ?assert(Reason == protocol_version_not_supported),
+ ?assert(Proto == test1protoFailed).
+
+%% this test runs first and leaves the server running for other tests
+start_link_test() ->
+ %% normally, ranch would be started as part of a supervisor tree, but we
+ %% need to start it here so that a supervision tree will be created.
+ ok = application:start(ranch),
+ {Ok, _Pid} = riak_core_service_mgr:start_link(?TEST_ADDR),
+ ?assert(Ok == ok).
+
+%% set/get the local cluster's name
+set_get_name_test() ->
+ riak_core_connection:set_symbolic_clustername("undefined"),
+ MyName = riak_core_connection:symbolic_clustername(),
+ ?assert("undefined" == MyName).
+
+%% register a service and confirm added
+register_service_test() ->
+ ExpectedRevs = [{1,0}, {1,1}],
+ ServiceProto = {test1proto, [{2,1}, {1,0}]},
+ ServiceSpec = {ServiceProto, {?TCP_OPTIONS, ?MODULE, test1service, ExpectedRevs}},
+ riak_core_service_mgr:register_service(ServiceSpec, {round_robin,?MAX_CONS}),
+ ?assert(riak_core_service_mgr:is_registered(test1proto) == true).
+
+%% unregister and confirm removed
+unregister_service_test() ->
+ TestProtocolId = test1proto,
+ riak_core_service_mgr:unregister_service(TestProtocolId),
+ ?assert(riak_core_service_mgr:is_registered(TestProtocolId) == false).
+
+protocol_match_test() ->
+ %% re-register the test protocol and confirm registered
+ register_service_test(),
+ %% try to connect via a client that speaks 0.1 and 1.1
+ ClientProtocol = {test1proto, [{0,1},{1,1}]},
+ ClientSpec = {ClientProtocol, {?TCP_OPTIONS, ?MODULE, [{1,1},{1,0}]}},
+ riak_core_connection:connect(?TEST_ADDR, ClientSpec),
+
+ timer:sleep(1000).
+
+%% test that a mismatch of client and host args will notify both host and client
+%% of a failed negotiation.
+failed_protocol_match_test() ->
+ %% start service
+ SubProtocol = {{test1protoFailed, [{2,1}, {1,0}]},
+ {?TCP_OPTIONS, ?MODULE, test1service, failed_host_args}},
+ riak_core_service_mgr:register_service(SubProtocol, {round_robin,?MAX_CONS}),
+ ?assert(riak_core_service_mgr:is_registered(test1protoFailed) == true),
+
+ %% try to connect via a client that speaks 0.1 and 3.1. No Match with host!
+ ClientProtocol = {test1protoFailed, [{0,1},{3,1}]},
+ ClientSpec = {ClientProtocol, {?TCP_OPTIONS, ?MODULE, failed_client_args}},
+ riak_core_connection:connect(?TEST_ADDR, ClientSpec),
+
+ timer:sleep(2000),
+ ok.
+
+cleanup_test() ->
+ riak_core_service_mgr:stop(),
+ application:stop(ranch).
View
122 test/riak_core_service_mgr_tests.erl
@@ -0,0 +1,122 @@
+%% Eunit test cases for the Connection Manager
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+
+-module(riak_core_service_mgr_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%-define(TRACE(Stmt),Stmt).
+-define(TRACE(Stmt),ok).
+
+%% internal functions
+-export([testService/5,
+ connected/6, connect_failed/3
+ ]).
+
+%% my name and remote same because I need to talk to myself for testing
+-define(MY_CLUSTER_NAME, "bob").
+-define(REMOTE_CLUSTER_NAME, "bob").
+
+-define(REMOTE_CLUSTER_ADDR, {"127.0.0.1", 4096}).
+-define(TEST_ADDR, {"127.0.0.1", 4097}).
+-define(MAX_CONS, 2).
+-define(TCP_OPTIONS, [{keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+%% this test runs first and leaves the server running for other tests
+start_link_test() ->
+ %% normally, ranch would be started as part of a supervisor tree, but we
+ %% need to start it here so that a supervision tree will be created.
+ ok = application:start(ranch),
+ {Ok, _Pid} = riak_core_service_mgr:start_link(?TEST_ADDR),
+ ?assert(Ok == ok).
+
+get_services_test() ->
+ Services = gen_server:call(riak_core_service_manager, get_services),
+ ?assert([] == Services).
+
+%% register a service and confirm added
+register_service_test() ->
+ ExpectedRevs = [{1,0}, {1,0}],
+ TestProtocol = {{testproto, [{1,0}]}, {?TCP_OPTIONS, ?MODULE, testService, ExpectedRevs}},
+ riak_core_service_mgr:register_service(TestProtocol, {round_robin,?MAX_CONS}),
+ ?assert(riak_core_service_mgr:is_registered(testproto) == true).
+
+%% unregister and confirm removed
+unregister_service_test() ->
+ TestProtocolId = testproto,
+ riak_core_service_mgr:unregister_service(TestProtocolId),
+ ?assert(riak_core_service_mgr:is_registered(testproto) == false).
+
+register_stats_fun_test() ->
+ Fun = fun(Stats) ->
+ ?assert(Stats == [{testproto,0}]) end,
+ riak_core_service_mgr:register_stats_fun(Fun).
+
+%% start a service via normal sequence
+start_service_test() ->
+ %% re-register the test protocol and confirm registered
+ register_service_test(),
+ %% try to connect via a client that speaks our test protocol
+ ExpectedRevs = {expectedToPass, [{1,0}, {1,0}]},
+ riak_core_connection:connect(?TEST_ADDR, {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedRevs}}),
+ %% allow client and server to connect and make assertions of success/failure
+ timer:sleep(1000),
+ Stats = riak_core_service_mgr:get_stats(),
+ ?assert(Stats == [{testproto,0}]).
+
+pause_existing_services_test() ->
+ riak_core_service_mgr:stop(),
+ %% there should be no services running now.
+ %% now start a client and confirm failure to connect
+ ExpectedArgs = expectedToFail,
+ riak_core_connection:connect(?TEST_ADDR, {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}}),
+ %% allow client and server to connect and make assertions of success/failure
+ timer:sleep(1000).
+
+cleanup_test() ->
+ %riak_core_service_mgr:stop(),
+ application:stop(ranch).
+
+%%------------------------
+%% Helper functions
+%%------------------------
+
+%% Protocol Service functions
+testService(_Socket, _Transport, {error, _Reason}, _Args, _Props) ->
+ ?assert(false);
+testService(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, _Props) ->
+ ?TRACE(?debugMsg("testService started")),
+ [ExpectedMyVer, ExpectedRemoteVer] = Args,
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer),
+ ?assert(Proto == testproto),
+%% timer:sleep(2000),
+ {ok, self()}.
+
+%% Client side protocol callbacks
+connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, _Props) ->
+ ?TRACE(?debugMsg("testClient started")),
+ {_TestType, [ExpectedMyVer, ExpectedRemoteVer]} = Args,
+ ?assert(Proto == testproto),
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer),
+ timer:sleep(2000).
+
+connect_failed({_Proto,_Vers}, {error, Reason}, Args) ->
+ case Args of
+ expectedToFail ->
+ ?assert(Reason == econnrefused);
+ {retry_test, _Stuff} ->
+ ?TRACE(?debugFmt("connect_failed: during retry test: ~p", [Reason])),
+ ok;
+ _Other ->
+ ?TRACE(?debugFmt("connect_failed: ~p with args = ~p", [Reason, _Other])),
+ ?assert(false)
+ end,
+ timer:sleep(1000).
Please sign in to comment.
Something went wrong with that request. Please try again.