Permalink
Browse files

Merge pull request #7 from basho/adt-quickcheck

QuickCheck Poolboy
  • Loading branch information...
2 parents 5b6c058 + fce0d9f commit 75572c2f535e23f5234d6d09abcc5705f9985a99 Devin Torres committed Jan 26, 2012
Showing with 405 additions and 102 deletions.
  1. +181 −102 src/poolboy.erl
  2. +224 −0 test/poolboy_eqc.erl
View
@@ -35,7 +35,7 @@ checkout(Pool, Block) ->
-spec checkout(Pool :: node(), Block :: boolean(), Timeout :: timeout())
-> pid() | full.
checkout(Pool, Block, Timeout) ->
- gen_fsm:sync_send_event(Pool, {checkout, Block}, Timeout).
+ gen_fsm:sync_send_event(Pool, {checkout, Block, Timeout}, Timeout).
-spec checkin(Pool :: node(), Worker :: pid()) -> ok.
checkin(Pool, Worker) ->
@@ -66,21 +66,31 @@ init([{stop_fun, StopFun} | Rest], State) when is_function(StopFun) ->
init(Rest, State#state{worker_stop=StopFun});
init([_ | Rest], State) ->
init(Rest, State);
-init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun}=State) ->
+init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun,
+ max_overflow=MaxOverflow}=State) ->
Workers = prepopulate(Size, Sup, InitFun),
- {ok, ready, State#state{workers=Workers}}.
+ StartState = case queue:len(Workers) of
+ 0 when MaxOverflow ==0 -> full;
+ 0 -> overflow;
+ _ -> ready
+ end,
+ {ok, StartState, State#state{workers=Workers}}.
ready({checkin, Pid}, State) ->
- Workers = queue:in(Pid, State#state.workers),
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> State#state.monitors
- end,
- {next_state, ready, State#state{workers=Workers, monitors=Monitors}};
+ case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Monitors} ->
+ erlang:demonitor(Ref),
+ Workers = queue:in(Pid, State#state.workers),
+ {next_state, ready, State#state{workers=Workers,
+ monitors=Monitors}};
+ false ->
+ %% unknown process checked in, ignore it
+ {next_state, ready, State}
+ end;
ready(_Event, State) ->
{next_state, ready, State}.
-ready({checkout, Block}, {FromPid, _}=From, State) ->
+ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
#state{workers = Workers,
worker_sup = Sup,
max_overflow = MaxOverflow,
@@ -89,8 +99,16 @@ ready({checkout, Block}, {FromPid, _}=From, State) ->
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
Monitors = [{Pid, Ref} | State#state.monitors],
- {reply, Pid, ready, State#state{workers=Left,
- monitors=Monitors}};
+ NextState = case queue:len(Left) of
+ 0 when MaxOverflow == 0 ->
+ full;
+ 0 ->
+ overflow;
+ _ ->
+ ready
+ end,
+ {reply, Pid, NextState, State#state{workers=Left,
+ monitors=Monitors}};
{empty, Empty} when MaxOverflow > 0 ->
{Pid, Ref} = new_worker(Sup, FromPid, InitFun),
Monitors = [{Pid, Ref} | State#state.monitors],
@@ -102,81 +120,105 @@ ready({checkout, Block}, {FromPid, _}=From, State) ->
{empty, Empty} ->
Waiting = State#state.waiting,
{next_state, full, State#state{workers=Empty,
- waiting=queue:in(From, Waiting)}}
+ waiting=add_waiting(From, Timeout, Waiting)}}
end;
ready(_Event, _From, State) ->
{reply, ok, ready, State}.
-overflow({checkin, Pid}, #state{overflow=1}=State) ->
- StopFun = State#state.worker_stop,
- dismiss_worker(Pid, StopFun),
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> []
- end,
- {next_state, ready, State#state{overflow=0, monitors=Monitors}};
+overflow({checkin, Pid}, #state{overflow=0}=State) ->
+ case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Monitors} ->
+ erlang:demonitor(Ref),
+ NextState = case State#state.size > 0 of
+ true -> ready;
+ _ -> overflow
+ end,
+ {next_state, NextState, State#state{overflow=0, monitors=Monitors,
+ workers=queue:in(Pid, State#state.workers)}};
+ false ->
+ %% unknown process checked in, ignore it
+ {next_state, overflow, State}
+ end;
overflow({checkin, Pid}, State) ->
#state{overflow=Overflow, worker_stop=StopFun} = State,
- dismiss_worker(Pid, StopFun),
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> State#state.monitors
- end,
- {next_state, overflow, State#state{overflow=Overflow-1,
- monitors=Monitors}};
+ case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Monitors} ->
+ dismiss_worker(Pid, StopFun),
+ erlang:demonitor(Ref),
+ {next_state, overflow, State#state{overflow=Overflow-1,
+ monitors=Monitors}};
+ _ ->
+ %% unknown process checked in, ignore it
+ {next_state, overflow, State}
+ end;
overflow(_Event, State) ->
{next_state, overflow, State}.
-overflow({checkout, Block}, From, #state{overflow=Overflow,
+overflow({checkout, Block, Timeout}, From, #state{overflow=Overflow,
max_overflow=MaxOverflow
}=State) when Overflow >= MaxOverflow ->
case Block of
false ->
{reply, full, full, State};
Block ->
Waiting = State#state.waiting,
- {next_state, full, State#state{waiting=queue:in(From, Waiting)}}
+ {next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}}
end;
-overflow({checkout, _Block}, {From, _}, State) ->
- #state{worker_sup=Sup, overflow=Overflow, worker_init=InitFun} = State,
+overflow({checkout, _Block, _Timeout}, {From, _}, State) ->
+ #state{worker_sup=Sup, overflow=Overflow,
+ worker_init=InitFun, max_overflow=MaxOverflow} = State,
{Pid, Ref} = new_worker(Sup, From, InitFun),
Monitors = [{Pid, Ref} | State#state.monitors],
- {reply, Pid, overflow, State#state{monitors=Monitors,
- overflow=Overflow+1}};
+ NewOverflow = Overflow + 1,
+ Next = case NewOverflow >= MaxOverflow of
+ true -> full;
+ _ -> overflow
+ end,
+ {reply, Pid, Next, State#state{monitors=Monitors,
+ overflow=NewOverflow}};
overflow(_Event, _From, State) ->
{reply, ok, overflow, State}.
full({checkin, Pid}, State) ->
#state{waiting = Waiting, max_overflow = MaxOverflow,
overflow = Overflow, worker_stop = StopFun} = State,
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref0}, Left0} -> erlang:demonitor(Ref0), Left0;
- false -> State#state.monitors
- end,
- case queue:out(Waiting) of
- {{value, {FromPid, _}=From}, Left} ->
- Ref = erlang:monitor(process, FromPid),
- Monitors1 = [{Pid, Ref} | Monitors],
- gen_fsm:reply(From, Pid),
- {next_state, full, State#state{waiting=Left,
- monitors=Monitors1}};
- {empty, Empty} when MaxOverflow < 1 ->
- Workers = queue:in(Pid, State#state.workers),
- {next_state, ready, State#state{workers=Workers, waiting=Empty,
- monitors=Monitors}};
- {empty, Empty} ->
- dismiss_worker(Pid, StopFun),
- {next_state, overflow, State#state{waiting=Empty,
- monitors=Monitors,
- overflow=Overflow-1}}
+ case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref0}, Monitors} ->
+ erlang:demonitor(Ref0),
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ Ref = erlang:monitor(process, FromPid),
+ Monitors1 = [{Pid, Ref} | Monitors],
+ gen_fsm:reply(From, Pid),
+ {next_state, full, State#state{waiting=Left,
+ monitors=Monitors1}};
+ _ ->
+ %% replay this event with cleaned up waiting queue
+ full({checkin, Pid}, State#state{waiting=Left})
+ end;
+ {empty, Empty} when MaxOverflow < 1 ->
+ Workers = queue:in(Pid, State#state.workers),
+ {next_state, ready, State#state{workers=Workers, waiting=Empty,
+ monitors=Monitors}};
+ {empty, Empty} ->
+ dismiss_worker(Pid, StopFun),
+ {next_state, overflow, State#state{waiting=Empty,
+ monitors=Monitors,
+ overflow=Overflow-1}}
+ end;
+ false ->
+ %% unknown process checked in, ignore it
+ {next_state, full, State}
end;
full(_Event, State) ->
{next_state, full, State}.
-full({checkout, false}, _From, State) ->
+full({checkout, false, _Timeout}, _From, State) ->
{reply, full, full, State};
-full({checkout, _Block}, From, #state{waiting=Waiting}=State) ->
- {next_state, full, State#state{waiting=queue:in(From, Waiting)}};
+full({checkout, _Block, Timeout}, From, #state{waiting=Waiting}=State) ->
+ {next_state, full, State#state{waiting=add_waiting(From, Timeout, Waiting)}};
full(_Event, _From, State) ->
{reply, ok, full, State}.
@@ -198,6 +240,10 @@ handle_sync_event(stop, _From, _StateName,State) ->
Sup = State#state.worker_sup,
exit(Sup, shutdown),
{stop, normal, ok, State};
+handle_sync_event(status, _From, StateName, State) ->
+ {reply, {StateName, queue:len(State#state.workers),
+ State#state.overflow, length(State#state.monitors)},
+ StateName, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = {error, invalid_message},
{reply, Reply, StateName, State}.
@@ -216,51 +262,75 @@ handle_info({'EXIT', Pid, _}, StateName, State) ->
waiting = Waiting,
max_overflow = MaxOverflow,
worker_init = InitFun} = State,
- Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
- {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
- false -> []
- end,
- case StateName of
- ready ->
- W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
- {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
- monitors=Monitors}};
- overflow when Overflow =< 1 ->
- {next_state, ready, State#state{monitors=Monitors, overflow=0}};
- overflow ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1}};
- full when MaxOverflow < 1 ->
- case queue:out(Waiting) of
- {{value, {FromPid, _}=From}, LeftWaiting} ->
- MonitorRef = erlang:monitor(process, FromPid),
- Monitors2 = [{FromPid, MonitorRef} | Monitors],
- gen_fsm:reply(From, new_worker(Sup, InitFun)),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- {empty, Empty} ->
- Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
- {next_state, ready, State#state{monitors=Monitors,
- waiting=Empty,
- workers=Workers2}}
- end;
- full when Overflow =< MaxOverflow ->
- case queue:out(Waiting) of
- {{value, {FromPid, _}=From}, LeftWaiting} ->
- MonitorRef = erlang:monitor(process, FromPid),
- Monitors2 = [{FromPid, MonitorRef} | Monitors],
- NewWorker = new_worker(Sup, InitFun),
- gen_fsm:reply(From, NewWorker),
- {next_state, full, State#state{waiting=LeftWaiting,
- monitors=Monitors2}};
- {empty, Empty} ->
- {next_state, overflow, State#state{monitors=Monitors,
- overflow=Overflow-1,
- waiting=Empty}}
- end;
- full ->
- {next_state, full, State#state{monitors=Monitors,
- overflow=Overflow-1}}
+ case lists:keytake(Pid, 1, State#state.monitors) of
+ {value, {_, Ref}, Monitors} -> erlang:demonitor(Ref),
+ case StateName of
+ ready ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
+ monitors=Monitors}};
+ overflow when Overflow == 0 ->
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, ready, State#state{workers=queue:in(new_worker(Sup, InitFun), W),
+ monitors=Monitors}};
+ overflow ->
+ {next_state, overflow, State#state{monitors=Monitors,
+ overflow=Overflow-1}};
+ full when MaxOverflow < 1 ->
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ Monitors2 = [{FromPid, MonitorRef} | Monitors],
+ gen_fsm:reply(From, new_worker(Sup, InitFun)),
+ {next_state, full, State#state{waiting=LeftWaiting,
+ monitors=Monitors2}};
+ _ ->
+ %% replay it
+ handle_info({'EXIT', Pid, foo}, StateName, State#state{waiting=LeftWaiting})
+ end;
+ {empty, Empty} ->
+ Workers2 = queue:in(new_worker(Sup, InitFun), State#state.workers),
+ {next_state, ready, State#state{monitors=Monitors,
+ waiting=Empty,
+ workers=Workers2}}
+ end;
+ full when Overflow =< MaxOverflow ->
+ case queue:out(Waiting) of
+ {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} ->
+ case wait_valid(StartTime, Timeout) of
+ true ->
+ MonitorRef = erlang:monitor(process, FromPid),
+ Monitors2 = [{FromPid, MonitorRef} | Monitors],
+ NewWorker = new_worker(Sup, InitFun),
+ gen_fsm:reply(From, NewWorker),
+ {next_state, full, State#state{waiting=LeftWaiting,
+ monitors=Monitors2}};
+ _ ->
+ %% replay it
+ handle_info({'EXIT', Pid, foo}, StateName, State#state{waiting=LeftWaiting})
+ end;
+ {empty, Empty} ->
+ {next_state, overflow, State#state{monitors=Monitors,
+ overflow=Overflow-1,
+ waiting=Empty}}
+ end;
+ full ->
+ {next_state, full, State#state{monitors=Monitors,
+ overflow=Overflow-1}}
+ end;
+ _ ->
+ %% not a monitored pid, is it in the worker queue?
+ case queue:member(Pid, State#state.workers) of
+ true ->
+ %% a checked in worker died
+ W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers),
+ {next_state, StateName, State#state{workers=queue:in(new_worker(Sup, InitFun), W)}};
+ _ ->
+ %% completely irrelevant pid exited, don't change anything
+ {next_state, StateName, State}
+ end
end;
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
@@ -294,3 +364,12 @@ prepopulate(0, _Sup, Workers, _InitFun) ->
Workers;
prepopulate(N, Sup, Workers, InitFun) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup, InitFun), Workers), InitFun).
+
+add_waiting(From, Timeout, Queue) ->
+ queue:in({From, Timeout, os:timestamp()}, Queue).
+
+wait_valid(infinity, _) ->
+ true;
+wait_valid(StartTime, Timeout) ->
+ Waited = timer:now_diff(os:timestamp(), StartTime),
+ (Waited div 1000) < Timeout.
Oops, something went wrong.

0 comments on commit 75572c2

Please sign in to comment.