Skip to content

Commit

Permalink
Change blocking checkout interface to checkout(PoolName, Block)
Browse files Browse the repository at this point in the history
  • Loading branch information
Devin Torres committed Sep 29, 2011
1 parent 0514787 commit 9733a39
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 171 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -154,5 +154,5 @@ Authors
License
-------
Poolboy is available in the public domain (see `UNLICENSE`).
Poolboy is also available under the Apache License (see `LICENSE`), meant
especially for jurisdictions that do not recognize public domain works.
Poolboy is also optionally available under the Apache License (see `LICENSE`),
meant especially for jurisdictions that do not recognize public domain works.
Binary file modified rebar
Binary file not shown.
2 changes: 1 addition & 1 deletion src/poolboy.app.src
Expand Up @@ -2,5 +2,5 @@
{description, "A hunky Erlang worker pool factory"},
{vsn, "0.3"},
{applications, [kernel, stdlib]},
{registered,[]}
{registered, []}
]}.
96 changes: 51 additions & 45 deletions src/poolboy.erl
Expand Up @@ -3,18 +3,22 @@
-module(poolboy).
-behaviour(gen_fsm).

-export([checkout/1, checkin/2, start_link/1, init/1, ready/2, ready/3,
overflow/2, overflow/3, full/2, full/3, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([start_link/1, checkout/1, checkout/2, checkin/2]).
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
code_change/4]).

-record(state, {workers, worker_sup, waiting=queue:new(), monitors=[],
size=5, overflow=0, max_overflow=10, checkout_blocks=true}).
size=5, overflow=0, max_overflow=10}).

checkin(Pool, Worker) ->
gen_fsm:send_event(Pool, {checkin, Worker}).

checkout(Pool) ->
gen_fsm:sync_send_event(Pool, checkout).
checkout(Pool, true).

checkout(Pool, Block) ->
gen_fsm:sync_send_event(Pool, {checkout, Block}).

start_link(Args) ->
case proplists:get_value(name, Args) of
Expand All @@ -35,8 +39,6 @@ init([{size, PoolSize} | Rest], State) ->
init(Rest, State#state{size=PoolSize});
init([{max_overflow, MaxOverflow} | Rest], State) ->
init(Rest, State#state{max_overflow=MaxOverflow});
init([{checkout_blocks, CheckoutBlocks} | Rest], State) ->
init(Rest, State#state{checkout_blocks=CheckoutBlocks});
init([_ | Rest], State) ->
init(Rest, State);
init([], #state{size=Size, worker_sup=Sup}=State) ->
Expand All @@ -53,9 +55,10 @@ ready({checkin, Pid}, State) ->
ready(_Event, State) ->
{next_state, ready, State}.

ready(checkout, {FromPid, _} = From, #state{workers=Workers, worker_sup=Sup,
max_overflow=MaxOverflow,
checkout_blocks=Blocks}=State) ->
ready({checkout, Block}, {FromPid, _}=From, #state{workers=Workers,
worker_sup=Sup,
max_overflow=MaxOverflow
}=State) ->
case queue:out(Workers) of
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
Expand All @@ -68,8 +71,7 @@ ready(checkout, {FromPid, _} = From, #state{workers=Workers, worker_sup=Sup,
{reply, Pid, overflow, State#state{workers=Empty,
monitors=Monitors,
overflow=1}};
{empty, Empty} when Blocks == false ->
%% don't block the calling process
{empty, Empty} when Block =:= false ->
{reply, full, full, State#state{workers=Empty}};
{empty, Empty} ->
Waiting = State#state.waiting,
Expand All @@ -88,18 +90,18 @@ overflow({checkin, Pid}, #state{overflow=Overflow}=State) ->
overflow(_Event, State) ->
{next_state, overflow, State}.

overflow(checkout, From, #state{overflow=Overflow,
max_overflow=MaxOverflow,
checkout_blocks=Blocks}=State) when Overflow >= MaxOverflow ->
case Blocks of
overflow({checkout, Block}, From, #state{overflow=Overflow,
max_overflow=MaxOverflow
}=State) when Overflow >= MaxOverflow ->
case Block of
false ->
{reply, full, full, State};
_ ->
Block ->
Waiting = State#state.waiting,
{next_state, full, State#state{waiting=queue:in(From, Waiting)}}
end;
overflow(checkout, {From, _}, #state{worker_sup=Sup,
overflow=Overflow}=State) ->
overflow({checkout, _Block}, {From, _},
#state{worker_sup=Sup, overflow=Overflow}=State) ->
{Pid, Ref} = new_worker(Sup, From),
Monitors = [{Pid, Ref} | State#state.monitors],
{reply, Pid, overflow, State#state{monitors=Monitors,
Expand All @@ -108,7 +110,7 @@ overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.

full({checkin, Pid}, #state{waiting=Waiting, max_overflow=MaxOverflow,
overflow=Overflow}=State) ->
overflow=Overflow}=State) ->
case queue:out(Waiting) of
{{value, {FromPid, _}=From}, Left} ->
Ref = erlang:monitor(process, FromPid),
Expand All @@ -122,37 +124,38 @@ full({checkin, Pid}, #state{waiting=Waiting, max_overflow=MaxOverflow,
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
false -> []
end,
{next_state, ready, State#state{workers=Workers, waiting=Empty,
{next_state, ready, State#state{workers=Workers,
waiting=Empty,
monitors=Monitors}};
{empty, Empty} ->
dismiss_worker(Pid),
{next_state, overflow, State#state{waiting=Empty,
overflow=Overflow-1}}
overflow=Overflow-1}}
end;
full(_Event, State) ->
{next_state, full, State}.

