Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Checkin non-worker processes 10% of the time and fix resulting bugs

Checking in an invalid process corrupts the internal state similarly to
receiving EXITs from non-worker pids.
  • Loading branch information...
commit e6af0b6a65cc8405e17b71626cfd81fe3311882f 1 parent e964cc5
Andrew Thompson authored January 22, 2012
111  src/poolboy.erl
@@ -77,12 +77,16 @@ init([], #state{size=Size, worker_sup=Sup, worker_init=InitFun,
77 77
     {ok, StartState, State#state{workers=Workers}}.
78 78
 
79 79
 ready({checkin, Pid}, State) ->
80  
-    Workers = queue:in(Pid, State#state.workers),
81  
-    Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
82  
-        {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
83  
-        false -> State#state.monitors
84  
-    end,
85  
-    {next_state, ready, State#state{workers=Workers, monitors=Monitors}};
  80
+    case lists:keytake(Pid, 1, State#state.monitors) of
  81
+        {value, {_, Ref}, Monitors} ->
  82
+            erlang:demonitor(Ref),
  83
+            Workers = queue:in(Pid, State#state.workers),
  84
+            {next_state, ready, State#state{workers=Workers,
  85
+                                            monitors=Monitors}};
  86
+        false ->
  87
+            %% unknown process checked in, ignore it
  88
+            {next_state, ready, State}
  89
+    end;
86 90
 ready(_Event, State) ->
87 91
     {next_state, ready, State}.
88 92
 
@@ -122,27 +126,31 @@ ready(_Event, _From, State) ->
122 126
     {reply, ok, ready, State}.
123 127
 
124 128
 overflow({checkin, Pid}, #state{overflow=0}=State) ->
125  
-    %StopFun = State#state.worker_stop,
126  
-    %dismiss_worker(Pid, StopFun),
127  
-    Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
128  
-        {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
129  
-        false -> []
130  
-    end,
131  
-    NextState = case State#state.size > 0 of
132  
-        true -> ready;
133  
-        _ -> overflow
134  
-    end,
135  
-    {next_state, NextState, State#state{overflow=0, monitors=Monitors,
136  
-            workers=queue:in(Pid, State#state.workers)}};
  129
+    case lists:keytake(Pid, 1, State#state.monitors) of
  130
+        {value, {_, Ref}, Monitors} ->
  131
+            erlang:demonitor(Ref),
  132
+            NextState = case State#state.size > 0 of
  133
+                true -> ready;
  134
+                _ -> overflow
  135
+            end,
  136
+            {next_state, NextState, State#state{overflow=0, monitors=Monitors,
  137
+                                                workers=queue:in(Pid, State#state.workers)}};
  138
+        false ->
  139
+            %% unknown process checked in, ignore it
  140
+            {next_state, overflow, State}
  141
+    end;
137 142
 overflow({checkin, Pid}, State) ->
138 143
     #state{overflow=Overflow, worker_stop=StopFun} = State,
139  
-    dismiss_worker(Pid, StopFun),
140  
-    Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
141  
-        {value, {_, Ref}, Left} -> erlang:demonitor(Ref), Left;
142  
-        false -> State#state.monitors
143  
-    end,
144  
-    {next_state, overflow, State#state{overflow=Overflow-1,
145  
-                                       monitors=Monitors}};
  144
+    case lists:keytake(Pid, 1, State#state.monitors) of
  145
+        {value, {_, Ref}, Monitors} ->
  146
+            dismiss_worker(Pid, StopFun),
  147
+            erlang:demonitor(Ref),
  148
+            {next_state, overflow, State#state{overflow=Overflow-1,
  149
+                                               monitors=Monitors}};
  150
+        _ ->
  151
+            %% unknown process checked in, ignore it
  152
+            {next_state, overflow, State}
  153
+    end;
146 154
 overflow(_Event, State) ->
147 155
     {next_state, overflow, State}.
148 156
 
@@ -174,32 +182,35 @@ overflow(_Event, _From, State) ->
174 182
 full({checkin, Pid}, State) ->
175 183
     #state{waiting = Waiting, max_overflow = MaxOverflow,
176 184
            overflow = Overflow, worker_stop = StopFun} = State,
177  
-    Monitors = case lists:keytake(Pid, 1, State#state.monitors) of
178  
-        {value, {_, Ref0}, Left0} -> erlang:demonitor(Ref0), Left0;
179  
-        false -> State#state.monitors
180  
-    end,
181  
-    case queue:out(Waiting) of
182  
-        {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
183  
-            case wait_valid(StartTime, Timeout) of
184  
-                true ->
185  
-                    Ref = erlang:monitor(process, FromPid),
186  
-                    Monitors1 = [{Pid, Ref} | Monitors],
187  
-                    gen_fsm:reply(From, Pid),
188  
-                    {next_state, full, State#state{waiting=Left,
189  
-                                                   monitors=Monitors1}};
190  
-                _ ->
191  
-                    %% replay this event with cleaned up waiting queue
192  
-                    full({checkin, Pid}, State#state{waiting=Left})
  185
+    case lists:keytake(Pid, 1, State#state.monitors) of
  186
+        {value, {_, Ref0}, Monitors} ->
  187
+            erlang:demonitor(Ref0),
  188
+            case queue:out(Waiting) of
  189
+                {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
  190
+                    case wait_valid(StartTime, Timeout) of
  191
+                        true ->
  192
+                            Ref = erlang:monitor(process, FromPid),
  193
+                            Monitors1 = [{Pid, Ref} | Monitors],
  194
+                            gen_fsm:reply(From, Pid),
  195
+                            {next_state, full, State#state{waiting=Left,
  196
+                                                           monitors=Monitors1}};
  197
+                        _ ->
  198
+                            %% replay this event with cleaned up waiting queue
  199
+                            full({checkin, Pid}, State#state{waiting=Left})
  200
+                    end;
  201
+                {empty, Empty} when MaxOverflow < 1 ->
  202
+                    Workers = queue:in(Pid, State#state.workers),
  203
+                    {next_state, ready, State#state{workers=Workers, waiting=Empty,
  204
+                                                    monitors=Monitors}};
  205
+                {empty, Empty} ->
  206
+                    dismiss_worker(Pid, StopFun),
  207
+                    {next_state, overflow, State#state{waiting=Empty,
  208
+                                                       monitors=Monitors,
  209
+                                                       overflow=Overflow-1}}
193 210
             end;
194  
-        {empty, Empty} when MaxOverflow < 1 ->
195  
-            Workers = queue:in(Pid, State#state.workers),
196  
-            {next_state, ready, State#state{workers=Workers, waiting=Empty,
197  
-                                            monitors=Monitors}};
198  
-        {empty, Empty} ->
199  
-            dismiss_worker(Pid, StopFun),
200  
-            {next_state, overflow, State#state{waiting=Empty,
201  
-                                               monitors=Monitors,
202  
-                                               overflow=Overflow-1}}
  211
+        false ->
  212
+            %% unknown process checked in, ignore it
  213
+            {next_state, full, State}
203 214
     end;
204 215
 full(_Event, State) ->
205 216
     {next_state, full, State}.
12  test/poolboy_eqc.erl
@@ -33,7 +33,7 @@ command(S) ->
33 33
 			[{call, ?MODULE, checkout_nonblock, [S#state.pid]} || S#state.pid /= undefined] ++
34 34
 			%% checkout shrinks to checkout_nonblock so we can simplify counterexamples
35 35
 			[{call, ?MODULE, ?SHRINK(checkout_block, [checkout_nonblock]), [S#state.pid]} || S#state.pid /= undefined] ++
36  
-			[{call, ?MODULE, checkin, [S#state.pid, elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
  36
+			[{call, ?MODULE, checkin, [S#state.pid, fault({call, ?MODULE, spawn_process, []}, elements(S#state.checked_out))]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
37 37
 			[{call, ?MODULE, kill_worker, [elements(S#state.checked_out)]} || S#state.pid /= undefined, S#state.checked_out /= []] ++
38 38
 			[{call, ?MODULE, kill_idle_worker, [S#state.pid]} || S#state.pid /= undefined] ++
39 39
 			[{call, ?MODULE, spurious_exit, [S#state.pid]} || S#state.pid /= undefined]
@@ -42,6 +42,11 @@ command(S) ->
42 42
 make_args(_S, Size, Overflow) ->
43 43
 	[[{size, Size}, {max_overflow, Overflow}, {worker_module, poolboy_test_worker}, {name, {local, poolboy_eqc}}]].
44 44
 
  45
+spawn_process() ->
  46
+	spawn(fun() ->
  47
+				timer:sleep(5000)
  48
+		end).
  49
+
45 50
 spawn_linked_process(Pool) ->
46 51
 	Parent = self(),
47 52
 	Pid = spawn(fun() ->
@@ -98,8 +103,6 @@ precondition(S,{call,_,start_poolboy,_}) ->
98 103
 precondition(S,_) when S#state.pid == undefined ->
99 104
 	%% all other states need a running pool
100 105
 	false;
101  
-precondition(S, {call, _, checkin, [_Pool, Pid]}) ->
102  
-	lists:member(Pid, S#state.checked_out);
103 106
 precondition(S, {call, _, kill_worker, [Pid]}) ->
104 107
 	lists:member(Pid, S#state.checked_out);
105 108
 precondition(S,{call,_,kill_idle_worker,[_Pool]}) ->
@@ -183,6 +186,7 @@ next_state(S,_V,{call, _, spurious_exit, [_Pool]}) ->
183 186
 	S.
184 187
 
185 188
 prop_sequential() ->
  189
+	fault_rate(1, 10,
186 190
 		?FORALL(Cmds,commands(?MODULE),
187 191
 			?TRAPEXIT(
188 192
 				aggregate(command_names(Cmds), 
@@ -192,7 +196,7 @@ prop_sequential() ->
192 196
 							?WHENFAIL(io:format("History: ~p\nState: ~p\nRes: ~p\n~p\n",
193 197
 									[H,S,Res, zip(tl(Cmds), [Y || {_, Y} <- H])]),
194 198
 								Res == ok)
195  
-					end))).
  199
+					end)))).
196 200
 
197 201
 checkout_ok(S) ->
198 202
 	length(S#state.checked_out) < S#state.size + S#state.max_overflow.

0 notes on commit e6af0b6

Please sign in to comment.
Something went wrong with that request. Please try again.