Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Change gen_fsm to gen_server behaviour #20

Merged
merged 2 commits into from

3 participants

@ddosia

Drop gen_fsm in favor of gen_server. 1/4 code from poolboy.erl is gone.
Is it worth to be merged into master?

@Vagabond
Collaborator

This even seems to pass the quickcheck tests, which is nice. I'll try to do some more review soon.

@devinus
Owner

I'll try to do some benchmarking this week.

@Vagabond
Collaborator

Actually, longer EQC runs are exposing some issues, I think there might be a bug lurking.

@ddosia

Any way to reproduce this tests?
Despite this issue, how about the idea? Currently i trying to add new functionality to poolboy, and it is much easier in gen_server version then in original.

@devinus
Owner

The idea is great. As long as performance doesn't suffer, who can argue against simpler code?

@Vagabond
Collaborator

Still trying to decide if the quickcheck problems are just line noise, sorry.

@Vagabond
Collaborator

But I think we should pursue this branch, just want to make sure this change is solid first.

@Vagabond
Collaborator

It looks like the EQC test had a timeout in it that was too short and it'd randomly timeout trying to do a checkout. I think we can ignore the failure and move forward with this.

@devinus devinus was assigned
@devinus
Owner

@Vagabond I don't think there's any performance difference. As long as there are no regressions I'm going to merge this. Any last thoughts?

@Vagabond
Collaborator

+1 from me.

@devinus devinus merged commit 6ddc61a into devinus:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 1, 2012
  1. @ddosia
Commits on Dec 2, 2012
  1. @ddosia

    Fix wrong status

    ddosia authored
This page is out of date. Refresh to see the latest.
Showing with 132 additions and 212 deletions.
  1. +103 −207 src/poolboy.erl
  2. +3 −3 test/poolboy_eqc.erl
  3. +26 −2 test/poolboy_tests.erl
