Permalink
Browse files

demonstrate the race between r_c_vnode_worker_pool and poolboy's fsm

As described in #298:

When a riak_core_vnode_worker finishes work, it sends checkin messages
to both poolboy and riak_core_vnode_worker_pool. The latter maintains a
queue of work to be handled when there's room in the pool. As soon as
RCVWP gets the checkin message, it asks poolboy if there is a worker
available (expecting that the worker just checked in will now be
available).

The problem is that poolboy may receive RCVWP's message before receiving
the worker's checkin message. If this happens, it will tell RCVWP that
the pool is full. RCVWP then sticks in the 'queueing' state until it
receives another checkin message from a worker. Since another checkin
may never arrive, the pool may become frozen.

Crashing workers create a similar race condition to the double-checking
case, because 'DOWN' messages are delivered to both
riak_core_vnode_worker_pool and poolboy. RCVWP again asks poolboy to
checkout a worker (effectively immediately), which might happen before
poolboy receives its 'DOWN' and starts a replacement.

The test defined by worker_pool_pulse.erl demonstrates these races.
Under PULSE execution, the test will fail with deadlock. If it fails for
another reason (like timeout) you may have missed one of the
requirements described below.

In order to run the test, you will need the pulse_otp beams from
https://github.com/Quviq/pulse_otp on your path. The riak_core and
poolboy applications, as well as the worker_pool_pulse module, must also
be compiled with the 'PULSE' macro defined. The newly-added 'pulse' make
target will do this for you (and also run the test), but you will need
to start with a clean checkout (no beams built), or recompilation will
be skipped.
  • Loading branch information...
1 parent b540782 commit 7a60b6e80a7556f2bfb49021340079bee8bd11a9 Bryan Fink committed Apr 11, 2013
Showing with 166 additions and 0 deletions.
  1. +7 −0 Makefile
  2. +7 −0 src/riak_core_vnode.erl
  3. +7 −0 src/riak_core_vnode_worker.erl
  4. +6 −0 src/riak_core_vnode_worker_pool.erl
  5. +139 −0 test/worker_pool_pulse.erl
