Skip to content
This repository
  • 6 commits
  • 12 files changed
  • 13 comments
  • 2 contributors
5  ebin/riak_core.app
@@ -77,6 +77,11 @@
77 77
              riak_core_vnode_worker_pool,
78 78
              riak_core_web,
79 79
              riak_core_wm_urlmap,
  80
+             riak_core_connection_mgr,
  81
+             riak_core_connection_mgr_stats,
  82
+             riak_core_connection,
  83
+             riak_core_service_mgr,
  84
+             riak_core_tcp_mon,
80 85
              supervisor_pre_r14b04,
81 86
              vclock
82 87
             ]},
100  include/riak_core_connection.hrl
... ...
@@ -0,0 +1,100 @@
  1
+%% -------------------------------------------------------------------
  2
+%%
  3
+%% Riak Core Connection Manager
  4
+%%
  5
+%% Copyright (c) 2013 Basho Technologies, Inc.  All Rights Reserved.
  6
+%%
  7
+%% This file is provided to you under the Apache License,
  8
+%% Version 2.0 (the "License"); you may not use this file
  9
+%% except in compliance with the License.  You may obtain
  10
+%% a copy of the License at
  11
+%%
  12
+%%   http://www.apache.org/licenses/LICENSE-2.0
  13
+%%
  14
+%% Unless required by applicable law or agreed to in writing,
  15
+%% software distributed under the License is distributed on an
  16
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  17
+%% KIND, either express or implied.  See the License for the
  18
+%% specific language governing permissions and limitations
  19
+%% under the License.
  20
+%%
  21
+%% -------------------------------------------------------------------
  22
+
  23
+%% handshake messages to safely initiate a connection. Let's not accept
  24
+%% a connection to a telnet session by accident!
  25
+-define(CTRL_REV, {1,0}).
  26
+-define(CTRL_HELLO, <<"riak-ctrl:hello">>).
  27
+-define(CTRL_TELL_IP_ADDR, <<"riak-ctrl:ip_addr">>).
  28
+-define(CTRL_ACK, <<"riak-ctrl:ack">>).
  29
+-define(CTRL_ASK_NAME, <<"riak-ctrl:ask_name">>).
  30
+-define(CTRL_ASK_MEMBERS, <<"riak-ctrl:ask_members">>).
  31
+
  32
+
  33
+-define(CONNECTION_SETUP_TIMEOUT, 10000).
  34
+
  35
+
  36
+
  37
+-define(CTRL_OPTIONS, [binary,
  38
+                       {keepalive, true},
  39
+                       {nodelay, true},
  40
+                       {packet, 4},
  41
+                       {reuseaddr, true},
  42
+                       {active, false}]).
  43
+
  44
+%% Tcp options shared during the connection and negotiation phase
  45
+-define(CONNECT_OPTIONS, [binary,
  46
+                          {keepalive, true},
  47
+                          {nodelay, true},
  48
+                          {packet, 4},
  49
+                          {reuseaddr, true},
  50
+                          {active, false}]).
  51
+
  52
+-type(ip_addr_str() :: string()).
  53
+-type(ip_portnum() :: non_neg_integer()).
  54
+-type(ip_addr() :: {ip_addr_str(), ip_portnum()}).
  55
+-type(tcp_options() :: [any()]).
  56
+
  57
+-type(proto_id() :: atom()).
  58
+-type(rev() :: non_neg_integer()). %% major or minor revision number
  59
+-type(proto() :: {proto_id(), {rev(), rev()}}). %% e.g. {myproto, 1, 0}
  60
+-type(protoprefs() :: {proto_id(), [{rev(), rev()}]}).
  61
+
  62
+
  63
+%% Function = fun(Socket, Transport, Protocol, Args) -> ok
  64
+%% Protocol :: proto()
  65
+-type(service_started_callback() :: fun((inet:socket(), module(), proto(), [any()]) -> no_return())).
  66
+
  67
+%% Host protocol spec
  68
+-type(hostspec() :: {protoprefs(), {tcp_options(), module(), service_started_callback(), [any()]}}).
  69
+
  70
+%% Client protocol spec
  71
+-type(clientspec() :: {protoprefs(), {tcp_options(), module(),[any()]}}).
  72
+
  73
+
  74
+%% Scheduler strategies tell the connection manager how distribute the load.
  75
+%%
  76
+%% Client scheduler strategies
  77
+%% ---------------------------
  78
+%% default := service side decides how to choose node to connect to. limits number of
  79
+%%            accepted connections to max_nb_cons().
  80
+%% askme := the connection manager will call your client for a custom protocol strategy
  81
+%%          and likewise will expect that the service side has plugged the cluster
  82
+%%          manager with support for that custom strategy. UNSUPPORTED so far.
  83
+%%          TODO: add a behaviour for the client to implement that includes this callback.
  84
+%% Service scheduler strategies
  85
+%% ----------------------------
  86
+%% round_robin := choose the next available node on cluster to service request. limits
  87
+%%                the number of accepted connections to max_nb_cons().
  88
+%% custom := service must provide a strategy to the cluster manager for choosing nodes
  89
+%%           UNSUPPORTED so far. Should we use a behaviour for the service module?
  90
+-type(max_nb_cons() :: non_neg_integer()).
  91
+-type(client_scheduler_strategy() :: default | askme).
  92
+-type(service_scheduler_strategy() :: {round_robin, max_nb_cons()} | custom).
  93
+
  94
+%% service manager statistics, can maybe get shared by other layers too
  95
+-record(stats, {open_connections = 0 : non_negative_integer()
  96
+                }).
  97
+
  98
+
  99
+-type(cluster_finder_fun() :: fun(() -> {ok,node()} | {error, term()})).
  100
+
4  rebar.config
@@ -12,5 +12,7 @@
12 12
   {riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "master"}}},
13 13
   {webmachine, ".*", {git, "git://github.com/basho/webmachine",
14 14
                                 {tag, "64176ef9b"}}},
15  
-  {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}}
  15
+  {folsom, ".*", {git, "git://github.com/boundary/folsom.git", {branch, "master"}}},
  16
+  {ranch, "0.4.0", {git, "git://github.com/extend/ranch.git", {tag, "0.4.0"}}}
  17
+
16 18
        ]}.
205  src/riak_core_connection.erl
... ...
@@ -0,0 +1,205 @@
  1
+%% -------------------------------------------------------------------
  2
+%%
  3
+%% Riak Subprotocol Server Dispatch and Client Connections
  4
+%%
  5
+%% Copyright (c) 2013 Basho Technologies, Inc.  All Rights Reserved.
  6
+%%
  7
+%% This file is provided to you under the Apache License,
  8
+%% Version 2.0 (the "License"); you may not use this file
  9
+%% except in compliance with the License.  You may obtain
  10
+%% a copy of the License at
  11
+%%
  12
+%%   http://www.apache.org/licenses/LICENSE-2.0
  13
+%%
  14
+%% Unless required by applicable law or agreed to in writing,
  15
+%% software distributed under the License is distributed on an
  16
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  17
+%% KIND, either express or implied.  See the License for the
  18
+%% specific language governing permissions and limitations
  19
+%% under the License.
  20
