Skip to content

Commit

Permalink
Fix a rather serious deadlock bug with pool sizes less than 2 where t…
Browse files Browse the repository at this point in the history
…emporary blocking primitives were used
  • Loading branch information
batterseapower committed Dec 5, 2010
1 parent 3662da0 commit 94f7fa7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
13 changes: 10 additions & 3 deletions Control/Concurrent/ParallelIO/Local.hs
Expand Up @@ -55,7 +55,9 @@ data Pool = Pool {
--
-- A better alternative is to see if you can use the 'withPool' variant.
startPool :: Int -> IO Pool
startPool threadcount = do
startPool threadcount
| threadcount < 1 = error $ "startPool: thread count must be strictly positive (was " ++ show threadcount ++ ")"
| otherwise = do
threadId <- myThreadId
queue <- CS.new
let pool = Pool {
Expand Down Expand Up @@ -137,7 +139,11 @@ killPoolWorkerFor pool = enqueueOnPool pool $ return True
-- action which is itself being executed by 'parallel_'.
parallel_ :: Pool -> [IO a] -> IO ()
parallel_ _ [] = return ()
parallel_ pool xs | pool_threadcount pool <= 1 = sequence_ xs
-- It is very important that we *don't* include this special case!
-- The reason is that even if there is only one worker thread in the pool, one of
-- the items we process might depend on the ability to use extraWorkerWhileBlocked
-- to allow processing to continue even before it has finished executing.
--parallel_ pool xs | pool_threadcount pool <= 1 = sequence_ xs
parallel_ _ [x] = x >> return ()
parallel_ pool (x1:xs) = do
count <- newMVar $ length xs
Expand Down Expand Up @@ -175,7 +181,8 @@ parallel_ pool (x1:xs) = do
-- action which is itself being executed by 'parallel'.
parallel :: Pool -> [IO a] -> IO [a]
parallel _ [] = return []
parallel pool xs | pool_threadcount pool <= 1 = sequence xs
-- It is important that we do not include this special case (see parallel_ for why)
--parallel pool xs | pool_threadcount pool <= 1 = sequence xs
parallel _ [x] = fmap return x
parallel pool (x1:xs) = do
count <- newMVar $ length xs
Expand Down
16 changes: 15 additions & 1 deletion Control/Concurrent/ParallelIO/Tests.hs
Expand Up @@ -12,6 +12,7 @@ import GHC.Conc

import Control.Monad

import Control.Concurrent.MVar
import qualified Control.Concurrent.ParallelIO.Global as Global
import Control.Concurrent.ParallelIO.Local

Expand All @@ -22,12 +23,15 @@ main = do
Global.stopGlobalPool

tests :: [Test]
tests = [ testCase "parallel_ executes correct number of actions" $ repeatTest parallel__execution_count_correct
tests = [
testCase "parallel_ executes correct number of actions" $ repeatTest parallel__execution_count_correct
, testCase "parallel_ doesn't spawn too many threads" $ repeatTest parallel__doesnt_spawn_too_many_threads
, testCase "parallel executes correct actions" $ repeatTest parallel_executes_correct_actions
, testCase "parallel doesn't spawn too many threads" $ repeatTest parallel_doesnt_spawn_too_many_threads
, testCase "parallelInterleaved executes correct actions" $ repeatTest parallelInterleaved_executes_correct_actions
, testCase "parallelInterleaved doesn't spawn too many threads" $ repeatTest parallelInterleaved_doesnt_spawn_too_many_threads
, testCase "parallel with one worker can be blocked" $ parallel_with_one_worker_can_be_blocked
, testCase "parallel_ with one worker can be blocked" $ parallel__with_one_worker_can_be_blocked
]

parallel__execution_count_correct n = do
Expand Down Expand Up @@ -70,6 +74,16 @@ doesnt_spawn_too_many_threads the_parallel n = do
then return True
else putStrLn ("Expected at most " ++ show expected_max_concurrent_threads ++ ", got " ++ show seenmax) >> return False

parallel_with_one_worker_can_be_blocked = with_one_worker_can_be_blocked parallel
parallel__with_one_worker_can_be_blocked = with_one_worker_can_be_blocked parallel_

-- This test is based on a specific bug I observed in the library. The problem was that I was special casing
-- pools with thread counts <= 1 to just use sequence/sequence_, but that doesn't give the right semantics if
-- the user is able to call extraWorkerWhileBlocked!
with_one_worker_can_be_blocked the_parallel = withPool 1 $ \pool -> do
wait <- newEmptyMVar
the_parallel pool [extraWorkerWhileBlocked pool (takeMVar wait), putMVar wait ()]
return ()

atomicModifyIORef_ :: IORef a -> (a -> a) -> IO a
atomicModifyIORef_ ref f = atomicModifyIORef ref (\x -> let x' = f x in x' `seq` (x', x'))
Expand Down
2 changes: 1 addition & 1 deletion parallel-io.cabal
@@ -1,5 +1,5 @@
Name: parallel-io
Version: 0.2.1
Version: 0.2.1.1
Cabal-Version: >= 1.2
Category: Concurrency
Synopsis: Combinators for executing IO actions in parallel on a thread pool.
Expand Down

0 comments on commit 94f7fa7

Please sign in to comment.