View
310 src/poolboy.erl
@@ -1,14 +1,13 @@
%% Poolboy - A hunky Erlang worker pool factory
-module(poolboy).
--behaviour(gen_fsm).
+-behaviour(gen_server).
-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
child_spec/2, child_spec/3, start/1, start/2, start_link/1,
start_link/2, stop/1, status/1]).
--export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
- handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
- code_change/4]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-define(TIMEOUT, 5000).
@@ -33,11 +32,11 @@ checkout(Pool, Block) ->
-spec checkout(Pool :: node(), Block :: boolean(), Timeout :: timeout())
-> pid() | full.
checkout(Pool, Block, Timeout) ->
- gen_fsm:sync_send_event(Pool, {checkout, Block, Timeout}, Timeout).
+ gen_server:call(Pool, {checkout, Block, Timeout}, Timeout).
-spec checkin(Pool :: node(), Worker :: pid()) -> ok.
checkin(Pool, Worker) when is_pid(Worker) ->
- gen_fsm:send_event(Pool, {checkin, Worker}).
+ gen_server:cast(Pool, {checkin, Worker}).
-spec transaction(Pool :: node(), Fun :: fun((Worker :: pid()) -> any()))
-> any().
@@ -87,180 +86,93 @@ start_link(PoolArgs, WorkerArgs) ->
-spec stop(Pool :: node()) -> ok.
stop(Pool) ->
- gen_fsm:sync_send_all_state_event(Pool, stop).
+ gen_server:call(Pool, stop).
-spec status(Pool :: node()) -> {atom(), integer(), integer(), integer()}.
status(Pool) ->
- gen_fsm:sync_send_all_state_event(Pool, status).
+ gen_server:call(Pool, status).
init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
Monitors = ets:new(monitors, [private]),
- init(PoolArgs, WorkerArgs, #state{waiting=Waiting, monitors=Monitors}).
+ init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).
init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
- init(Rest, WorkerArgs, State#state{supervisor=Sup});
+ init(Rest, WorkerArgs, State#state{supervisor = Sup});
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
- init(Rest, WorkerArgs, State#state{size=Size});
+ init(Rest, WorkerArgs, State#state{size = Size});
init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
- init(Rest, WorkerArgs, State#state{max_overflow=MaxOverflow});
+ init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
-init([], _WorkerArgs, #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) ->
+init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Workers = prepopulate(Size, Sup),
- StartState = case Size of
- Size when Size < 1, MaxOverflow < 1 -> full;
- Size when Size < 1 -> overflow;
- Size -> ready
- end,
- {ok, StartState, State#state{workers=Workers}}.
-
-ready({checkin, Pid}, State) ->
- Monitors = State#state.monitors,
+ {ok, State#state{workers = Workers}}.
+
+handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
true = erlang:demonitor(Ref),
true = ets:delete(Monitors, Pid),
- Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers}};
+ NewState = handle_checkin(Pid, State),
+ {noreply, NewState};
[] ->
- {next_state, ready, State}
+ {noreply, State}
end;
-ready(_Event, State) ->
- {next_state, ready, State}.
-ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_call({checkout, Block, Timeout}, {FromPid, _} = From, State) ->
#state{supervisor = Sup,
workers = Workers,
monitors = Monitors,
+ overflow = Overflow,
max_overflow = MaxOverflow} = State,
case queue:out(Workers) of
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
- NextState = case queue:is_empty(Left) of
- true when MaxOverflow < 1 -> full;
- true -> overflow;
- false -> ready
- end,
- {reply, Pid, NextState, State#state{workers=Left}};
- {empty, Empty} when MaxOverflow > 0 ->
+ {reply, Pid, State#state{workers = Left}};
+ {empty, Empty} when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, Ref} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
- {reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
+ {reply, Pid, State#state{workers = Empty, overflow = Overflow + 1}};
{empty, Empty} when Block =:= false ->
- {reply, full, full, State#state{workers=Empty}};
+ {reply, full, State#state{workers = Empty}};
{empty, Empty} ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
- {next_state, full, State#state{workers=Empty, waiting=Waiting}}
+ {noreply, State#state{workers = Empty, waiting = Waiting}}
end;
-ready(_Event, _From, State) ->
- {reply, ok, ready, State}.
-overflow({checkin, Pid}, #state{overflow=0}=State) ->
- Monitors = State#state.monitors,
- case ets:lookup(Monitors, Pid) of
- [{Pid, Ref}] ->
- true = erlang:demonitor(Ref),
- true = ets:delete(Monitors, Pid),
- NextState = case State#state.size > 0 of
- true -> ready;
- false -> overflow
- end,
- Workers = queue:in(Pid, State#state.workers),
- {next_state, NextState, State#state{overflow=0, workers=Workers}};
- [] ->
- {next_state, overflow, State}
- end;
-overflow({checkin, Pid}, State) ->
- #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State,
- case ets:lookup(Monitors, Pid) of
- [{Pid, Ref}] ->
- ok = dismiss_worker(Sup, Pid),
- true = erlang:demonitor(Ref),
- true = ets:delete(Monitors, Pid),
- {next_state, overflow, State#state{overflow=Overflow-1}};
- [] ->
- {next_state, overflow, State}
- end;
-overflow(_Event, State) ->
- {next_state, overflow, State}.
-
-overflow({checkout, Block, Timeout}, From,
- #state{overflow=Overflow,
- max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow ->
- case Block of
- true ->
- Waiting = add_waiting(From, Timeout, State#state.waiting),
- {next_state, full, State#state{waiting=Waiting}};
- false ->
- {reply, full, full, State}
- end;
-overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
- #state{supervisor = Sup,
- overflow = Overflow,
- max_overflow = MaxOverflow} = State,
- {Pid, Ref} = new_worker(Sup, From),
- true = ets:insert(State#state.monitors, {Pid, Ref}),
- NewOverflow = Overflow + 1,
- NextState = case NewOverflow >= MaxOverflow of
- true -> full;
- false -> overflow
- end,
- {reply, Pid, NextState, State#state{overflow=NewOverflow}};
-overflow(_Event, _From, State) ->
- {reply, ok, overflow, State}.
-
-full({checkin, Pid}, State) ->
- #state{monitors = Monitors} = State,
- case ets:lookup(Monitors, Pid) of
- [{Pid, Ref}] ->
- true = erlang:demonitor(Ref),
- true = ets:delete(Monitors, Pid),
- checkin_while_full(Pid, State);
- [] ->
- {next_state, full, State}
- end;
-full(_Event, State) ->
- {next_state, full, State}.
-
-full({checkout, true, Timeout}, From, State) ->
- Waiting = add_waiting(From, Timeout, State#state.waiting),
- {next_state, full, State#state{waiting=Waiting}};
-full({checkout, false, _Timeout}, _From, State) ->
- {reply, full, full, State};
-full(_Event, _From, State) ->
- {reply, ok, full, State}.
-
-handle_event(_Event, StateName, State) ->
- {next_state, StateName, State}.
-
-handle_sync_event(status, _From, StateName, State) ->
- {reply, {StateName, queue:len(State#state.workers), State#state.overflow,
- ets:info(State#state.monitors, size)},
- StateName, State};
-handle_sync_event(get_avail_workers, _From, StateName, State) ->
+handle_call(status, _From, State) ->
+ #state{workers = Workers,
+ monitors = Monitors,
+ overflow = Overflow} = State,
+ StateName = state_name(State),
+ {reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State};
+handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
WorkerList = queue:to_list(Workers),
- {reply, WorkerList, StateName, State};
-handle_sync_event(get_all_workers, _From, StateName, State) ->
+ {reply, WorkerList, State};
+handle_call(get_all_workers, _From, State) ->
Sup = State#state.supervisor,
WorkerList = supervisor:which_children(Sup),
- {reply, WorkerList, StateName, State};
-handle_sync_event(get_all_monitors, _From, StateName, State) ->
+ {reply, WorkerList, State};
+handle_call(get_all_monitors, _From, State) ->
Monitors = ets:tab2list(State#state.monitors),
- {reply, Monitors, StateName, State};
-handle_sync_event(stop, _From, _StateName, State) ->
+ {reply, Monitors, State};
+handle_call(stop, _From, State) ->
Sup = State#state.supervisor,
true = exit(Sup, shutdown),
{stop, normal, ok, State};
-handle_sync_event(_Event, _From, StateName, State) ->
+handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
- {reply, Reply, StateName, State}.
+ {reply, Reply, State}.
-handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
+handle_info({'DOWN', Ref, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', Ref}) of
[[Pid]] ->
Sup = State#state.supervisor,
@@ -270,42 +182,45 @@ handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
%% a race condition with messages waiting in the
%% mailbox.
true = ets:delete(State#state.monitors, Pid),
- handle_worker_exit(Pid, StateName, State);
+ NewState = handle_worker_exit(Pid, State),
+ {noreply, NewState};
[] ->
- {next_state, StateName, State}
+ {noreply, State}
end;
-handle_info({'EXIT', Pid, _Reason}, StateName, State) ->
+handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
true = erlang:demonitor(Ref),
true = ets:delete(Monitors, Pid),
- handle_worker_exit(Pid, StateName, State);
+ NewState = handle_worker_exit(Pid, State),
+ {noreply, NewState};
[] ->
case queue:member(Pid, State#state.workers) of
true ->
W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, StateName, State#state{workers=queue:in(new_worker(Sup), W)}};
+ {noreply, State#state{workers = queue:in(new_worker(Sup), W)}};
false ->
- {next_state, StateName, State}
+ {noreply, State}
end
end;
-handle_info(_Info, StateName, State) ->
- {next_state, StateName, State}.
-terminate(_Reason, _StateName, _State) ->
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, StateName, State, _Extra) ->
- {ok, StateName, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
start_pool(StartFun, PoolArgs, WorkerArgs) ->
case proplists:get_value(name, PoolArgs) of
undefined ->
- gen_fsm:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []);
+ gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []);
Name ->
- gen_fsm:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
+ gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
end.
new_worker(Sup) ->
@@ -341,83 +256,64 @@ wait_valid(StartTime, Timeout) ->
Waited = timer:now_diff(os:timestamp(), StartTime),
(Waited div 1000) < Timeout.
-checkin_while_full(Pid, State) ->
+handle_checkin(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
- max_overflow = MaxOverflow,
overflow = Overflow} = State,
case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
+ {{value, {{FromPid, _} = From, Timeout, StartTime}}, Left} ->
case wait_valid(StartTime, Timeout) of
true ->
Ref1 = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref1}),
- gen_fsm:reply(From, Pid),
- {next_state, full, State#state{waiting=Left}};
+ gen_server:reply(From, Pid),
+ State#state{waiting = Left};
false ->
- checkin_while_full(Pid, State#state{waiting=Left})
+ handle_checkin(Pid, State#state{waiting = Left})
end;
- {empty, Empty} when MaxOverflow < 1 ->
- Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers,
- waiting=Empty}};
- {empty, Empty} ->
+ {empty, Empty} when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
- {next_state, overflow, State#state{waiting=Empty,
- overflow=Overflow-1}}
+ State#state{waiting = Empty, overflow = Overflow - 1};
+ {empty, Empty} ->
+ Workers = queue:in(Pid, State#state.workers),
+ State#state{workers = Workers, waiting = Empty, overflow = 0}
end.
-handle_worker_exit(Pid, StateName, State) ->
+handle_worker_exit(Pid, State) ->
#state{supervisor = Sup,
- overflow = Overflow,
- waiting = Waiting,
monitors = Monitors,
- max_overflow = MaxOverflow} = State,
- case StateName of
- ready ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}};
- overflow when Overflow =:= 0 ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}};
- overflow ->
- {next_state, overflow, State#state{overflow=Overflow-1}};
- full when MaxOverflow < 1 ->
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(Sup),
- true = ets:insert(Monitors, {NewWorker, MonitorRef}),
- gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting}};
- false ->
- handle_worker_exit(Pid, StateName, State#state{waiting=LeftWaiting})
- end;
- {empty, Empty} ->
- Workers2 = queue:in(new_worker(Sup), State#state.workers),
- {next_state, ready, State#state{waiting=Empty,
- workers=Workers2}}
- end;
- full when Overflow =< MaxOverflow ->
- case queue:out(Waiting) of
- {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
- case wait_valid(StartTime, Timeout) of
- true ->
- MonitorRef = erlang:monitor(process, FromPid),
- NewWorker = new_worker(Sup),
- true = ets:insert(Monitors, {NewWorker, MonitorRef}),
- gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting}};
- _ ->
- handle_worker_exit(Pid, StateName, State#state{waiting=LeftWaiting})
- end;
- {empty, Empty} ->
- {next_state, overflow, State#state{overflow=Overflow-1,
- waiting=Empty}}
+ overflow = Overflow} = State,
+ case queue:out(State#state.waiting) of
+ {{value, {{FromPid, _} = From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ NewWorker = new_worker(State#state.supervisor),
+ true = ets:insert(Monitors, {NewWorker, MonitorRef}),
+ gen_server:reply(From, NewWorker),
+ State#state{waiting = LeftWaiting};
+ _ ->
+ handle_worker_exit(Pid, State#state{waiting = LeftWaiting})
end;
- full ->
- {next_state, full, State#state{overflow=Overflow-1}}
+ {empty, Empty} when Overflow > 0 ->
+ State#state{overflow = Overflow - 1, waiting = Empty};
+ {empty, Empty} ->
+ Workers = queue:in(
+ new_worker(Sup),
+ queue:filter(fun (P) -> P =/= Pid end, State#state.workers)
+ ),
+ State#state{workers = Workers, waiting = Empty}
end.
+
+state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
+ #state{max_overflow = MaxOverflow, workers = Workers} = State,
+ case queue:len(Workers) == 0 of
+ true when MaxOverflow < 1 -> full;
+ true -> overflow;
+ false -> ready
+ end;
+state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
+ full;
+state_name(_State) ->
+ overflow.
View
6 test/poolboy_eqc.erl
@@ -67,7 +67,7 @@ start_poolboy(Args) ->
Pid.
stop_poolboy(Pid) ->
- gen_fsm:sync_send_all_state_event(Pid, stop),
+ gen_server:call(Pid, stop),
timer:sleep(1).
checkout_nonblock(Pool) ->
@@ -78,7 +78,7 @@ checkout_block(Pool) ->
checkin(Pool, {Worker, _}) ->
Res = poolboy:checkin(Pool, Worker),
- gen_fsm:sync_send_all_state_event(Pool, get_avail_workers),
+ gen_server:call(Pool, get_avail_workers),
Res.
kill_worker({Worker, _}) ->
@@ -130,7 +130,7 @@ invariant(S = #state{pid=Pid},_) when Pid /= undefined ->
OverFlow = max(0, length(S#state.checked_out) - S#state.size),
Monitors = length(S#state.checked_out),
- RealStatus = gen_fsm:sync_send_all_state_event(Pid, status),
+ RealStatus = gen_server:call(Pid, status),
case RealStatus == {State, Workers, OverFlow, Monitors} of
true ->
true;
View
28 test/poolboy_tests.erl
@@ -3,7 +3,7 @@
-include_lib("eunit/include/eunit.hrl").
-define(sync(Pid, Event),
- gen_fsm:sync_send_all_state_event(Pid, Event)).
+ gen_server:call(Pid, Event)).
pool_test_() ->
{foreach,
@@ -369,7 +369,31 @@ checkin_after_exception_in_transaction() ->
pool_returns_status() ->
{ok, Pool} = new_pool(2, 0),
?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)),
- ok = ?sync(Pool, stop).
+ poolboy:checkout(Pool),
+ ?assertEqual({ready, 1, 0, 1}, poolboy:status(Pool)),
+ poolboy:checkout(Pool),
+ ?assertEqual({full, 0, 0, 2}, poolboy:status(Pool)),
+ ok = ?sync(Pool, stop),
+
+ {ok, Pool2} = new_pool(1, 1),
+ ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pool2)),
+ poolboy:checkout(Pool2),
+ ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pool2)),
+ poolboy:checkout(Pool2),
+ ?assertEqual({full, 0, 1, 2}, poolboy:status(Pool2)),
+ ok = ?sync(Pool2, stop),
+
+ {ok, Pool3} = new_pool(0, 2),
+ ?assertEqual({overflow, 0, 0, 0}, poolboy:status(Pool3)),
+ poolboy:checkout(Pool3),
+ ?assertEqual({overflow, 0, 1, 1}, poolboy:status(Pool3)),
+ poolboy:checkout(Pool3),
+ ?assertEqual({full, 0, 2, 2}, poolboy:status(Pool3)),
+ ok = ?sync(Pool3, stop),
+
+ {ok, Pool4} = new_pool(0, 0),
+ ?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)),
+ ok = ?sync(Pool4, stop).
new_pool(Size, MaxOverflow) ->
poolboy:start_link([{name, {local, poolboy_test}},
Something went wrong with that request. Please try again.