+%%
  21
+%% -------------------------------------------------------------------
  22
+-module(riak_core_connection).
  23
+-author("Chris Tilt").
  24
+
  25
+-include("riak_core_connection.hrl").
  26
+
  27
+-ifdef(TEST).
  28
+-include_lib("eunit/include/eunit.hrl").
  29
+-define(TRACE(Stmt),Stmt).
  30
+-else.
  31
+-define(TRACE(Stmt),ok).
  32
+-endif.
  33
+
  34
+
  35
+%% public API
  36
+-export([connect/2,
  37
+         sync_connect/2]).
  38
+
  39
+%% internal functions
  40
+-export([async_connect_proc/3]).
  41
+
  42
+%%TODO: move to riak_ring_core...symbolic cluster naming
  43
+-export([set_symbolic_clustername/2, set_symbolic_clustername/1,
  44
+         symbolic_clustername/1, symbolic_clustername/0]).
  45
+
  46
+%% @doc Sets the symbolic, human readable, name of this cluster.
  47
+set_symbolic_clustername(Ring, ClusterName) ->
  48
+    {new_ring,
  49
+     riak_core_ring:update_meta(symbolic_clustername, ClusterName, Ring)}.
  50
+
  51
+set_symbolic_clustername(ClusterName) when is_list(ClusterName) ->
  52
+    {error, "argument is not a string"};
  53
+set_symbolic_clustername(ClusterName) ->
  54
+    case riak_core_ring_manager:get_my_ring() of
  55
+        {ok, _Ring} ->
  56
+            riak_core_ring_manager:ring_trans(fun riak_core_connection:set_symbolic_clustername/2,
  57
+                                              ClusterName);
  58
+        {error, Reason} ->
  59
+            lager:error("Can't set symbolic clustername because: ~p", [Reason])
  60
+    end.
  61
+
  62
+symbolic_clustername(Ring) ->
  63
+    case riak_core_ring:get_meta(symbolic_clustername, Ring) of
  64
+        {ok, Name} -> Name;
  65
+        undefined -> "undefined"
  66
+    end.
  67
+
  68
+%% @doc Returns the symbolic, human readable, name of this cluster.
  69
+symbolic_clustername() ->
  70
+    case riak_core_ring_manager:get_my_ring() of
  71
+        {ok, Ring} ->
  72
+            symbolic_clustername(Ring);
  73
+        {error, Reason} ->
  74
+            lager:error("Can't read symbolic clustername because: ~p", [Reason]),
  75
+            "undefined"
  76
+    end.
  77
+
  78
+%% Make async connection request. The connection manager is responsible for retry/backoff
  79
+%% and calls your module's functions on success or error (asynchrously):
  80
+%%   Module:connected(Socket, TransportModule, {IpAddress, Port}, {Proto, MyVer, RemoteVer}, Args)
  81
+%%   Module:connect_failed(Proto, {error, Reason}, Args)
  82
+%%       Reason could be 'protocol_version_not_supported"
  83
+%%
  84
+%%
  85
+%% You can set options on the tcp connection, e.g.
  86
+%% [{packet, 4}, {active, false}, {keepalive, true}, {nodelay, true}]
  87
+%%
  88
+%% ClientProtocol specifies the preferences of the client, in terms of what versions
  89
+%% of a protocol it speaks. The host will choose the highest common major version and
  90
+%% inform the client via the callback Module:connected() in the HostProtocol parameter.
  91
+%%
  92
+%% Note: that the connection will initially be setup with the `binary` option
  93
+%% because protocol negotiation requires binary. TODO: should we allow non-binary
  94
+%% options? Check that binary is not overwritten.
  95
+%%
  96
+%% connect returns the pid() of the asynchronous process that will attempt the connection.
  97
+
  98
+-spec(connect(ip_addr(), clientspec()) -> pid()).
  99
+connect({IP,Port}, ClientSpec) ->
  100
+    ?TRACE(?debugMsg("spawning async_connect link")),
  101
+    %% start a process to handle the connection request asyncrhonously
  102
+    proc_lib:spawn_link(?MODULE, async_connect_proc, [self(), {IP,Port}, ClientSpec]).
  103
+
  104
+sync_connect({IP,Port}, ClientSpec) ->
  105
+    sync_connect_status(self(), {IP,Port}, ClientSpec).
  106
+
  107
+%% @private
  108
+
  109
+%% exchange brief handshake with client to ensure that we're supporting sub-protocols.
  110
+%% client -> server : Hello {1,0} [Capabilities]
  111
+%% server -> client : Ack {1,0} [Capabilities]
  112
+exchange_handshakes_with(host, Socket, Transport, MyCaps) ->
  113
+    Hello = term_to_binary({?CTRL_HELLO, ?CTRL_REV, MyCaps}),
  114
+    case Transport:send(Socket, Hello) of
  115
+        ok ->
  116
+            ?TRACE(?debugFmt("exchange_handshakes: waiting for ~p from host", [?CTRL_ACK])),
  117
+            case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
  118
+                {ok, Ack} ->
  119
+                    case binary_to_term(Ack) of
  120
+                        {?CTRL_ACK, TheirRev, TheirCaps} ->
  121
+                            Props = [{local_revision, ?CTRL_REV}, {remote_revision, TheirRev} | TheirCaps],
  122
+                            {ok,Props};
  123
+                        {error, _Reason} = Error ->
  124
+                            Error;
  125
+                        Msg ->
  126
+                            {error, Msg}
  127
+                    end;
  128
+                {error, Reason} ->
  129
+                    {error, Reason}
  130
+            end;
  131
+        Error ->
  132
+            Error
  133
+    end.
  134
+
  135
+async_connect_proc(Parent, {IP,Port}, ProtocolSpec) ->
  136
+    sync_connect_status(Parent, {IP,Port}, ProtocolSpec).
  137
+
  138
+%% connect synchronously to remote addr/port and return status
  139
+sync_connect_status(_Parent, {IP,Port}, {ClientProtocol, {Options, Module, Args}}) ->
  140
+    Timeout = ?CONNECTION_SETUP_TIMEOUT,
  141
+    Transport = ranch_tcp,
  142
+    %%   connect to host's {IP,Port}
  143
+    ?TRACE(?debugFmt("sync_connect: connect to ~p", [{IP,Port}])),
  144
+    case gen_tcp:connect(IP, Port, ?CONNECT_OPTIONS, Timeout) of
  145
+        {ok, Socket} ->
  146
+            ?TRACE(?debugFmt("Setting system options on client side: ~p", [?CONNECT_OPTIONS])),
  147
+            Transport:setopts(Socket, ?CONNECT_OPTIONS),
  148
+            %% handshake to make sure it's a riak sub-protocol dispatcher
  149
+            MyName = symbolic_clustername(),
  150
+            MyCaps = [{clustername, MyName}],
  151
+            case exchange_handshakes_with(host, Socket, Transport, MyCaps) of
  152
+                {ok,Props} ->
  153
+                    %% ask for protocol, see what host has
  154
+                    case negotiate_proto_with_server(Socket, Transport, ClientProtocol) of
  155
+                        {ok,HostProtocol} ->
  156
