Permalink
Browse files

Manage connections with Poolboy

Instead of single named processes for BossCache and BossDB, use worker
pools managed by Poolboy. This should increase throughput in high-load
systems. Still needs to be tested, and options should be added for
configuring the number of processes in the pools.
  • Loading branch information...
Evan Miller
Evan Miller committed Feb 11, 2012
1 parent 863ba63 commit fd2df1d00432e5dd1f84495a735fb2285df960b3
View
@@ -3,10 +3,10 @@
{aleppo, ".*", {git, "git://github.com/evanmiller/aleppo.git", {tag, "1caff84da4"}}},
{bson, ".*", {git, "git://github.com/mongodb/bson-erlang.git", {tag, "adce0e94ab"}}},
{epgsql, ".*", {git, "git://github.com/wg/epgsql.git", {tag, "1.4"}}},
- {erlmc, ".*", {git, "git://github.com/bipthelin/erlmc.git", {tag, "0.4"}}},
+ {erlmc, ".*", {git, "git://github.com/bipthelin/erlmc.git", {tag, "HEAD"}}},
{medici, ".*", {git, "git://github.com/evanmiller/medici.git", {branch, "rebarify"}}},
{mongodb, ".*", {git, "git://github.com/mongodb/mongodb-erlang.git", {tag, "e5e20a0cbd"}}},
{mysql, ".*", {git, "git://github.com/dizzyd/erlang-mysql-driver.git", {tag, "16cae84b5e"}}},
- {riakc, "1.1.*", {git, "git://github.com/basho/riak-erlang-client", {tag, "aa5c64a6a04192662d9c"}}},
- {riakpool, "0.1", {git, "git://github.com/dweldon/riakpool", {tag, "HEAD"}}}
+ {poolboy, ".*", {git, "git://github.com/devinus/poolboy.git", {tag, "855802e0cc"}}},
+ {riakc, "1.1.*", {git, "git://github.com/basho/riak-erlang-client", {tag, "aa5c64a6a04192662d9c"}}}
]}.
View
@@ -3,20 +3,25 @@
-export([stop/0]).
-export([get/2, set/4, delete/2]).
+-define(POOLNAME, boss_cache_pool).
+
start() ->
- start([{adapter, boss_cache_adapter_memcached_bin}, {cache_servers, [{"127.0.0.1", 11211, 1}]}]).
+ Adapter = boss_cache_adapter_memcached_bin,
+ start([{adapter, Adapter}, {cache_servers, [{"127.0.0.1", 11211, 1}]}]).
start(Options) ->
+ Adapter = proplists:get_value(adapter, Options, boss_cache_adapter_memcached_bin),
+ Adapter:init(Options),
boss_cache_sup:start_link(Options).
stop() ->
ok.
set(Prefix, Key, Val, TTL) ->
- gen_server:call(?MODULE, {set, Prefix, Key, Val, TTL}).
+ boss_pool:call(?POOLNAME, {set, Prefix, Key, Val, TTL}).
get(Prefix, Key) ->
- gen_server:call(?MODULE, {get, Prefix, Key}).
+ boss_pool:call(?POOLNAME, {get, Prefix, Key}).
delete(Prefix, Key) ->
- gen_server:call(?MODULE, {delete, Prefix, Key}).
+ boss_pool:call(?POOLNAME, {delete, Prefix, Key}).
@@ -4,7 +4,7 @@
%% @spec behaviour_info( atom() ) -> [ {Function::atom(), Arity::integer()} ] | undefined
behaviour_info(callbacks) ->
[
- {start, 0}, {start, 1}, {stop, 1},
+ {start, 0}, {start, 1}, {stop, 1}, {init, 1},
{get, 3}, {set, 5}, {delete, 3}
];
behaviour_info(_Other) ->
@@ -15,7 +15,7 @@ start_link() ->
start_link([]).
start_link(Args) ->
- gen_server:start_link({local, boss_cache}, ?MODULE, Args, []).
+ gen_server:start_link(?MODULE, Args, []).
init(Options) ->
AdapterName = proplists:get_value(adapter, Options, memcached_bin),
View
@@ -14,10 +14,8 @@ start_link(StartArgs) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, StartArgs).
init(StartArgs) ->
- {ok, {{one_for_one, 10, 10}, [
- {cache_controller, {boss_cache_controller, start_link, [StartArgs]},
- permanent,
- 2000,
- worker,
- [boss_cache_controller]}
- ]}}.
+ Args = [{name, {local, boss_cache_pool}},
+ {worker_module, boss_cache_controller},
+ {size, 20}, {max_overflow, 40}|StartArgs],
+ PoolSpec = {cache_controller, {poolboy, start_link, [Args]}, permanent, 2000, worker, [poolboy]},
+ {ok, {{one_for_one, 10, 10}, [PoolSpec]}}.
View
@@ -29,8 +29,24 @@
data_type/2]).
-define(DEFAULT_TIMEOUT, (30 * 1000)).
+-define(POOLNAME, boss_db_pool).
start(Options) ->
+ AdapterName = proplists:get_value(adapter, Options, mock),
+ Adapter = list_to_atom(lists:concat(["boss_db_adapter_", AdapterName])),
+ Adapter:init(Options),
+ lists:foldr(fun(ShardOptions, Acc) ->
+ case proplists:get_value(db_shard_models, ShardOptions, []) of
+ [] -> Acc;
+ _ ->
+ ShardAdapter = case proplists:get_value(db_adapter, ShardOptions) of
+ undefined -> Adapter;
+ ShortName -> list_to_atom(lists:concat(["boss_db_adapter_", ShortName]))
+ end,
+ ShardAdapter:init(ShardOptions ++ Options),
+ Acc
+ end
+ end, [], proplists:get_value(shards, Options, [])),
boss_db_sup:start_link(Options).
stop() ->
@@ -40,7 +56,7 @@ stop() ->
%% @doc Find a BossRecord with the specified `Id'.
find("") -> undefined;
find(Key) when is_list(Key) ->
- gen_server:call(boss_db, {find, Key}, ?DEFAULT_TIMEOUT);
+ boss_pool:call(?POOLNAME, {find, Key}, ?DEFAULT_TIMEOUT);
find(_) ->
{error, invalid_id}.
@@ -82,8 +98,8 @@ find(Type, Conditions, Max, Skip, Sort) ->
%% sort them numerically.
find(Type, Conditions, Max, Skip, Sort, SortOrder) ->
- gen_server:call(boss_db, {find, Type, normalize_conditions(Conditions), Max, Skip, Sort, SortOrder},
- ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, {find, Type, normalize_conditions(Conditions), Max, Skip, Sort, SortOrder},
+ ?DEFAULT_TIMEOUT).
%% @spec count( Type::atom() ) -> integer()
%% @doc Count the number of BossRecords of type `Type' in the database.
@@ -94,14 +110,14 @@ count(Type) ->
%% @doc Count the number of BossRecords of type `Type' in the database matching
%% all of the given `Conditions'.
count(Type, Conditions) ->
- gen_server:call(boss_db, {count, Type, normalize_conditions(Conditions)}, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, {count, Type, normalize_conditions(Conditions)}, ?DEFAULT_TIMEOUT).
%% @spec counter( Id::string() ) -> integer()
%% @doc Treat the record associated with `Id' as a counter and return its value.
%% Returns 0 if the record does not exist, so to reset a counter just use
%% "delete".
counter(Key) ->
- gen_server:call(boss_db, {counter, Key}, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, {counter, Key}, ?DEFAULT_TIMEOUT).
%% @spec incr( Id::string() ) -> integer()
%% @doc Treat the record associated with `Id' as a counter and atomically increment its value by 1.
@@ -111,46 +127,47 @@ incr(Key) ->
%% @spec incr( Id::string(), Increment::integer() ) -> integer()
%% @doc Treat the record associated with `Id' as a counter and atomically increment its value by `Increment'.
incr(Key, Count) ->
- gen_server:call(boss_db, {incr, Key, Count}, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, {incr, Key, Count}, ?DEFAULT_TIMEOUT).
%% @spec delete( Id::string() ) -> ok | {error, Reason}
%% @doc Delete the BossRecord with the given `Id'.
delete(Key) ->
AboutToDelete = boss_db:find(Key),
case boss_record_lib:run_before_delete_hooks(AboutToDelete) of
ok ->
- case gen_server:call(boss_db, {delete, Key}, ?DEFAULT_TIMEOUT) of
+ Result = boss_pool:call(?POOLNAME, {delete, Key}, ?DEFAULT_TIMEOUT),
+ case Result of
ok ->
boss_news:deleted(Key, AboutToDelete:attributes()),
ok;
- RetVal ->
- RetVal
+ _ ->
+ Result
end;
{error, Reason} ->
{error, Reason}
end.
push() ->
- gen_server:call(boss_db, push, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, push, ?DEFAULT_TIMEOUT).
pop() ->
- gen_server:call(boss_db, pop, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, pop, ?DEFAULT_TIMEOUT).
depth() ->
- gen_server:call(boss_db, depth, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, depth, ?DEFAULT_TIMEOUT).
dump() ->
- gen_server:call(boss_db, depth, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, dump, ?DEFAULT_TIMEOUT).
%% @spec execute( Commands::iolist() ) -> RetVal
%% @doc Execute raw database commands on SQL databases
execute(Commands) ->
- gen_server:call(boss_db, {execute, Commands}, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, {execute, Commands}, ?DEFAULT_TIMEOUT).
%% @spec transaction( TransactionFun::function() ) -> {atomic, Result} | {aborted, Reason}
%% @doc Execute a fun inside a transaction.
transaction(TransactionFun) ->
- gen_server:call(boss_db, {transaction, TransactionFun}, ?DEFAULT_TIMEOUT).
+ boss_pool:call(?POOLNAME, {transaction, TransactionFun}, ?DEFAULT_TIMEOUT).
%% @spec save_record( BossRecord ) -> {ok, SavedBossRecord} | {error, [ErrorMessages]}
%% @doc Save (that is, create or update) the given BossRecord in the database.
@@ -176,7 +193,7 @@ save_record(Record) ->
end,
case HookResult of
{ok, PossiblyModifiedRecord} ->
- case gen_server:call(boss_db, {save_record, PossiblyModifiedRecord}, ?DEFAULT_TIMEOUT) of
+ case boss_pool:call(?POOLNAME, {save_record, PossiblyModifiedRecord}, ?DEFAULT_TIMEOUT) of
{ok, SavedRecord} ->
boss_record_lib:run_after_hooks(OldRecord, SavedRecord, IsNew),
{ok, SavedRecord};
View
@@ -4,7 +4,7 @@
%% @spec behaviour_info( atom() ) -> [ {Function::atom(), Arity::integer()} ] | undefined
behaviour_info(callbacks) ->
[
- {start, 0}, {start, 1}, {stop, 1},
+ {start, 0}, {start, 1}, {stop, 1}, {init, 1},
{find, 2}, {find, 7}, {count, 3},
{delete, 2}, {counter, 2}, {incr, 3}, {save_record, 2}
];
@@ -20,7 +20,7 @@ start_link() ->
start_link([]).
start_link(Args) ->
- gen_server:start_link({local, boss_db}, ?MODULE, Args, []).
+ gen_server:start_link(?MODULE, Args, []).
init(Options) ->
AdapterName = proplists:get_value(adapter, Options, mock),
@@ -51,13 +51,11 @@ init(Options) ->
handle_call({find, Key}, From, #state{ cache_enable = true, cache_prefix = Prefix } = State) ->
case boss_cache:get(Prefix, Key) of
undefined ->
- io:format("Not cached: ~p~n", [Key]),
{reply, Res, _} = handle_call({find, Key}, From, State#state{ cache_enable = false }),
boss_cache:set(Prefix, Key, Res, State#state.cache_ttl),
boss_news:set_watch(Key, lists:concat([Key, ", ", Key, ".*"]), fun boss_db_cache:handle_record_news/3, {Prefix, Key}, State#state.cache_ttl),
{reply, Res, State};
CachedValue ->
- io:format("Cached! ~p~n", [CachedValue]),
boss_news:extend_watch(Key),
{reply, CachedValue, State}
end;
@@ -70,14 +68,12 @@ handle_call({find, Type, Conditions, Max, Skip, Sort, SortOrder} = Cmd, From,
Key = {Type, Conditions, Max, Skip, Sort, SortOrder},
case boss_cache:get(Prefix, Key) of
undefined ->
- io:format("Not cached: ~p~n", [Key]),
{reply, Res, _} = handle_call(Cmd, From, State#state{ cache_enable = false }),
boss_cache:set(Prefix, Key, Res, State#state.cache_ttl),
boss_news:set_watch(Key, lists:concat([inflector:pluralize(atom_to_list(Type)), ", ", Type, "-*.*"]),
fun boss_db_cache:handle_collection_news/3, {Prefix, Key}, State#state.cache_ttl),
{reply, Res, State};
CachedValue ->
- io:format("Cached! ~p~n", [CachedValue]),
boss_news:extend_watch(Key),
{reply, CachedValue, State}
end;
View
@@ -14,10 +14,8 @@ start_link(StartArgs) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, StartArgs).
init(StartArgs) ->
- {ok, {{one_for_one, 10, 10}, [
- {db_controller, {boss_db_controller, start_link, [StartArgs]},
- permanent,
- 2000,
- worker,
- [boss_db_controller]}
- ]}}.
+ Args = [{name, boss_db_pool},
+ {worker_module, boss_db_controller},
+ {size, 5}, {max_overflow, 10}|StartArgs],
+ PoolSpec = {db_controller, {poolboy, start_link, [Args]}, permanent, 2000, worker, [poolboy]},
+ {ok, {{one_for_one, 10, 10}, [PoolSpec]}}.
@@ -1,20 +1,23 @@
-module(boss_cache_adapter_memcached_bin).
-behaviour(boss_cache_adapter).
--export([start/0, start/1, stop/1]).
+-export([init/1, start/0, start/1, stop/1]).
-export([get/3, set/5, delete/3]).
start() ->
start([]).
-start(Options) ->
- CacheServers = proplists:get_value(cache_servers, Options, [{"localhost", 11211, 1}]),
- ok = erlmc:start(CacheServers),
+start(_Options) ->
{ok, undefined}.
stop(_Conn) ->
erlmc:quit().
+init(Options) ->
+ CacheServers = proplists:get_value(cache_servers, Options, [{"localhost", 11211, 1}]),
+ ok = erlmc:start(CacheServers),
+ ok.
+
get(_Conn, Prefix, Key) ->
case erlmc:get(term_to_key(Prefix, Key)) of
<<>> ->
@@ -1,24 +1,23 @@
-module(boss_db_adapter_mnesia).
-behaviour(boss_db_adapter).
--export([start/0, start/1, stop/1, find/2, find/7]).
+-export([init/1, start/0, start/1, stop/1, find/2, find/7]).
-export([count/3, counter/2, incr/3, delete/2, save_record/2]).
-export([transaction/2]).
%-define(TRILLION, (1000 * 1000 * 1000 * 1000)).
+init([]) ->
+ application:start(mnesia).
+
% -----
start() ->
-%io:format("==> Start/0 Called~n"),
start([]).
start(_Options) ->
-%io:format("==> Start/1 Called~n"),
- application:start(mnesia),
{ok, undefined}.
% -----
stop(_) ->
-%io:format("==> Stop/0 Called~n"),
application:stop(mnesia).
% -----
@@ -1,22 +1,24 @@
% In-memory database for fast tests and easy setup
-module(boss_db_adapter_mock).
-behaviour(boss_db_adapter).
--export([start/0, start/1, stop/1]).
+-export([init/1, start/0, start/1, stop/1]).
-export([find/2, find/7, count/3, counter/2, incr/3, delete/2, save_record/2]).
-export([push/2, pop/2, dump/1, transaction/2]).
-start() ->
- start([]).
-
-start(Options) ->
+init(Options) ->
case proplists:get_value(is_master_node, Options, true) of
true ->
- {ok, MockSup} = boss_db_mock_sup:start_link(),
- {ok, MockSup};
+ boss_db_mock_sup:start_link();
false ->
- {ok, undefined}
+ ok
end.
+start() ->
+ start([]).
+
+start(_Options) ->
+ {ok, undefined}.
+
stop(undefined) ->
ok;
stop(MockSup) ->
@@ -1,6 +1,6 @@
-module(boss_db_adapter_mongodb).
-behaviour(boss_db_adapter).
--export([start/0, start/1, stop/1, find/2, find/7]).
+-export([init/1, start/0, start/1, stop/1, find/2, find/7]).
-export([count/3, counter/2, incr/2, incr/3, delete/2, save_record/2]).
-export([execute/2]).
-export([push/2, pop/2]).
@@ -14,6 +14,8 @@
-define(CONTAINS_FORMAT, "this.~s.indexOf('~s') != -1").
-define(NOT_CONTAINS_FORMAT, "this.~s.indexOf('~s') == -1").
+init(_Options) ->
+ application:start(mongodb).
start() ->
start([]).
@@ -24,7 +26,6 @@ start(Options) ->
Database = proplists:get_value(db_database, Options, test),
WriteMode = proplists:get_value(db_write_mode, Options, safe),
ReadMode = proplists:get_value(db_read_mode, Options, master),
- application:start(mongodb),
{ok, Connection} = mongo:connect({Host, Port}),
% We pass around arguments required by mongo:do/5
{ok, {WriteMode, ReadMode, Connection, Database}}.
Oops, something went wrong.

0 comments on commit fd2df1d

Please sign in to comment.