View
7 Makefile
@@ -1,6 +1,7 @@
APPS = kernel stdlib sasl erts ssl tools os_mon runtime_tools crypto inets \
public_key mnesia syntax_tools compiler
COMBO_PLT = $(HOME)/.riak_core_combo_dialyzer_plt
+PULSE_TESTS = worker_pool_pulse
.PHONY: deps test
@@ -21,6 +22,12 @@ distclean: clean
test: all
./rebar skip_deps=true eunit
+# You should 'clean' before your first run of this target
+# so that deps get built with PULSE where needed.
+pulse:
+ ./rebar compile -D PULSE
+ ./rebar eunit -D PULSE skip_deps=true suite=$(PULSE_TESTS)
+
docs: deps
./rebar skip_deps=true doc
View
7 src/riak_core_vnode.erl
@@ -51,6 +51,13 @@
current_state/1]).
-endif.
+-ifdef(PULSE).
+-compile(export_all).
+-compile({parse_transform, pulse_instrument}).
+-compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm},
+ {gen_server, pulse_gen_server}]}).
+-endif.
+
-define(normal_reason(R),
(R == normal orelse R == shutdown orelse
(is_tuple(R) andalso element(1,R) == shutdown))).
View
7 src/riak_core_vnode_worker.erl
@@ -25,6 +25,13 @@
code_change/3]).
-export([start_link/1, handle_work/4, handle_work/5]).
+-ifdef(PULSE).
+-compile(export_all).
+-compile({parse_transform, pulse_instrument}).
+-compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm},
+ {gen_server, pulse_gen_server}]}).
+-endif.
+
-record(state, {
module :: atom(),
modstate :: any()
View
6 src/riak_core_vnode_worker_pool.erl
@@ -30,6 +30,12 @@
%% API
-export([start_link/5, stop/2, shutdown_pool/2, handle_work/3]).
+-ifdef(PULSE).
+-compile(export_all).
+-compile({parse_transform, pulse_instrument}).
+-compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm}]}).
+-endif.
+
-record(state, {
queue = queue:new(),
pool :: pid(),
View
139 test/worker_pool_pulse.erl
@@ -0,0 +1,139 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Test riak_core_vnode_worker_pool's interaction with poolboy
+%% under PULSE. This requires that riak_core, poolboy, and this module
+%% be compiled with the 'PULSE' macro defined.
+-module(worker_pool_pulse).
+
+-behaviour(riak_core_vnode_worker).
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-endif.
+-include_lib("eunit/include/eunit.hrl").
+
+%% riak_core_vnode_worker behavior
+-export([init_worker/3, handle_work/3]).
+%% console debugging convenience
+-compile(export_all).
+
+-ifdef(PULSE).
+-include_lib("pulse/include/pulse.hrl").
+%% have to transform the 'receive' of the work results
+-compile({parse_transform, pulse_instrument}).
+%% don't trasnform toplevel test functions
+-compile({pulse_skip,[{prop_any_pool,0},
+ {prop_small_pool,0},
+ {pool_test_,0}]}).
+-endif.
+
+%%% Worker Definition - does nothing but reply with what it is given
+
+init_worker(_VnodeIndex, Noreply, _WorkerProps) ->
+ {ok, Noreply}.
+
+handle_work(die, _From, _State) ->
+ exit(test_die);
+handle_work(Work, _From, State) ->
+ {reply, Work, State}.
+
+%% none of these tests make sense if PULSE is not used
+-ifdef(PULSE).
+
+%% @doc Any amount of work should complete through any size pool.
+prop_any_pool() ->
+ ?FORALL({Seed, ExtraWork, WorkList},
+ {pulse:seed(),
+ frequency([{10,true},{1,false}]),
+ list(frequency([{10,nat()}, {1,die}]))},
+ aggregate([{{extra_work,
+ pool_size(ExtraWork, WorkList) < length(WorkList) },
+ {deaths, lists:member(die, WorkList)}}],
+ ?WHENFAIL(
+ io:format(user,
+ "PoolSize: ~b~n"
+ "WorkList: ~p~n"
+ "Schedule: ~p~n",
+ [pool_size(ExtraWork, WorkList),
+ WorkList,
+ pulse:get_schedule()]),
+ begin
+ PoolSize = pool_size(ExtraWork, WorkList),
+ true == all_work_gets_done(Seed, PoolSize, WorkList)
+ end))).
+
+pool_size(false, WorkList) ->
+ length(WorkList);
+pool_size(true, WorkList) ->
+ case length(WorkList) div 2 of
+ 0 ->
+ 1;
+ Size ->
+ Size
+ end.
+
+%% @doc Minimal case for the issue this test was created to probe: one
+%% worker, two inputs.
+prop_small_pool() ->
+ ?PULSE(Result, all_work_gets_done(1, [1,2]), true == Result).
+
+all_work_gets_done(Seed, PoolSize, WorkList) ->
+ pulse:run_with_seed(
+ fun() -> all_work_gets_done(PoolSize, WorkList) end,
+ Seed).
+
+all_work_gets_done(PoolSize, WorkList) ->
+ %% get the pool up
+ {ok, Pool} = riak_core_vnode_worker_pool:start_link(
+ ?MODULE, PoolSize, 10, false, []),
+
+ %% send all the work
+ [ riak_core_vnode_worker_pool:handle_work(
+ Pool, W, {raw, N, self()})
+ || {N, W} <- lists:zip(lists:seq(1, length(WorkList)), WorkList) ],
+
+ %% wait for all the work
+ Results = [ receive {N, _} -> ok end
+ || N <- lists:seq(1, length(WorkList)) ],
+ riak_core_vnode_worker_pool:stop(Pool, normal),
+
+ %% check that we got a response for every piece of work
+ %% TODO: actually not needed, since the bug is deadlock, and will
+ %% thus never get here
+ length(Results) == length(WorkList).
+
+pool_test_() ->
+ {setup,
+ fun() ->
+ error_logger:tty(false),
+ pulse:start()
+ end,
+ fun(_) ->
+ pulse:stop(),
+ error_logger:tty(true)
+ end,
+ [
+ %% not necessary to run both tests here, but why not anyway?
+ ?_assert(eqc:quickcheck(prop_small_pool())),
+ ?_assert(eqc:quickcheck(prop_any_pool()))
+ ]
+ }.
+
+-endif.

0 comments on commit 7a60b6e

Please sign in to comment.