Skip to content

Commit

Permalink
pool must now be started with a host and port before being used.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Weldon committed Apr 12, 2011
1 parent 7e32657 commit a3a982a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 39 deletions.
9 changes: 5 additions & 4 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ Overview
--------
riakpool is an application for maintaining a dynamic pool of protocol buffer
client connections to a riak database. It ensures that a given connection can
only be in use by one external process at a time. Currently, riakpool will only
connect to port 8087 on 127.0.0.1.
only be in use by one external process at a time.

Installation
------------
Expand All @@ -22,7 +21,9 @@ the complete documentation by running `make doc`.

1> application:start(riakpool).
ok
2> riakpool:execute(fun(C) -> riakc_pb_socket:ping(C) end).
2> riakpool:start_pool("127.0.0.1", 8087).
ok
3> riakpool:execute(fun(C) -> riakc_pb_socket:ping(C) end).
{ok,pong}
3> riakpool:count().
4> riakpool:count().
1
117 changes: 85 additions & 32 deletions src/riakpool.erl
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
%% @author David Weldon
%% @doc riakpool implements a simple pool of riak protocol buffer clients. When
%% the server is started, a single client connection is established. In order to
%% use a connection, a call to {@link execute/1} must be made. This will check
%% out a connection from the pool, use it, and check it back in. This ensures
%% that a given connection can only be in use by one external process at a time.
%% If no existing connections are found, a new one will be established. Note
%% this means that the pool will always be the size of the last peak need. The
%% number of connections can be checked with {@link count/0}.
%%
%% Currently, riakpool will only connect to port 8087 on 127.0.0.1.
%% @doc riakpool implements a pool of riak protocol buffer clients. After the
%% server is started, the pool must be initialized with a call to
%% {@link start_pool/0} or {@link start_pool/2}. In order to use a connection, a
%% call to {@link execute/1} must be made. This will check out a connection from
%% the pool, use it, and check it back in. This ensures that a given connection
%% can only be in use by one external process at a time. If no existing
%% connections are found, a new one will be established. Note this means that
%% the pool will always be the size of the last peak need. The number of
%% connections can be checked with {@link count/0}.

