diff --git a/ebin/riak_core.app b/ebin/riak_core.app index 56dca41a6..160ff2ca6 100644 --- a/ebin/riak_core.app +++ b/ebin/riak_core.app @@ -77,6 +77,11 @@ riak_core_vnode_worker_pool, riak_core_web, riak_core_wm_urlmap, + riak_core_connection_mgr, + riak_core_connection_mgr_stats, + riak_core_connection, + riak_core_service_mgr, + riak_core_tcp_mon, supervisor_pre_r14b04, vclock ]}, diff --git a/include/riak_core_connection.hrl b/include/riak_core_connection.hrl new file mode 100644 index 000000000..e150e172e --- /dev/null +++ b/include/riak_core_connection.hrl @@ -0,0 +1,100 @@ +%% ------------------------------------------------------------------- +%% +%% Riak Core Connection Manager +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% handshake messages to safely initiate a connection. Let's not accept +%% a connection to a telnet session by accident! +-define(CTRL_REV, {1,0}). +-define(CTRL_HELLO, <<"riak-ctrl:hello">>). +-define(CTRL_TELL_IP_ADDR, <<"riak-ctrl:ip_addr">>). +-define(CTRL_ACK, <<"riak-ctrl:ack">>). +-define(CTRL_ASK_NAME, <<"riak-ctrl:ask_name">>). +-define(CTRL_ASK_MEMBERS, <<"riak-ctrl:ask_members">>). + + +-define(CONNECTION_SETUP_TIMEOUT, 10000). + + + +-define(CTRL_OPTIONS, [binary, + {keepalive, true}, + {nodelay, true}, + {packet, 4}, + {reuseaddr, true}, + {active, false}]). + +%% Tcp options shared during the connection and negotiation phase +-define(CONNECT_OPTIONS, [binary, + {keepalive, true}, + {nodelay, true}, + {packet, 4}, + {reuseaddr, true}, + {active, false}]). + +-type(ip_addr_str() :: string()). +-type(ip_portnum() :: non_neg_integer()). +-type(ip_addr() :: {ip_addr_str(), ip_portnum()}). +-type(tcp_options() :: [any()]). + +-type(proto_id() :: atom()). +-type(rev() :: non_neg_integer()). %% major or minor revision number +-type(proto() :: {proto_id(), {rev(), rev()}}). %% e.g. {myproto, 1, 0} +-type(protoprefs() :: {proto_id(), [{rev(), rev()}]}). + + +%% Function = fun(Socket, Transport, Protocol, Args) -> ok +%% Protocol :: proto() +-type(service_started_callback() :: fun((inet:socket(), module(), proto(), [any()]) -> no_return())). + +%% Host protocol spec +-type(hostspec() :: {protoprefs(), {tcp_options(), module(), service_started_callback(), [any()]}}). + +%% Client protocol spec +-type(clientspec() :: {protoprefs(), {tcp_options(), module(),[any()]}}). + + +%% Scheduler strategies tell the connection manager how distribute the load. +%% +%% Client scheduler strategies +%% --------------------------- +%% default := service side decides how to choose node to connect to. limits number of +%% accepted connections to max_nb_cons(). +%% askme := the connection manager will call your client for a custom protocol strategy +%% and likewise will expect that the service side has plugged the cluster +%% manager with support for that custom strategy. UNSUPPORTED so far. +%% TODO: add a behaviour for the client to implement that includes this callback. +%% Service scheduler strategies +%% ---------------------------- +%% round_robin := choose the next available node on cluster to service request. limits +%% the number of accepted connections to max_nb_cons(). +%% custom := service must provide a strategy to the cluster manager for choosing nodes +%% UNSUPPORTED so far. Should we use a behaviour for the service module? +-type(max_nb_cons() :: non_neg_integer()). +-type(client_scheduler_strategy() :: default | askme). +-type(service_scheduler_strategy() :: {round_robin, max_nb_cons()} | custom). + +%% service manager statistics, can maybe get shared by other layers too +-record(stats, {open_connections = 0 : non_negative_integer() + }). + + +-type(cluster_finder_fun() :: fun(() -> {ok,node()} | {error, term()})). + diff --git a/rebar.config b/rebar.config index d6922e7f7..3389c13e3 100644 --- a/rebar.config +++ b/rebar.config @@ -12,5 +12,7 @@ {riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "master"}}}, {webmachine, ".*", {git, "git://github.com/basho/webmachine", {tag, "64176ef9b"}}}, - {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}} + {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}}, + {ranch, "0.4.0", {git, "git://github.com/extend/ranch.git", {tag, "0.4.0"}}} + ]}. diff --git a/src/riak_core_connection.erl b/src/riak_core_connection.erl new file mode 100644 index 000000000..9868812c7 --- /dev/null +++ b/src/riak_core_connection.erl @@ -0,0 +1,205 @@ +%% ------------------------------------------------------------------- +%% +%% Riak Subprotocol Server Dispatch and Client Connections +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_core_connection). +-author("Chris Tilt"). + +-include("riak_core_connection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-define(TRACE(Stmt),Stmt). +-else. +-define(TRACE(Stmt),ok). +-endif. + + +%% public API +-export([connect/2, + sync_connect/2]). + +%% internal functions +-export([async_connect_proc/3]). + +%%TODO: move to riak_ring_core...symbolic cluster naming +-export([set_symbolic_clustername/2, set_symbolic_clustername/1, + symbolic_clustername/1, symbolic_clustername/0]). + +%% @doc Sets the symbolic, human readable, name of this cluster. +set_symbolic_clustername(Ring, ClusterName) -> + {new_ring, + riak_core_ring:update_meta(symbolic_clustername, ClusterName, Ring)}. + +set_symbolic_clustername(ClusterName) when is_list(ClusterName) -> + {error, "argument is not a string"}; +set_symbolic_clustername(ClusterName) -> + case riak_core_ring_manager:get_my_ring() of + {ok, _Ring} -> + riak_core_ring_manager:ring_trans(fun riak_core_connection:set_symbolic_clustername/2, + ClusterName); + {error, Reason} -> + lager:error("Can't set symbolic clustername because: ~p", [Reason]) + end. + +symbolic_clustername(Ring) -> + case riak_core_ring:get_meta(symbolic_clustername, Ring) of + {ok, Name} -> Name; + undefined -> "undefined" + end. + +%% @doc Returns the symbolic, human readable, name of this cluster. +symbolic_clustername() -> + case riak_core_ring_manager:get_my_ring() of + {ok, Ring} -> + symbolic_clustername(Ring); + {error, Reason} -> + lager:error("Can't read symbolic clustername because: ~p", [Reason]), + "undefined" + end. + +%% Make async connection request. The connection manager is responsible for retry/backoff +%% and calls your module's functions on success or error (asynchrously): +%% Module:connected(Socket, TransportModule, {IpAddress, Port}, {Proto, MyVer, RemoteVer}, Args) +%% Module:connect_failed(Proto, {error, Reason}, Args) +%% Reason could be 'protocol_version_not_supported" +%% +%% +%% You can set options on the tcp connection, e.g. +%% [{packet, 4}, {active, false}, {keepalive, true}, {nodelay, true}] +%% +%% ClientProtocol specifies the preferences of the client, in terms of what versions +%% of a protocol it speaks. The host will choose the highest common major version and +%% inform the client via the callback Module:connected() in the HostProtocol parameter. +%% +%% Note: that the connection will initially be setup with the `binary` option +%% because protocol negotiation requires binary. TODO: should we allow non-binary +%% options? Check that binary is not overwritten. +%% +%% connect returns the pid() of the asynchronous process that will attempt the connection. + +-spec(connect(ip_addr(), clientspec()) -> pid()). +connect({IP,Port}, ClientSpec) -> + ?TRACE(?debugMsg("spawning async_connect link")), + %% start a process to handle the connection request asyncrhonously + proc_lib:spawn_link(?MODULE, async_connect_proc, [self(), {IP,Port}, ClientSpec]). + +sync_connect({IP,Port}, ClientSpec) -> + sync_connect_status(self(), {IP,Port}, ClientSpec). + +%% @private + +%% exchange brief handshake with client to ensure that we're supporting sub-protocols. +%% client -> server : Hello {1,0} [Capabilities] +%% server -> client : Ack {1,0} [Capabilities] +exchange_handshakes_with(host, Socket, Transport, MyCaps) -> + Hello = term_to_binary({?CTRL_HELLO, ?CTRL_REV, MyCaps}), + case Transport:send(Socket, Hello) of + ok -> + ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from host", [?CTRL_ACK])), + case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of + {ok, Ack} -> + case binary_to_term(Ack) of + {?CTRL_ACK, TheirRev, TheirCaps} -> + Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps], + {ok,Props}; + {error, _Reason} = Error -> + Error; + Msg -> + {error, Msg} + end; + {error, Reason} -> + {error, Reason} + end; + Error -> + Error + end. + +async_connect_proc(Parent, {IP,Port}, ProtocolSpec) -> + sync_connect_status(Parent, {IP,Port}, ProtocolSpec). + +%% connect synchronously to remote addr/port and return status +sync_connect_status(_Parent, {IP,Port}, {ClientProtocol, {Options, Module, Args}}) -> + Timeout = ?CONNECTION_SETUP_TIMEOUT, + Transport = ranch_tcp, + %% connect to host's {IP,Port} + ?TRACE(?debugFmt("sync_connect: connect to ~p", [{IP,Port}])), + case gen_tcp:connect(IP, Port, ?CONNECT_OPTIONS, Timeout) of + {ok, Socket} -> + ?TRACE(?debugFmt("Setting system options on client side: ~p", [?CONNECT_OPTIONS])), + Transport:setopts(Socket, ?CONNECT_OPTIONS), + %% handshake to make sure it's a riak sub-protocol dispatcher + MyName = symbolic_clustername(), + MyCaps = [{clustername, MyName}], + case exchange_handshakes_with(host, Socket, Transport, MyCaps) of + {ok,Props} -> + %% ask for protocol, see what host has + case negotiate_proto_with_server(Socket, Transport, ClientProtocol) of + {ok,HostProtocol} -> + %% set client's requested Tcp options + ?TRACE(?debugFmt("Setting user options on client side; ~p", [Options])), + Transport:setopts(Socket, Options), + %% notify requester of connection and negotiated protocol from host + %% pass back returned value in case problem detected on connection + %% by module. requestor is responsible for transferring control + %% of the socket. + Module:connected(Socket, Transport, {IP, Port}, HostProtocol, Args, Props); + {error, Reason} -> + ?TRACE(?debugFmt("negotiate_proto_with_server returned: ~p", [{error,Reason}])), + %% Module:connect_failed(ClientProtocol, {error, Reason}, Args), + {error, Reason} + end; + {error, closed} -> + %% socket got closed, don't report this + {error, closed}; + {error, Reason} -> + lager:error("Failed to exchange handshake with host. Error = ~p", [Reason]), + {error, Reason}; + Error -> + %% failed to exchange handshakes + lager:error("Failed to exchange handshake with host. Error = ~p", [Error]), + {error, Error} + end; + {error, Reason} -> + %% Module:connect_failed(ClientProtocol, {error, Reason}, Args), + {error, Reason} + end. + +%% Negotiate the highest common major protocol revisision with the connected server. +%% client -> server : Prefs List = {SubProto, [{Major, Minor}]} +%% server -> client : selected version = {SubProto, {Major, HostMinor, ClientMinor}} +%% +%% returns {ok,{Proto,{Major,ClientMinor},{Major,HostMinor}}} | {error, Reason} +negotiate_proto_with_server(Socket, Transport, ClientProtocol) -> + ?TRACE(?debugFmt("negotiate protocol with host, client proto = ~p", [ClientProtocol])), + Transport:send(Socket, erlang:term_to_binary(ClientProtocol)), + case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of + {ok, NegotiatedProtocolBin} -> + case erlang:binary_to_term(NegotiatedProtocolBin) of + {ok, {Proto,{CommonMajor,HMinor,CMinor}}} -> + {ok, {Proto,{CommonMajor,CMinor},{CommonMajor,HMinor}}}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + lager:error("Failed to receive protocol ~p response from server. Reason = ~p", + [ClientProtocol, Reason]), + {error, connection_failed} + end. diff --git a/src/riak_core_connection_mgr.erl b/src/riak_core_connection_mgr.erl new file mode 100644 index 000000000..2d900b834 --- /dev/null +++ b/src/riak_core_connection_mgr.erl @@ -0,0 +1,622 @@ +%% ------------------------------------------------------------------- +%% +%% Riak Subprotocol Server Dispatch and Client Connections +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_connection_mgr). +-author("Chris Tilt"). +-behaviour(gen_server). + +-include("riak_core_connection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% controls retry and backoff. +-define(INITIAL_BACKOFF, 1 * 1000). %% 1 second initial backoff per endpoint +-define(MAX_BACKOFF, 1 * 60 * 1000). %% 1 minute maximum backoff per endpoint +-define(EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, 10 * 1000). %% 10 seconds until retrying the list again + +%% retry delay if locator returned empty list +-ifdef(TEST). +-define(TRACE(Stmt),Stmt). +%%-define(TRACE(Stmt),ok). +-define(DEFAULT_RETRY_NO_ENDPOINTS, 2 * 1000). %% short for testing to avoid timeout +-else. +-define(TRACE(Stmt),ok). +-define(DEFAULT_RETRY_NO_ENDPOINTS, 5 * 1000). %% 5 seconds +-endif. + + +-define(SERVER, riak_core_connection_manager). +-define(MAX_LISTENERS, 100). + +-type(counter() :: non_neg_integer()). + +%% Connection manager strategy (per Jon M.) +%% when a connection request comes in, +%% + call the locator service to get the list of {transport, {address, port}} +%% + create a linked helper process to call riak_core_connection (just once) on the next available +%% connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary) +%% + on connection it transfers control of the socket back to the connmgr, casts a success message back +%% to the connection manager and exits normally. +%% - on success, the connection manager increments successful connects, reset the backoff timeout on +%% that connection. +%% - on failure, casts a failure message back to the connection manager (error, timeout etc) the +%% connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure +%% counter and starts a timer for the backoff time (and updates it for next time). The connection +%% manager checks for the next non--blacklisted endpoint in the connection request list to launch +%% a new connection, if the list is empty call the locator service again to get a new list. If all +%% connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff +%% time too). + +%% End-point status state, updated for failed and successful connection attempts, +%% or by timers that fire to update the backoff time. +%% TODO: add folsom window'd stats +%% handle an EXIT from the helper process if it dies +-record(ep, {addr, %% endpoint {IP, Port} + nb_curr_connections = 0 :: counter(), %% number of current connections + nb_success = 0 :: counter(), %% total successfull connects on this ep + nb_failures = 0 :: counter(), %% total failed connects on this ep + is_black_listed = false :: boolean(), %% true after a failed connection attempt + backoff_delay=0 :: counter(), %% incremented on each failure, reset to zero on success + failures = orddict:new() :: orddict:orddict(), %% failure reasons + last_fail_time :: erlang:timestamp(), %% time of last failure since 1970 + next_try_secs :: counter() %% time in seconds to next retry attempt + }). + +%% connection request record +-record(req, {ref, % Unique reference for this connection request + pid, % Helper pid trying to make connection + target, % target to connect to {Type, Name} + spec, % client spec + strategy, % connection strategy + cur, % current connection endpoint + state = init, % init | connecting | connected | cancelled + status % history of connection attempts + }). + + +%% connection manager state: +-record(state, {is_paused = false :: boolean(), + pending = [] :: [#req{}], % pending requests + %% endpoints :: {module(),ip_addr()} -> ep() + endpoints = orddict:new() :: orddict:orddict(), %% known endpoints w/status + locators = orddict:new() :: orddict:orddict(), %% connection locators + nb_total_succeeded = 0 :: counter(), + nb_total_failed = 0 :: counter() + }). + +-export([start_link/0, + resume/0, + pause/0, + is_paused/0, + connect/2, connect/3, + disconnect/1, + register_locator/2, + apply_locator/2, + reset_backoff/0, + stop/0 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% internal functions +-export([connection_helper/4, increase_backoff/1]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Starts the manager linked. +-spec(start_link() -> {ok, pid()}). +start_link() -> + Args = [], + Options = [], + gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options). + +%% @doc Begins or resumes accepting and establishing new connections, in +%% order to maintain the protocols that have been (or continue to be) registered +%% and unregistered. pause() will not kill any existing connections, but will +%% cease accepting new requests or retrying lost connections. +-spec(resume() -> ok). +resume() -> + gen_server:cast(?SERVER, resume). + +%% @doc Stop accepting / creating new connections; this does not terminated +%% existing ones. +-spec(pause() -> ok). +pause() -> + gen_server:cast(?SERVER, pause). + +%% @doc Return paused state +-spec is_paused() -> boolean(). +is_paused() -> + gen_server:call(?SERVER, is_paused). + +%% @doc Reset all backoff delays to zero. +-spec reset_backoff() -> 'ok'. +reset_backoff() -> + gen_server:cast(?SERVER, reset_backoff). + +%% Register a locator - for the given Name and strategy it returns {ok, [{IP,Port}]} +%% list of endpoints to connect to, in order. The list may be empty. +%% If the query can never be answered +%% return {error, Reason}. +%% fun(Name +register_locator(Type, Fun) -> + gen_server:call(?SERVER, {register_locator, Type, Fun}, infinity). + +apply_locator(Name, Strategy) -> + gen_server:call(?SERVER, {apply_locator, Name, Strategy}, infinity). + +%% @doc Establish a connection to the remote destination. be persistent about it, +%% but not too annoying to the remote end. Connect by name of cluster or +%% IP address. Use default strategy to find "best" peer for connection. +%% +%% Targets are found by applying a registered locator for it. +%% The identity locator is pre-installed, so if you want to connect to a list +%% of IP and Port addresses, supply a Target like this: {identity, [{IP, Port},...]}, +%% where IP::string() and Port::integer(). You can also pass {identity, {IP, Port}} +%% and the locator will use just that one IP. With a list, it will rotate +%% trying them all until a connection is established. +%% +%% Other locator types must be registered with this connection manager +%% before calling connect(). +%% +%% Supervision must be done by the calling process if desired. No supervision +%% is done here. +%% +-spec connect(Target :: string(), ClientSpec :: clientspec(), Strategy :: client_scheduler_strategy()) -> {'ok', reference()}. +connect(Target, ClientSpec, Strategy) -> + gen_server:call(?SERVER, {connect, Target, ClientSpec, Strategy}). + +%% @doc same as connect(Target, ClientSpec, default). +%% @see connect/3 +-spec connect(Target :: string(), ClientSpec :: clientspec()) -> {'ok', reference()}. +connect(Target, ClientSpec) -> + gen_server:call(?SERVER, {connect, Target, ClientSpec, default}). + +%% @doc Disconnect from the remote side. +-spec disconnect(Target :: string()) -> 'ok'. +disconnect(Target) -> + gen_server:cast(?SERVER, {disconnect, Target}). + +%% doc Stop the server and sever all connections. +-spec stop() -> 'ok'. +stop() -> + gen_server:call(?SERVER, stop). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + process_flag(trap_exit, true), + %% install default "identity" locator + Locator = fun identity_locator/2, + ?TRACE(?debugMsg("Starting")), + {ok, #state{is_paused = false, + locators = orddict:store(identity, Locator, orddict:new()) + }}. + +handle_call(is_paused, _From, State) -> + {reply, State#state.is_paused, State}; + +%% connect based on address. Return process id of helper +handle_call({connect, Target, ClientSpec, Strategy}, _From, State) -> + Reference = make_ref(), + Request = #req{ref = Reference, + target = Target, + pid = undefined, + spec = ClientSpec, + state = init, + strategy = Strategy}, + %% add request to pending queue so it may be found in restarts + State2 = State#state{pending = lists:keystore(Reference, #req.ref, + State#state.pending, + Request)}, + ?TRACE(?debugFmt("Starting connect request to ~p, ref is ~p", [Target, Reference])), + {reply, {ok, Reference}, start_request(Request, State2)}; + +handle_call({get_endpoint_backoff, Addr}, _From, State) -> + ?TRACE(?debugFmt("backing off ~p", [Addr])), + {reply, {ok, get_endpoint_backoff(Addr, State#state.endpoints)}, State}; + +handle_call({register_locator, Type, Fun}, _From, + State = #state{locators = Locators}) -> + {reply, ok, State#state{locators = orddict:store(Type, Fun, Locators)}}; + +handle_call({apply_locator, Target, Strategy}, _From, + State = #state{locators = Locators}) -> + AddrsOrError = locate_endpoints(Target, Strategy, Locators), + {reply, AddrsOrError, State}; + +handle_call(stop, _From, State) -> + %% TODO do we need to cleanup helper pids here? + {stop, normal, ok, State}; + +handle_call({should_try_endpoint, Ref, Addr}, _From, State = #state{pending=Pending}) -> + case lists:keyfind(Ref, #req.ref, Pending) of + false -> + %% This should never happen + {reply, false, State}; + Req -> + {Answer, ReqState} = + case Req#req.state of + cancelled -> + %% helper process hasn't cancelled itself yet. + {false, cancelled}; + _ -> + {true, connecting} + end, + {reply, Answer, State#state{pending = lists:keystore(Ref, #req.ref, Pending, + Req#req{cur = Addr, + state = ReqState})}} + end; + +handle_call(_Unhandled, _From, State) -> + ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])), + {reply, {error, unhandled}, State}. + +handle_cast(pause, State) -> + {noreply, State#state{is_paused = true}}; + +handle_cast(resume, State) -> + {noreply, State#state{is_paused = false}}; + +handle_cast({disconnect, Target}, State) -> + {noreply, disconnect_from_target(Target, State)}; + +%% reset all backoff delays to zero. +%% TODO: restart stalled connections. +handle_cast(reset_backoff, State) -> + NewEps = reset_backoff(State#state.endpoints), + {noreply, State#state{endpoints = NewEps}}; + +%% helper process says no endpoints were returned by the locators. +%% helper process will schedule a retry. +handle_cast({conmgr_no_endpoints, _Ref}, State) -> + %% mark connection as black-listed and start timer for reset + {noreply, State}; + +%% helper process says it failed to reach an address. +handle_cast({endpoint_failed, Addr, Reason, ProtocolId}, State) -> + ?TRACE(?debugFmt("Failing endpoint ~p for protocol ~p with reason ~p", [Addr, ProtocolId, Reason])), + %% mark connection as black-listed and start timer for reset + {noreply, fail_endpoint(Addr, Reason, ProtocolId, State)}. + +%% it is time to remove Addr from the black-listed addresses +handle_info({backoff_timer, Addr}, State = #state{endpoints = EPs}) -> + case orddict:find(Addr, EPs) of + {ok, EP} -> + EP2 = EP#ep{is_black_listed = false}, + {noreply, State#state{endpoints = orddict:store(Addr,EP2,EPs)}}; + error -> + %% TODO: Should never happen because the Addr came from the EP list. + {norepy, State} + end; +handle_info({retry_req, Ref}, State = #state{pending = Pending}) -> + case lists:keyfind(Ref, #req.ref, Pending) of + false -> + %% TODO: should never happen + {noreply, State}; + Req -> + {noreply, start_request(Req, State)} + end; + +%%% All of the connection helpers end here +%% cases: +%% helper succeeded -> update EP stats: BL<-false, backoff_delay<-0 +%% helper failed -> updates EP stats: failures++, backoff_delay++ +%% other Pid failed -> pass on linked error +handle_info({'EXIT', From, Reason}, State = #state{pending = Pending}) -> + %% Work out which endpoint it was + case lists:keytake(From, #req.pid, Pending) of + false -> + %% Must have been something we were linked to, or linked to us + ?TRACE(?debugFmt("Connection Manager exiting because linked process ~p exited for reason: ~p", + [From, Reason])), + lager:error("Connection Manager exiting because linked process ~p exited for reason: ~p", + [From, Reason]), + exit({linked, From, Reason}); + {value, #req{cur = Cur, ref = Ref}=Req, Pending2} -> + {{ProtocolId, _Foo},_Bar} = Req#req.spec, + case Reason of + ok -> + %% update the stats module + Stat = conn_success, + ?TRACE(?debugMsg("Trying for stats update, the connect_endpoint")), + riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId), + %% riak_core_connection set up and handlers called + {noreply, connect_endpoint(Cur, State#state{pending = Pending2})}; + + {ok, cancelled} -> + %% helper process has been cancelled and has exited nicely. + %% update the stats module + Stat = conn_cancelled, + ?TRACE(?debugMsg("Trying for stats update")), + riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId), + %% toss out the cancelled request from pending. + {noreply, State#state{pending = Pending2}}; + + {error, endpoints_exhausted, Ref} -> + %% tried all known endpoints. schedule a retry. + %% reuse the existing request Reference, Ref. + case Req#req.state of + cancelled -> + %% oops. that request was cancelled. No retry + {noreply, State#state{pending = Pending2}}; + _ -> + ?TRACE(?debugMsg("Scheduling retry")), + {noreply, schedule_retry(?EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, Ref, State)} + end; + + Reason -> % something bad happened to the connection, reuse the request + ?TRACE(?debugFmt("handle_info: EP failed on ~p for ~p. removed Ref ~p", + [Cur, Reason, Ref])), + lager:warning("handle_info: endpoint ~p failed: ~p. removed Ref ~p", + [Cur, Reason, Ref]), + State2 = fail_endpoint(Cur, Reason, ProtocolId, State), + %% the connection helper will not retry. It's up the caller. + State3 = fail_request(Reason, Req, State2), + {noreply, State3} + end + end; +handle_info(_Unhandled, State) -> + ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])), + lager:error("Unhandled gen_server info: ~p", [_Unhandled]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Private +%%%=================================================================== + +identity_locator({IP,Port}, _Policy) -> + {ok, [{IP,Port}]}; +identity_locator([Ips], _Policy) -> + {ok, Ips}. + +%% close the pending connection and cancel the request +disconnect_from_target(Target, State = #state{pending = Pending}) -> + lager:debug("Disconnecting from: ~p", [Target]), + case lists:keyfind(Target, #req.target, Pending) of + false -> + %% already gone! + State; + Req -> + %% The helper process will discover the cancellation when it asks if it + %% should connect to an endpoint. + State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending, + Req#req{state = cancelled})} + end. + +%% schedule a retry to occur after Interval milliseconds. +%% do not clear the pid from pending. the exit handler will do that. +schedule_retry(Interval, Ref, State = #state{pending = Pending}) -> + case lists:keyfind(Ref, #req.ref, Pending) of + false -> + %% this should never happen + lager:error("ConnectionManager: failed to find connection ref while scheduling retry."), + State; + Req -> + case Req#req.state of + cancelled -> + %% the request was cancelled, so no rescheduling wanted. + State; + _ -> + %% reschedule request to happen in the future + erlang:send_after(Interval, self(), {retry_req, Ref}), + State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending, + Req#req{cur = undefined})} + end + end. + +%% Start process to make connection to available endpoints. Return a reference for +%% this connection attempt. +start_request(#req{state=cancelled}, State) -> + State; +start_request(Req = #req{ref=Ref, target=Target, spec=ClientSpec, strategy=Strategy}, + State) -> + case locate_endpoints(Target, Strategy, State#state.locators) of + {ok, []} -> + %% locators provided no addresses + gen_server:cast(?SERVER, {conmgr_no_endpoints, Ref}), + Interval = app_helper:get_env(riak_core, connmgr_no_endpoint_retry, + ?DEFAULT_RETRY_NO_ENDPOINTS), + lager:debug("Connection Manager located no endpoints for: ~p. Will retry.", [Target]), + %% schedule a retry and exit + schedule_retry(Interval, Ref, State); + {ok, EpAddrs } -> + lager:debug("Connection Manager located endpoints: ~p", [EpAddrs]), + AllEps = update_endpoints(EpAddrs, State#state.endpoints), + TryAddrs = filter_blacklisted_endpoints(EpAddrs, AllEps), + lager:debug("Connection Manager trying endpoints: ~p", [TryAddrs]), + Pid = spawn_link( + fun() -> exit(try connection_helper(Ref, ClientSpec, Strategy, TryAddrs) + catch T:R -> {exception, {T, R}} + end) + end), + State#state{endpoints = AllEps, + pending = lists:keystore(Ref, #req.ref, State#state.pending, + Req#req{pid = Pid, + state = connecting, + cur = undefined})}; + {error, Reason} -> + fail_request(Reason, Req, State) + end. + +%% reset the backoff delay to zero for all endpoints +reset_backoff(Endpoints) -> + orddict:map(fun(_Addr,EP) -> EP#ep{backoff_delay = 0} end,Endpoints). + +%% increase the backoff delay, but cap at a maximum +increase_backoff(0) -> + ?INITIAL_BACKOFF; +increase_backoff(Delay) when Delay > ?MAX_BACKOFF -> + ?MAX_BACKOFF; +increase_backoff(Delay) -> + 2 * Delay. + +%% Convert an inet:address to a string if needed. +string_of_ip(IP) when is_tuple(IP) -> + inet_parse:ntoa(IP); +string_of_ip(IP) -> + IP. + +string_of_ipport({IP,Port}) -> + string_of_ip(IP) ++ ":" ++ erlang:integer_to_list(Port). + +%% A spawned process that will walk down the list of endpoints and try them +%% all until exhausting the list. This process is responsible for waiting for +%% the backoff delay for each endpoint. +connection_helper(Ref, _Protocol, _Strategy, []) -> + %% exhausted the list of endpoints. let server start new helper process + {error, endpoints_exhausted, Ref}; +connection_helper(Ref, Protocol, Strategy, [Addr|Addrs]) -> + {{ProtocolId, _Foo},_Bar} = Protocol, + %% delay by the backoff_delay for this endpoint. + {ok, BackoffDelay} = gen_server:call(?SERVER, {get_endpoint_backoff, Addr}), + lager:debug("Holding off ~p seconds before trying ~p at ~p", + [(BackoffDelay/1000), ProtocolId, string_of_ipport(Addr)]), + timer:sleep(BackoffDelay), + case gen_server:call(?SERVER, {should_try_endpoint, Ref, Addr}) of + true -> + lager:debug("Trying connection to: ~p at ~p", [ProtocolId, string_of_ipport(Addr)]), + ?TRACE(?debugMsg("Attempting riak_core_connection:sync_connect/2")), + case riak_core_connection:sync_connect(Addr, Protocol) of + ok -> + ok; + {error, Reason} -> + %% notify connection manager this EP failed and try next one + gen_server:cast(?SERVER, {endpoint_failed, Addr, Reason, ProtocolId}), + connection_helper(Ref, Protocol, Strategy, Addrs) + end; + _ -> + %% connection request has been cancelled + lager:debug("Ignoring connection to: ~p at ~p because it was cancelled", + [ProtocolId, string_of_ipport(Addr)]), + {ok, cancelled} + end. + +locate_endpoints({Type, Name}, Strategy, Locators) -> + case orddict:find(Type, Locators) of + {ok, Locate} -> + case Locate(Name, Strategy) of + error -> + {error, {bad_target_name_args, Type, Name}}; + Addrs -> + Addrs + end; + error -> + {error, {unknown_target_type, Type}} + end. + +%% Make note of the failed connection attempt and update +%% our book keeping for that endpoint. Black-list it, and +%% adjust a backoff timer so that we wait a while before +%% trying this endpoint again. +fail_endpoint(Addr, Reason, ProtocolId, State) -> + %% update the stats module + Stat = {conn_error, Reason}, + riak_core_connection_mgr_stats:update(Stat, Addr, ProtocolId), + %% update the endpoint + Fun = fun(EP=#ep{backoff_delay = Backoff, failures = Failures}) -> + erlang:send_after(Backoff, self(), {backoff_timer, Addr}), + EP#ep{failures = orddict:update_counter(Reason, 1, Failures), + nb_failures = EP#ep.nb_failures + 1, + backoff_delay = increase_backoff(Backoff), + last_fail_time = os:timestamp(), + next_try_secs = Backoff/1000, + is_black_listed = true} + end, + update_endpoint(Addr, Fun, State). + +connect_endpoint(Addr, State) -> + update_endpoint(Addr, fun(EP) -> + EP#ep{is_black_listed = false, + nb_success = EP#ep.nb_success + 1, + next_try_secs = 0, + backoff_delay = 0} + end, State). + +%% Return the current backoff delay for the named Address, +%% or if we can't find that address in the endpoints - the +%% initial backoff. +get_endpoint_backoff(Addr, EPs) -> + case orddict:find(Addr, EPs) of + error -> + 0; + {ok, EP} -> + EP#ep.backoff_delay + end. + +update_endpoint(Addr, Fun, State = #state{endpoints = EPs}) -> + case orddict:find(Addr, EPs) of + error -> + EP2 = Fun(#ep{addr = Addr}), + State#state{endpoints = orddict:store(Addr,EP2,EPs)}; + {ok, EP} -> + EP2 = Fun(EP), + State#state{endpoints = orddict:store(Addr,EP2,EPs)} + end. + +fail_request(Reason, #req{ref = Ref, spec = Spec}, + State = #state{pending = Pending}) -> + %% Tell the module it failed + {Proto, {_TcpOptions, Module,Args}} = Spec, + ?TRACE(?debugFmt("module ~p getting connect_failed", [Module])), + Module:connect_failed(Proto, {error, Reason}, Args), + %% Remove the request from the pending list + State#state{pending = lists:keydelete(Ref, #req.ref, Pending)}. + +update_endpoints(Addrs, Endpoints) -> + %% add addr to Endpoints if not already there + Fun = (fun(Addr, EPs) -> + case orddict:is_key(Addr, Endpoints) of + true -> EPs; + false -> + EP = #ep{addr=Addr}, + orddict:store(Addr, EP, EPs) + end + end), + lists:foldl(Fun, Endpoints, Addrs). + +%% Return the addresses of non-blacklisted endpoints that are also +%% members of the list EpAddrs. +filter_blacklisted_endpoints(EpAddrs, AllEps) -> + PredicateFun = (fun(Addr) -> + case orddict:find(Addr, AllEps) of + {ok, EP} -> + EP#ep.is_black_listed == false; + error -> + false + end + end), + lists:filter(PredicateFun, EpAddrs). diff --git a/src/riak_core_connection_mgr_stats.erl b/src/riak_core_connection_mgr_stats.erl new file mode 100644 index 000000000..dab964939 --- /dev/null +++ b/src/riak_core_connection_mgr_stats.erl @@ -0,0 +1,273 @@ +%% ------------------------------------------------------------------- +%% +%% Riak Core Connection Manager Statistics +%% collect, aggregate, and provide stats for connections made by +%% the connection manager +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_connection_mgr_stats). +-author("Chris Tilt"). + +-behaviour(gen_server). + +%% API +-export([start_link/0, + get_stats/0, + get_stats_by_ip/1, + get_stats_by_protocol/1, + get_consolidated_stats/0, + update/3, + register_stats/0, + produce_stats/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(APP, riak_conn_mgr_stats). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +register_stats() -> + [(catch folsom_metrics:delete_metric(Stat)) || Stat <- folsom_metrics:get_metrics(), + is_tuple(Stat), element(1, Stat) == ?APP], + [register_stat({?APP, Name}, Type) || {Name, Type} <- stats()], + riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}). + +%% @spec get_stats() -> proplist() +%% @doc Return all stats from the cached value. This will refresh +%% the cache if it's been over 5 seconds since the last query. +%% When the cache needs to get the latest values, it will call our +%% produce_stats() function. +get_stats() -> + case riak_core_stat_cache:get_stats(?APP) of + {ok, Stats, _TS} -> + Stats; + Error -> Error + end. + +get_consolidated_stats() -> + Strings = [format_stat(Stat) || Stat <- get_stats()], + stats_as_atoms(Strings). + +%% Sort and convert {K,V} stats to atoms +stats_as_atoms(StringStats) -> + lists:map(fun({S,V}) -> {list_to_atom(S), V} end, lists:sort(StringStats)). + +format_stat({{?APP, conn_error, StatName}, [{count,N},{one,_W}]}) -> + {"conn_error_" ++ atom_to_list(StatName), N}; +format_stat({{?APP, conn_error, StatName, total}, N}) -> + {"conn_error_" ++ atom_to_list(StatName) ++ "_total", N}; +format_stat({{?APP, conn_error, StatName, Addr, total}, N}) when not is_atom(Addr) -> + {string_of_ipaddr(Addr) ++ "_" + ++ "conn_error_" + ++ atom_to_list(StatName) + ++ "_total", N}; +format_stat({{?APP, conn_error, StatName, Addr, ProtocolId, total},N}) -> + {string_of_ipaddr(Addr) ++ "_" + ++ "conn_error_" + ++ atom_to_list(ProtocolId) ++ "_" + ++ atom_to_list(StatName) + ++ "_total" , N}; +format_stat({{?APP, conn_error, StatName, ProtocolId, total},N}) -> + {atom_to_list(ProtocolId) ++ "_" + ++ "conn_error_" + ++ atom_to_list(StatName) + ++ "_total", N}; +format_stat({{?APP, conn_error, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) -> + {string_of_ipaddr(Addr) ++ "_" + ++ "conn_error_" + ++ atom_to_list(ProtocolId) ++ "_" + ++ atom_to_list(StatName), N}; +format_stat({{?APP, conn_error, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) -> + {string_of_ipaddr(Addr) ++ "_" + ++ "conn_error_" + ++ atom_to_list(StatName), N}; +format_stat({{?APP, conn_error, StatName, ProtocolId},[{count,N},{one,_W}]}) -> + {"conn_error_" ++ atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N}; + +format_stat({{?APP, StatName},[{count,N},{one,_W}]}) -> + {atom_to_list(StatName), N}; +format_stat({{?APP, StatName, total},N}) -> + {atom_to_list(StatName) ++ "_total", N}; +format_stat({{?APP, StatName, Addr, total},N}) when not is_atom(Addr) -> + {string_of_ipaddr(Addr) + ++ "_" ++ atom_to_list(StatName) ++ "_total", N}; +format_stat({{?APP, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) -> + {string_of_ipaddr(Addr) ++ "_" ++ atom_to_list(StatName),N}; +format_stat({{?APP, StatName, ProtocolId, total},N}) when is_atom(ProtocolId) -> + {atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName) ++ "_total", N}; +format_stat({{?APP, StatName, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) -> + {atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N}; +format_stat({{?APP, StatName, Addr, ProtocolId, total},N}) when is_atom(ProtocolId) -> + {string_of_ipaddr(Addr) + ++ "_" ++ atom_to_list(ProtocolId) + ++ "_" ++ atom_to_list(StatName) + ++ "_total", N}; +format_stat({{?APP, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) -> + {string_of_ipaddr(Addr) + ++ "_" ++ atom_to_list(ProtocolId) + ++ "_" ++ atom_to_list(StatName), N}. + +string_of_ipaddr({IP, Port}) when is_list(IP) -> + lists:flatten(io_lib:format("~s:~p", [IP, Port])); +string_of_ipaddr({IP, Port}) when is_tuple(IP) -> + lists:flatten(io_lib:format("~s:~p", [inet_parse:ntoa(IP), Port])). + +%% Get stats filtered by given IP address +get_stats_by_ip({_IP, _Port}=Addr) -> + AllStats = get_stats(), + Stats = lists:filter(fun(S) -> predicate_by_ip(S,Addr) end, AllStats), + stats_as_atoms([format_stat(Stat) || Stat <- Stats]). + +predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, total},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, conn_error, _StatName, MatchAddr},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId},_Value}, _MatchAddr) -> + false; +predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId, total},_Value}, _MatchAddr) -> + false; +predicate_by_ip({{_App, _StatName, MatchAddr},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, _StatName, MatchAddr, total},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) -> + true; +predicate_by_ip({{_App, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) -> + true; +predicate_by_ip(_X, _MatchAddr) -> + false. + +%% Get stats filtered by given protocol-id (e.g. rt_repl) +get_stats_by_protocol(ProtocolId) -> + AllStats = get_stats(), + Stats = lists:filter(fun(S) -> predicate_by_protocol(S,ProtocolId) end, AllStats), + stats_as_atoms([format_stat(Stat) || Stat <- Stats]). + +predicate_by_protocol({{_App, conn_error, _StatName, _Addr, MatchId},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, conn_error, _StatName, _Addr, MatchId, total},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, conn_error, _StatName, MatchId},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, conn_error, _StatName, MatchId, total},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, conn_error, _StatName, _Addr},_Value}, _MatchId) -> + false; +predicate_by_protocol({{_App, conn_error, _StatName, _Addr, total},_Value}, _MatchId) -> + false; +predicate_by_protocol({{_App, _StatName, MatchId},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, _StatName, MatchId, total},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, _StatName, _Addr, MatchId},_Value}, MatchId) -> + true; +predicate_by_protocol({{_App, _StatName, _Addr, MatchId, total},_Value}, MatchId) -> + true; +predicate_by_protocol(_X, _MatchId) -> + false. + +%% Public interface to accumulate stats +update(Stat, Addr, ProtocolId) -> + gen_server:cast(?SERVER, {update, Stat, Addr, ProtocolId}). + +%% gen_server + +init([]) -> + register_stats(), + {ok, ok}. + +handle_call(_Req, _From, State) -> + {reply, ok, State}. + +handle_cast({update, Stat, Addr, ProtocolId}, State) -> + do_update(Stat, Addr, ProtocolId), + {noreply, State}; +handle_cast(_Req, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Update a stat for given IP-Address, Cluster, and Protocol-id +do_update({conn_error, Error}, IPAddr, Protocol) -> + create_or_update({?APP, conn_error, Error, total}, {inc, 1}, counter), + create_or_update({?APP, conn_error, Error}, 1, spiral), + create_or_update({?APP, conn_error, Error, IPAddr, total}, {inc, 1}, counter), + create_or_update({?APP, conn_error, Error, IPAddr}, 1, spiral), + create_or_update({?APP, conn_error, Error, Protocol, total}, {inc, 1}, counter), + create_or_update({?APP, conn_error, Error, Protocol}, 1, spiral), + create_or_update({?APP, conn_error, Error, IPAddr, Protocol, total}, {inc, 1}, counter), + create_or_update({?APP, conn_error, Error, IPAddr, Protocol}, 1, spiral); + +do_update(Stat, IPAddr, Protocol) -> + create_or_update({?APP, Stat, total}, {inc, 1}, counter), + create_or_update({?APP, Stat}, 1, spiral), + create_or_update({?APP, Stat, Protocol, total}, {inc, 1}, counter), + create_or_update({?APP, Stat, Protocol}, 1, spiral), + create_or_update({?APP, Stat, IPAddr, total}, {inc, 1}, counter), + create_or_update({?APP, Stat, IPAddr }, 1, spiral), + create_or_update({?APP, Stat, IPAddr, Protocol, total}, {inc, 1}, counter), + create_or_update({?APP, Stat, IPAddr, Protocol}, 1, spiral). + +%% private + +%% dynamically update (and create if needed) a stat +create_or_update(Name, UpdateVal, Type) -> + case (catch folsom_metrics:notify_existing_metric(Name, UpdateVal, Type)) of + ok -> + ok; + {'EXIT', _} -> + register_stat(Name, Type), + create_or_update(Name, UpdateVal, Type) + end. + +register_stat(Name, spiral) -> + folsom_metrics:new_spiral(Name); +register_stat(Name, counter) -> + folsom_metrics:new_counter(Name). + +%% @spec produce_stats() -> proplist() +%% @doc Produce a proplist-formatted view of the current aggregation +%% of stats. +produce_stats() -> + Stats = [Stat || Stat <- folsom_metrics:get_metrics(), is_tuple(Stat), element(1, Stat) == ?APP], + lists:flatten([{Stat, get_stat(Stat)} || Stat <- Stats]). + +%% Get the value of the named stats metric +%% NOTE: won't work for Histograms +get_stat(Name) -> + folsom_metrics:get_metric_value(Name). + +%% Return list of static stat names and types to register +stats() -> []. %% no static stats to register diff --git a/src/riak_core_service_mgr.erl b/src/riak_core_service_mgr.erl new file mode 100644 index 000000000..9966e6b55 --- /dev/null +++ b/src/riak_core_service_mgr.erl @@ -0,0 +1,453 @@ +%% ------------------------------------------------------------------- +%% +%% Riak Subprotocol Server Dispatcher +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Listens on a single TCP port and negotiates which protocol to start +%% on a new connection. Ranch is used to create a connection pool and accept +%% new socket connections. When a connection is accepted, the client supplies +%% a hello with thier revision and capabilities. The server replies in kind. +%% The client then sends the service they wish to use, and which versions of +%% the service they support. The server will find the highest major version in +%% common, and highest major version in common. If there is no major version in +%% common, the connectin fails. Minor versions do not need to match. On a +%% success, the server sends the Major version, Client minor version, and +%% Host minor version to the client. After that, the registered +%% module:function/5 is called and control of the socket passed to it. + + +-module(riak_core_service_mgr). +-author("Chris Tilt"). +-behaviour(gen_server). + +-include("riak_core_connection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-define(TRACE(Stmt),Stmt). +%%-define(TRACE(Stmt),ok). +-else. +-define(TRACE(Stmt),ok). +-endif. + +-define(SERVER, riak_core_service_manager). +-define(MAX_LISTENERS, 100). + +%% services := registered protocols, key :: proto_id() +-record(state, {dispatch_addr = {"localhost", 9000} :: ip_addr(), + services = orddict:new() :: orddict:orddict(), + dispatcher_pid = undefined :: pid(), + status_notifiers = [], + service_stats = orddict:new() :: orddict:orddict(), % proto-id -> stats() + refs = [] + }). + +-export([start_link/0, start_link/1, + register_service/2, + unregister_service/1, + is_registered/1, + register_stats_fun/1, + get_stats/0, + stop/0 + ]). + +%% ranch callbacks +-export([start_link/4]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% internal +-export([dispatch_service/4]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Start the Service Manager on the default/configured Ip Address and Port. +%% All sub-protocols will be dispatched from there. +-spec(start_link() -> {ok, pid()}). +start_link() -> + ServiceAddr = case app_helper:get_env(riak_core, cluster_mgr) of + undefined -> + lager:error("cluster_mgr is not configured for riak_core in + app.config, defaulting to {\"127.0.0.1\", 0}."), + {"127.0.0.1", 0}; + Res -> + Res + end, + start_link(ServiceAddr). + +%% @doc Start the Service Manager on the given Ip Address and Port. +%% All sub-protocols will be dispatched from there. +-spec(start_link(ip_addr()) -> {ok, pid()}). +start_link({IP,Port}) when is_integer(Port), Port >= 0 -> + case valid_host_ip(IP) of + false -> + erlang:error({badarg, invalid_ip}); + true -> + ?TRACE(?debugFmt("Starting Core Service Manager at ~p", [{IP,Port}])), + lager:info("Starting Core Service Manager at ~p", [{IP,Port}]), + Args = [{IP,Port}], + Options = [], + gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options) + end. + +%% @doc Once a protocol specification is registered, it will be kept available +%% by the Service Manager. Note that the callee is responsible for taking +%% ownership of the socket via Transport:controlling_process(Socket, Pid). +%% Only the strategy of `round_robin' is supported; it's arg is ignored. +-spec(register_service(hostspec(), service_scheduler_strategy()) -> ok). +register_service(HostProtocol, Strategy) -> + %% only one strategy is supported as yet + {round_robin, _NB} = Strategy, + gen_server:cast(?SERVER, {register_service, HostProtocol, Strategy}). + +%% @doc Unregister the given protocol-id. Existing connections for this +%% protocol are not killed. New connections for this protocol will not be +%% accepted until re-registered. +-spec(unregister_service(proto_id()) -> ok). +unregister_service(ProtocolId) -> + gen_server:cast(?SERVER, {unregister_service, ProtocolId}). + +%% @doc True if the given protocal id is registered. +-spec(is_registered(proto_id()) -> boolean()). +is_registered(ProtocolId) -> + gen_server:call(?SERVER, {is_registered, service, ProtocolId}). + +%% @doc Register a callback function that will get called periodically or +%% when the connection status of services changes. The function will +%% receive a list of tuples: {, } where stats +%% holds the number of open connections that have been accepted for that +%% protocol type. This can be used to report load, in the form of +%% connected-ness, for each protocol type, to remote clusters, e.g., +%% making it possible for schedulers to balance the number of +%% connections across a cluster. +-spec register_stats_fun(Fun :: fun(([{proto_id(), non_neg_integer()}]) -> any())) -> 'ok'. +register_stats_fun(Fun) -> + gen_server:cast(?SERVER, {register_stats_fun, Fun}). + +%% @doc Number of open connections for each protocol id. +-spec get_stats() -> [{proto_id(), non_neg_integer()}]. +get_stats() -> + gen_server:call(?SERVER, get_stats). + +%% @doc Stop the ranch listener, and then exit the server normally. +-spec stop() -> 'ok'. +stop() -> + gen_server:call(?SERVER, stop). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([IpAddr]) -> + {ok, Pid} = start_dispatcher(IpAddr, ?MAX_LISTENERS, []), + {ok, #state{dispatch_addr = IpAddr, dispatcher_pid=Pid}}. + +handle_call({is_registered, service, ProtocolId}, _From, State) -> + Found = orddict:is_key(ProtocolId, State#state.services), + {reply, Found, State}; + +handle_call(get_services, _From, State) -> + {reply, orddict:to_list(State#state.services), State}; + +handle_call(stop, _From, State) -> + ranch:stop_listener(State#state.dispatch_addr), + {stop, normal, ok, State}; + +handle_call(get_stats, _From, State) -> + Stats = orddict:to_list(State#state.service_stats), + PStats = [{Protocol, Count} || {Protocol,{_Stats,Count}} <- Stats], + {reply, PStats, State}; + +handle_call(_Unhandled, _From, State) -> + ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])), + {reply, {error, unhandled}, State}. + +handle_cast({register_service, Protocol, Strategy}, State) -> + {{ProtocolId,_Revs},_Rest} = Protocol, + NewDict = orddict:store(ProtocolId, {Protocol, Strategy}, State#state.services), + {noreply, State#state{services=NewDict}}; + +handle_cast({unregister_service, ProtocolId}, State) -> + NewDict = orddict:erase(ProtocolId, State#state.services), + {noreply, State#state{services=NewDict}}; + +handle_cast({register_stats_fun, Fun}, State) -> + Notifiers = [Fun | State#state.status_notifiers], + erlang:send_after(500, self(), status_update_timer), + {noreply, State#state{status_notifiers=Notifiers}}; + +%% TODO: unregister support for notifiers? +%% handle_cast({unregister_node_status_fun, Fun}, State) -> +%% Notifiers = [Fun | State#state.notifiers], +%% {noreply, State#state{status_notifiers=Notifiers}}; + +handle_cast({service_up_event, Pid, ProtocolId}, State) -> + ?TRACE(?debugFmt("Service up event: ~p", [ProtocolId])), + erlang:send_after(500, self(), status_update_timer), + Ref = erlang:monitor(process, Pid), %% arrange for us to receive 'DOWN' when Pid terminates + ServiceStats = incr_count_for_protocol_id(ProtocolId, 1, State#state.service_stats), + Refs = [{Ref,ProtocolId} | State#state.refs], + {noreply, State#state{service_stats=ServiceStats, refs=Refs}}; + +handle_cast({service_down_event, _Pid, ProtocolId}, State) -> + ?TRACE(?debugFmt("Service down event: ~p", [ProtocolId])), + erlang:send_after(500, self(), status_update_timer), + ServiceStats = incr_count_for_protocol_id(ProtocolId, -1, State#state.service_stats), + {noreply, State#state{service_stats = ServiceStats}}; + +handle_cast(_Unhandled, _State) -> + ?TRACE(?debugFmt("Unhandled gen_server cast: ~p", [_Unhandled])), + {error, unhandled}. %% this will crash the server + +handle_info(status_update_timer, State) -> + %% notify all registered parties of this node's services counts + Stats = orddict:to_list(State#state.service_stats), + PStats = [ {Protocol, Count} || {Protocol,{_Stats,Count}} <- Stats], + [NotifyFun(PStats) || NotifyFun <- State#state.status_notifiers], + {noreply, State}; + +%% Get notified of a service that went down. +%% Remove it's Ref and pass the event on to get counted by ProtocolId +handle_info({'DOWN', Ref, process, Pid, _Reason}, State) -> + Refs = State#state.refs, + Refs2 = case lists:keytake(Ref, 1, Refs) of + {value, {Ref, ProtocolId}, Rest} -> + gen_server:cast(?SERVER, {service_down_event, Pid, ProtocolId}), + Rest; + error -> + Refs + end, + {noreply, State#state{refs=Refs2}}; + +handle_info(_Unhandled, State) -> + ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Private +%%%=================================================================== + +incr_count_for_protocol_id(ProtocolId, Incr, ServiceStatus) -> + Stats2 = case orddict:find(ProtocolId, ServiceStatus) of + {ok, Stats} -> + Count = Stats#stats.open_connections, + Stats#stats{open_connections = Count + Incr}; + error -> + #stats{open_connections = Incr} + end, + orddict:store(ProtocolId, Stats2, ServiceStatus). + +%% @private +%% Host callback function, called by ranch for each accepted connection by way of +%% of the ranch:start_listener() call above, specifying this module. +start_link(Listener, Socket, Transport, SubProtocols) -> + ?TRACE(?debugMsg("Start_link dispatch_service")), + {ok, spawn_link(?MODULE, dispatch_service, [Listener, Socket, Transport, SubProtocols])}. + +%% Body of the main dispatch loop. This is instantiated once for each connection +%% we accept because it transforms itself into the SubProtocol once it receives +%% the sub protocol and version, negotiated with the client. +dispatch_service(Listener, Socket, Transport, _Args) -> + %% tell ranch "we've got it. thanks pardner" + ok = ranch:accept_ack(Listener), + %% set some starting options for the channel; these should match the client + ?TRACE(?debugFmt("setting system options on service side: ~p", [?CONNECT_OPTIONS])), + ok = Transport:setopts(Socket, ?CONNECT_OPTIONS), + %% Version 1.0 capabilities just passes our clustername + MyName = riak_core_connection:symbolic_clustername(), + MyCaps = [{clustername, MyName}], + case exchange_handshakes_with(client, Socket, Transport, MyCaps) of + {ok,Props} -> + %% get latest set of registered services from gen_server and do negotiation + Services = gen_server:call(?SERVER, get_services), + SubProtocols = [Protocol || {_Key,{Protocol,_Strategy}} <- Services], + ?TRACE(?debugFmt("started dispatch_service with protocols: ~p", + [SubProtocols])), + Negotiated = negotiate_proto_with_client(Socket, Transport, SubProtocols), + ?TRACE(?debugFmt("negotiated = ~p", [Negotiated])), + start_negotiated_service(Socket, Transport, Negotiated, Props); + Error -> + Error + end. + +%% start user's module:function and transfer socket to it's process. +start_negotiated_service(_Socket, _Transport, {error, Reason}, _Props) -> + ?TRACE(?debugFmt("service dispatch failed with ~p", [{error, Reason}])), + lager:error("service dispatch failed with ~p", [{error, Reason}]), + {error, Reason}; +%% Note that the callee is responsible for taking ownership of the socket via +%% Transport:controlling_process(Socket, Pid), +start_negotiated_service(Socket, Transport, + {NegotiatedProtocols, {Options, Module, Function, Args}}, + Props) -> + %% Set requested Tcp socket options now that we've finished handshake phase + ?TRACE(?debugFmt("Setting user options on service side; ~p", [Options])), + ?TRACE(?debugFmt("negotiated protocols: ~p", [NegotiatedProtocols])), + Transport:setopts(Socket, Options), + %% call service body function for matching protocol. The callee should start + %% a process or gen_server or such, and return {ok, pid()}. + case Module:Function(Socket, Transport, NegotiatedProtocols, Args, Props) of + {ok, Pid} -> + {ok,{ClientProto,_Client,_Host}} = NegotiatedProtocols, + gen_server:cast(?SERVER, {service_up_event, Pid, ClientProto}), + {ok, Pid}; + Error -> + ?TRACE(?debugFmt("service dispatch of ~p:~p failed with ~p", + [Module, Function, Error])), + lager:error("service dispatch of ~p:~p failed with ~p", + [Module, Function, Error]), + Error + end. + +%% Negotiate the highest common major protocol revisision with the connected client. +%% client -> server : Prefs List = {SubProto, [{Major, Minor}]} as binary +%% server -> client : selected version = {SubProto, {Major, HostMinor}} as binary +%% +%% returns {ok,{{Proto,MyVer,RemoteVer},Options,Module,Function,Args}} | Error +negotiate_proto_with_client(Socket, Transport, HostProtocols) -> + case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of + {ok, PrefsBin} -> + {ClientProto,Versions} = erlang:binary_to_term(PrefsBin), + case choose_version({ClientProto,Versions}, HostProtocols) of + {error, Reason} -> + lager:error("Failed to negotiate protocol ~p from client because ~p", + [ClientProto, Reason]), + Transport:send(Socket, erlang:term_to_binary({error,Reason})), + {error, Reason}; + {ok,{ClientProto,Major,CN,HN}, Rest} -> + Transport:send(Socket, erlang:term_to_binary({ok,{ClientProto,{Major,HN,CN}}})), + {{ok,{ClientProto,{Major,HN},{Major,CN}}}, Rest}; + {error, Reason, Rest} -> + lager:error("Failed to negotiate protocol ~p from client because ~p", + [ClientProto, Reason]), + %% notify client it failed to negotiate + Transport:send(Socket, erlang:term_to_binary({error,Reason})), + {{error, Reason}, Rest} + end; + {error, Reason} -> + lager:error("Failed to receive protocol request from client. Error = ~p", + [Reason]), + {error, connection_failed} + end. + +choose_version({ClientProto,ClientVersions}=_CProtocol, HostProtocols) -> + ?TRACE(?debugFmt("choose_version: client proto = ~p, HostProtocols = ~p", + [_CProtocol, HostProtocols])), + %% first, see if the host supports the subprotocol + case [H || {{HostProto,_Versions},_Rest}=H <- HostProtocols, ClientProto == HostProto] of + [] -> + %% oops! The host does not support this sub protocol type + lager:error("Failed to find host support for protocol: ~p", [ClientProto]), + ?TRACE(?debugMsg("choose_version: no common protocols")), + {error,protocol_not_supported}; + [{{_HostProto,HostVersions},Rest}=_Matched | _DuplicatesIgnored] -> + ?TRACE(?debugFmt("choose_version: unsorted = ~p clientversions = ~p", + [_Matched, ClientVersions])), + CommonVers = [{CM,CN,HN} || {CM,CN} <- ClientVersions, {HM,HN} <- HostVersions, CM == HM], + ?TRACE(?debugFmt("common versions = ~p", [CommonVers])), + %% sort by major version, highest to lowest, and grab the top one. + case lists:reverse(lists:keysort(1,CommonVers)) of + [] -> + %% oops! No common major versions for Proto. + ?TRACE(?debugFmt("Failed to find a common major version for protocol: ~p", + [ClientProto])), + lager:error("Failed to find a common major version for protocol: ~p", [ClientProto]), + {error,protocol_version_not_supported,Rest}; + [{Major,CN,HN}] -> + {ok, {ClientProto,Major,CN,HN},Rest}; + [{Major,CN,HN} | _] -> + {ok, {ClientProto,Major,CN,HN},Rest} + end + end. + +%% exchange brief handshake with client to ensure that we're supporting sub-protocols. +%% client -> server : Hello {1,0} [Capabilities] +%% server -> client : Ack {1,0} [Capabilities] +exchange_handshakes_with(client, Socket, Transport, MyCaps) -> + ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from client", [?CTRL_HELLO])), + case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of + {ok, Hello} -> + %% read their hello + case binary_to_term(Hello) of + {?CTRL_HELLO, TheirRev, TheirCaps} -> + Ack = term_to_binary({?CTRL_ACK, ?CTRL_REV, MyCaps}), + Transport:send(Socket, Ack), + %% make some props to hand dispatched service + Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps], + {ok,Props}; + Msg -> + %% tell other side we are failing them + Error = {error, bad_handshake}, + Transport:send(Socket, erlang:term_to_binary(Error)), + lager:error("Control protocol handshake with client got unexpected hello: ~p", + [Msg]), + Error + end; + {error, Reason} -> + lager:error("Failed to exchange handshake with client. Error = ~p", [Reason]), + {error, Reason} + end. + +%% Returns true if the IP address given is a valid host IP address. +valid_host_ip("0.0.0.0") -> + true; +valid_host_ip(IP) -> + {ok, IFs} = inet:getifaddrs(), + {ok, NormIP} = normalize_ip(IP), + lists:foldl( + fun({_IF, Attrs}, Match) -> + case lists:member({addr, NormIP}, Attrs) of + true -> + true; + _ -> + Match + end + end, false, IFs). + +%% Convert IP address the tuple form +normalize_ip(IP) when is_list(IP) -> + inet_parse:address(IP); +normalize_ip(IP) when is_tuple(IP) -> + {ok, IP}. + +%% @doc Start the connection dispatcher with a limit of MaxListeners +%% listener connections and supported sub-protocols. When a connection +%% request arrives, it is mapped via the associated Protocol atom to an +%% acceptor function called as Module:Function(Listener, Socket, Transport, Args), +%% which must create it's own process and return {ok, pid()} + +-spec(start_dispatcher(ip_addr(), non_neg_integer(), [hostspec()]) -> {ok, pid()}). +start_dispatcher({IP,Port}, MaxListeners, SubProtocols) -> + {ok, RawAddress} = inet_parse:address(IP), + {ok, Pid} = ranch:start_listener({IP,Port}, MaxListeners, ranch_tcp, + [{ip, RawAddress}, {port, Port}], + ?MODULE, SubProtocols), + lager:info("Service manager: listening on ~s:~p", [IP, Port]), + {ok, Pid}. diff --git a/src/riak_core_tcp_mon.erl b/src/riak_core_tcp_mon.erl new file mode 100644 index 000000000..9d2053732 --- /dev/null +++ b/src/riak_core_tcp_mon.erl @@ -0,0 +1,319 @@ +%% ------------------------------------------------------------------- +%% +%% TCP Connection Monitor +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + + +-module(riak_core_tcp_mon). + +-export([start_link/0, start_link/1, monitor/3, status/0, status/1, format/0, format/2]). +-export([default_status_funs/0, raw/2, diff/2, rate/2, kbps/2, + socket_status/1, format_socket_stats/2 ]). + +%% gen_server callbacks +-behavior(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% Keep 6 x 10s worth of data plus an extra sample for working out rates +-define(DEFAULT_LIMIT, 7). +-define(DEFAULT_INTERVAL, timer:seconds(10)). +-define(DEFAULT_CLEAR, timer:seconds(60)). + +-define(INET_STATS, [recv_oct,recv_cnt,recv_max,recv_avg,recv_dvi, + send_oct,send_cnt,send_max,send_avg,send_pend]). +-define(INET_OPTS, [sndbuf,recbuf,active,buffer]). + +-define(STATUS_FUNS, [{recv_oct, {recv_kbps, fun kbps/2}}, {recv_cnt, fun diff/2}, + {recv_max, fun raw/2}, {recv_avg, fun raw/2}, {recv_dvi, fun raw/2}, + {send_oct, {send_kbps, fun kbps/2}}, {send_cnt, fun diff/2}, + {send_max, fun raw/2}, {send_avg, fun raw/2}, {send_pend, fun raw/2}, + {sndbuf, fun raw/2}, {recbuf, fun raw/2}, {active, fun raw/2}, + {buffer, fun raw/2}]). + +-record(state, {conns = gb_trees:empty(), % conn records keyed by Socket + tags = gb_trees:empty(), % tags to ports + interval = ?DEFAULT_INTERVAL, % how often to get stats + limit = ?DEFAULT_LIMIT, % + clear_after = ?DEFAULT_CLEAR, % how long to leave errored sockets in status + stats = ?INET_STATS, % Stats to read + opts = ?INET_OPTS, % Opts to read + status_funs = dict:from_list(default_status_funs()) % Status reporting functions + }). + +-record(conn, {tag, %% Tag used to find socket + transport, + type, %% Type - normal, dist, error + ts_hist = [], %% History of timestamps for readings + hist = []}). %% History of readings + + +start_link() -> + start_link([]). + +start_link(Props) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Props, []). + +monitor(Socket, Tag, Transport) -> + gen_server:call(?MODULE, {monitor, Socket, Tag, Transport}). + +status() -> + gen_server:call(?MODULE, status). + +status(Timeout) -> + gen_server:call(?MODULE, status, Timeout). + +socket_status(Socket) -> + gen_server:call(?MODULE, {socket_status, Socket}). + +format() -> + Status = status(), + io:fwrite([format(Status, recv_kbps), + format(Status, send_kbps)]). + +format(Status, Stat) -> + [format_header(Stat), + [format_entry(Entry, Stat) || Entry <- Status]]. + +format_header(Stat) -> + io_lib:format("~40w Value\n", [Stat]). + +format_entry({_Socket, Status}, Stat) -> + Tag = proplists:get_value(tag, Status), + Value = proplists:get_value(Stat, Status), + case Value of + Value when is_list(Value) -> + [io_lib:format("~40s [", [Tag]), + format_list(Value), + "]\n"]; + _ -> + [io_lib:format("~40s", [Tag]), + format_value(Value), + "\n"] + end. + +format_value(Val) when is_float(Val) -> + io_lib:format("~7.1f", [Val]); +format_value(Val) -> + io_lib:format("~w", [Val]). + +format_list(Value) -> + [$[, string:join([format_value(Item) || Item <- Value], ", "), $]]. + +%% Provide a way to get to the default status fun +default_status_funs() -> + ?STATUS_FUNS. + +%% Return raw readings, ignore timestamps +raw(_TS, Hist) -> + Hist. + +diff(TS, Hist) -> + RevTS = lists:reverse(TS), + RevHist = lists:reverse(Hist), + diff(RevTS, RevHist, []). + +diff([_TS], [_C], Acc) -> + Acc; +diff([_TS1 | TSRest], [C1 | CRest], Acc) -> + Diff = hd(CRest) - C1, + diff(TSRest, CRest, [Diff | Acc]). + +%% Convert byte rate to bit rate +kbps(TS, Hist) -> + [trunc(R / 128.0) || R <- rate(TS, Hist)]. % *8 bits / 1024 bytes + +%% Work out the rate of something per second +rate(TS, Hist) -> + RevTS = lists:reverse(TS), + RevHist = lists:reverse(Hist), + rate(RevTS, RevHist, []). + +rate([_TS], [_C], Acc) -> + Acc; +rate([TS1 | TSRest], [C1 | CRest], Acc) -> + Secs = timer:now_diff(hd(TSRest), TS1) / 1.0e6, + Rate = (hd(CRest) - C1) / Secs, + rate(TSRest, CRest, [Rate | Acc]). + +init(Props) -> + lager:info("Starting TCP Monitor"), + ok = net_kernel:monitor_nodes(true, [{node_type, visible}, nodedown_reason]), + State0 = #state{interval = proplists:get_value(interval, Props, ?DEFAULT_INTERVAL), + limit = proplists:get_value(limit, Props, ?DEFAULT_LIMIT), + clear_after = proplists:get_value(clear_after, Props, ?DEFAULT_LIMIT)}, + DistCtrl = erlang:system_info(dist_ctrl), + State = lists:foldl(fun({Node,Port}, DatState) -> + add_dist_conn(Node, Port, DatState) + end, State0, DistCtrl), + {ok, schedule_tick(State)}. + +handle_call(status, _From, State = #state{conns = Conns, + status_funs = StatusFuns}) -> + Out = [ [{socket,P} | conn_status(P, Conn, StatusFuns)] + || {P,Conn} <- gb_trees:to_list(Conns)], + {reply, Out , State}; + +handle_call({socket_status, Socket}, _From, State = #state{conns = Conns, + status_funs = StatusFuns}) -> + Stats = + case gb_trees:lookup(Socket, Conns) of + none -> []; + {value, Conn} -> conn_status(Socket, Conn, StatusFuns) + end, + {reply, Stats, State}; + +handle_call({monitor, Socket, Tag, Transport}, _From, State) -> + {reply, ok, add_conn(Socket, #conn{tag = Tag, type = normal, + transport = Transport}, State)}. + +handle_cast(Msg, State) -> + lager:warning("unknown message received: ~p", [Msg]), + {noreply, State}. + +handle_info({nodeup, Node, _InfoList}, State) -> + DistCtrl = erlang:system_info(dist_ctrl), + case proplists:get_value(Node, DistCtrl) of + undefined -> + lager:error("Could not get dist for ~p\n~p\n", [Node, DistCtrl]), + {noreply, State}; + Port -> + {noreply, add_dist_conn(Port, Node, State)} + end; + + +handle_info({nodedown, _Node, _InfoList}, _State) -> + {noreply, #state{}}; +handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats, + opts = Opts, conns = Conns}) -> + schedule_tick(State), + Fun = fun(Socket, Conn = #conn{type = Type, ts_hist = TSHist, hist = Hist}) when Type /= error -> + try + {ok, StatVals} = inet:getstat(Socket, Stats), + TS = os:timestamp(), % read between the two split the difference + {ok, OptVals} = inet:getopts(Socket, Opts), + Hist2 = update_hist(OptVals, Limit, + update_hist(StatVals, Limit, Hist)), + Conn#conn{ts_hist = prepend_trunc(TS, TSHist, Limit), + hist = Hist2} + catch + _E:_R -> + %io:format("Error ~p: ~p\n", [E, R]), + %% Any problems with getstat/getopts mark in error + erlang:send_after(State#state.clear_after, + self(), + {clear, Socket}), + Conn#conn{type = error} + end; + (_Socket, Conn) -> + Conn + end, + {noreply, State#state{conns = gb_trees:map(Fun, Conns)}}; +handle_info({clear, Socket}, State = #state{conns = Conns}) -> + {noreply, State#state{conns = gb_trees:delete_any(Socket, Conns)}}. + +terminate(_Reason, _State) -> + lager:info("Shutting down TCP Monitor"), + %% TODO: Consider trying to do something graceful with poolboy? + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Add a distributed connection to the state +add_dist_conn(Node, Port, State) -> + add_conn(Port, #conn{tag = {node, Node}, type = dist}, State). + +%% Add connection to the state +add_conn(Socket, Conn, State = #state{conns = Conns}) -> + State#state{conns = gb_trees:enter(Socket, Conn, Conns)}. + +%% Update the histogram with the list of name/values +update_hist(Readings, Limit, Histories) -> + %% For all the readings of {Stat, Val} pairs + lists:foldl( + %% Prepend newest reading and truncate + fun ({Stat, Val}, Histories0) -> + orddict:update(Stat, + fun(Hist) -> + prepend_trunc(Val, Hist, Limit) + end, + [Val], + Histories0) + end, Histories, Readings). + +prepend_trunc(Val, List, Limit) -> + lists:sublist([Val | List], Limit). + +conn_status(Socket, #conn{tag = Tag, type = Type, + ts_hist = TsHist, hist = Histories, + transport = Transport}, StatusFuns) -> + Fun = fun({Stat, Hist}, Acc) -> + case dict:find(Stat, StatusFuns) of + {ok, {Alias, StatusFun}} -> + [{Alias, StatusFun(TsHist, Hist)} | Acc]; + {ok, StatusFun} -> + [{Stat, StatusFun(TsHist, Hist)} | Acc]; + _ -> + Acc + end + end, + Stats = lists:sort(lists:foldl(Fun, [], Histories)), + Conn = try % Socket could be dead, don't kill the TCP mon finding out + Peername = riak_core_util:peername(Socket, Transport), + Sockname = riak_core_util:sockname(Socket, Transport), + [{peername, Peername}, {sockname, Sockname}] + catch + _:_ -> + [{peername, "error"}, {sockname, "error"}] + end, + [{tag, Tag}, {type, Type}] ++ Conn ++ Stats. + +schedule_tick(State = #state{interval = Interval}) -> + erlang:send_after(Interval, self(), measurement_tick), + State. + +format_socket_stats([], Buf) -> lists:reverse(Buf); +%format_socket_stats([{K,V}|T], Buf) when K == tag -> + %format_socket_stats(T, [{tag, V} | Buf]); +format_socket_stats([{K,_V}|T], Buf) when + K == tag; + K == sndbuf; + K == recbuf; + K == buffer; + K == active; + K == type; + K == send_max; + K == send_avg -> + %% skip these + format_socket_stats(T, Buf); +format_socket_stats([{K,V}|T], Buf) when + K == recv_avg; + K == recv_cnt; + K == recv_dvi; + K == recv_kbps; + K == recv_max; + K == send_kbps; + K == send_pend; + K == send_cnt -> + format_socket_stats(T, [{K, lists:flatten(format_list(V))} | Buf]); +format_socket_stats([{K,V}|T], Buf) -> + format_socket_stats(T, [{K, V} | Buf]). + diff --git a/src/riak_core_util.erl b/src/riak_core_util.erl index b7882ac45..63fc27e49 100644 --- a/src/riak_core_util.erl +++ b/src/riak_core_util.erl @@ -2,7 +2,7 @@ %% %% riak_core: Core Riak Application %% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -48,7 +48,11 @@ multi_rpc_ann/5, multicall_ann/4, multicall_ann/5, - is_arch/1]). + is_arch/1, + format_ip_and_port/2, + peername/2, + sockname/2 + ]). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -472,6 +476,29 @@ is_arch (osx) -> is_arch(darwin); is_arch (solaris) -> is_arch(sunos); is_arch (Arch) -> throw({unsupported_architecture,Arch}). +format_ip_and_port(Ip, Port) when is_list(Ip) -> + lists:flatten(io_lib:format("~s:~p",[Ip,Port])); +format_ip_and_port(Ip, Port) when is_tuple(Ip) -> + lists:flatten(io_lib:format("~s:~p",[inet_parse:ntoa(Ip), + Port])). +peername(Socket, Transport) -> + case Transport:peername(Socket) of + {ok, {Ip, Port}} -> + format_ip_and_port(Ip, Port); + {error, Reason} -> + %% just return a string so JSON doesn't blow up + lists:flatten(io_lib:format("error:~p", [Reason])) + end. + +sockname(Socket, Transport) -> + case Transport:sockname(Socket) of + {ok, {Ip, Port}} -> + format_ip_and_port(Ip, Port); + {error, Reason} -> + %% just return a string so JSON doesn't blow up + lists:flatten(io_lib:format("error:~p", [Reason])) + end. + %% =================================================================== %% EUnit tests %% =================================================================== diff --git a/test/riak_core_connection_mgr_tests.erl b/test/riak_core_connection_mgr_tests.erl new file mode 100644 index 000000000..fa63e1333 --- /dev/null +++ b/test/riak_core_connection_mgr_tests.erl @@ -0,0 +1,249 @@ +%% ------------------------------------------------------------------- +%% +%% Eunit test cases for the Connection Manager +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_connection_mgr_tests). +-author("Chris Tilt"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TRACE(Stmt),Stmt). +%%-define(TRACE(Stmt),ok). + +%% internal functions +-export([testService/5, + connected/6, connect_failed/3 + ]). + +%% Locator selector types +-define(REMOTE_LOCATOR_TYPE, remote). +-define(ADDR_LOCATOR_TYPE, addr). + +%% My cluster +-define(MY_CLUSTER_NAME, "bob"). +-define(MY_CLUSTER_ADDR, {"127.0.0.1", 4097}). + +%% Remote cluster +-define(REMOTE_CLUSTER_NAME, "betty"). +-define(REMOTE_CLUSTER_ADDR, {"127.0.0.1", 4096}). +-define(REMOTE_ADDRS, [{"127.0.0.1",5001}, {"127.0.0.1",5002}, {"127.0.0.1",5003}, + ?REMOTE_CLUSTER_ADDR]). + +-define(MAX_CONS, 2). +-define(TCP_OPTIONS, [{keepalive, true}, + {nodelay, true}, + {packet, 4}, + {reuseaddr, true}, + {active, false}]). + +%% Tell the connection manager how to find out "remote" end points. +%% For testing, we just make up some local addresses. +register_remote_locator() -> + Remotes = orddict:from_list([{?REMOTE_CLUSTER_NAME, ?REMOTE_ADDRS}]), + Locator = fun(Name, _Policy) -> + case orddict:find(Name, Remotes) of + false -> + {error, {unknown, Name}}; + OKEndpoints -> + OKEndpoints + end + end, + ok = riak_core_connection_mgr:register_locator(?REMOTE_LOCATOR_TYPE, Locator). + +register_addr_locator() -> + Locator = fun(Name, _Policy) -> {ok, [Name]} end, + ok = riak_core_connection_mgr:register_locator(?ADDR_LOCATOR_TYPE, Locator). + +register_empty_locator() -> + Locator = fun(_Name, _Policy) -> {ok, []} end, + ok = riak_core_connection_mgr:register_locator(?REMOTE_LOCATOR_TYPE, Locator). + +connections_test_() -> + {timeout, 6000, {setup, fun() -> + ok = application:start(ranch), + {ok, _} = riak_core_service_mgr:start_link(?REMOTE_CLUSTER_ADDR), + {ok, _} = riak_core_connection_mgr:start_link() + end, + fun(_) -> + riak_core_connection_mgr:stop(), + riak_core_service_mgr:stop(), + application:stop(ranch) + end, + fun(_) -> [ + + {"regsiter remote locator", fun() -> + register_remote_locator(), + Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME}, + Strategy = default, + Got = riak_core_connection_mgr:apply_locator(Target, Strategy), + ?assertEqual({ok, ?REMOTE_ADDRS}, Got) + end}, + + {"register locator addr", fun() -> + register_addr_locator(), + Target = {?ADDR_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR}, + Strategy = default, + Got = riak_core_connection_mgr:apply_locator(Target, Strategy), + ?assertEqual({ok, [?REMOTE_CLUSTER_ADDR]}, Got) + end}, + + {"bad locator args", fun() -> + register_addr_locator(), + %% bad args for 'addr' + Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR}, + Strategy = default, + Expected = {error, {bad_target_name_args, remote, ?REMOTE_CLUSTER_ADDR}}, + Got = riak_core_connection_mgr:apply_locator(Target, Strategy), + ?assertEqual(Expected, Got) + end}, + + {"is paused", ?_assertNot(riak_core_connection_mgr:is_paused())}, + + {"pause", fun() -> + riak_core_connection_mgr:pause(), + ?assert(riak_core_connection_mgr:is_paused()) + end}, + + {"resume", fun() -> + riak_core_connection_mgr:resume(), + ?assertNot(riak_core_connection_mgr:is_paused()) + end}, + + {"client connection", fun() -> + %% start a test service + start_service(), + %% do async connect via connection_mgr + ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]}, + Target = {?ADDR_LOCATOR_TYPE, ?REMOTE_CLUSTER_ADDR}, + Strategy = default, + riak_core_connection_mgr:connect(Target, + {{testproto, [{1,0}]}, + {?TCP_OPTIONS, ?MODULE, ExpectedArgs}}, + Strategy), + timer:sleep(1000) + end}, + + {"client connect via cluster name", fun() -> + start_service(), + %% do async connect via connection_mgr + ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]}, + Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME}, + Strategy = default, + riak_core_connection_mgr:connect(Target, + {{testproto, [{1,0}]}, + {?TCP_OPTIONS, ?MODULE, ExpectedArgs}}, + Strategy), + timer:sleep(1000) + end}, + + {"client retries", fun() -> + %% do async connect via connection_mgr + ExpectedArgs = {retry_test, [{1,0}, {1,0}]}, + Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME}, + Strategy = default, + + riak_core_connection_mgr:connect(Target, + {{testproto, [{1,0}]}, + {?TCP_OPTIONS, ?MODULE, ExpectedArgs}}, + Strategy), + %% delay so the client will keep trying + ?TRACE(?debugMsg(" ------ sleeping 3 sec")), + timer:sleep(3000), + %% resume and confirm not paused, which should cause service to start and connection :-) + ?TRACE(?debugMsg(" ------ resuming services")), + start_service(), + %% allow connection to setup + ?TRACE(?debugMsg(" ------ sleeping 2 sec")), + timer:sleep(1000) + end}, + + {"empty locator", fun() -> + register_empty_locator(), %% replace remote locator with one that returns empty list + start_service(), + %% do async connect via connection_mgr + ExpectedArgs = {expectedToPass, [{1,0}, {1,0}]}, + Target = {?REMOTE_LOCATOR_TYPE, ?REMOTE_CLUSTER_NAME}, + Strategy = default, + riak_core_connection_mgr:connect(Target, + {{testproto, [{1,0}]}, + {?TCP_OPTIONS, ?MODULE, ExpectedArgs}}, + Strategy), + %% allow conn manager to try and schedule a few retries + timer:sleep(1000), + + %% restore the remote locator that gives a good endpoint + register_remote_locator(), + Got = riak_core_connection_mgr:apply_locator(Target, Strategy), + ?assertEqual({ok, ?REMOTE_ADDRS}, Got), + + %% allow enough time for retry mechanism to kick in + timer:sleep(2000) + %% we should get a connection + end} + + ] end} }. + +%%------------------------ +%% Helper functions +%%------------------------ + +start_service() -> + %% start dispatcher + ExpectedRevs = [{1,0}, {1,0}], + TestProtocol = {{testproto, [{1,0}]}, {?TCP_OPTIONS, ?MODULE, testService, ExpectedRevs}}, + riak_core_service_mgr:register_service(TestProtocol, {round_robin,10}), + ?assert(riak_core_service_mgr:is_registered(testproto) == true). + +%% Protocol Service functions +testService(_Socket, _Transport, {error, _Reason}, _Args, _Props) -> + ?assert(false); +testService(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, _Props) -> + ?TRACE(?debugMsg("testService started")), + [ExpectedMyVer, ExpectedRemoteVer] = Args, + ?assert(ExpectedMyVer == MyVer), + ?assert(ExpectedRemoteVer == RemoteVer), + ?assert(Proto == testproto), + timer:sleep(2000), + {ok, self()}. + +%% Client side protocol callbacks +connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, _Props) -> + ?TRACE(?debugFmt("testClient started, connected to ~p:~p", [_IP,_Port])), + {_TestType, [ExpectedMyVer, ExpectedRemoteVer]} = Args, + ?assert(Proto == testproto), + ?assert(ExpectedMyVer == MyVer), + ?assert(ExpectedRemoteVer == RemoteVer). + +connect_failed({_Proto,_Vers}, {error, Reason}, Args) -> + + case Args of + expectedToFail -> + ?TRACE(?debugFmt("connect_failed: (EXPECTED) when expected to fail: ~p with ~p", + [Reason, Args])), + ?assert(Reason == econnrefused); + {retry_test, _Stuff} -> + ?TRACE(?debugFmt("connect_failed: (EXPECTED) during retry test: ~p", + [Reason])), + ok; + Other -> + ?TRACE(?debugFmt("connect_failed: (UNEXPECTED) ~p with args = ~p", + [Reason, Other])), + ?assert(false == Other) + end. diff --git a/test/riak_core_connection_tests.erl b/test/riak_core_connection_tests.erl new file mode 100644 index 000000000..07498d5a6 --- /dev/null +++ b/test/riak_core_connection_tests.erl @@ -0,0 +1,133 @@ +%% ------------------------------------------------------------------- +%% +%% Eunit test cases for the Conn Manager Connection +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + + +-module(riak_core_connection_tests). +-author("Chris Tilt"). +-include_lib("eunit/include/eunit.hrl"). + +-export([test1service/5, connected/6, connect_failed/3]). + +-define(TEST_ADDR, { "127.0.0.1", 4097}). +-define(MAX_CONS, 2). +-define(TCP_OPTIONS, [{keepalive, true}, + {nodelay, true}, + {packet, 4}, + {reuseaddr, true}, + {active, false}]). + +%% host service functions +test1service(_Socket, _Transport, {error, Reason}, Args, _Props) -> + ?debugFmt("test1service failed with {error, ~p}", [Reason]), + ?assert(Args == failed_host_args), + ?assert(Reason == protocol_version_not_supported), + {error, Reason}; +test1service(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, Props) -> + [ExpectedMyVer, ExpectedRemoteVer] = Args, + RemoteClusterName = proplists:get_value(clustername, Props), + ?debugFmt("test1service started with Args ~p Props ~p", [Args, Props]), + ?assert(RemoteClusterName == "undefined"), + ?assert(ExpectedMyVer == MyVer), + ?assert(ExpectedRemoteVer == RemoteVer), + ?assert(Proto == test1proto), + timer:sleep(2000), + {ok, self()}. + +%% client connection callbacks +connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, Props) -> + [ExpectedMyVer, ExpectedRemoteVer] = Args, + RemoteClusterName = proplists:get_value(clustername, Props), + ?debugFmt("connected with Args ~p Props ~p", [Args, Props]), + ?assert(RemoteClusterName == "undefined"), + ?assert(Proto == test1proto), + ?assert(ExpectedMyVer == MyVer), + ?assert(ExpectedRemoteVer == RemoteVer), + timer:sleep(2000). + +connect_failed({Proto,_Vers}, {error, Reason}, Args) -> + ?debugFmt("connect_failed: Reason = ~p Args = ~p", [Reason, Args]), + ?assert(Args == failed_client_args), + ?assert(Reason == protocol_version_not_supported), + ?assert(Proto == test1protoFailed). + +conection_test_() -> + {timeout, 60000, {setup, fun() -> + ok = application:start(ranch), + {ok, _} = riak_core_service_mgr:start_link(?TEST_ADDR) + end, + fun(_) -> + riak_core_service_mgr:stop(), + application:stop(ranch) + end, + fun(_) -> [ + + {"started", ?_assert(is_pid(whereis(riak_core_service_manager)))}, + + {"set and get name", fun() -> + riak_core_connection:set_symbolic_clustername("undefined"), + Got = riak_core_connection:symbolic_clustername(), + ?assertEqual("undefined", Got) + end}, + + {"register service", fun() -> + ExpectedRevs = [{1,0}, {1,1}], + ServiceProto = {test1proto, [{2,1}, {1,0}]}, + ServiceSpec = {ServiceProto, {?TCP_OPTIONS, ?MODULE, test1service, ExpectedRevs}}, + riak_core_service_mgr:register_service(ServiceSpec, {round_robin,?MAX_CONS}), + ?assert(riak_core_service_mgr:is_registered(test1proto)) + end}, + + {"unregister service", fun() -> + TestProtocolId = test1proto, + riak_core_service_mgr:unregister_service(TestProtocolId), + ?assertNot(riak_core_service_mgr:is_registered(TestProtocolId)) + end}, + + {"protocol match", fun() -> + ExpectedRevs = [{1,0}, {1,1}], + ServiceProto = {test1proto, [{2,1}, {1,0}]}, + ServiceSpec = {ServiceProto, {?TCP_OPTIONS, ?MODULE, test1service, ExpectedRevs}}, + riak_core_service_mgr:register_service(ServiceSpec, {round_robin,?MAX_CONS}), + + % test protocal match + ClientProtocol = {test1proto, [{0,1},{1,1}]}, + ClientSpec = {ClientProtocol, {?TCP_OPTIONS, ?MODULE, [{1,1},{1,0}]}}, + riak_core_connection:connect(?TEST_ADDR, ClientSpec), + timer:sleep(1000) + end}, + + {"failed protocal match", fun() -> + %% start service + SubProtocol = {{test1protoFailed, [{2,1}, {1,0}]}, + {?TCP_OPTIONS, ?MODULE, test1service, failed_host_args}}, + riak_core_service_mgr:register_service(SubProtocol, {round_robin,?MAX_CONS}), + ?assert(riak_core_service_mgr:is_registered(test1protoFailed) == true), + + %% try to connect via a client that speaks 0.1 and 3.1. No Match with host! + ClientProtocol = {test1protoFailed, [{0,1},{3,1}]}, + ClientSpec = {ClientProtocol, {?TCP_OPTIONS, ?MODULE, failed_client_args}}, + riak_core_connection:connect(?TEST_ADDR, ClientSpec), + + timer:sleep(2000) + end} + + ] end } }. diff --git a/test/riak_core_service_mgr_tests.erl b/test/riak_core_service_mgr_tests.erl new file mode 100644 index 000000000..5da950c8a --- /dev/null +++ b/test/riak_core_service_mgr_tests.erl @@ -0,0 +1,168 @@ +%% ------------------------------------------------------------------- +%% +%% Eunit test cases for the Service Manager +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_service_mgr_tests). +-author("Chris Tilt"). +-include_lib("eunit/include/eunit.hrl"). + +%%-define(TRACE(Stmt),Stmt). +-define(TRACE(Stmt),ok). + +%% internal functions +-export([testService/5, + connected/6, connect_failed/3 + ]). + +%% my name and remote same because I need to talk to myself for testing +-define(MY_CLUSTER_NAME, "bob"). +-define(REMOTE_CLUSTER_NAME, "bob"). + +-define(REMOTE_CLUSTER_ADDR, {"127.0.0.1", 4096}). +-define(TEST_ADDR, {"127.0.0.1", 4097}). +-define(MAX_CONS, 2). +-define(TCP_OPTIONS, [{keepalive, true}, + {nodelay, true}, + {packet, 4}, + {reuseaddr, true}, + {active, false}]). + +service_test_() -> + {timeout, 60000, {setup, fun() -> + ok = application:start(ranch), + {ok, _Pid} = riak_core_service_mgr:start_link(?TEST_ADDR) + end, + fun(_) -> + case whereis(riak_core_service_manager) of + Pid when is_pid(Pid) -> + riak_core_service_mgr:stop(), + {ok, _Mon} = erlang:monitor(process, Pid), + receive + {'DOWN', _, _, _, _} -> + ok + after + 1000 -> + ok + end; + undefined -> + ok + end + end, + fun(_) -> [ + + {"started", ?_assert(is_pid(whereis(riak_core_service_manager)))}, + + {"get services", ?_assertEqual([], gen_server:call(riak_core_service_manager, get_services))}, + + {"register service", fun() -> + ExpectedRevs = [{1,0}, {1,0}], + TestProtocol = {{testproto, [{1,0}]}, {?TCP_OPTIONS, ?MODULE, testService, ExpectedRevs}}, + riak_core_service_mgr:register_service(TestProtocol, {round_robin,?MAX_CONS}), + ?assert(riak_core_service_mgr:is_registered(testproto)) + end}, + + {"unregister service", fun() -> + TestProtocolId = testproto, + riak_core_service_mgr:unregister_service(TestProtocolId), + ?assertNot(riak_core_service_mgr:is_registered(testproto)) + end}, + + {"register stats fun", fun() -> + Self = self(), + Fun = fun(Stats) -> + Self ! Stats + end, + riak_core_service_mgr:register_stats_fun(Fun), + GotStats = receive + Term -> + Term + after 5500 -> + timeout + end, + ?assertEqual([], GotStats) + end}, + + {"start service test", fun() -> + %% re-register the test protocol and confirm registered + TestProtocol = {{testproto, [{1,0}]}, {?TCP_OPTIONS, ?MODULE, testService, [{1,0}, {1,0}]}}, + riak_core_service_mgr:register_service(TestProtocol, {round_robin, ?MAX_CONS}), + ?assert(riak_core_service_mgr:is_registered(testproto)), + %register_service_test_d(), + %% try to connect via a client that speaks our test protocol + ExpectedRevs = {expectedToPass, [{1,0}, {1,0}]}, + riak_core_connection:connect(?TEST_ADDR, {{testproto, [{1,0}]}, + {?TCP_OPTIONS, ?MODULE, ExpectedRevs}}), + %% allow client and server to connect and make assertions of success/failure + timer:sleep(1000), + Stats = riak_core_service_mgr:get_stats(), + ?assertEqual([{testproto,0}], Stats) + end}, + + {"pause existing services", fun() -> + riak_core_service_mgr:stop(), + %% there should be no services running now. + %% now start a client and confirm failure to connect + ExpectedArgs = expectedToFail, + riak_core_connection:connect(?TEST_ADDR, {{testproto, [{1,0}]}, + {?TCP_OPTIONS, ?MODULE, ExpectedArgs}}), + %% allow client and server to connect and make assertions of success/failure + timer:sleep(1000) + end} + + ] end} }. + +%%------------------------ +%% Helper functions +%%------------------------ + +%% Protocol Service functions +testService(_Socket, _Transport, {error, _Reason}, _Args, _Props) -> + ?assert(false); +testService(_Socket, _Transport, {ok, {Proto, MyVer, RemoteVer}}, Args, _Props) -> + ?TRACE(?debugMsg("testService started")), + [ExpectedMyVer, ExpectedRemoteVer] = Args, + ?assertEqual(ExpectedMyVer, MyVer), + ?assertEqual(ExpectedRemoteVer, RemoteVer), + ?assertEqual(Proto, testproto), +%% timer:sleep(2000), + {ok, self()}. + +%% Client side protocol callbacks +connected(_Socket, _Transport, {_IP, _Port}, {Proto, MyVer, RemoteVer}, Args, _Props) -> + ?TRACE(?debugMsg("testClient started")), + {_TestType, [ExpectedMyVer, ExpectedRemoteVer]} = Args, + ?assertEqual(Proto, testproto), + ?assertEqual(ExpectedMyVer, MyVer), + ?assertEqual(ExpectedRemoteVer, RemoteVer), + timer:sleep(2000). + +connect_failed({_Proto,_Vers}, {error, Reason}, Args) -> + case Args of + expectedToFail -> + ?assertEqual(Reason, econnrefused); + {retry_test, _Stuff} -> + ?TRACE(?debugFmt("connect_failed: during retry test: ~p", [Reason])), + ok; + _Other -> + ?TRACE(?debugFmt("connect_failed: ~p with args = ~p", [Reason, _Other])), + ?assert(false) + end, + timer:sleep(1000).