Skip to content
Browse files

Add tests and fix ThreadPool laziness

  • Loading branch information...
1 parent abe668b commit ec71d143e1addbd22951738d7fdd8ee4a5784d66 @batterseapower committed
Showing with 129 additions and 13 deletions.
  1. +2 −2 Test/Framework/Runners/Core.hs
  2. +15 −0 Test/Framework/Tests.hs
  3. +47 −0 Test/Framework/Tests/ThreadPool.hs
  4. +38 −11 Test/Framework/ThreadPool.hs
  5. +27 −0 test-framework.cabal
View
4 Test/Framework/Runners/Core.hs
@@ -43,9 +43,9 @@ runTest' (TestGroup name tests) results = (RunTestGroup name run_tests, results'
runTest' _ _ = error "runTest': incoming results did not match outgoing ones"
runTests' :: [Test] -> [TestResult] -> ([RunTest], [TestResult], [PendingTest])
-runTests' initial_tests initial_results = (reverse final_run_tests, final_results, final_requested_runs)
+runTests' initial_tests initial_results = (final_run_tests, final_results, final_requested_runs)
where
(final_run_tests, final_results, final_requested_runs) = foldl' go ([], initial_results, []) initial_tests
- go (run_tests, results, requested_runs) test = (run_test : run_tests, results', requested_runs ++ requested_runs')
+ go (run_tests, results, requested_runs) test = (run_tests ++ [run_test], results', requested_runs ++ requested_runs')
where (run_test, results', requested_runs') = runTest' test results
View
15 Test/Framework/Tests.hs
@@ -0,0 +1,15 @@
+module Main where
+
+import qualified Test.Framework.Tests.ThreadPool as TP
+
+import Test.HUnit
+--import Test.QuickCheck
+
+
+-- I wish I could use my test framework to test my framework...
+main :: IO ()
+main = do
+ -- HUnit tests first
+ runTestTT $ TestList TP.tests
+ -- QuickCheck tests, if we add any
+ return ()
View
47 Test/Framework/Tests/ThreadPool.hs
@@ -0,0 +1,47 @@
+module Test.Framework.Tests.ThreadPool (tests) where
+
+import Test.Framework.ThreadPool
+
+import Test.HUnit
+
+import System.Random
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+
+import Prelude hiding (catch)
+
+
+expectBroken :: Assertion -> Assertion
+expectBroken assertion = do
+ did_succeed <- catch (assertion >> return True) (const $ return False)
+ when did_succeed $ assertFailure "Test unexpectedly succeeded"
+
+tests :: [Test]
+tests = [TestLabel "ThreadPool.executeOnPool preserves order" (TestCase test_execute_preserves_order),
+ TestLabel "ThreadPool.executeOnPool preserves order even with delays" (TestCase test_execute_preserves_order_even_with_delay),
+ TestLabel "ThreadPool.executeOnPool input list can depend on previous items" (TestCase test_execute_schedules_lazily)]
+
+test_execute_preserves_order :: Assertion
+test_execute_preserves_order = do
+ let input = [1..1000] :: [Int]
+ output <- executeOnPool 4 $ map return input
+ input @=? output
+
+test_execute_preserves_order_even_with_delay :: Assertion
+test_execute_preserves_order_even_with_delay = do
+ gen <- getStdGen
+ let -- Execute 100 actions with a random delay of up to 50ms each
+ input = [1..100] :: [Int]
+ actions = zipWith (\n delay -> threadDelay delay >> return n) input (randomRs (0, 50000) gen)
+ output <- executeOnPool 4 actions
+ input @=? output
+
+-- This /should/ work, but doesn't. I can't quite work out why: it appears to me that even though we
+-- add things to the internal thread-pool channel on another thread the channel demands that everything
+-- is added before it peels off the first action to be executed.
+test_execute_schedules_lazily :: Assertion
+test_execute_schedules_lazily = mdo
+ ~(first_output:rest) <- executeOnPool 4 $ return 10 : (return 20) : replicate first_output (return 99) :: IO [Int]
+ [10, 20] ++ (replicate 10 99) @=? (first_output:rest)
View
49 Test/Framework/ThreadPool.hs
@@ -8,39 +8,66 @@ import Control.Monad
import qualified Data.IntMap as IM
+
+data WorkerEvent token a = WorkerTermination
+ | WorkerItem token a
+
+-- | Execute IO actions on several threads and return their results in the original
+-- order. It is guaranteed that no action from the input list is executed unless all
+-- the items that precede it in the list have been executed or are executing at that
+-- moment.
executeOnPool :: Int -- ^ Number of threads to use
- -> [IO a] -- ^ Actions to execute, left to right as far possible
+ -> [IO a] -- ^ Actions to execute: these will be scheduled left to right
-> IO [a] -- ^ Ordered results of executing the given IO actions in parallel
executeOnPool n actions = do
-- Prepare the channels
input_chan <- newChan
output_chan <- newChan
- -- Write Just the actions to the channel followed by one Nothing per thread
+ -- Write the actions as items to the channel followed by one termination per thread
-- that indicates they should terminate. We do this on another thread for
-- maximum laziness (in case one the actions we are going to run depend on the
-- output from previous actions..)
- forkIO $ writeList2Chan input_chan (map Just ([0..] `zip` actions) ++ replicate n Nothing)
+ forkIO $ writeList2Chan input_chan (zipWith WorkerItem [0..] actions ++ replicate n WorkerTermination)
-- Spawn workers
forM_ [1..n] (const $ forkIO $ poolWorker input_chan output_chan)
-- Return the results generated by the worker threads lazily and in
-- the same order as we got the inputs
- fmap (reorderFrom 0) $ getChanContents output_chan
+ fmap (reorderFrom 0 . takeWhileWorkersExist n) $ getChanContents output_chan
-poolWorker :: Chan (Maybe (token, IO a)) -> Chan (token, a) -> IO ()
+poolWorker :: Chan (WorkerEvent token (IO a)) -> Chan (WorkerEvent token a) -> IO ()
poolWorker input_chan output_chan = do
-- Read an action and work out whether we should continue or stop
- mb_action <- readChan input_chan
- case mb_action of
- Nothing -> return () -- Must have run out of real actions to execute
- Just (token, action) -> do
+ action_item <- readChan input_chan
+ case action_item of
+ WorkerTermination -> writeChan output_chan WorkerTermination -- Must have run out of real actions to execute
+ WorkerItem token action -> do
-- Do the action then loop
result <- action
- writeChan output_chan (token, result)
+ writeChan output_chan (WorkerItem token result)
poolWorker input_chan output_chan
+-- | Keep grabbing items out of the infinite list of worker outputs until we have
+-- recieved word that all of the workers have shut down. This lets us turn a possibly
+-- infinite list of outputs into a certainly finite one suitable for use with reorderFrom.
+takeWhileWorkersExist :: Int -> [WorkerEvent token a] -> [(token, a)]
+takeWhileWorkersExist worker_count events
+ | worker_count <= 0 = []
+ | otherwise = case events of
+ (WorkerTermination:events') -> takeWhileWorkersExist (worker_count - 1) events'
+ (WorkerItem token x:events') -> (token, x) : takeWhileWorkersExist worker_count events'
+ [] -> []
+
+-- | This function carefully shuffles the input list so it in the total order
+-- defined by the integers paired with the elements. If the list is @xs@ and
+-- the supplied initial integer is @n@, it must be the case that:
+--
+-- > sort (map fst xs) == [n..n + (length xs - 1)]
+--
+-- This function returns items in the lazy result list as soon as it is sure
+-- it has the right item for that position.
reorderFrom :: Int -> [(Int, a)] -> [a]
reorderFrom from initial_things = go from initial_things IM.empty False
where go next [] buf _
@@ -53,4 +80,4 @@ reorderFrom from initial_things = go from initial_things IM.empty False
, (Just x', buf') <- IM.updateLookupWithKey (const $ const Nothing) next buf -- Delete the found item from the map (if we find it) to save space
= x' : go (next + 1) all_things buf' True -- Always worth checking the buffer now because the expected item has changed
| otherwise -- Token didn't match, buffer unhelpful: it must be in the tail of the list
- = go next things (IM.insert token x buf) False -- Since we've already checked the buffer, stop bothering to do so until something changes
+ = go next things (IM.insert token x buf) False -- Since we've already checked the buffer, stop bothering to do so until something changes -}
View
27 test-framework.cabal
@@ -16,6 +16,10 @@ Flag SplitBase
Description: Choose the new smaller, split-up base package
Default: True
+Flag Tests
+ Description: Build the tests
+ Default: False
+
Flag Example
Description: Build the example testsuite
Default: False
@@ -53,6 +57,29 @@ Library
if impl(ghc)
Cpp-Options: -DCOMPILER_GHC
+Executable test-framework-tests
+ Main-Is: Test/Framework/Tests.hs
+
+ Build-Depends: QuickCheck >= 1.1, HUnit >= 1.2
+ if flag(splitBase)
+ Build-Depends: base >= 3, random >= 1.0, containers >= 0.1
+ else
+ Build-Depends: base < 3
+
+ Extensions: CPP
+ PatternGuards
+ ExistentialQuantification
+ RecursiveDo
+ FlexibleInstances
+
+ Ghc-Options: -Wall -threaded
+
+ if impl(ghc)
+ Cpp-Options: -DCOMPILER_GHC
+
+ if !flag(tests)
+ Buildable: False
+
Executable test-framework-example
Main-Is: Test/Framework/Example.lhs

0 comments on commit ec71d14

Please sign in to comment.
Something went wrong with that request. Please try again.