+                            %% set client's requested Tcp options
  157
+                            ?TRACE(?debugFmt("Setting user options on client side; ~p", [Options])),
  158
+                            Transport:setopts(Socket, Options),
  159
+                            %% notify requester of connection and negotiated protocol from host
  160
+                            %% pass back returned value in case problem detected on connection
  161
+                            %% by module.  requestor is responsible for transferring control
  162
+                            %% of the socket.
  163
+                            Module:connected(Socket, Transport, {IP, Port}, HostProtocol, Args, Props);
  164
+                        {error, Reason} ->
  165
+                            ?TRACE(?debugFmt("negotiate_proto_with_server returned: ~p", [{error,Reason}])),
  166
+                            %% Module:connect_failed(ClientProtocol, {error, Reason}, Args),
  167
+                            {error, Reason}
  168
+                    end;
  169
+                {error, closed} ->
  170
+                    %% socket got closed, don't report this
  171
+                    {error, closed};
  172
+                {error, Reason} ->
  173
+                    lager:error("Failed to exchange handshake with host. Error = ~p", [Reason]),
  174
+                    {error, Reason};
  175
+                Error ->
  176
+                    %% failed to exchange handshakes
  177
+                    lager:error("Failed to exchange handshake with host. Error = ~p", [Error]),
  178
+                    {error, Error}
  179
+            end;
  180
+        {error, Reason} ->
  181
+            %% Module:connect_failed(ClientProtocol, {error, Reason}, Args),
  182
+            {error, Reason}
  183
+    end.
  184
+
  185
+%% Negotiate the highest common major protocol revisision with the connected server.
  186
+%% client -> server : Prefs List = {SubProto, [{Major, Minor}]}
  187
+%% server -> client : selected version = {SubProto, {Major, HostMinor, ClientMinor}}
  188
+%%
  189
+%% returns {ok,{Proto,{Major,ClientMinor},{Major,HostMinor}}} | {error, Reason}
  190
+negotiate_proto_with_server(Socket, Transport, ClientProtocol) ->
  191
+    ?TRACE(?debugFmt("negotiate protocol with host, client proto = ~p", [ClientProtocol])),
  192
+    Transport:send(Socket, erlang:term_to_binary(ClientProtocol)),
  193
+    case Transport:recv(Socket, 0, ?CONNECTION_SETUP_TIMEOUT) of
  194
+        {ok, NegotiatedProtocolBin} ->
  195
+            case erlang:binary_to_term(NegotiatedProtocolBin) of
  196
+                {ok, {Proto,{CommonMajor,HMinor,CMinor}}} ->
  197
+                    {ok, {Proto,{CommonMajor,CMinor},{CommonMajor,HMinor}}};
  198
+                {error, Reason} ->
  199
+                    {error, Reason}
  200
+            end;
  201
+        {error, Reason} ->
  202
+            lager:error("Failed to receive protocol ~p response from server. Reason = ~p",
  203
+                        [ClientProtocol, Reason]),
  204
+            {error, connection_failed}
  205
+    end.
622  src/riak_core_connection_mgr.erl
... ...
@@ -0,0 +1,622 @@
  1
+%% -------------------------------------------------------------------
  2
+%%
  3
+%% Riak  Subprotocol Server Dispatch and Client Connections
  4
+%%
  5
+%% Copyright (c) 2013 Basho Technologies, Inc.  All Rights Reserved.
  6
+%%
  7
+%% This file is provided to you under the Apache License,
  8
+%% Version 2.0 (the "License"); you may not use this file
  9
+%% except in compliance with the License.  You may obtain
  10
+%% a copy of the License at
  11
+%%
  12
+%%   http://www.apache.org/licenses/LICENSE-2.0
  13
+%%
  14
+%% Unless required by applicable law or agreed to in writing,
  15
+%% software distributed under the License is distributed on an
  16
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  17
+%% KIND, either express or implied.  See the License for the
  18
+%% specific language governing permissions and limitations
  19
+%% under the License.
  20
+%%
  21
+%% -------------------------------------------------------------------
  22
+
  23
+-module(riak_core_connection_mgr).
  24
+-author("Chris Tilt").
  25
+-behaviour(gen_server).
  26
+
  27
+-include("riak_core_connection.hrl").
  28
+
  29
+-ifdef(TEST).
  30
+-include_lib("eunit/include/eunit.hrl").
  31
+-endif.
  32
+
  33
+%% controls retry and backoff.
  34
+-define(INITIAL_BACKOFF, 1 * 1000).  %% 1 second initial backoff per endpoint
  35
+-define(MAX_BACKOFF, 1 * 60 * 1000). %% 1 minute maximum backoff per endpoint
  36
+-define(EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, 10 * 1000). %% 10 seconds until retrying the list again
  37
+
  38
+%% retry delay if locator returned empty list
  39
+-ifdef(TEST).
  40
+-define(TRACE(Stmt),Stmt).
  41
+%%-define(TRACE(Stmt),ok).
  42
+-define(DEFAULT_RETRY_NO_ENDPOINTS, 2 * 1000). %% short for testing to avoid timeout
  43
+-else.
  44
+-define(TRACE(Stmt),ok).
  45
+-define(DEFAULT_RETRY_NO_ENDPOINTS, 5 * 1000). %% 5 seconds
  46
+-endif.
  47
+
  48
+
  49
+-define(SERVER, riak_core_connection_manager).
  50
+-define(MAX_LISTENERS, 100).
  51
+
  52
+-type(counter() :: non_neg_integer()).
  53
+
  54
+%% Connection manager strategy (per Jon M.)
  55
+%% when a connection request comes in,
  56
+%% + call the locator service to get the list of {transport, {address, port}}
  57
+%% + create a linked helper process to call riak_core_connection (just once) on the next available
  58
+%%   connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary)
  59
+%% + on connection it transfers control of the socket back to the connmgr, casts a success message back
  60
+%%   to the connection manager and exits normally.
  61
+%%   - on success, the connection manager increments successful connects, reset the backoff timeout on
  62
+%%     that connection.
  63
+%%   - on failure, casts a failure message back to the connection manager (error, timeout etc) the
  64
+%%     connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure
  65
+%%     counter and starts a timer for the backoff time (and updates it for next time). The connection
  66
+%%     manager checks for the next non--blacklisted endpoint in the connection request list to launch
  67
+%%     a new connection, if the list is empty call the locator service again to get a new list. If all
  68
+%%     connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff
  69
+%%     time too).
  70
+
  71
+%% End-point status state, updated for failed and successful connection attempts,
  72
+%% or by timers that fire to update the backoff time.
  73
+%% TODO: add folsom window'd stats
  74
+%% handle an EXIT from the helper process if it dies
  75
+-record(ep, {addr,                                 %% endpoint {IP, Port}
  76
+             nb_curr_connections = 0 :: counter(), %% number of current connections
  77
+             nb_success = 0 :: counter(),   %% total successfull connects on this ep
  78
+             nb_failures = 0 :: counter(),  %% total failed connects on this ep
  79
+             is_black_listed = false :: boolean(), %% true after a failed connection attempt
  80
+             backoff_delay=0 :: counter(),  %% incremented on each failure, reset to zero on success
  81
+             failures = orddict:new() :: orddict:orddict(), %% failure reasons
  82
+             last_fail_time :: erlang:timestamp(),          %% time of last failure since 1970
  83
+             next_try_secs :: counter()     %% time in seconds to next retry attempt
  84
+             }).
  85
