Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #284 from basho/dip_conn_mgr

initial add of the Riak Core Connection Manager

We'll be circling back to fix the Ranch incompatibilities with R14B03|4 soon.
  • Loading branch information...
commit dcf8e52fbc867d9d32a897edd9cfbf381d412680 2 parents dd942b5 + 5b5f9f9
Dave Parfitt authored
5 ebin/riak_core.app
View
@@ -77,6 +77,11 @@
riak_core_vnode_worker_pool,
riak_core_web,
riak_core_wm_urlmap,
+ riak_core_connection_mgr,
+ riak_core_connection_mgr_stats,
+ riak_core_connection,
+ riak_core_service_mgr,
+ riak_core_tcp_mon,
supervisor_pre_r14b04,
vclock
]},
100 include/riak_core_connection.hrl
View
@@ -0,0 +1,100 @@
+%% -------------------------------------------------------------------
+%%
+%% Riak Core Connection Manager
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% handshake messages to safely initiate a connection. Let's not accept
+%% a connection to a telnet session by accident!
+-define(CTRL_REV, {1,0}).
+-define(CTRL_HELLO, <<"riak-ctrl:hello">>).
+-define(CTRL_TELL_IP_ADDR, <<"riak-ctrl:ip_addr">>).
+-define(CTRL_ACK, <<"riak-ctrl:ack">>).
+-define(CTRL_ASK_NAME, <<"riak-ctrl:ask_name">>).
+-define(CTRL_ASK_MEMBERS, <<"riak-ctrl:ask_members">>).
+
+
+-define(CONNECTION_SETUP_TIMEOUT, 10000).
+
+
+
+-define(CTRL_OPTIONS, [binary,
+ {keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+%% Tcp options shared during the connection and negotiation phase
+-define(CONNECT_OPTIONS, [binary,
+ {keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+-type(ip_addr_str() :: string()).
+-type(ip_portnum() :: non_neg_integer()).
+-type(ip_addr() :: {ip_addr_str(), ip_portnum()}).
+-type(tcp_options() :: [any()]).
+
+-type(proto_id() :: atom()).
+-type(rev() :: non_neg_integer()). %% major or minor revision number
+-type(proto() :: {proto_id(), {rev(), rev()}}). %% e.g. {myproto, 1, 0}
+-type(protoprefs() :: {proto_id(), [{rev(), rev()}]}).
+
+
+%% Function = fun(Socket, Transport, Protocol, Args) -> ok
+%% Protocol :: proto()
+-type(service_started_callback() :: fun((inet:socket(), module(), proto(), [any()]) -> no_return())).
+
+%% Host protocol spec
+-type(hostspec() :: {protoprefs(), {tcp_options(), module(), service_started_callback(), [any()]}}).
+
+%% Client protocol spec
+-type(clientspec() :: {protoprefs(), {tcp_options(), module(),[any()]}}).
+
+
+%% Scheduler strategies tell the connection manager how distribute the load.
+%%
+%% Client scheduler strategies
+%% ---------------------------
+%% default := service side decides how to choose node to connect to. limits number of
+%% accepted connections to max_nb_cons().
+%% askme := the connection manager will call your client for a custom protocol strategy
+%% and likewise will expect that the service side has plugged the cluster
+%% manager with support for that custom strategy. UNSUPPORTED so far.
+%% TODO: add a behaviour for the client to implement that includes this callback.
+%% Service scheduler strategies
+%% ----------------------------
+%% round_robin := choose the next available node on cluster to service request. limits
+%% the number of accepted connections to max_nb_cons().
+%% custom := service must provide a strategy to the cluster manager for choosing nodes
+%% UNSUPPORTED so far. Should we use a behaviour for the service module?
+-type(max_nb_cons() :: non_neg_integer()).
+-type(client_scheduler_strategy() :: default | askme).
+-type(service_scheduler_strategy() :: {round_robin, max_nb_cons()} | custom).
+
+%% service manager statistics, can maybe get shared by other layers too
+-record(stats, {open_connections = 0 : non_negative_integer()
+ }).
+
+
+-type(cluster_finder_fun() :: fun(() -> {ok,node()} | {error, term()})).
+
4 rebar.config
View
@@ -12,5 +12,7 @@
{riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "master"}}},
{webmachine, ".*", {git, "git://github.com/basho/webmachine",
{tag, "64176ef9b"}}},
- {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}}
+ {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}},
+ {ranch, "0.4.0", {git, "git://github.com/extend/ranch.git", {tag, "0.4.0"}}}
+
]}.
205 src/riak_core_connection.erl
View
@@ -0,0 +1,205 @@
+%% -------------------------------------------------------------------
+%%
+%% Riak Subprotocol Server Dispatch and Client Connections
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_connection).
+-author("Chris Tilt").
+
+-include("riak_core_connection.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-define(TRACE(Stmt),Stmt).
+-else.
+-define(TRACE(Stmt),ok).
+-endif.
+
+
+%% public API
+-export([connect/2,
+ sync_connect/2]).
+
+%% internal functions
+-export([async_connect_proc/3]).
+
+%%TODO: move to riak_ring_core...symbolic cluster naming
+-export([set_symbolic_clustername/2, set_symbolic_clustername/1,
+ symbolic_clustername/1, symbolic_clustername/0]).
+
+%% @doc Sets the symbolic, human readable, name of this cluster.
+set_symbolic_clustername(Ring, ClusterName) ->
+ {new_ring,
+ riak_core_ring:update_meta(symbolic_clustername, ClusterName, Ring)}.
+
+set_symbolic_clustername(ClusterName) when is_list(ClusterName) ->
+ {error, "argument is not a string"};
+set_symbolic_clustername(ClusterName) ->
+ case riak_core_ring_manager:get_my_ring() of
+ {ok, _Ring} ->
+ riak_core_ring_manager:ring_trans(fun riak_core_connection:set_symbolic_clustername/2,
+ ClusterName);
+ {error, Reason} ->
+ lager:error("Can't set symbolic clustername because: ~p", [Reason])
+ end.
+
+symbolic_clustername(Ring) ->
+ case riak_core_ring:get_meta(symbolic_clustername, Ring) of
+ {ok, Name} -> Name;
+ undefined -> "undefined"
+ end.
+
+%% @doc Returns the symbolic, human readable, name of this cluster.
+symbolic_clustername() ->
+ case riak_core_ring_manager:get_my_ring() of
+ {ok, Ring} ->
+ symbolic_clustername(Ring);
+ {error, Reason} ->
+ lager:error("Can't read symbolic clustername because: ~p", [Reason]),
+ "undefined"
+ end.
+
+%% Make async connection request. The connection manager is responsible for retry/backoff
+%% and calls your module's functions on success or error (asynchrously):
+%% Module:connected(Socket, TransportModule, {IpAddress, Port}, {Proto, MyVer, RemoteVer}, Args)
+%% Module:connect_failed(Proto, {error, Reason}, Args)
+%% Reason could be 'protocol_version_not_supported"
+%%
+%%
+%% You can set options on the tcp connection, e.g.
+%% [{packet, 4}, {active, false}, {keepalive, true}, {nodelay, true}]
+%%
+%% ClientProtocol specifies the preferences of the client, in terms of what versions
+%% of a protocol it speaks. The host will choose the highest common major version and
+%% inform the client via the callback Module:connected() in the HostProtocol parameter.
+%%
+%% Note: that the connection will initially be setup with the `binary` option
+%% because protocol negotiation requires binary. TODO: should we allow non-binary
+%% options? Check that binary is not overwritten.
+%%
+%% connect returns the pid() of the asynchronous process that will attempt the connection.
+
+-spec(connect(ip_addr(), clientspec()) -> pid()).
+connect({IP,Port}, ClientSpec) ->
+ ?TRACE(?debugMsg("spawning async_connect link")),
+ %% start a process to handle the connection request asyncrhonously
+ proc_lib:spawn_link(?MODULE, async_connect_proc, [self(), {IP,Port}, ClientSpec]).
+
+sync_connect({IP,Port}, ClientSpec) ->
+ sync_connect_status(self(), {IP,Port}, ClientSpec).
+
+%% @private
+
+%% exchange brief handshake with client to ensure that we're supporting sub-protocols.
+%% client -> server : Hello {1,0} [Capabilities]
+%% server -> client : Ack {1,0} [Capabilities]
+exchange_handshakes_with(host, Socket, Transport, MyCaps) ->
+ Hello = term_to_binary({?CTRL_HELLO, ?CTRL_REV, MyCaps}),
+ case Transport:send(Socket, Hello) of
+ ok ->
+ ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from host", [?CTRL_ACK])),
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, Ack} ->
+ case binary_to_term(Ack) of
+ {?CTRL_ACK, TheirRev, TheirCaps} ->
+ Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps],
+ {ok,Props};
+ {error, _Reason} = Error ->
+ Error;
+ Msg ->
+ {error, Msg}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ Error ->
+ Error
+ end.
+
+async_connect_proc(Parent, {IP,Port}, ProtocolSpec) ->
+ sync_connect_status(Parent, {IP,Port}, ProtocolSpec).
+
+%% connect synchronously to remote addr/port and return status
+sync_connect_status(_Parent, {IP,Port}, {ClientProtocol, {Options, Module, Args}}) ->
+ Timeout = ?CONNECTION_SETUP_TIMEOUT,
+ Transport = ranch_tcp,
+ %% connect to host's {IP,Port}
+ ?TRACE(?debugFmt("sync_connect: connect to ~p", [{IP,Port}])),
+ case gen_tcp:connect(IP, Port, ?CONNECT_OPTIONS, Timeout) of
+ {ok, Socket} ->
+ ?TRACE(?debugFmt("Setting system options on client side: ~p", [?CONNECT_OPTIONS])),
+ Transport:setopts(Socket, ?CONNECT_OPTIONS),
+ %% handshake to make sure it's a riak sub-protocol dispatcher
+ MyName = symbolic_clustername(),
+ MyCaps = [{clustername, MyName}],
+ case exchange_handshakes_with(host, Socket, Transport, MyCaps) of
+ {ok,Props} ->
+ %% ask for protocol, see what host has
+ case negotiate_proto_with_server(Socket, Transport, ClientProtocol) of
+ {ok,HostProtocol} ->
+ %% set client's requested Tcp options
+ ?TRACE(?debugFmt("Setting user options on client side; ~p", [Options])),
+ Transport:setopts(Socket, Options),
+ %% notify requester of connection and negotiated protocol from host
+ %% pass back returned value in case problem detected on connection
+ %% by module. requestor is responsible for transferring control
+ %% of the socket.
+ Module:connected(Socket, Transport, {IP, Port}, HostProtocol, Args, Props);
+ {error, Reason} ->
+ ?TRACE(?debugFmt("negotiate_proto_with_server returned: ~p", [{error,Reason}])),
+ %% Module:connect_failed(ClientProtocol, {error, Reason}, Args),
+ {error, Reason}
+ end;
+ {error, closed} ->
+ %% socket got closed, don't report this
+ {error, closed};
+ {error, Reason} ->
+ lager:error("Failed to exchange handshake with host. Error = ~p", [Reason]),
+ {error, Reason};
+ Error ->
+ %% failed to exchange handshakes
+ lager:error("Failed to exchange handshake with host. Error = ~p", [Error]),
+ {error, Error}
+ end;
+ {error, Reason} ->
+ %% Module:connect_failed(ClientProtocol, {error, Reason}, Args),
+ {error, Reason}
+ end.
+
+%% Negotiate the highest common major protocol revisision with the connected server.
Pedram Nimreezi
DeadZen added a note