full(checkout, _From, #state{checkout_blocks=false}=State) ->
full({checkout, false}, _From, State) ->
{reply, full, full, State};
full(checkout, From, #state{waiting=Waiting}=State) ->
full({checkout, _Block}, From, #state{waiting=Waiting}=State) ->
{next_state, full, State#state{waiting=queue:in(From, Waiting)}};
full(_Event, _From, State) ->
{reply, ok, full, State}.

handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.

handle_sync_event(get_avail_workers, _From, StateName, #state{workers=Workers}=State) ->
WorkerList = queue:to_list(Workers),
{reply, WorkerList, StateName, State};
handle_sync_event(get_all_workers, _From, StateName, #state{worker_sup=Sup}=State) ->
WorkerList = supervisor:which_children(Sup),
{reply, WorkerList, StateName, State};
{next_state, StateName, State}.

handle_sync_event(get_avail_workers, _From, StateName,
#state{workers=Workers}=State) ->
{reply, queue:to_list(Workers), StateName, State};
handle_sync_event(get_all_workers, _From, StateName,
#state{worker_sup=Sup}=State) ->
{reply, supervisor:which_children(Sup), StateName, State};
handle_sync_event(stop, _From, _StateName, State) ->
{stop, normal, ok, State};
{stop, normal, ok, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = {error, invalid_message},
{reply, Reply, StateName, State}.
Reply = {error, invalid_message},
{reply, Reply, StateName, State}.

handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
case lists:keytake(Ref, 2, State#state.monitors) of
Expand All @@ -163,7 +166,8 @@ handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
overflow=Overflow,
waiting=Waiting,
max_overflow=MaxOverflow}=State) ->
max_overflow=MaxOverflow
}=State) ->
Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
{value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
false -> []
Expand All @@ -180,29 +184,31 @@ handle_info({'EXIT', Pid, _}, StateName, #state{worker_sup=Sup,
overflow=Overflow-1}};
full when MaxOverflow < 1 ->
case queue:out(Waiting) of
{{value, {FromPid, _} = From}, LeftWaiting} ->
{{value, {FromPid, _}=From}, LeftWaiting} ->
MonitorRef = erlang:monitor(process, FromPid),
Monitors2 = [{FromPid, MonitorRef} | Monitors],
NewWorker = new_worker(Sup),
gen_fsm:reply(From, NewWorker),
gen_fsm:reply(From, new_worker(Sup)),
{next_state, full, State#state{waiting=LeftWaiting,
monitors=Monitors2}};
monitors=Monitors2}};
{empty, Empty} ->
{next_state, ready, State#state{monitors=Monitors,waiting=Empty,
workers=queue:in(new_worker(Sup), State#state.workers)}}
Workers2 = queue:in(new_worker(Sup), State#state.workers),
{next_state, ready, State#state{monitors=Monitors,
waiting=Empty,
workers=Workers2}}
end;
full when Overflow =< MaxOverflow ->
case queue:out(Waiting) of
{{value, {FromPid, _} = From}, LeftWaiting} ->
{{value, {FromPid, _}=From}, LeftWaiting} ->
MonitorRef = erlang:monitor(process, FromPid),
Monitors2 = [{FromPid, MonitorRef} | Monitors],
NewWorker = new_worker(Sup),
gen_fsm:reply(From, NewWorker),
{next_state, full, State#state{waiting=LeftWaiting,
monitors=Monitors2}};
monitors=Monitors2}};
{empty, Empty} ->
{next_state, overflow, State#state{monitors=Monitors,
overflow=Overflow-1, waiting=Empty}}
overflow=Overflow-1,
waiting=Empty}}
end;
full ->
{next_state, full, State#state{monitors=Monitors,
Expand Down
9 changes: 3 additions & 6 deletions test/poolboy_test_worker.erl
Expand Up @@ -2,14 +2,11 @@

-behaviour(gen_server).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

-record(state, {
}).

-include_lib("eunit/include/eunit.hrl").
-record(state, {}).

start_link(_Args) ->
gen_server:start_link(?MODULE, [], []).
Expand Down

0 comments on commit 9733a39

Please sign in to comment.