+
  86
+%% connection request record
  87
+-record(req, {ref,      % Unique reference for this connection request
  88
+              pid,      % Helper pid trying to make connection
  89
+              target,   % target to connect to {Type, Name}
  90
+              spec,     % client spec
  91
+              strategy, % connection strategy
  92
+              cur,      % current connection endpoint
  93
+              state = init,  % init | connecting | connected | cancelled
  94
+              status    % history of connection attempts
  95
+             }).
  96
+              
  97
+
  98
+%% connection manager state:
  99
+-record(state, {is_paused = false :: boolean(),
  100
+                pending = [] :: [#req{}], % pending requests
  101
+                %% endpoints :: {module(),ip_addr()} -> ep()
  102
+                endpoints = orddict:new() :: orddict:orddict(), %% known endpoints w/status
  103
+                locators = orddict:new() :: orddict:orddict(), %% connection locators
  104
+                nb_total_succeeded = 0 :: counter(),
  105
+                nb_total_failed = 0 :: counter()
  106
+               }).
  107
+
  108
+-export([start_link/0,
  109
+         resume/0,
  110
+         pause/0,
  111
+         is_paused/0,
  112
+         connect/2, connect/3,
  113
+         disconnect/1,
  114
+         register_locator/2,
  115
+         apply_locator/2,
  116
+         reset_backoff/0,
  117
+         stop/0
  118
+         ]).
  119
+
  120
+%% gen_server callbacks
  121
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  122
+         terminate/2, code_change/3]).
  123
+
  124
+%% internal functions
  125
+-export([connection_helper/4, increase_backoff/1]).
  126
+
  127
+%%%===================================================================
  128
+%%% API
  129
+%%%===================================================================
  130
+
  131
+%% @doc Starts the manager linked.
  132
+-spec(start_link() -> {ok, pid()}).
  133
+start_link() ->
  134
+    Args = [],
  135
+    Options = [],
  136
+    gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options).
  137
+
  138
+%% @doc Begins or resumes accepting and establishing new connections, in
  139
+%% order to maintain the protocols that have been (or continue to be) registered
  140
+%% and unregistered. pause() will not kill any existing connections, but will
  141
+%% cease accepting new requests or retrying lost connections.
  142
+-spec(resume() -> ok).
  143
+resume() ->
  144
+    gen_server:cast(?SERVER, resume).
  145
+
  146
+%% @doc Stop accepting / creating new connections; this does not terminated
  147
+%% existing ones.
  148
+-spec(pause() -> ok).
  149
+pause() ->
  150
+    gen_server:cast(?SERVER, pause).
  151
+
  152
+%% @doc Return paused state
  153
+-spec is_paused() -> boolean().
  154
+is_paused() ->
  155
+    gen_server:call(?SERVER, is_paused).
  156
+
  157
+%% @doc Reset all backoff delays to zero.
  158
+-spec reset_backoff() -> 'ok'.
  159
+reset_backoff() ->
  160
+    gen_server:cast(?SERVER, reset_backoff).
  161
+
  162
+%% Register a locator - for the given Name and strategy it returns {ok, [{IP,Port}]}
  163
+%% list of endpoints to connect to, in order. The list may be empty.  
  164
+%% If the query can never be answered
  165
+%% return {error, Reason}.
  166
+%% fun(Name
  167
+register_locator(Type, Fun) ->
  168
+    gen_server:call(?SERVER, {register_locator, Type, Fun}, infinity).
  169
+
  170
+apply_locator(Name, Strategy) ->
  171
+    gen_server:call(?SERVER, {apply_locator, Name, Strategy}, infinity).
  172
+
  173
+%% @doc Establish a connection to the remote destination. be persistent about it,
  174
+%% but not too annoying to the remote end. Connect by name of cluster or
  175
+%% IP address. Use default strategy to find "best" peer for connection.
  176
+%%
  177
+%% Targets are found by applying a registered locator for it.
  178
+%% The identity locator is pre-installed, so if you want to connect to a list
  179
+%% of IP and Port addresses, supply a Target like this: {identity, [{IP, Port},...]},
  180
+%% where IP::string() and Port::integer(). You can also pass {identity, {IP, Port}}
  181
+%% and the locator will use just that one IP. With a list, it will rotate
  182
+%% trying them all until a connection is established.
  183
+%%
  184
+%% Other locator types must be registered with this connection manager
  185
+%% before calling connect().
  186
+%%
  187
+%% Supervision must be done by the calling process if desired. No supervision
  188
+%% is done here.
  189
+%%
  190
+-spec connect(Target :: string(), ClientSpec :: clientspec(), Strategy :: client_scheduler_strategy()) -> {'ok', reference()}.
  191
+connect(Target, ClientSpec, Strategy) ->
  192
+    gen_server:call(?SERVER, {connect, Target, ClientSpec, Strategy}).
  193
+
  194
+%% @doc same as connect(Target, ClientSpec, default).
  195
+%% @see connect/3
  196
+-spec connect(Target :: string(), ClientSpec :: clientspec()) -> {'ok', reference()}.
  197
+connect(Target, ClientSpec) ->
  198
+    gen_server:call(?SERVER, {connect, Target, ClientSpec, default}).
  199
+
  200
+%% @doc Disconnect from the remote side.
  201
+-spec disconnect(Target :: string()) -> 'ok'.
  202
+disconnect(Target) ->
  203
+    gen_server:cast(?SERVER, {disconnect, Target}).
  204
+
  205
+%% doc Stop the server and sever all connections.
  206
+-spec stop() -> 'ok'.
  207
+stop() ->
  208
+    gen_server:call(?SERVER, stop).
  209
+
  210
+%%%===================================================================
  211
+%%% gen_server callbacks
  212
+%%%===================================================================
  213
+
  214
+init([]) ->
  215
+    process_flag(trap_exit, true),
  216
+    %% install default "identity" locator
  217
+    Locator = fun identity_locator/2,
  218
+    ?TRACE(?debugMsg("Starting")),
  219
+    {ok, #state{is_paused = false,
  220
+                locators = orddict:store(identity, Locator, orddict:new())
  221
+               }}.
  222
+
  223
+handle_call(is_paused, _From, State) ->
  224
+    {reply, State#state.is_paused, State};
  225
+
  226
+%% connect based on address. Return process id of helper
  227
+handle_call({connect, Target, ClientSpec, Strategy}, _From, State) ->
  228
+    Reference = make_ref(),
  229
+    Request = #req{ref = Reference,
  230
+                   target = Target,
  231
+                   pid = undefined,
  232
+                   spec = ClientSpec,
  233
+                   state = init,
  234
+                   strategy = Strategy},
  235
+    %% add request to pending queue so it may be found in restarts
  236
+    State2 = State#state{pending = lists:keystore(Reference, #req.ref,
  237
+                                                  State#state.pending,
  238
+                                                  Request)},
  239