-module(riakpool).
-behaviour(gen_server).
-export([count/0,
execute/1,
start_link/0,
start_pool/0,
start_pool/2,
stop/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {host, port, pids}).

%% @type host() = string() | atom().

%% @spec count() -> integer()
%% @doc Returns the number of connections as seen by the supervisor.
Expand Down Expand Up @@ -51,34 +55,56 @@ execute(Fun) ->
%% @doc Starts the server.
start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% @spec start_pool() -> ok | {error, any()}
%% @doc Starts a connection pool to a server listening on {"127.0.0.1", 8087}.
%% Note that a pool can only be started once.
start_pool() -> start_pool("127.0.0.1", 8087).

%% @spec start_pool(host(), integer()) -> ok | {error, any()}
%% @doc Starts a connection pool to a server listening on {`Host', `Port'}.
%% Note that a pool can only be started once.
start_pool(Host, Port) when is_integer(Port) ->
gen_server:call(?MODULE, {start_pool, Host, Port}).

%% @spec stop() -> ok
%% @doc Stops the server.
stop() -> gen_server:cast(?MODULE, stop).

%% @hidden
init([]) ->
{ok, Pid} = new_connection(),
Pids = queue:in(Pid, queue:new()),
{ok, Pids}.
init([]) -> {ok, undefined}.

%% @hidden
handle_call(check_out, _From, Pids) ->
case next_pid(Pids) of
{ok, Pid, NewPids} -> {reply, {ok, Pid}, NewPids};
{error, NewPids} -> {reply, error, NewPids}
handle_call({start_pool, Host, Port}, _From, undefined) ->
case new_connection(Host, Port) of
{ok, Pid} ->
Pids = queue:in(Pid, queue:new()),
State = #state{host=Host, port=Port, pids=Pids},
{reply, ok, State};
error -> {reply, {error, connection_error}, undefined}
end;
handle_call({start_pool, _Host, _Port}, _From, State=#state{}) ->
{reply, {error, pool_already_started}, State};
handle_call(check_out, _From, undefined) -> {reply, error, undefined};
handle_call(check_out, _From, State=#state{host=Host, port=Port, pids=Pids}) ->
case next_pid(Host, Port, Pids) of
{ok, Pid, NewPids} -> {reply, {ok, Pid}, State#state{pids=NewPids}};
{error, NewPids} -> {reply, error, State#state{pids=NewPids}}
end;
handle_call(_Request, _From, State) -> {reply, ok, State}.

%% @hidden
handle_cast({check_in, Pid}, Pids) -> {noreply, queue:in(Pid, Pids)};
handle_cast({check_in, Pid}, State=#state{pids=Pids}) ->
NewPids = queue:in(Pid, Pids),
{noreply, State#state{pids=NewPids}};
handle_cast(stop, State) -> {stop, normal, State};
handle_cast(_Msg, State) -> {noreply, State}.

%% @hidden
handle_info(_Info, State) -> {noreply, State}.

%% @hidden
terminate(_Reason, Pids) ->
terminate(_Reason, undefined) -> ok;
terminate(_Reason, #state{pids=Pids}) ->
StopFun =
fun(Pid) ->
case is_process_alive(Pid) of
Expand All @@ -91,31 +117,32 @@ terminate(_Reason, Pids) ->
%% @hidden
code_change(_OldVsn, State, _Extra) -> {ok, State}.

%% @spec new_connection() -> {ok, Pid} | error
%% @spec new_connection(host(), integer()) -> {ok, Pid} | error
%% @doc Returns {ok, Pid} if a new connection was established and added to the
%% supervisor, otherwise returns error.
new_connection() ->
case supervisor:start_child(riakpool_connection_sup, []) of
new_connection(Host, Port) ->
case supervisor:start_child(riakpool_connection_sup, [Host, Port]) of
{ok, Pid} when is_pid(Pid) -> {ok, Pid};
{ok, Pid, _} when is_pid(Pid) -> {ok, Pid};
_ -> error
end.

%% @spec next_pid(queue()) -> {ok, pid(), queue()} | {error, queue()}
%% @spec next_pid(host(), integer(), queue()) -> {ok, pid(), queue()} |
%% {error, queue()}
%% @doc Recursively dequeues Pids in search of a live connection. Dead
%% connections are removed from the queue as it is searched. If no connection
%% pid could be found, a new one will be established. Returns {ok, Pid, NewPids}
%% where NewPids is the queue after any necessary dequeues. Returns error if no
%% live connection could be found and no new connection could be established.
next_pid(Pids) ->
next_pid(Host, Port, Pids) ->
case queue:out(Pids) of
{{value, Pid}, NewPids} ->
case is_process_alive(Pid) of
true -> {ok, Pid, NewPids};
false -> next_pid(NewPids)
false -> next_pid(Host, Port, NewPids)
end;
{empty, _} ->
case new_connection() of
case new_connection(Host, Port) of
{ok, Pid} -> {ok, Pid, Pids};
error -> {error, Pids}
end
Expand All @@ -127,6 +154,7 @@ next_pid(Pids) ->
execute_test() ->
riakpool_connection_sup:start_link(),
riakpool:start_link(),
riakpool:start_pool(),
?assertEqual(1, count()),
Fun1 = fun(C) -> riakc_pb_socket:ping(C) end,
Fun2 = fun(_) -> riakc_pb_socket:ping(1) end,
Expand All @@ -137,23 +165,48 @@ next_pid(Pids) ->
timer:sleep(10),
?assertEqual(0, count()).

execute_error_test() ->
riakpool:start_link(),
Fun = fun(C) -> riakc_pb_socket:ping(C) end,
?assertEqual(error, execute(Fun)),
riakpool:stop(),
timer:sleep(10),
?assertEqual(0, count()).

start_pool_test() ->
riakpool_connection_sup:start_link(),
riakpool:start_link(),
{H, P} = {"localhost", 8000},
?assertEqual({error, connection_error}, riakpool:start_pool(H, P)),
?assertEqual(ok, riakpool:start_pool()),
?assertEqual({error, pool_already_started}, riakpool:start_pool()),
riakpool:stop(),
timer:sleep(10),
?assertEqual(0, count()).

next_pid_test() ->
riakpool_connection_sup:start_link(),
{H, P} = {"localhost", 8087},
?assertEqual(0, count()),
{ok, P1} = new_connection(),
{ok, P2} = new_connection(),
{ok, P3} = new_connection(),
{ok, P1} = new_connection(H, P),
{ok, P2} = new_connection(H, P),
{ok, P3} = new_connection(H, P),
?assertEqual(3, count()),
riakc_pb_socket:stop(P1),
riakc_pb_socket:stop(P2),
?assertEqual(1, count()),
Q0 = queue:new(),
Q = queue:from_list([P1, P2, P3]),
?assertMatch({ok, P3, Q0}, next_pid(Q)),
?assertMatch({ok, P3, Q0}, next_pid(H, P, Q)),
riakc_pb_socket:stop(P3),
{ok, P4, Q0} = next_pid(Q0),
{ok, P4, Q0} = next_pid(H, P, Q0),
?assertEqual(1, count()),
riakc_pb_socket:stop(P4),
?assertEqual(0, count()).

next_pid_error_test() ->
{H, P} = {"localhost", 8000},
Q0 = queue:new(),
?assertMatch({error, Q0}, next_pid(H, P, Q0)).

-endif.
12 changes: 12 additions & 0 deletions src/riakpool_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@ start(_StartType, _StartArgs) ->

stop(_State) ->
ok.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

app_test() ->
application:start(riakpool),
riakpool:start_pool(),
Fun = fun(C) -> riakc_pb_socket:ping(C) end,
?assertEqual({ok, pong}, riakpool:execute(Fun)),
application:stop(riakpool).

-endif.
4 changes: 1 addition & 3 deletions src/riakpool_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(ADDRESS, "127.0.0.1").
-define(PORT, 8087).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
{ok, {{simple_one_for_one, 0, 1},
[{connections, {riakc_pb_socket, start_link, [?ADDRESS, ?PORT]},
[{connections, {riakc_pb_socket, start_link, []},
temporary, brutal_kill, worker, [riakc_pb_socket]}]}}.

0 comments on commit a3a982a

Please sign in to comment.