typo on revision

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+%% client -> server : Prefs List = {SubProto, [{Major, Minor}]}
+%% server -> client : selected version = {SubProto, {Major, HostMinor, ClientMinor}}
+%%
+%% returns {ok,{Proto,{Major,ClientMinor},{Major,HostMinor}}} | {error, Reason}
+negotiate_proto_with_server(Socket, Transport, ClientProtocol) ->
+ ?TRACE(?debugFmt("negotiate protocol with host, client proto = ~p", [ClientProtocol])),
+ Transport:send(Socket, erlang:term_to_binary(ClientProtocol)),
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, NegotiatedProtocolBin} ->
+ case erlang:binary_to_term(NegotiatedProtocolBin) of
+ {ok, {Proto,{CommonMajor,HMinor,CMinor}}} ->
+ {ok, {Proto,{CommonMajor,CMinor},{CommonMajor,HMinor}}};
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ lager:error("Failed to receive protocol ~p response from server. Reason = ~p",
+ [ClientProtocol, Reason]),
+ {error, connection_failed}
+ end.
622 src/riak_core_connection_mgr.erl
View
@@ -0,0 +1,622 @@
+%% -------------------------------------------------------------------
+%%
+%% Riak Subprotocol Server Dispatch and Client Connections
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_connection_mgr).
+-author("Chris Tilt").
+-behaviour(gen_server).
+
+-include("riak_core_connection.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% controls retry and backoff.
+-define(INITIAL_BACKOFF, 1 * 1000). %% 1 second initial backoff per endpoint
+-define(MAX_BACKOFF, 1 * 60 * 1000). %% 1 minute maximum backoff per endpoint
+-define(EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, 10 * 1000). %% 10 seconds until retrying the list again
+
+%% retry delay if locator returned empty list
+-ifdef(TEST).
+-define(TRACE(Stmt),Stmt).
+%%-define(TRACE(Stmt),ok).
+-define(DEFAULT_RETRY_NO_ENDPOINTS, 2 * 1000). %% short for testing to avoid timeout
+-else.
+-define(TRACE(Stmt),ok).
+-define(DEFAULT_RETRY_NO_ENDPOINTS, 5 * 1000). %% 5 seconds
+-endif.
+
+
+-define(SERVER, riak_core_connection_manager).
+-define(MAX_LISTENERS, 100).
+
+-type(counter() :: non_neg_integer()).
+
+%% Connection manager strategy (per Jon M.)
+%% when a connection request comes in,
+%% + call the locator service to get the list of {transport, {address, port}}
+%% + create a linked helper process to call riak_core_connection (just once) on the next available
+%% connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary)
+%% + on connection it transfers control of the socket back to the connmgr, casts a success message back
+%% to the connection manager and exits normally.
+%% - on success, the connection manager increments successful connects, reset the backoff timeout on
+%% that connection.
+%% - on failure, casts a failure message back to the connection manager (error, timeout etc) the
+%% connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure
+%% counter and starts a timer for the backoff time (and updates it for next time). The connection
+%% manager checks for the next non--blacklisted endpoint in the connection request list to launch
+%% a new connection, if the list is empty call the locator service again to get a new list. If all
+%% connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff
+%% time too).
+
+%% End-point status state, updated for failed and successful connection attempts,
+%% or by timers that fire to update the backoff time.
+%% TODO: add folsom window'd stats
+%% handle an EXIT from the helper process if it dies
+-record(ep, {addr, %% endpoint {IP, Port}
+ nb_curr_connections = 0 :: counter(), %% number of current connections
+ nb_success = 0 :: counter(), %% total successfull connects on this ep
+ nb_failures = 0 :: counter(), %% total failed connects on this ep
+ is_black_listed = false :: boolean(), %% true after a failed connection attempt
+ backoff_delay=0 :: counter(), %% incremented on each failure, reset to zero on success
+ failures = orddict:new() :: orddict:orddict(), %% failure reasons
+ last_fail_time :: erlang:timestamp(), %% time of last failure since 1970
+ next_try_secs :: counter() %% time in seconds to next retry attempt
+ }).
+
+%% connection request record
+-record(req, {ref, % Unique reference for this connection request
+ pid, % Helper pid trying to make connection
+ target, % target to connect to {Type, Name}
+ spec, % client spec
+ strategy, % connection strategy
+ cur, % current connection endpoint
+ state = init, % init | connecting | connected | cancelled
+ status % history of connection attempts
+ }).
+
+
+%% connection manager state:
+-record(state, {is_paused = false :: boolean(),
+ pending = [] :: [#req{}], % pending requests
+ %% endpoints :: {module(),ip_addr()} -> ep()
+ endpoints = orddict:new() :: orddict:orddict(), %% known endpoints w/status
+ locators = orddict:new() :: orddict:orddict(), %% connection locators
+ nb_total_succeeded = 0 :: counter(),
+ nb_total_failed = 0 :: counter()
+ }).
+
+-export([start_link/0,
+ resume/0,
+ pause/0,
+ is_paused/0,
+ connect/2, connect/3,
+ disconnect/1,
+ register_locator/2,
+ apply_locator/2,
+ reset_backoff/0,
+ stop/0
+ ]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% internal functions
+-export([connection_helper/4, increase_backoff/1]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%% @doc Starts the manager linked.
+-spec(start_link() -> {ok, pid()}).
+start_link() ->
+ Args = [],
+ Options = [],
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options).
+
+%% @doc Begins or resumes accepting and establishing new connections, in
+%% order to maintain the protocols that have been (or continue to be) registered
+%% and unregistered. pause() will not kill any existing connections, but will
+%% cease accepting new requests or retrying lost connections.
+-spec(resume() -> ok).
+resume() ->
+ gen_server:cast(?SERVER, resume).
+
+%% @doc Stop accepting / creating new connections; this does not terminated
+%% existing ones.
+-spec(pause() -> ok).
+pause() ->
+ gen_server:cast(?SERVER, pause).
+
+%% @doc Return paused state
+-spec is_paused() -> boolean().
+is_paused() ->
+ gen_server:call(?SERVER, is_paused).
+
+%% @doc Reset all backoff delays to zero.
+-spec reset_backoff() -> 'ok'.
+reset_backoff() ->
+ gen_server:cast(?SERVER, reset_backoff).
+
+%% Register a locator - for the given Name and strategy it returns {ok, [{IP,Port}]}
+%% list of endpoints to connect to, in order. The list may be empty.
+%% If the query can never be answered
+%% return {error, Reason}.
+%% fun(Name
+register_locator(Type, Fun) ->
+ gen_server:call(?SERVER, {register_locator, Type, Fun}, infinity).
+
+apply_locator(Name, Strategy) ->
+ gen_server:call(?SERVER, {apply_locator, Name, Strategy}, infinity).
+
+%% @doc Establish a connection to the remote destination. be persistent about it,
+%% but not too annoying to the remote end. Connect by name of cluster or
+%% IP address. Use default strategy to find "best" peer for connection.
+%%
+%% Targets are found by applying a registered locator for it.
+%% The identity locator is pre-installed, so if you want to connect to a list
+%% of IP and Port addresses, supply a Target like this: {identity, [{IP, Port},...]},
+%% where IP::string() and Port::integer(). You can also pass {identity, {IP, Port}}
+%% and the locator will use just that one IP. With a list, it will rotate
+%% trying them all until a connection is established.
+%%
+%% Other locator types must be registered with this connection manager
+%% before calling connect().
+%%
+%% Supervision must be done by the calling process if desired. No supervision
+%% is done here.
+%%
+-spec connect(Target :: string(), ClientSpec :: clientspec(), Strategy :: client_scheduler_strategy()) -> {'ok', reference()}.
+connect(Target, ClientSpec, Strategy) ->
+ gen_server:call(?SERVER, {connect, Target, ClientSpec, Strategy}).
+
+%% @doc same as connect(Target, ClientSpec, default).
+%% @see connect/3
+-spec connect(Target :: string(), ClientSpec :: clientspec()) -> {'ok', reference()}.
+connect(Target, ClientSpec) ->
+ gen_server:call(?SERVER, {connect, Target, ClientSpec, default}).
+
+%% @doc Disconnect from the remote side.
+-spec disconnect(Target :: string()) -> 'ok'.
+disconnect(Target) ->
+ gen_server:cast(?SERVER, {disconnect, Target}).
+
+%% doc Stop the server and sever all connections.
+-spec stop() -> 'ok'.
+stop() ->
+ gen_server:call(?SERVER, stop).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ process_flag(trap_exit, true),
+ %% install default "identity" locator
+ Locator = fun identity_locator/2,
+ ?TRACE(?debugMsg("Starting")),
+ {ok, #state{is_paused = false,
+ locators = orddict:store(identity, Locator, orddict:new())
+ }}.
+
+handle_call(is_paused, _From, State) ->
+ {reply, State#state.is_paused, State};
+
+%% connect based on address. Return process id of helper
+handle_call({connect, Target, ClientSpec, Strategy}, _From, State) ->
+ Reference = make_ref(),
+ Request = #req{ref = Reference,
+ target = Target,
+ pid = undefined,
+ spec = ClientSpec,
+ state = init,
+ strategy = Strategy},
+ %% add request to pending queue so it may be found in restarts
+ State2 = State#state{pending = lists:keystore(Reference, #req.ref,
+ State#state.pending,
+ Request)},
+ ?TRACE(?debugFmt("Starting connect request to ~p, ref is ~p", [Target, Reference])),
+ {reply, {ok, Reference}, start_request(Request, State2)};
+
+handle_call({get_endpoint_backoff, Addr}, _From, State) ->
+ ?TRACE(?debugFmt("backing off ~p", [Addr])),
+ {reply, {ok, get_endpoint_backoff(Addr, State#state.endpoints)}, State};
+
+handle_call({register_locator, Type, Fun}, _From,
+ State = #state{locators = Locators}) ->
+ {reply, ok, State#state{locators = orddict:store(Type, Fun, Locators)}};
+
+handle_call({apply_locator, Target, Strategy}, _From,
+ State = #state{locators = Locators}) ->
+ AddrsOrError = locate_endpoints(Target, Strategy, Locators),
+ {reply, AddrsOrError, State};
+
+handle_call(stop, _From, State) ->
+ %% TODO do we need to cleanup helper pids here?
+ {stop, normal, ok, State};
+
+handle_call({should_try_endpoint, Ref, Addr}, _From, State = #state{pending=Pending}) ->
+ case lists:keyfind(Ref, #req.ref, Pending) of
+ false ->
+ %% This should never happen
+ {reply, false, State};
+ Req ->
+ {Answer, ReqState} =
+ case Req#req.state of
+ cancelled ->
+ %% helper process hasn't cancelled itself yet.
+ {false, cancelled};
+ _ ->
+ {true, connecting}
+ end,
+ {reply, Answer, State#state{pending = lists:keystore(Ref, #req.ref, Pending,
+ Req#req{cur = Addr,
+ state = ReqState})}}
+ end;
+
+handle_call(_Unhandled, _From, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])),
+ {reply, {error, unhandled}, State}.
+
+handle_cast(pause, State) ->
+ {noreply, State#state{is_paused = true}};
+
+handle_cast(resume, State) ->
+ {noreply, State#state{is_paused = false}};
+
+handle_cast({disconnect, Target}, State) ->
+ {noreply, disconnect_from_target(Target, State)};
+
+%% reset all backoff delays to zero.
+%% TODO: restart stalled connections.
+handle_cast(reset_backoff, State) ->
+ NewEps = reset_backoff(State#state.endpoints),
+ {noreply, State#state{endpoints = NewEps}};
+
+%% helper process says no endpoints were returned by the locators.
+%% helper process will schedule a retry.
+handle_cast({conmgr_no_endpoints, _Ref}, State) ->
+ %% mark connection as black-listed and start timer for reset
+ {noreply, State};
+
+%% helper process says it failed to reach an address.
+handle_cast({endpoint_failed, Addr, Reason, ProtocolId}, State) ->
+ ?TRACE(?debugFmt("Failing endpoint ~p for protocol ~p with reason ~p", [Addr, ProtocolId, Reason])),
+ %% mark connection as black-listed and start timer for reset
+ {noreply, fail_endpoint(Addr, Reason, ProtocolId, State)}.
+
+%% it is time to remove Addr from the black-listed addresses
+handle_info({backoff_timer, Addr}, State = #state{endpoints = EPs}) ->
+ case orddict:find(Addr, EPs) of
+ {ok, EP} ->
+ EP2 = EP#ep{is_black_listed = false},
+ {noreply, State#state{endpoints = orddict:store(Addr,EP2,EPs)}};
+ error ->
+ %% TODO: Should never happen because the Addr came from the EP list.
+ {norepy, State}
+ end;
+handle_info({retry_req, Ref}, State = #state{pending = Pending}) ->
+ case lists:keyfind(Ref, #req.ref, Pending) of
+ false ->
+ %% TODO: should never happen
+ {noreply, State};
+ Req ->
+ {noreply, start_request(Req, State)}
+ end;
+
+%%% All of the connection helpers end here
+%% cases:
+%% helper succeeded -> update EP stats: BL<-false, backoff_delay<-0
+%% helper failed -> updates EP stats: failures++, backoff_delay++
+%% other Pid failed -> pass on linked error
+handle_info({'EXIT', From, Reason}, State = #state{pending = Pending}) ->
+ %% Work out which endpoint it was
+ case lists:keytake(From, #req.pid, Pending) of
+ false ->
+ %% Must have been something we were linked to, or linked to us
+ ?TRACE(?debugFmt("Connection Manager exiting because linked process ~p exited for reason: ~p",
+ [From, Reason])),
+ lager:error("Connection Manager exiting because linked process ~p exited for reason: ~p",
+ [From, Reason]),
+ exit({linked, From, Reason});
+ {value, #req{cur = Cur, ref = Ref}=Req, Pending2} ->
+ {{ProtocolId, _Foo},_Bar} = Req#req.spec,
+ case Reason of
+ ok ->
+ %% update the stats module
+ Stat = conn_success,
+ ?TRACE(?debugMsg("Trying for stats update, the connect_endpoint")),
+ riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId),
+ %% riak_core_connection set up and handlers called
+ {noreply, connect_endpoint(Cur, State#state{pending = Pending2})};
+
+ {ok, cancelled} ->
+ %% helper process has been cancelled and has exited nicely.
+ %% update the stats module
+ Stat = conn_cancelled,
+ ?TRACE(?debugMsg("Trying for stats update")),
+ riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId),
+ %% toss out the cancelled request from pending.
+ {noreply, State#state{pending = Pending2}};
+
+ {error, endpoints_exhausted, Ref} ->
+ %% tried all known endpoints. schedule a retry.
+ %% reuse the existing request Reference, Ref.
+ case Req#req.state of
+ cancelled ->
+ %% oops. that request was cancelled. No retry
+ {noreply, State#state{pending = Pending2}};
+ _ ->
+ ?TRACE(?debugMsg("Scheduling retry")),
+ {noreply, schedule_retry(?EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, Ref, State)}
+ end;
+
+ Reason -> % something bad happened to the connection, reuse the request
+ ?TRACE(?debugFmt("handle_info: EP failed on ~p for ~p. removed Ref ~p",
+ [Cur, Reason, Ref])),
+ lager:warning("handle_info: endpoint ~p failed: ~p. removed Ref ~p",
+ [Cur, Reason, Ref]),
+ State2 = fail_endpoint(Cur, Reason, ProtocolId, State),
+ %% the connection helper will not retry. It's up the caller.
+ State3 = fail_request(Reason, Req, State2),
+ {noreply, State3}
+ end
+ end;
+handle_info(_Unhandled, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])),
+ lager:error("Unhandled gen_server info: ~p", [_Unhandled]),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Private
+%%%===================================================================
+
+identity_locator({IP,Port}, _Policy) ->
+ {ok, [{IP,Port}]};
+identity_locator([Ips], _Policy) ->
+ {ok, Ips}.
+
+%% close the pending connection and cancel the request
+disconnect_from_target(Target, State = #state{pending = Pending}) ->
+ lager:debug("Disconnecting from: ~p", [Target]),
+ case lists:keyfind(Target, #req.target, Pending) of
+ false ->
+ %% already gone!
+ State;
+ Req ->
+ %% The helper process will discover the cancellation when it asks if it
+ %% should connect to an endpoint.
+ State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending,
+ Req#req{state = cancelled})}
+ end.
+
+%% schedule a retry to occur after Interval milliseconds.
+%% do not clear the pid from pending. the exit handler will do that.
+schedule_retry(Interval, Ref, State = #state{pending = Pending}) ->
+ case lists:keyfind(Ref, #req.ref, Pending) of
+ false ->
+ %% this should never happen
+ lager:error("ConnectionManager: failed to find connection ref while scheduling retry."),
+ State;
+ Req ->
+ case Req#req.state of
+ cancelled ->
+ %% the request was cancelled, so no rescheduling wanted.
+ State;
+ _ ->
+ %% reschedule request to happen in the future
+ erlang:send_after(Interval, self(), {retry_req, Ref}),
+ State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending,
+ Req#req{cur = undefined})}
+ end
+ end.
+
+%% Start process to make connection to available endpoints. Return a reference for
+%% this connection attempt.
+start_request(#req{state=cancelled}, State) ->
+ State;
+start_request(Req = #req{ref=Ref, target=Target, spec=ClientSpec, strategy=Strategy},
+ State) ->
+ case locate_endpoints(Target, Strategy, State#state.locators) of
+ {ok, []} ->
+ %% locators provided no addresses
+ gen_server:cast(?SERVER, {conmgr_no_endpoints, Ref}),
+ Interval = app_helper:get_env(riak_core, connmgr_no_endpoint_retry,
+ ?DEFAULT_RETRY_NO_ENDPOINTS),
+ lager:debug("Connection Manager located no endpoints for: ~p. Will retry.", [Target]),
+ %% schedule a retry and exit
+ schedule_retry(Interval, Ref, State);
+ {ok, EpAddrs } ->
+ lager:debug("Connection Manager located endpoints: ~p", [EpAddrs]),
+ AllEps = update_endpoints(EpAddrs, State#state.endpoints),
+ TryAddrs = filter_blacklisted_endpoints(EpAddrs, AllEps),
+ lager:debug("Connection Manager trying endpoints: ~p", [TryAddrs]),
+ Pid = spawn_link(
+ fun() -> exit(try connection_helper(Ref, ClientSpec, Strategy, TryAddrs)
+ catch T:R -> {exception, {T, R}}
+ end)
+ end),
+ State#state{endpoints = AllEps,
+ pending = lists:keystore(Ref, #req.ref, State#state.pending,
+ Req#req{pid = Pid,
+ state = connecting,
+ cur = undefined})};
+ {error, Reason} ->
+ fail_request(Reason, Req, State)
+ end.
+
+%% reset the backoff delay to zero for all endpoints
+reset_backoff(Endpoints) ->
+ orddict:map(fun(_Addr,EP) -> EP#ep{backoff_delay = 0} end,Endpoints).
+
+%% increase the backoff delay, but cap at a maximum
+increase_backoff(0) ->
+ ?INITIAL_BACKOFF;
+increase_backoff(Delay) when Delay > ?MAX_BACKOFF ->
+ ?MAX_BACKOFF;
+increase_backoff(Delay) ->
+ 2 * Delay.
+
+%% Convert an inet:address to a string if needed.
+string_of_ip(IP) when is_tuple(IP) ->
+ inet_parse:ntoa(IP);
+string_of_ip(IP) ->
+ IP.
+
+string_of_ipport({IP,Port}) ->
+ string_of_ip(IP) ++ ":" ++ erlang:integer_to_list(Port).
+
+%% A spawned process that will walk down the list of endpoints and try them
+%% all until exhausting the list. This process is responsible for waiting for
+%% the backoff delay for each endpoint.
+connection_helper(Ref, _Protocol, _Strategy, []) ->
+ %% exhausted the list of endpoints. let server start new helper process
+ {error, endpoints_exhausted, Ref};
+connection_helper(Ref, Protocol, Strategy, [Addr|Addrs]) ->
+ {{ProtocolId, _Foo},_Bar} = Protocol,
+ %% delay by the backoff_delay for this endpoint.
+ {ok, BackoffDelay} = gen_server:call(?SERVER, {get_endpoint_backoff, Addr}),
+ lager:debug("Holding off ~p seconds before trying ~p at ~p",
+ [(BackoffDelay/1000), ProtocolId, string_of_ipport(Addr)]),
+ timer:sleep(BackoffDelay),
+ case gen_server:call(?SERVER, {should_try_endpoint, Ref, Addr}) of
+ true ->
+ lager:debug("Trying connection to: ~p at ~p", [ProtocolId, string_of_ipport(Addr)]),
+ ?TRACE(?debugMsg("Attempting riak_core_connection:sync_connect/2")),
+ case riak_core_connection:sync_connect(Addr, Protocol) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% notify connection manager this EP failed and try next one
+ gen_server:cast(?SERVER, {endpoint_failed, Addr, Reason, ProtocolId}),
+ connection_helper(Ref, Protocol, Strategy, Addrs)
+ end;
+ _ ->
+ %% connection request has been cancelled
+ lager:debug("Ignoring connection to: ~p at ~p because it was cancelled",
+ [ProtocolId, string_of_ipport(Addr)]),
+ {ok, cancelled}
+ end.
+
+locate_endpoints({Type, Name}, Strategy, Locators) ->
+ case orddict:find(Type, Locators) of
+ {ok, Locate} ->
+ case Locate(Name, Strategy) of
+ error ->
+ {error, {bad_target_name_args, Type, Name}};
+ Addrs ->
+ Addrs
+ end;
+ error ->
+ {error, {unknown_target_type, Type}}
+ end.
+
+%% Make note of the failed connection attempt and update
+%% our book keeping for that endpoint. Black-list it, and
+%% adjust a backoff timer so that we wait a while before
+%% trying this endpoint again.
+fail_endpoint(Addr, Reason, ProtocolId, State) ->
+ %% update the stats module
+ Stat = {conn_error, Reason},
+ riak_core_connection_mgr_stats:update(Stat, Addr, ProtocolId),
+ %% update the endpoint
+ Fun = fun(EP=#ep{backoff_delay = Backoff, failures = Failures}) ->
+ erlang:send_after(Backoff, self(), {backoff_timer, Addr}),
+ EP#ep{failures = orddict:update_counter(Reason, 1, Failures),
+ nb_failures = EP#ep.nb_failures + 1,
+ backoff_delay = increase_backoff(Backoff),
+ last_fail_time = os:timestamp(),
+ next_try_secs = Backoff/1000,
+ is_black_listed = true}
+ end,
+ update_endpoint(Addr, Fun, State).
+
+connect_endpoint(Addr, State) ->
+ update_endpoint(Addr, fun(EP) ->
+ EP#ep{is_black_listed = false,
+ nb_success = EP#ep.nb_success + 1,
+ next_try_secs = 0,
+ backoff_delay = 0}
+ end, State).
+
+%% Return the current backoff delay for the named Address,
+%% or if we can't find that address in the endpoints - the
+%% initial backoff.
+get_endpoint_backoff(Addr, EPs) ->
+ case orddict:find(Addr, EPs) of
+ error ->
+ 0;
+ {ok, EP} ->
+ EP#ep.backoff_delay
+ end.
+
+update_endpoint(Addr, Fun, State = #state{endpoints = EPs}) ->
+ case orddict:find(Addr, EPs) of
+ error ->
+ EP2 = Fun(#ep{addr = Addr}),
+ State#state{endpoints = orddict:store(Addr,EP2,EPs)};
+ {ok, EP} ->
+ EP2 = Fun(EP),
+ State#state{endpoints = orddict:store(Addr,EP2,EPs)}
+ end.
+
+fail_request(Reason, #req{ref = Ref, spec = Spec},
+ State = #state{pending = Pending}) ->
+ %% Tell the module it failed
+ {Proto, {_TcpOptions, Module,Args}} = Spec,
+ ?TRACE(?debugFmt("module ~p getting connect_failed", [Module])),
+ Module:connect_failed(Proto, {error, Reason}, Args),
+ %% Remove the request from the pending list
+ State#state{pending = lists:keydelete(Ref, #req.ref, Pending)}.
+
+update_endpoints(Addrs, Endpoints) ->
+ %% add addr to Endpoints if not already there
+ Fun = (fun(Addr, EPs) ->
+ case orddict:is_key(Addr, Endpoints) of
+ true -> EPs;
+ false ->
+ EP = #ep{addr=Addr},
+ orddict:store(Addr, EP, EPs)
+ end
+ end),
+ lists:foldl(Fun, Endpoints, Addrs).
+
+%% Return the addresses of non-blacklisted endpoints that are also
+%% members of the list EpAddrs.
+filter_blacklisted_endpoints(EpAddrs, AllEps) ->
+ PredicateFun = (fun(Addr) ->
+ case orddict:find(Addr, AllEps) of
+ {ok, EP} ->
+ EP#ep.is_black_listed == false;
+ error ->
+ false
+ end
+ end),
+ lists:filter(PredicateFun, EpAddrs).
273 src/riak_core_connection_mgr_stats.erl
View
@@ -0,0 +1,273 @@
+%% -------------------------------------------------------------------
+%%
+%% Riak Core Connection Manager Statistics
+%% collect, aggregate, and provide stats for connections made by
+%% the connection manager
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_connection_mgr_stats).
+-author("Chris Tilt").
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0,
+ get_stats/0,
+ get_stats_by_ip/1,
+ get_stats_by_protocol/1,
+ get_consolidated_stats/0,
+ update/3,
+ register_stats/0,
+ produce_stats/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+-define(APP, riak_conn_mgr_stats).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register_stats() ->
+ [(catch folsom_metrics:delete_metric(Stat)) || Stat <- folsom_metrics:get_metrics(),
+ is_tuple(Stat), element(1, Stat) == ?APP],
+ [register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
+ riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
+
+%% @spec get_stats() -> proplist()
+%% @doc Return all stats from the cached value. This will refresh
+%% the cache if it's been over 5 seconds since the last query.
+%% When the cache needs to get the latest values, it will call our
+%% produce_stats() function.
+get_stats() ->
+ case riak_core_stat_cache:get_stats(?APP) of
+ {ok, Stats, _TS} ->
+ Stats;
+ Error -> Error
+ end.
+
+get_consolidated_stats() ->
+ Strings = [format_stat(Stat) || Stat <- get_stats()],
+ stats_as_atoms(Strings).
+
+%% Sort and convert {K,V} stats to atoms
+stats_as_atoms(StringStats) ->
+ lists:map(fun({S,V}) -> {list_to_atom(S), V} end, lists:sort(StringStats)).
+
+format_stat({{?APP, conn_error, StatName}, [{count,N},{one,_W}]}) ->
+ {"conn_error_" ++ atom_to_list(StatName), N};
+format_stat({{?APP, conn_error, StatName, total}, N}) ->
+ {"conn_error_" ++ atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, conn_error, StatName, Addr, total}, N}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(StatName)
+ ++ "_total", N};
+format_stat({{?APP, conn_error, StatName, Addr, ProtocolId, total},N}) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(ProtocolId) ++ "_"
+ ++ atom_to_list(StatName)
+ ++ "_total" , N};
+format_stat({{?APP, conn_error, StatName, ProtocolId, total},N}) ->
+ {atom_to_list(ProtocolId) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(StatName)
+ ++ "_total", N};
+format_stat({{?APP, conn_error, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(ProtocolId) ++ "_"
+ ++ atom_to_list(StatName), N};
+format_stat({{?APP, conn_error, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr) ++ "_"
+ ++ "conn_error_"
+ ++ atom_to_list(StatName), N};
+format_stat({{?APP, conn_error, StatName, ProtocolId},[{count,N},{one,_W}]}) ->
+ {"conn_error_" ++ atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N};
+
+format_stat({{?APP, StatName},[{count,N},{one,_W}]}) ->
+ {atom_to_list(StatName), N};
+format_stat({{?APP, StatName, total},N}) ->
+ {atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, StatName, Addr, total},N}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr)
+ ++ "_" ++ atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) ->
+ {string_of_ipaddr(Addr) ++ "_" ++ atom_to_list(StatName),N};
+format_stat({{?APP, StatName, ProtocolId, total},N}) when is_atom(ProtocolId) ->
+ {atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName) ++ "_total", N};
+format_stat({{?APP, StatName, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
+ {atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N};
+format_stat({{?APP, StatName, Addr, ProtocolId, total},N}) when is_atom(ProtocolId) ->
+ {string_of_ipaddr(Addr)
+ ++ "_" ++ atom_to_list(ProtocolId)
+ ++ "_" ++ atom_to_list(StatName)
+ ++ "_total", N};
+format_stat({{?APP, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
+ {string_of_ipaddr(Addr)
+ ++ "_" ++ atom_to_list(ProtocolId)
+ ++ "_" ++ atom_to_list(StatName), N}.
+
+string_of_ipaddr({IP, Port}) when is_list(IP) ->
+ lists:flatten(io_lib:format("~s:~p", [IP, Port]));
+string_of_ipaddr({IP, Port}) when is_tuple(IP) ->
+ lists:flatten(io_lib:format("~s:~p", [inet_parse:ntoa(IP), Port])).
+
+%% Get stats filtered by given IP address
+get_stats_by_ip({_IP, _Port}=Addr) ->
+ AllStats = get_stats(),
+ Stats = lists:filter(fun(S) -> predicate_by_ip(S,Addr) end, AllStats),
+ stats_as_atoms([format_stat(Stat) || Stat <- Stats]).
+
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId},_Value}, _MatchAddr) ->
+ false;
+predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId, total},_Value}, _MatchAddr) ->
+ false;
+predicate_by_ip({{_App, _StatName, MatchAddr},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, _StatName, MatchAddr, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip({{_App, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) ->
+ true;
+predicate_by_ip(_X, _MatchAddr) ->
+ false.
+
+%% Get stats filtered by given protocol-id (e.g. rt_repl)
+get_stats_by_protocol(ProtocolId) ->
+ AllStats = get_stats(),
+ Stats = lists:filter(fun(S) -> predicate_by_protocol(S,ProtocolId) end, AllStats),
+ stats_as_atoms([format_stat(Stat) || Stat <- Stats]).
+
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr},_Value}, _MatchId) ->
+ false;
+predicate_by_protocol({{_App, conn_error, _StatName, _Addr, total},_Value}, _MatchId) ->
+ false;
+predicate_by_protocol({{_App, _StatName, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, _StatName, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, _StatName, _Addr, MatchId},_Value}, MatchId) ->
+ true;
+predicate_by_protocol({{_App, _StatName, _Addr, MatchId, total},_Value}, MatchId) ->
+ true;
+predicate_by_protocol(_X, _MatchId) ->
+ false.
+
+%% Public interface to accumulate stats
+update(Stat, Addr, ProtocolId) ->
+ gen_server:cast(?SERVER, {update, Stat, Addr, ProtocolId}).
+
+%% gen_server
+
+init([]) ->
+ register_stats(),
+ {ok, ok}.
+
+handle_call(_Req, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast({update, Stat, Addr, ProtocolId}, State) ->
+ do_update(Stat, Addr, ProtocolId),
+ {noreply, State};
+handle_cast(_Req, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Update a stat for given IP-Address, Cluster, and Protocol-id
+do_update({conn_error, Error}, IPAddr, Protocol) ->
+ create_or_update({?APP, conn_error, Error, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error}, 1, spiral),
+ create_or_update({?APP, conn_error, Error, IPAddr, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error, IPAddr}, 1, spiral),
+ create_or_update({?APP, conn_error, Error, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error, Protocol}, 1, spiral),
+ create_or_update({?APP, conn_error, Error, IPAddr, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, conn_error, Error, IPAddr, Protocol}, 1, spiral);
+
+do_update(Stat, IPAddr, Protocol) ->
+ create_or_update({?APP, Stat, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat}, 1, spiral),
+ create_or_update({?APP, Stat, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat, Protocol}, 1, spiral),
+ create_or_update({?APP, Stat, IPAddr, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat, IPAddr }, 1, spiral),
+ create_or_update({?APP, Stat, IPAddr, Protocol, total}, {inc, 1}, counter),
+ create_or_update({?APP, Stat, IPAddr, Protocol}, 1, spiral).
+
+%% private
+
+%% dynamically update (and create if needed) a stat
+create_or_update(Name, UpdateVal, Type) ->
+ case (catch folsom_metrics:notify_existing_metric(Name, UpdateVal, Type)) of
+ ok ->
+ ok;
+ {'EXIT', _} ->
+ register_stat(Name, Type),
+ create_or_update(Name, UpdateVal, Type)
+ end.
+
+register_stat(Name, spiral) ->
+ folsom_metrics:new_spiral(Name);
+register_stat(Name, counter) ->
+ folsom_metrics:new_counter(Name).
+
+%% @spec produce_stats() -> proplist()
+%% @doc Produce a proplist-formatted view of the current aggregation
+%% of stats.
+produce_stats() ->
+ Stats = [Stat || Stat <- folsom_metrics:get_metrics(), is_tuple(Stat), element(1, Stat) == ?APP],
+ lists:flatten([{Stat, get_stat(Stat)} || Stat <- Stats]).
+
+%% Get the value of the named stats metric
+%% NOTE: won't work for Histograms
+get_stat(Name) ->
+ folsom_metrics:get_metric_value(Name).
+
+%% Return list of static stat names and types to register
+stats() -> []. %% no static stats to register
453 src/riak_core_service_mgr.erl
View
@@ -0,0 +1,453 @@
+%% -------------------------------------------------------------------
+%%
+%% Riak Subprotocol Server Dispatcher
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Listens on a single TCP port and negotiates which protocol to start
+%% on a new connection. Ranch is used to create a connection pool and accept
+%% new socket connections. When a connection is accepted, the client supplies
+%% a hello with thier revision and capabilities. The server replies in kind.
Pedram Nimreezi
DeadZen added a note

typo on their

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+%% The client then sends the service they wish to use, and which versions of
+%% the service they support. The server will find the highest major version in
+%% common, and highest major version in common. If there is no major version in
+%% common, the connectin fails. Minor versions do not need to match. On a
Pedram Nimreezi
DeadZen added a note

typo on connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+%% success, the server sends the Major version, Client minor version, and
+%% Host minor version to the client. After that, the registered
+%% module:function/5 is called and control of the socket passed to it.
+
+
+-module(riak_core_service_mgr).
+-author("Chris Tilt").
+-behaviour(gen_server).
+
+-include("riak_core_connection.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-define(TRACE(Stmt),Stmt).
+%%-define(TRACE(Stmt),ok).
+-else.
+-define(TRACE(Stmt),ok).
+-endif.
+
+-define(SERVER, riak_core_service_manager).
+-define(MAX_LISTENERS, 100).
+
+%% services := registered protocols, key :: proto_id()
+-record(state, {dispatch_addr = {"localhost", 9000} :: ip_addr(),
+ services = orddict:new() :: orddict:orddict(),
+ dispatcher_pid = undefined :: pid(),
+ status_notifiers = [],
+ service_stats = orddict:new() :: orddict:orddict(), % proto-id -> stats()
+ refs = []
+ }).
+
+-export([start_link/0, start_link/1,
+ register_service/2,
+ unregister_service/1,
+ is_registered/1,
+ register_stats_fun/1,
+ get_stats/0,
+ stop/0
+ ]).
+
+%% ranch callbacks
+-export([start_link/4]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% internal
+-export([dispatch_service/4]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%% @doc Start the Service Manager on the default/configured Ip Address and Port.
+%% All sub-protocols will be dispatched from there.
+-spec(start_link() -> {ok, pid()}).
+start_link() ->
+ ServiceAddr = case app_helper:get_env(riak_core, cluster_mgr) of
+ undefined ->
+ lager:error("cluster_mgr is not configured for riak_core in
+ app.config, defaulting to {\"127.0.0.1\", 0}."),
+ {"127.0.0.1", 0};
+ Res ->
+ Res
+ end,
+ start_link(ServiceAddr).
+
+%% @doc Start the Service Manager on the given Ip Address and Port.
+%% All sub-protocols will be dispatched from there.
+-spec(start_link(ip_addr()) -> {ok, pid()}).
+start_link({IP,Port}) when is_integer(Port), Port >= 0 ->
+ case valid_host_ip(IP) of
+ false ->
+ erlang:error({badarg, invalid_ip});
+ true ->
+ ?TRACE(?debugFmt("Starting Core Service Manager at ~p", [{IP,Port}])),
+ lager:info("Starting Core Service Manager at ~p", [{IP,Port}]),
+ Args = [{IP,Port}],
+ Options = [],
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options)
+ end.
+
+%% @doc Once a protocol specification is registered, it will be kept available
+%% by the Service Manager. Note that the callee is responsible for taking
+%% ownership of the socket via Transport:controlling_process(Socket, Pid).
+%% Only the strategy of `round_robin' is supported; it's arg is ignored.
+-spec(register_service(hostspec(), service_scheduler_strategy()) -> ok).
+register_service(HostProtocol, Strategy) ->
+ %% only one strategy is supported as yet
+ {round_robin, _NB} = Strategy,
+ gen_server:cast(?SERVER, {register_service, HostProtocol, Strategy}).
+
+%% @doc Unregister the given protocol-id. Existing connections for this
+%% protocol are not killed. New connections for this protocol will not be
+%% accepted until re-registered.
+-spec(unregister_service(proto_id()) -> ok).
+unregister_service(ProtocolId) ->
+ gen_server:cast(?SERVER, {unregister_service, ProtocolId}).
+
+%% @doc True if the given protocal id is registered.
+-spec(is_registered(proto_id()) -> boolean()).
+is_registered(ProtocolId) ->
+ gen_server:call(?SERVER, {is_registered, service, ProtocolId}).
+
+%% @doc Register a callback function that will get called periodically or
+%% when the connection status of services changes. The function will
+%% receive a list of tuples: {<protocol-id>, <stats>} where stats
+%% holds the number of open connections that have been accepted for that
+%% protocol type. This can be used to report load, in the form of
+%% connected-ness, for each protocol type, to remote clusters, e.g.,
+%% making it possible for schedulers to balance the number of
+%% connections across a cluster.
+-spec register_stats_fun(Fun :: fun(([{proto_id(), non_neg_integer()}]) -> any())) -> 'ok'.
+register_stats_fun(Fun) ->
+ gen_server:cast(?SERVER, {register_stats_fun, Fun}).
+
+%% @doc Number of open connections for each protocol id.
+-spec get_stats() -> [{proto_id(), non_neg_integer()}].
+get_stats() ->
+ gen_server:call(?SERVER, get_stats).
+
+%% @doc Stop the ranch listener, and then exit the server normally.
+-spec stop() -> 'ok'.
+stop() ->
+ gen_server:call(?SERVER, stop).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([IpAddr]) ->
+ {ok, Pid} = start_dispatcher(IpAddr, ?MAX_LISTENERS, []),
+ {ok, #state{dispatch_addr = IpAddr, dispatcher_pid=Pid}}.
+
+handle_call({is_registered, service, ProtocolId}, _From, State) ->
+ Found = orddict:is_key(ProtocolId, State#state.services),
+ {reply, Found, State};
+
+handle_call(get_services, _From, State) ->
+ {reply, orddict:to_list(State#state.services), State};
+
+handle_call(stop, _From, State) ->
+ ranch:stop_listener(State#state.dispatch_addr),
+ {stop, normal, ok, State};
+
+handle_call(get_stats, _From, State) ->
+ Stats = orddict:to_list(State#state.service_stats),
+ PStats = [{Protocol, Count} || {Protocol,{_Stats,Count}} <- Stats],
+ {reply, PStats, State};
+
+handle_call(_Unhandled, _From, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])),
+ {reply, {error, unhandled}, State}.
+
+handle_cast({register_service, Protocol, Strategy}, State) ->
+ {{ProtocolId,_Revs},_Rest} = Protocol,
+ NewDict = orddict:store(ProtocolId, {Protocol, Strategy}, State#state.services),
+ {noreply, State#state{services=NewDict}};
+
+handle_cast({unregister_service, ProtocolId}, State) ->
+ NewDict = orddict:erase(ProtocolId, State#state.services),
+ {noreply, State#state{services=NewDict}};
+
+handle_cast({register_stats_fun, Fun}, State) ->
+ Notifiers = [Fun | State#state.status_notifiers],
+ erlang:send_after(500, self(), status_update_timer),
+ {noreply, State#state{status_notifiers=Notifiers}};
+
+%% TODO: unregister support for notifiers?
+%% handle_cast({unregister_node_status_fun, Fun}, State) ->
+%% Notifiers = [Fun | State#state.notifiers],
+%% {noreply, State#state{status_notifiers=Notifiers}};
+
+handle_cast({service_up_event, Pid, ProtocolId}, State) ->
+ ?TRACE(?debugFmt("Service up event: ~p", [ProtocolId])),
+ erlang:send_after(500, self(), status_update_timer),
+ Ref = erlang:monitor(process, Pid), %% arrange for us to receive 'DOWN' when Pid terminates
+ ServiceStats = incr_count_for_protocol_id(ProtocolId, 1, State#state.service_stats),
+ Refs = [{Ref,ProtocolId} | State#state.refs],
+ {noreply, State#state{service_stats=ServiceStats, refs=Refs}};
+
+handle_cast({service_down_event, _Pid, ProtocolId}, State) ->
+ ?TRACE(?debugFmt("Service down event: ~p", [ProtocolId])),
+ erlang:send_after(500, self(), status_update_timer),
+ ServiceStats = incr_count_for_protocol_id(ProtocolId, -1, State#state.service_stats),
+ {noreply, State#state{service_stats = ServiceStats}};
+
+handle_cast(_Unhandled, _State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server cast: ~p", [_Unhandled])),
+ {error, unhandled}. %% this will crash the server
+
+handle_info(status_update_timer, State) ->
+ %% notify all registered parties of this node's services counts
+ Stats = orddict:to_list(State#state.service_stats),
+ PStats = [ {Protocol, Count} || {Protocol,{_Stats,Count}} <- Stats],
+ [NotifyFun(PStats) || NotifyFun <- State#state.status_notifiers],
+ {noreply, State};
+
+%% Get notified of a service that went down.
+%% Remove it's Ref and pass the event on to get counted by ProtocolId
+handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
+ Refs = State#state.refs,
+ Refs2 = case lists:keytake(Ref, 1, Refs) of
+ {value, {Ref, ProtocolId}, Rest} ->
+ gen_server:cast(?SERVER, {service_down_event, Pid, ProtocolId}),
+ Rest;
+ error ->
+ Refs
+ end,
+ {noreply, State#state{refs=Refs2}};
+
+handle_info(_Unhandled, State) ->
+ ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Private
+%%%===================================================================
+
+incr_count_for_protocol_id(ProtocolId, Incr, ServiceStatus) ->
+ Stats2 = case orddict:find(ProtocolId, ServiceStatus) of
+ {ok, Stats} ->
+ Count = Stats#stats.open_connections,
+ Stats#stats{open_connections = Count + Incr};
+ error ->
+ #stats{open_connections = Incr}
+ end,
+ orddict:store(ProtocolId, Stats2, ServiceStatus).
+
+%% @private
+%% Host callback function, called by ranch for each accepted connection by way of
+%% of the ranch:start_listener() call above, specifying this module.
+start_link(Listener, Socket, Transport, SubProtocols) ->
+ ?TRACE(?debugMsg("Start_link dispatch_service")),
+ {ok, spawn_link(?MODULE, dispatch_service, [Listener, Socket, Transport, SubProtocols])}.
+
+%% Body of the main dispatch loop. This is instantiated once for each connection
+%% we accept because it transforms itself into the SubProtocol once it receives
+%% the sub protocol and version, negotiated with the client.
+dispatch_service(Listener, Socket, Transport, _Args) ->
+ %% tell ranch "we've got it. thanks pardner"
+ ok = ranch:accept_ack(Listener),
+ %% set some starting options for the channel; these should match the client
+ ?TRACE(?debugFmt("setting system options on service side: ~p", [?CONNECT_OPTIONS])),
+ ok = Transport:setopts(Socket, ?CONNECT_OPTIONS),
+ %% Version 1.0 capabilities just passes our clustername
+ MyName = riak_core_connection:symbolic_clustername(),
+ MyCaps = [{clustername, MyName}],
+ case exchange_handshakes_with(client, Socket, Transport, MyCaps) of
+ {ok,Props} ->
+ %% get latest set of registered services from gen_server and do negotiation
+ Services = gen_server:call(?SERVER, get_services),
+ SubProtocols = [Protocol || {_Key,{Protocol,_Strategy}} <- Services],
+ ?TRACE(?debugFmt("started dispatch_service with protocols: ~p",
+ [SubProtocols])),
+ Negotiated = negotiate_proto_with_client(Socket, Transport, SubProtocols),
+ ?TRACE(?debugFmt("negotiated = ~p", [Negotiated])),
+ start_negotiated_service(Socket, Transport, Negotiated, Props);
+ Error ->
+ Error
+ end.
+
+%% start user's module:function and transfer socket to it's process.
+start_negotiated_service(_Socket, _Transport, {error, Reason}, _Props) ->
+ ?TRACE(?debugFmt("service dispatch failed with ~p", [{error, Reason}])),
+ lager:error("service dispatch failed with ~p", [{error, Reason}]),
+ {error, Reason};
+%% Note that the callee is responsible for taking ownership of the socket via
+%% Transport:controlling_process(Socket, Pid),
+start_negotiated_service(Socket, Transport,
+ {NegotiatedProtocols, {Options, Module, Function, Args}},
+ Props) ->
+ %% Set requested Tcp socket options now that we've finished handshake phase
+ ?TRACE(?debugFmt("Setting user options on service side; ~p", [Options])),
+ ?TRACE(?debugFmt("negotiated protocols: ~p", [NegotiatedProtocols])),
+ Transport:setopts(Socket, Options),
+ %% call service body function for matching protocol. The callee should start
+ %% a process or gen_server or such, and return {ok, pid()}.
+ case Module:Function(Socket, Transport, NegotiatedProtocols, Args, Props) of
+ {ok, Pid} ->
+ {ok,{ClientProto,_Client,_Host}} = NegotiatedProtocols,
+ gen_server:cast(?SERVER, {service_up_event, Pid, ClientProto}),
+ {ok, Pid};
+ Error ->
+ ?TRACE(?debugFmt("service dispatch of ~p:~p failed with ~p",
+ [Module, Function, Error])),
+ lager:error("service dispatch of ~p:~p failed with ~p",
+ [Module, Function, Error]),
+ Error
+ end.
+
+%% Negotiate the highest common major protocol revisision with the connected client.
+%% client -> server : Prefs List = {SubProto, [{Major, Minor}]} as binary
+%% server -> client : selected version = {SubProto, {Major, HostMinor}} as binary
+%%
+%% returns {ok,{{Proto,MyVer,RemoteVer},Options,Module,Function,Args}} | Error
+negotiate_proto_with_client(Socket, Transport, HostProtocols) ->
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, PrefsBin} ->
+ {ClientProto,Versions} = erlang:binary_to_term(PrefsBin),
+ case choose_version({ClientProto,Versions}, HostProtocols) of
+ {error, Reason} ->
+ lager:error("Failed to negotiate protocol ~p from client because ~p",
+ [ClientProto, Reason]),
+ Transport:send(Socket, erlang:term_to_binary({error,Reason})),
+ {error, Reason};
+ {ok,{ClientProto,Major,CN,HN}, Rest} ->
+ Transport:send(Socket, erlang:term_to_binary({ok,{ClientProto,{Major,HN,CN}}})),
+ {{ok,{ClientProto,{Major,HN},{Major,CN}}}, Rest};
+ {error, Reason, Rest} ->
+ lager:error("Failed to negotiate protocol ~p from client because ~p",
+ [ClientProto, Reason]),
+ %% notify client it failed to negotiate
+ Transport:send(Socket, erlang:term_to_binary({error,Reason})),
+ {{error, Reason}, Rest}
+ end;
+ {error, Reason} ->
+ lager:error("Failed to receive protocol request from client. Error = ~p",
+ [Reason]),
+ {error, connection_failed}
+ end.
+
+choose_version({ClientProto,ClientVersions}=_CProtocol, HostProtocols) ->
+ ?TRACE(?debugFmt("choose_version: client proto = ~p, HostProtocols = ~p",
+ [_CProtocol, HostProtocols])),
+ %% first, see if the host supports the subprotocol
+ case [H || {{HostProto,_Versions},_Rest}=H <- HostProtocols, ClientProto == HostProto] of
+ [] ->
+ %% oops! The host does not support this sub protocol type
+ lager:error("Failed to find host support for protocol: ~p", [ClientProto]),
+ ?TRACE(?debugMsg("choose_version: no common protocols")),
+ {error,protocol_not_supported};
+ [{{_HostProto,HostVersions},Rest}=_Matched | _DuplicatesIgnored] ->
+ ?TRACE(?debugFmt("choose_version: unsorted = ~p clientversions = ~p",
+ [_Matched, ClientVersions])),
+ CommonVers = [{CM,CN,HN} || {CM,CN} <- ClientVersions, {HM,HN} <- HostVersions, CM == HM],
+ ?TRACE(?debugFmt("common versions = ~p", [CommonVers])),
+ %% sort by major version, highest to lowest, and grab the top one.
+ case lists:reverse(lists:keysort(1,CommonVers)) of
+ [] ->
+ %% oops! No common major versions for Proto.
+ ?TRACE(?debugFmt("Failed to find a common major version for protocol: ~p",
+ [ClientProto])),
+ lager:error("Failed to find a common major version for protocol: ~p", [ClientProto]),
+ {error,protocol_version_not_supported,Rest};
+ [{Major,CN,HN}] ->
+ {ok, {ClientProto,Major,CN,HN},Rest};
+ [{Major,CN,HN} | _] ->
+ {ok, {ClientProto,Major,CN,HN},Rest}
+ end
+ end.
+
+%% exchange brief handshake with client to ensure that we're supporting sub-protocols.
+%% client -> server : Hello {1,0} [Capabilities]
+%% server -> client : Ack {1,0} [Capabilities]
+exchange_handshakes_with(client, Socket, Transport, MyCaps) ->
+ ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from client", [?CTRL_HELLO])),
+ case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
+ {ok, Hello} ->
+ %% read their hello
+ case binary_to_term(Hello) of
+ {?CTRL_HELLO, TheirRev, TheirCaps} ->
+ Ack = term_to_binary({?CTRL_ACK, ?CTRL_REV, MyCaps}),
+ Transport:send(Socket, Ack),
+ %% make some props to hand dispatched service
+ Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps],
+ {ok,Props};
+ Msg ->
+ %% tell other side we are failing them
+ Error = {error, bad_handshake},
+ Transport:send(Socket, erlang:term_to_binary(Error)),
+ lager:error("Control protocol handshake with client got unexpected hello: ~p",
+ [Msg]),
+ Error
+ end;
+ {error, Reason} ->
+ lager:error("Failed to exchange handshake with client. Error = ~p", [Reason]),
+ {error, Reason}
+ end.
+
+%% Returns true if the IP address given is a valid host IP address.
+valid_host_ip("0.0.0.0") ->
+ true;
+valid_host_ip(IP) ->
+ {ok, IFs} = inet:getifaddrs(),
+ {ok, NormIP} = normalize_ip(IP),
+ lists:foldl(
+ fun({_IF, Attrs}, Match) ->
+ case lists:member({addr, NormIP}, Attrs) of
+ true ->
+ true;
+ _ ->
+ Match
+ end
+ end, false, IFs).
+
+%% Convert IP address the tuple form
+normalize_ip(IP) when is_list(IP) ->
+ inet_parse:address(IP);
+normalize_ip(IP) when is_tuple(IP) ->
+ {ok, IP}.
+
+%% @doc Start the connection dispatcher with a limit of MaxListeners
+%% listener connections and supported sub-protocols. When a connection
+%% request arrives, it is mapped via the associated Protocol atom to an
+%% acceptor function called as Module:Function(Listener, Socket, Transport, Args),
+%% which must create it's own process and return {ok, pid()}
+
+-spec(start_dispatcher(ip_addr(), non_neg_integer(), [hostspec()]) -> {ok, pid()}).
+start_dispatcher({IP,Port}, MaxListeners, SubProtocols) ->
+ {ok, RawAddress} = inet_parse:address(IP),
+ {ok, Pid} = ranch:start_listener({IP,Port}, MaxListeners, ranch_tcp,
+ [{ip, RawAddress}, {port, Port}],
+ ?MODULE, SubProtocols),
+ lager:info("Service manager: listening on ~s:~p", [IP, Port]),
+ {ok, Pid}.
319 src/riak_core_tcp_mon.erl
View
@@ -0,0 +1,319 @@
+%% -------------------------------------------------------------------
+%%
+%% TCP Connection Monitor
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+
+-module(riak_core_tcp_mon).
+
+-export([start_link/0, start_link/1, monitor/3, status/0, status/1, format/0, format/2]).
+-export([default_status_funs/0, raw/2, diff/2, rate/2, kbps/2,
+ socket_status/1, format_socket_stats/2 ]).
+
+%% gen_server callbacks
+-behavior(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% Keep 6 x 10s worth of data plus an extra sample for working out rates
+-define(DEFAULT_LIMIT, 7).
+-define(DEFAULT_INTERVAL, timer:seconds(10)).
+-define(DEFAULT_CLEAR, timer:seconds(60)).
+
+-define(INET_STATS, [recv_oct,recv_cnt,recv_max,recv_avg,recv_dvi,
+ send_oct,send_cnt,send_max,send_avg,send_pend]).
+-define(INET_OPTS, [sndbuf,recbuf,active,buffer]).
+
+-define(STATUS_FUNS, [{recv_oct, {recv_kbps, fun kbps/2}}, {recv_cnt, fun diff/2},
+ {recv_max, fun raw/2}, {recv_avg, fun raw/2}, {recv_dvi, fun raw/2},
+ {send_oct, {send_kbps, fun kbps/2}}, {send_cnt, fun diff/2},
+ {send_max, fun raw/2}, {send_avg, fun raw/2}, {send_pend, fun raw/2},
+ {sndbuf, fun raw/2}, {recbuf, fun raw/2}, {active, fun raw/2},
+ {buffer, fun raw/2}]).
+
+-record(state, {conns = gb_trees:empty(), % conn records keyed by Socket
+ tags = gb_trees:empty(), % tags to ports
+ interval = ?DEFAULT_INTERVAL, % how often to get stats
+ limit = ?DEFAULT_LIMIT, %
+ clear_after = ?DEFAULT_CLEAR, % how long to leave errored sockets in status
+ stats = ?INET_STATS, % Stats to read
+ opts = ?INET_OPTS, % Opts to read
+ status_funs = dict:from_list(default_status_funs()) % Status reporting functions
+ }).
+
+-record(conn, {tag, %% Tag used to find socket
+ transport,
+ type, %% Type - normal, dist, error
+ ts_hist = [], %% History of timestamps for readings
+ hist = []}). %% History of readings
+
+
+start_link() ->
+ start_link([]).
+
+start_link(Props) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, Props, []).
+
+monitor(Socket, Tag, Transport) ->
+ gen_server:call(?MODULE, {monitor, Socket, Tag, Transport}).
+
+status() ->
+ gen_server:call(?MODULE, status).
+
+status(Timeout) ->
+ gen_server:call(?MODULE, status, Timeout).
+
+socket_status(Socket) ->
+ gen_server:call(?MODULE, {socket_status, Socket}).
+
+format() ->
+ Status = status(),
+ io:fwrite([format(Status, recv_kbps),
+ format(Status, send_kbps)]).
+
+format(Status, Stat) ->
+ [format_header(Stat),
+ [format_entry(Entry, Stat) || Entry <- Status]].
+
+format_header(Stat) ->
+ io_lib:format("~40w Value\n", [Stat]).
+
+format_entry({_Socket, Status}, Stat) ->
+ Tag = proplists:get_value(tag, Status),
+ Value = proplists:get_value(Stat, Status),
+ case Value of
+ Value when is_list(Value) ->
+ [io_lib:format("~40s [", [Tag]),
+ format_list(Value),
+ "]\n"];
+ _ ->
+ [io_lib:format("~40s", [Tag]),
+ format_value(Value),
+ "\n"]
+ end.
+
+format_value(Val) when is_float(Val) ->
+ io_lib:format("~7.1f", [Val]);
+format_value(Val) ->
+ io_lib:format("~w", [Val]).
+
+format_list(Value) ->
+ [$[, string:join([format_value(Item) || Item <- Value], ", "), $]].
+
+%% Provide a way to get to the default status fun
+default_status_funs() ->
+ ?STATUS_FUNS.
+
+%% Return raw readings, ignore timestamps
+raw(_TS, Hist) ->
+ Hist.
+
+diff(TS, Hist) ->
+ RevTS = lists:reverse(TS),
+ RevHist = lists:reverse(Hist),
+ diff(RevTS, RevHist, []).
+
+diff([_TS], [_C], Acc) ->
+ Acc;
+diff([_TS1 | TSRest], [C1 | CRest], Acc) ->
+ Diff = hd(CRest) - C1,
+ diff(TSRest, CRest, [Diff | Acc]).
+
+%% Convert byte rate to bit rate
+kbps(TS, Hist) ->
+ [trunc(R / 128.0) || R <- rate(TS, Hist)]. % *8 bits / 1024 bytes
+
+%% Work out the rate of something per second
+rate(TS, Hist) ->
+ RevTS = lists:reverse(TS),
+ RevHist = lists:reverse(Hist),
+ rate(RevTS, RevHist, []).
+
+rate([_TS], [_C], Acc) ->
+ Acc;
+rate([TS1 | TSRest], [C1 | CRest], Acc) ->
+ Secs = timer:now_diff(hd(TSRest), TS1) / 1.0e6,
+ Rate = (hd(CRest) - C1) / Secs,
+ rate(TSRest, CRest, [Rate | Acc]).
+
+init(Props) ->
+ lager:info("Starting TCP Monitor"),
+ ok = net_kernel:monitor_nodes(true, [{node_type, visible}, nodedown_reason]),
+ State0 = #state{interval = proplists:get_value(interval, Props, ?DEFAULT_INTERVAL),
+ limit = proplists:get_value(limit, Props, ?DEFAULT_LIMIT),
+ clear_after = proplists:get_value(clear_after, Props, ?DEFAULT_LIMIT)},
+ DistCtrl = erlang:system_info(dist_ctrl),
+ State = lists:foldl(fun({Node,Port}, DatState) ->
+ add_dist_conn(Node, Port, DatState)
+ end, State0, DistCtrl),
+ {ok, schedule_tick(State)}.
+
+handle_call(status, _From, State = #state{conns = Conns,
+ status_funs = StatusFuns}) ->
+ Out = [ [{socket,P} | conn_status(P, Conn, StatusFuns)]
+ || {P,Conn} <- gb_trees:to_list(Conns)],
+ {reply, Out , State};
+
+handle_call({socket_status, Socket}, _From, State = #state{conns = Conns,
+ status_funs = StatusFuns}) ->
+ Stats =
+ case gb_trees:lookup(Socket, Conns) of
+ none -> [];
+ {value, Conn} -> conn_status(Socket, Conn, StatusFuns)
+ end,
+ {reply, Stats, State};
+
+handle_call({monitor, Socket, Tag, Transport}, _From, State) ->
+ {reply, ok, add_conn(Socket, #conn{tag = Tag, type = normal,
+ transport = Transport}, State)}.
+
+handle_cast(Msg, State) ->
+ lager:warning("unknown message received: ~p", [Msg]),
+ {noreply, State}.
+
+handle_info({nodeup, Node, _InfoList}, State) ->
+ DistCtrl = erlang:system_info(dist_ctrl),
+ case proplists:get_value(Node, DistCtrl) of
+ undefined ->
+ lager:error("Could not get dist for ~p\n~p\n", [Node, DistCtrl]),
+ {noreply, State};
+ Port ->
+ {noreply, add_dist_conn(Port, Node, State)}
+ end;
+
+
+handle_info({nodedown, _Node, _InfoList}, _State) ->
+ {noreply, #state{}};
+handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
+ opts = Opts, conns = Conns}) ->
+ schedule_tick(State),
+ Fun = fun(Socket, Conn = #conn{type = Type, ts_hist = TSHist, hist = Hist}) when Type /= error ->
+ try
+ {ok, StatVals} = inet:getstat(Socket, Stats),
+ TS = os:timestamp(), % read between the two split the difference
+ {ok, OptVals} = inet:getopts(Socket, Opts),
+ Hist2 = update_hist(OptVals, Limit,
+ update_hist(StatVals, Limit, Hist)),
+ Conn#conn{ts_hist = prepend_trunc(TS, TSHist, Limit),
+ hist = Hist2}
+ catch
+ _E:_R ->
+ %io:format("Error ~p: ~p\n", [E, R]),
+ %% Any problems with getstat/getopts mark in error
+ erlang:send_after(State#state.clear_after,
+ self(),
+ {clear, Socket}),
+ Conn#conn{type = error}
+ end;
+ (_Socket, Conn) ->
+ Conn
+ end,
+ {noreply, State#state{conns = gb_trees:map(Fun, Conns)}};
+handle_info({clear, Socket}, State = #state{conns = Conns}) ->
+ {noreply, State#state{conns = gb_trees:delete_any(Socket, Conns)}}.
+
+terminate(_Reason, _State) ->
+ lager:info("Shutting down TCP Monitor"),
+ %% TODO: Consider trying to do something graceful with poolboy?
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Add a distributed connection to the state
+add_dist_conn(Node, Port, State) ->
+ add_conn(Port, #conn{tag = {node, Node}, type = dist}, State).
+
+%% Add connection to the state
+add_conn(Socket, Conn, State = #state{conns = Conns}) ->
+ State#state{conns = gb_trees:enter(Socket, Conn, Conns)}.
+
+%% Update the histogram with the list of name/values
+update_hist(Readings, Limit, Histories) ->
+ %% For all the readings of {Stat, Val} pairs
+ lists:foldl(
+ %% Prepend newest reading and truncate
+ fun ({Stat, Val}, Histories0) ->
+ orddict:update(Stat,
+ fun(Hist) ->
+ prepend_trunc(Val, Hist, Limit)
+ end,
+ [Val],
+ Histories0)
+ end, Histories, Readings).
+
+prepend_trunc(Val, List, Limit) ->
+ lists:sublist([Val | List], Limit).
+
+conn_status(Socket, #conn{tag = Tag, type = Type,
+ ts_hist = TsHist, hist = Histories,
+ transport = Transport}, StatusFuns) ->
+ Fun = fun({Stat, Hist}, Acc) ->
+ case dict:find(Stat, StatusFuns) of
+ {ok, {Alias, StatusFun}} ->
+ [{Alias, StatusFun(TsHist, Hist)} | Acc];
+ {ok, StatusFun} ->
+ [{Stat, StatusFun(TsHist, Hist)} | Acc];
+ _ ->
+ Acc
+ end
+ end,
+ Stats = lists:sort(lists:foldl(Fun, [], Histories)),
+ Conn = try % Socket could be dead, don't kill the TCP mon finding out
+ Peername = riak_core_util:peername(Socket, Transport),
+ Sockname = riak_core_util:sockname(Socket, Transport),
+ [{peername, Peername}, {sockname, Sockname}]
+ catch
+ _:_ ->
+ [{peername, "error"}, {sockname, "error"}]
+ end,
+ [{tag, Tag}, {type, Type}] ++ Conn ++ Stats.
+
+schedule_tick(State = #state{interval = Interval}) ->
+ erlang:send_after(Interval, self(), measurement_tick),
+ State.
+
+format_socket_stats([], Buf) -> lists:reverse(Buf);
+%format_socket_stats([{K,V}|T], Buf) when K == tag ->
+ %format_socket_stats(T, [{tag, V} | Buf]);
+format_socket_stats([{K,_V}|T], Buf) when
+ K == tag;
+ K == sndbuf;
+ K == recbuf;
+ K == buffer;
+ K == active;
+ K == type;
+ K == send_max;
+ K == send_avg ->
+ %% skip these
+ format_socket_stats(T, Buf);
+format_socket_stats([{K,V}|T], Buf) when
+ K == recv_avg;
+ K == recv_cnt;
+ K == recv_dvi;
+ K == recv_kbps;
+ K == recv_max;
+ K == send_kbps;
+ K == send_pend;
+ K == send_cnt ->
+ format_socket_stats(T, [{K, lists:flatten(format_list(V))} | Buf]);
+format_socket_stats([{K,V}|T], Buf) ->
+ format_socket_stats(T, [{K, V} | Buf]).
+
31 src/riak_core_util.erl
View
@@ -2,7 +2,7 @@
%%
%% riak_core: Core Riak Application
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@@ -48,7 +48,11 @@
multi_rpc_ann/5,
multicall_ann/4,
multicall_ann/5,
- is_arch/1]).
+ is_arch/1,
+ format_ip_and_port/2,
+ peername/2,
+ sockname/2
+ ]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -472,6 +476,29 @@ is_arch (osx) -> is_arch(darwin);
is_arch (solaris) -> is_arch(sunos);
is_arch (Arch) -> throw({unsupported_architecture,Arch}).
+format_ip_and_port(Ip, Port) when is_list(Ip) ->
+ lists:flatten(io_lib:format("~s:~p",[Ip,Port]));
+format_ip_and_port(Ip, Port) when is_tuple(Ip) ->
+ lists:flatten(io_lib:format("~s:~p",[inet_parse:ntoa(Ip),
+ Port])).
+peername(Socket, Transport) ->
+ case Transport:peername(Socket) of
+ {ok, {Ip, Port}} ->
+ format_ip_and_port(Ip, Port);
+ {error, Reason} ->
+ %% just return a string so JSON doesn't blow up
+ lists:flatten(io_lib:format("error:~p", [Reason]))
+ end.
+
+sockname(Socket, Transport) ->
+ case Transport:sockname(Socket) of
+ {ok, {Ip, Port}} ->
+ format_ip_and_port(Ip, Port);
+ {error, Reason} ->
+ %% just return a string so JSON doesn't blow up
+ lists:flatten(io_lib:format("error:~p", [Reason]))
+ end.
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
249 test/riak_core_connection_mgr_tests.erl
View
@@ -0,0 +1,249 @@
+%% -------------------------------------------------------------------
+%%
+%% Eunit test cases for the Connection Manager
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_connection_mgr_tests).
+-author("Chris Tilt").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TRACE(Stmt),Stmt).
+%%-define(TRACE(Stmt),ok).
+
+%% internal functions
+-export([testService/5,
+ connected/6, connect_failed/3
+ ]).
+
+%% Locator selector types
+-define(REMOTE_LOCATOR_TYPE, remote).
+-define(ADDR_LOCATOR_TYPE, addr).
+
+%% My cluster
+-define(MY_CLUSTER_NAME, "bob").
+-define(MY_CLUSTER_ADDR, {"127.0.0.1", 4097}).
+
+%% Remote cluster
+-define(REMOTE_CLUSTER_NAME, "betty").
+-define(REMOTE_CLUSTER_ADDR, {"127.0.0.1", 4096}).
+-define(REMOTE_ADDRS, [{"127.0.0.1",5001}, {"127.0.0.1",5002}, {"127.0.0.1",5003},
+ ?REMOTE_CLUSTER_ADDR]).
+
+-define(MAX_CONS, 2).
+-define(TCP_OPTIONS, [{keepalive, true},
+ {nodelay, true},
+ {packet, 4},
+ {reuseaddr, true},
+ {active, false}]).
+
+%% Tell the connection manager how to find out "remote" end points.
+%% For testing, we just make up some local addresses.
+register_remote_locator() ->
+ Remotes = orddict:from_list([{?REMOTE_CLUSTER_NAME, ?REMOTE_ADDRS}]),
+ Locator = fun(Name, _Policy) ->
+ case orddict:find(Name, Remotes) of
+ false ->
+ {error, {unknown, Name}};
+ OKEndpoints ->
+ OKEndpoints
+ end
+ end,
+ ok = riak_core_connection_mgr:register_locator(?REMOTE_LOCATOR_TYPE, Locator).
+
+register_addr_locator() ->
+ Locator = fun(Name, _Policy) -> {ok, [Name]} end,
+ ok = riak_core_connection_mgr:register_locator(?ADDR_LOCATOR_TYPE, Locator).
+
+register_empty_locator() ->
+ Locator = fun(_Name, _Policy) -> {ok, []} end,
+ ok = riak_core_connection_mgr:register_locator(?REMOTE_LOCATOR_TYPE, Locator).
+
+connections_test_() ->
+ {timeout, 6000, {setup, fun() ->
+ ok = application:start(ranch),
+ {ok, _} = riak_core_service_mgr:start_link(?REMOTE_CLUSTER_ADDR),
+ {ok, _} = riak_core_connection_mgr:start_link()
+ end,
+ fun(_) ->
+ riak_core_connection_mgr:stop(),
+ riak_core_service_mgr:stop(),
+ application:stop(ranch)
+ end,
+ fun(_) -> [
+
+ {"regsiter remote locator", fun() ->
Pedram Nimreezi
DeadZen added a note

typo on register

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ register_remote_locator(),
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+ Got = riak_core_connection_mgr:apply_locator(Target, Strategy),
+ ?assertEqual({ok, ?REMOTE_ADDRS}, Got)
+ end},
+
+ {"register locator addr", fun() ->
+ register_addr_locator(),
+ Target = {?ADDR_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR},
+ Strategy = default,
+ Got = riak_core_connection_mgr:apply_locator(Target, Strategy),
+ ?assertEqual({ok, [?REMOTE_CLUSTER_ADDR]}, Got)
+ end},
+
+ {"bad locator args", fun() ->
+ register_addr_locator(),
+ %% bad args for 'addr'
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR},
+ Strategy = default,
+ Expected = {error, {bad_target_name_args, remote, ?REMOTE_CLUSTER_ADDR}},
+ Got = riak_core_connection_mgr:apply_locator(Target, Strategy),
+ ?assertEqual(Expected, Got)
+ end},
+
+ {"is paused", ?_assertNot(riak_core_connection_mgr:is_paused())},
+
+ {"pause", fun() ->
+ riak_core_connection_mgr:pause(),
+ ?assert(riak_core_connection_mgr:is_paused())
+ end},
+
+ {"resume", fun() ->
+ riak_core_connection_mgr:resume(),
+ ?assertNot(riak_core_connection_mgr:is_paused())
+ end},
+
+ {"client connection", fun() ->
+ %% start a test service
+ start_service(),
+ %% do async connect via connection_mgr
+ ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]},
+ Target = {?ADDR_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR},
+ Strategy = default,
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ timer:sleep(1000)
+ end},
+
+ {"client connect via cluster name", fun() ->
+ start_service(),
+ %% do async connect via connection_mgr
+ ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]},
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ timer:sleep(1000)
+ end},
+
+ {"client retries", fun() ->
+ %% do async connect via connection_mgr
+ ExpectedArgs = {retry_test, [{1,0}, {1,0}]},
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ %% delay so the client will keep trying
+ ?TRACE(?debugMsg(" ------ sleeping 3 sec")),
+ timer:sleep(3000),
+ %% resume and confirm not paused, which should cause service to start and connection :-)
+ ?TRACE(?debugMsg(" ------ resuming services")),
+ start_service(),
+ %% allow connection to setup
+ ?TRACE(?debugMsg(" ------ sleeping 2 sec")),
+ timer:sleep(1000)
+ end},
+
+ {"empty locator", fun() ->
+ register_empty_locator(), %% replace remote locator with one that returns empty list
+ start_service(),
+ %% do async connect via connection_mgr
+ ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]},
+ Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME},
+ Strategy = default,
+ riak_core_connection_mgr:connect(Target,
+ {{testproto, [{1,0}]},
+ {?TCP_OPTIONS, ?MODULE, ExpectedArgs}},
+ Strategy),
+ %% allow conn manager to try and schedule a few retries
+ timer:sleep(1000),
+
+ %% restore the remote locator that gives a good endpoint
+ register_remote_locator(),
+ Got = riak_core_connection_mgr:apply_locator(Target, Strategy),
+ ?assertEqual({ok, ?REMOTE_ADDRS}, Got),
+
+ %% allow enough time for retry mechanism to kick in
+ timer:sleep(2000)
+ %% we should get a connection
+ end}
+
+ ] end} }.
+
+%%------------------------
+%% Helper functions
+%%------------------------
+
+start_service() ->
+ %% start dispatcher
+ ExpectedRevs = [{1,0}, {1,0}],
+ TestProtocol = {{testproto, [{1,0}]}, {?TCP_OPTIONS, ?MODULE, testService, ExpectedRevs}},
+ riak_core_service_mgr:register_service(TestProtocol, {round_robin,10}),
+ ?assert(riak_core_service_mgr:is_registered(testproto) == true).
+
+%% Protocol Service functions
+testService(_Socket, _Transport, {error, _Reason}, _Args, _Props) ->
+ ?assert(false);
+testService(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, _Props) ->
+ ?TRACE(?debugMsg("testService started")),
+ [ExpectedMyVer, ExpectedRemoteVer] = Args,
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer),
+ ?assert(Proto == testproto),
+ timer:sleep(2000),
+ {ok, self()}.
+
+%% Client side protocol callbacks
+connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, _Props) ->
+ ?TRACE(?debugFmt("testClient started, connected to ~p:~p", [_IP,_Port])),
+ {_TestType, [ExpectedMyVer, ExpectedRemoteVer]} = Args,
+ ?assert(Proto == testproto),
+ ?assert(ExpectedMyVer == MyVer),
+ ?assert(ExpectedRemoteVer == RemoteVer).
+
+connect_failed({_Proto,_Vers}, {error, Reason}, Args) ->
+
+ case Args of
+ expectedToFail ->
+ ?TRACE(?debugFmt("connect_failed: (EXPECTED) when expected to fail: ~p with ~p",
+ [Reason, Args])),
+ ?assert(Reason == econnrefused);
+ {retry_test, _Stuff} ->
+ ?TRACE(?debugFmt("connect_failed: (EXPECTED) during retry test: ~p",
+ [Reason])),
+ ok;
+ Other ->
+ ?TRACE(?debugFmt("connect_failed: (UNEXPECTED) ~p with args = ~p",
+ [Reason, Other])),
+ ?assert(false == Other)
+ end.
133 test/riak_core_connection_tests.erl
View
@@ -0,0 +1,133 @@
+%% -------------------------------------------------------------------
+%%
+%% Eunit test cases for the Conn Manager Connection
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the