+    ?TRACE(?debugFmt("Starting connect request to ~p, ref is ~p", [Target, Reference])),
  240
+    {reply, {ok, Reference}, start_request(Request, State2)};
  241
+
  242
+handle_call({get_endpoint_backoff, Addr}, _From, State) ->
  243
+    ?TRACE(?debugFmt("backing off ~p", [Addr])),
  244
+    {reply, {ok, get_endpoint_backoff(Addr, State#state.endpoints)}, State};
  245
+
  246
+handle_call({register_locator, Type, Fun}, _From,
  247
+            State = #state{locators = Locators}) ->
  248
+    {reply, ok, State#state{locators = orddict:store(Type, Fun, Locators)}};
  249
+
  250
+handle_call({apply_locator, Target, Strategy}, _From,
  251
+            State = #state{locators = Locators}) ->
  252
+    AddrsOrError = locate_endpoints(Target, Strategy, Locators),
  253
+    {reply, AddrsOrError, State};
  254
+
  255
+handle_call(stop, _From, State) ->
  256
+    %% TODO do we need to cleanup helper pids here?
  257
+    {stop, normal, ok, State};
  258
+
  259
+handle_call({should_try_endpoint, Ref, Addr}, _From, State = #state{pending=Pending}) ->
  260
+    case lists:keyfind(Ref, #req.ref, Pending) of
  261
+        false ->
  262
+            %% This should never happen
  263
+            {reply, false, State};
  264
+        Req ->
  265
+            {Answer, ReqState} =
  266
+                case Req#req.state of
  267
+                    cancelled ->
  268
+                        %% helper process hasn't cancelled itself yet.
  269
+                        {false, cancelled};
  270
+                    _ ->
  271
+                        {true, connecting}
  272
+                end,
  273
+            {reply, Answer, State#state{pending = lists:keystore(Ref, #req.ref, Pending,
  274
+                                                                 Req#req{cur = Addr,
  275
+                                                                         state = ReqState})}}
  276
+    end;
  277
+
  278
+handle_call(_Unhandled, _From, State) ->
  279
+    ?TRACE(?debugFmt("Unhandled gen_server call: ~p", [_Unhandled])),
  280
+    {reply, {error, unhandled}, State}.
  281
+
  282
+handle_cast(pause, State) ->
  283
+    {noreply, State#state{is_paused = true}};
  284
+
  285
+handle_cast(resume, State) ->
  286
+    {noreply, State#state{is_paused = false}};
  287
+
  288
+handle_cast({disconnect, Target}, State) ->
  289
+    {noreply, disconnect_from_target(Target, State)};
  290
+
  291
+%% reset all backoff delays to zero.
  292
+%% TODO: restart stalled connections.
  293
+handle_cast(reset_backoff, State) ->
  294
+    NewEps = reset_backoff(State#state.endpoints),
  295
+    {noreply, State#state{endpoints = NewEps}};
  296
+
  297
+%% helper process says no endpoints were returned by the locators.
  298
+%% helper process will schedule a retry.
  299
+handle_cast({conmgr_no_endpoints, _Ref}, State) ->
  300
+    %% mark connection as black-listed and start timer for reset
  301
+    {noreply, State};
  302
+
  303
+%% helper process says it failed to reach an address.
  304
+handle_cast({endpoint_failed, Addr, Reason, ProtocolId}, State) ->
  305
+    ?TRACE(?debugFmt("Failing endpoint ~p for protocol ~p with reason ~p", [Addr, ProtocolId, Reason])),
  306
+    %% mark connection as black-listed and start timer for reset
  307
+    {noreply, fail_endpoint(Addr, Reason, ProtocolId, State)}.
  308
+
  309
+%% it is time to remove Addr from the black-listed addresses
  310
+handle_info({backoff_timer, Addr}, State = #state{endpoints = EPs}) ->
  311
+    case orddict:find(Addr, EPs) of
  312
+        {ok, EP} ->
  313
+            EP2 = EP#ep{is_black_listed = false},
  314
+            {noreply, State#state{endpoints = orddict:store(Addr,EP2,EPs)}};
  315
+        error ->
  316
+            %% TODO: Should never happen because the Addr came from the EP list.
  317
+            {norepy, State}
  318
+    end;
  319
+handle_info({retry_req, Ref}, State = #state{pending = Pending}) ->
  320
+    case lists:keyfind(Ref, #req.ref, Pending) of
  321
+        false ->
  322
+            %% TODO: should never happen
  323
+            {noreply, State};
  324
+        Req ->
  325
+            {noreply, start_request(Req, State)}
  326
+    end;
  327
+    
  328
+%%% All of the connection helpers end here
  329
+%% cases:
  330
+%% helper succeeded -> update EP stats: BL<-false, backoff_delay<-0
  331
+%% helper failed -> updates EP stats: failures++, backoff_delay++
  332
+%% other Pid failed -> pass on linked error
  333
+handle_info({'EXIT', From, Reason}, State = #state{pending = Pending}) ->
  334
+    %% Work out which endpoint it was
  335
+    case lists:keytake(From, #req.pid, Pending) of
  336
+        false ->
  337
+            %% Must have been something we were linked to, or linked to us
  338
+            ?TRACE(?debugFmt("Connection Manager exiting because linked process ~p exited for reason: ~p",
  339
+                        [From, Reason])),
  340
+            lager:error("Connection Manager exiting because linked process ~p exited for reason: ~p",
  341
+                        [From, Reason]),
  342
+            exit({linked, From, Reason});
  343
+        {value, #req{cur = Cur, ref = Ref}=Req, Pending2} ->
  344
+            {{ProtocolId, _Foo},_Bar} = Req#req.spec,
  345
+            case Reason of
  346
+                ok ->
  347
+                    %% update the stats module
  348
+                    Stat = conn_success,
  349
+                    ?TRACE(?debugMsg("Trying for stats update, the connect_endpoint")),
  350
+                    riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId),
  351
+                    %% riak_core_connection set up and handlers called
  352
+                    {noreply, connect_endpoint(Cur, State#state{pending = Pending2})};
  353
+
  354
+                {ok, cancelled} ->
  355
+                    %% helper process has been cancelled and has exited nicely.
  356
+                    %% update the stats module
  357
+                    Stat = conn_cancelled,
  358
+                    ?TRACE(?debugMsg("Trying for stats update")),
  359
+                    riak_core_connection_mgr_stats:update(Stat, Cur, ProtocolId),
  360
+                    %% toss out the cancelled request from pending.
  361
+                    {noreply, State#state{pending = Pending2}};
  362
+                
  363
+                {error, endpoints_exhausted, Ref} ->
  364
+                    %% tried all known endpoints. schedule a retry.
  365
+                    %% reuse the existing request Reference, Ref.
  366
+                    case Req#req.state of
  367
+                        cancelled ->
  368
+                            %% oops. that request was cancelled. No retry
  369
+                            {noreply, State#state{pending = Pending2}};
  370
+                        _ ->
  371
+                            ?TRACE(?debugMsg("Scheduling retry")),
  372
+                            {noreply, schedule_retry(?EXHAUSTED_ENDPOINTS_RETRY_INTERVAL, Ref, State)}
  373
+                    end;
  374
+
  375
+                Reason -> % something bad happened to the connection, reuse the request
  376
+                    ?TRACE(?debugFmt("handle_info: EP failed on ~p for ~p. removed Ref ~p",
  377
+                                     [Cur, Reason, Ref])),
  378
+                    lager:warning("handle_info: endpoint ~p failed: ~p. removed Ref ~p",
  379
+                                  [Cur, Reason, Ref]),
  380
+                    State2 = fail_endpoint(Cur, Reason, ProtocolId, State),
  381
+                    %% the connection helper will not retry. It's up the caller.
  382
+                    State3 = fail_request(Reason, Req, State2),
  383
+                    {noreply, State3}
  384
+            end
  385
+    end;
  386
+handle_info(_Unhandled, State) ->
  387
+    ?TRACE(?debugFmt("Unhandled gen_server info: ~p", [_Unhandled])),
  388
+    lager:error("Unhandled gen_server info: ~p", [_Unhandled]),
  389
+    {noreply, State}.
  390
+
  391
+terminate(_Reason, _State) ->
  392
+    ok.
  393
+
  394
+code_change(_OldVsn, State, _Extra) ->
  395
+    {ok, State}.
  396
+
  397
+%%%===================================================================
  398
+%%% Private
  399
+%%%===================================================================
  400
+
  401
+identity_locator({IP,Port}, _Policy) ->
  402
+    {ok, [{IP,Port}]};
  403
+identity_locator([Ips], _Policy) ->
  404
+    {ok, Ips}.
  405
+
  406
+%% close the pending connection and cancel the request
  407
+disconnect_from_target(Target, State = #state{pending = Pending}) ->
  408
+    lager:debug("Disconnecting from: ~p", [Target]),
  409
+    case lists:keyfind(Target, #req.target, Pending) of
  410
+        false ->
  411
+            %% already gone!
  412
+            State;
  413
+        Req ->
  414
+            %% The helper process will discover the cancellation when it asks if it
  415
+            %% should connect to an endpoint.
  416
+            State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending,
  417
+                                                 Req#req{state = cancelled})}
  418
+    end.
  419
+
  420
+%% schedule a retry to occur after Interval milliseconds.
  421
+%% do not clear the pid from pending. the exit handler will do that.
  422
+schedule_retry(Interval, Ref, State = #state{pending = Pending}) ->
  423
+    case lists:keyfind(Ref, #req.ref, Pending) of
  424
+        false ->
  425
+            %% this should never happen
  426
+            lager:error("ConnectionManager: failed to find connection ref while scheduling retry."),
  427
+            State;
  428
+        Req ->
  429
+            case Req#req.state of
  430
+                cancelled ->
  431
+                    %% the request was cancelled, so no rescheduling wanted.
  432
+                    State;
  433
+                _ ->
  434
+                    %% reschedule request to happen in the future
  435
+                    erlang:send_after(Interval, self(), {retry_req, Ref}),
  436
+                    State#state{pending = lists:keystore(Req#req.ref, #req.ref, Pending,
  437
+                                                         Req#req{cur = undefined})}
  438
+            end
  439
+    end.
  440
+
  441
+%% Start process to make connection to available endpoints. Return a reference for
  442
+%% this connection attempt.
  443
+start_request(#req{state=cancelled}, State) ->
  444
+    State;
  445
+start_request(Req = #req{ref=Ref, target=Target, spec=ClientSpec, strategy=Strategy},
  446
+              State) ->
  447
+    case locate_endpoints(Target, Strategy, State#state.locators) of
  448
+        {ok, []} ->
  449
+            %% locators provided no addresses
  450
+            gen_server:cast(?SERVER, {conmgr_no_endpoints, Ref}),
  451
+            Interval = app_helper:get_env(riak_core, connmgr_no_endpoint_retry,
  452
+                                         ?DEFAULT_RETRY_NO_ENDPOINTS),
  453
+            lager:debug("Connection Manager located no endpoints for: ~p. Will retry.", [Target]),
  454
+            %% schedule a retry and exit
  455
+            schedule_retry(Interval, Ref, State);
  456
+        {ok, EpAddrs } ->
  457
+            lager:debug("Connection Manager located endpoints: ~p", [EpAddrs]),
  458
+            AllEps = update_endpoints(EpAddrs, State#state.endpoints),
  459
+            TryAddrs = filter_blacklisted_endpoints(EpAddrs, AllEps),
  460
+            lager:debug("Connection Manager trying endpoints: ~p", [TryAddrs]),
  461
+            Pid = spawn_link(
  462
+                    fun() -> exit(try connection_helper(Ref, ClientSpec, Strategy, TryAddrs)
  463
+                                  catch T:R -> {exception, {T, R}}
  464
+                                  end)
  465
+                    end),
  466
+            State#state{endpoints = AllEps,
  467
+                        pending = lists:keystore(Ref, #req.ref, State#state.pending,
  468
+                                                 Req#req{pid = Pid,
  469
+                                                         state = connecting,
  470
+                                                         cur = undefined})};
  471
+        {error, Reason} ->
  472
+            fail_request(Reason, Req, State)
  473
+    end.
  474
+
  475
+%% reset the backoff delay to zero for all endpoints
  476
+reset_backoff(Endpoints) ->
  477
+    orddict:map(fun(_Addr,EP) -> EP#ep{backoff_delay = 0} end,Endpoints).
  478
+
  479
+%% increase the backoff delay, but cap at a maximum
  480
+increase_backoff(0) ->
  481
+    ?INITIAL_BACKOFF;
  482
+increase_backoff(Delay) when Delay > ?MAX_BACKOFF ->
  483
+    ?MAX_BACKOFF;
  484
+increase_backoff(Delay) ->
  485
+    2 * Delay.
  486
+
  487
+%% Convert an inet:address to a string if needed.
  488
+string_of_ip(IP) when is_tuple(IP) ->    
  489
+    inet_parse:ntoa(IP);
  490
+string_of_ip(IP) ->
  491
+    IP.
  492
+
  493
+string_of_ipport({IP,Port}) ->
  494
+    string_of_ip(IP) ++ ":" ++ erlang:integer_to_list(Port).
  495
+
  496
+%% A spawned process that will walk down the list of endpoints and try them
  497
+%% all until exhausting the list. This process is responsible for waiting for
  498
+%% the backoff delay for each endpoint.
  499
+connection_helper(Ref, _Protocol, _Strategy, []) ->
  500
+    %% exhausted the list of endpoints. let server start new helper process
  501
+    {error, endpoints_exhausted, Ref};
  502
+connection_helper(Ref, Protocol, Strategy, [Addr|Addrs]) ->
  503
+    {{ProtocolId, _Foo},_Bar} = Protocol,
  504
+    %% delay by the backoff_delay for this endpoint.
  505
+    {ok, BackoffDelay} = gen_server:call(?SERVER, {get_endpoint_backoff, Addr}),
  506
+    lager:debug("Holding off ~p seconds before trying ~p at ~p",
  507
+               [(BackoffDelay/1000), ProtocolId, string_of_ipport(Addr)]),
  508
+    timer:sleep(BackoffDelay),
  509
+    case gen_server:call(?SERVER, {should_try_endpoint, Ref, Addr}) of
  510
+        true ->
  511
+            lager:debug("Trying connection to: ~p at ~p", [ProtocolId, string_of_ipport(Addr)]),
  512
+            ?TRACE(?debugMsg("Attempting riak_core_connection:sync_connect/2")),
  513
+            case riak_core_connection:sync_connect(Addr, Protocol) of
  514
+                ok ->
  515
+                    ok;
  516
+                {error, Reason} ->
  517
+                    %% notify connection manager this EP failed and try next one
  518
+                    gen_server:cast(?SERVER, {endpoint_failed, Addr, Reason, ProtocolId}),
  519
+                    connection_helper(Ref, Protocol, Strategy, Addrs)
  520
+            end;
  521
+        _ ->
  522
+            %% connection request has been cancelled
  523
+            lager:debug("Ignoring connection to: ~p at ~p because it was cancelled",
  524
+                       [ProtocolId, string_of_ipport(Addr)]),
  525
+            {ok, cancelled}
  526
+    end.
  527
+
  528
+locate_endpoints({Type, Name}, Strategy, Locators) ->
  529
+    case orddict:find(Type, Locators) of
  530
+        {ok, Locate} ->
  531
+            case Locate(Name, Strategy) of
  532
+                error ->
  533
+                    {error, {bad_target_name_args, Type, Name}};
  534
+                Addrs ->
  535
+                    Addrs
  536
+            end;
  537
+        error ->
  538
+            {error, {unknown_target_type, Type}}
  539
+    end.
  540
+
  541
+%% Make note of the failed connection attempt and update
  542
+%% our book keeping for that endpoint. Black-list it, and
  543
+%% adjust a backoff timer so that we wait a while before
  544
+%% trying this endpoint again.
  545
+fail_endpoint(Addr, Reason, ProtocolId, State) ->
  546
+    %% update the stats module
  547
+    Stat = {conn_error, Reason},
  548
+    riak_core_connection_mgr_stats:update(Stat, Addr, ProtocolId),
  549
+    %% update the endpoint
  550
+    Fun = fun(EP=#ep{backoff_delay = Backoff, failures = Failures}) ->
  551
+                  erlang:send_after(Backoff, self(), {backoff_timer, Addr}),
  552
+                  EP#ep{failures = orddict:update_counter(Reason, 1, Failures),
  553
+                        nb_failures = EP#ep.nb_failures + 1,
  554
+                        backoff_delay = increase_backoff(Backoff),
  555
+                        last_fail_time = os:timestamp(),
  556
+                        next_try_secs = Backoff/1000,
  557
+                        is_black_listed = true}
  558
+          end,
  559
+    update_endpoint(Addr, Fun, State).
  560
+
  561
+connect_endpoint(Addr, State) ->
  562
+    update_endpoint(Addr, fun(EP) ->
  563
+                                  EP#ep{is_black_listed = false,
  564
+                                        nb_success = EP#ep.nb_success + 1,
  565
+                                        next_try_secs = 0,
  566
+                                        backoff_delay = 0}
  567
+                          end, State).
  568
+
  569
+%% Return the current backoff delay for the named Address,
  570
+%% or if we can't find that address in the endpoints - the
  571
+%% initial backoff.
  572
+get_endpoint_backoff(Addr, EPs) ->
  573
+    case orddict:find(Addr, EPs) of
  574
+        error ->
  575
+            0;
  576
+        {ok, EP} ->
  577
+            EP#ep.backoff_delay
  578
+    end.
  579
+
  580
+update_endpoint(Addr, Fun, State = #state{endpoints = EPs}) ->
  581
+    case orddict:find(Addr, EPs) of
  582
+        error ->
  583
+            EP2 = Fun(#ep{addr = Addr}),
  584
+            State#state{endpoints = orddict:store(Addr,EP2,EPs)};
  585
+        {ok, EP} ->
  586
+            EP2 = Fun(EP),
  587
+            State#state{endpoints = orddict:store(Addr,EP2,EPs)}
  588
+    end.
  589
+
  590
+fail_request(Reason, #req{ref = Ref, spec = Spec},
  591
+             State = #state{pending = Pending}) ->
  592
+    %% Tell the module it failed
  593
+    {Proto, {_TcpOptions, Module,Args}} = Spec,
  594
+    ?TRACE(?debugFmt("module ~p getting connect_failed", [Module])),
  595
+    Module:connect_failed(Proto, {error, Reason}, Args),
  596
+    %% Remove the request from the pending list
  597
+    State#state{pending = lists:keydelete(Ref, #req.ref, Pending)}.
  598
+
  599
+update_endpoints(Addrs, Endpoints) ->
  600
+    %% add addr to Endpoints if not already there
  601
+    Fun = (fun(Addr, EPs) ->
  602
+                   case orddict:is_key(Addr, Endpoints) of
  603
+                       true -> EPs;
  604
+                       false ->
  605
+                           EP = #ep{addr=Addr},
  606
+                           orddict:store(Addr, EP, EPs)
  607
+                   end
  608
+           end),
  609
+    lists:foldl(Fun, Endpoints, Addrs).
  610
+
  611
+%% Return the addresses of non-blacklisted endpoints that are also
  612
+%% members of the list EpAddrs.
  613
+filter_blacklisted_endpoints(EpAddrs, AllEps) ->
  614
+    PredicateFun = (fun(Addr) ->
  615
+                            case orddict:find(Addr, AllEps) of
  616
+                                {ok, EP} ->
  617
+                                    EP#ep.is_black_listed == false;
  618
+                                error ->
  619
+                                    false
  620
+                            end
  621
+                    end),
  622
+    lists:filter(PredicateFun, EpAddrs).
273  src/riak_core_connection_mgr_stats.erl
... ...
@@ -0,0 +1,273 @@
  1
+%% -------------------------------------------------------------------
  2
+%%
  3
+%% Riak Core Connection Manager Statistics
  4
+%%    collect, aggregate, and provide stats for connections made by 
  5
+%%    the connection manager
  6
+%%
  7
+%% Copyright (c) 2013 Basho Technologies, Inc.  All Rights Reserved.
  8
+%%
  9
+%% This file is provided to you under the Apache License,
  10
+%% Version 2.0 (the "License"); you may not use this file
  11
+%% except in compliance with the License.  You may obtain
  12
+%% a copy of the License at
  13
+%%
  14
+%%   http://www.apache.org/licenses/LICENSE-2.0
  15
+%%
  16
+%% Unless required by applicable law or agreed to in writing,
  17
+%% software distributed under the License is distributed on an
  18
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  19
+%% KIND, either express or implied.  See the License for the
  20
+%% specific language governing permissions and limitations
  21
+%% under the License.
  22
+%%
  23
+%% -------------------------------------------------------------------
  24
+
  25
+-module(riak_core_connection_mgr_stats).
  26
+-author("Chris Tilt").
  27
+
  28
+-behaviour(gen_server).
  29
+
  30
+%% API
  31
+-export([start_link/0,
  32
+         get_stats/0,
  33
+         get_stats_by_ip/1,
  34
+         get_stats_by_protocol/1,
  35
+         get_consolidated_stats/0,
  36
+         update/3,
  37
+         register_stats/0,
  38
+         produce_stats/0]).
  39
+
  40
+%% gen_server callbacks
  41
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  42
+         terminate/2, code_change/3]).
  43
+
  44
+-define(SERVER, ?MODULE).
  45
+-define(APP, riak_conn_mgr_stats).
  46
+
  47
+start_link() ->
  48
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
  49
+
  50
+register_stats() ->
  51
+    [(catch folsom_metrics:delete_metric(Stat)) || Stat <- folsom_metrics:get_metrics(),
  52
+                                                   is_tuple(Stat), element(1, Stat) == ?APP],
  53
+    [register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
  54
+    riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
  55
+
  56
+%% @spec get_stats() -> proplist()
  57
+%% @doc Return all stats from the cached value. This will refresh
  58
+%% the cache if it's been over 5 seconds since the last query.
  59
+%% When the cache needs to get the latest values, it will call our
  60
+%% produce_stats() function.
  61
+get_stats() ->
  62
+    case riak_core_stat_cache:get_stats(?APP) of
  63
+        {ok, Stats, _TS} ->
  64
+            Stats;
  65
+        Error -> Error
  66
+    end.
  67
+
  68
+get_consolidated_stats() ->
  69
+    Strings = [format_stat(Stat) || Stat <- get_stats()],
  70
+    stats_as_atoms(Strings).
  71
+
  72
+%% Sort and convert {K,V} stats to atoms
  73
+stats_as_atoms(StringStats) ->
  74
+    lists:map(fun({S,V}) -> {list_to_atom(S), V} end, lists:sort(StringStats)).
  75
+
  76
+format_stat({{?APP, conn_error, StatName}, [{count,N},{one,_W}]}) ->
  77
+    {"conn_error_" ++ atom_to_list(StatName), N};
  78
+format_stat({{?APP, conn_error, StatName, total}, N}) ->
  79
+    {"conn_error_" ++ atom_to_list(StatName) ++ "_total", N};
  80
+format_stat({{?APP, conn_error, StatName, Addr, total}, N}) when not is_atom(Addr) ->
  81
+    {string_of_ipaddr(Addr) ++ "_"
  82
+     ++ "conn_error_"
  83
+     ++ atom_to_list(StatName)
  84
+     ++ "_total", N};
  85
+format_stat({{?APP, conn_error, StatName, Addr, ProtocolId, total},N}) ->
  86
+    {string_of_ipaddr(Addr) ++ "_"
  87
+     ++ "conn_error_"
  88
+     ++ atom_to_list(ProtocolId) ++ "_"
  89
+     ++ atom_to_list(StatName)
  90
+     ++ "_total" , N};
  91
+format_stat({{?APP, conn_error, StatName, ProtocolId, total},N}) ->
  92
+    {atom_to_list(ProtocolId) ++ "_"
  93
+     ++ "conn_error_"
  94
+     ++ atom_to_list(StatName)
  95
+     ++ "_total", N};
  96
+format_stat({{?APP, conn_error, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) ->
  97
+    {string_of_ipaddr(Addr) ++ "_"
  98
+     ++ "conn_error_"
  99
+     ++ atom_to_list(ProtocolId) ++ "_"
  100
+     ++ atom_to_list(StatName), N};
  101
+format_stat({{?APP, conn_error, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) ->
  102
+    {string_of_ipaddr(Addr) ++ "_"
  103
+     ++ "conn_error_"
  104
+     ++ atom_to_list(StatName), N};
  105
+format_stat({{?APP, conn_error, StatName, ProtocolId},[{count,N},{one,_W}]}) ->
  106
+    {"conn_error_" ++ atom_to_list(ProtocolId) ++ "_" ++ atom_to_list(StatName), N};
  107
+
  108
+format_stat({{?APP, StatName},[{count,N},{one,_W}]}) ->
  109
+    {atom_to_list(StatName), N};
  110
+format_stat({{?APP, StatName, total},N}) ->
  111
+    {atom_to_list(StatName) ++ "_total", N};
  112
+format_stat({{?APP, StatName, Addr, total},N}) when not is_atom(Addr) ->
  113
+    {string_of_ipaddr(Addr)
  114
+     ++ "_" ++ atom_to_list(StatName) ++ "_total", N};
  115
+format_stat({{?APP, StatName, Addr},[{count,N},{one,_W}]}) when not is_atom(Addr) ->
  116
+    {string_of_ipaddr(Addr) ++ "_" ++ atom_to_list(StatName),N};
  117
+format_stat({{?APP, StatName, ProtocolId, total},N}) when is_atom(ProtocolId) ->
  118
+    {atom_to_list(ProtocolId)  ++ "_" ++ atom_to_list(StatName) ++ "_total", N};
  119
+format_stat({{?APP, StatName, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
  120
+    {atom_to_list(ProtocolId)  ++ "_" ++ atom_to_list(StatName), N};
  121
+format_stat({{?APP, StatName, Addr, ProtocolId, total},N}) when is_atom(ProtocolId) ->
  122
+    {string_of_ipaddr(Addr)
  123
+     ++ "_" ++ atom_to_list(ProtocolId) 
  124
+     ++ "_" ++ atom_to_list(StatName)
  125
+     ++ "_total", N};
  126
+format_stat({{?APP, StatName, Addr, ProtocolId},[{count,N},{one,_W}]}) when is_atom(ProtocolId) ->
  127
+    {string_of_ipaddr(Addr)
  128
+     ++ "_" ++ atom_to_list(ProtocolId) 
  129
+     ++ "_" ++ atom_to_list(StatName), N}.
  130
+
  131
+string_of_ipaddr({IP, Port}) when is_list(IP) ->
  132
+    lists:flatten(io_lib:format("~s:~p", [IP, Port]));
  133
+string_of_ipaddr({IP, Port}) when is_tuple(IP) ->
  134
+    lists:flatten(io_lib:format("~s:~p", [inet_parse:ntoa(IP), Port])).
  135
+
  136
+%% Get stats filtered by given IP address
  137
+get_stats_by_ip({_IP, _Port}=Addr) ->
  138
+    AllStats = get_stats(),
  139
+    Stats = lists:filter(fun(S) -> predicate_by_ip(S,Addr) end, AllStats),
  140
+    stats_as_atoms([format_stat(Stat) || Stat <- Stats]).
  141
+
  142
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, total},_Value}, MatchAddr) ->
  143
+    true;
  144
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId, total},_Value}, MatchAddr) ->
  145
+    true;
  146
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr, _ProtocolId},_Value}, MatchAddr) ->
  147
+    true;
  148
+predicate_by_ip({{_App, conn_error, _StatName, MatchAddr},_Value}, MatchAddr) ->
  149
+    true;
  150
+predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId},_Value}, _MatchAddr) ->
  151
+    false;
  152
+predicate_by_ip({{_App, conn_error, _StatName, _ProtocolId, total},_Value}, _MatchAddr) ->
  153
+    false;
  154
+predicate_by_ip({{_App, _StatName, MatchAddr},_Value}, MatchAddr) ->