Skip to content

Commit

Permalink
Add a thread limit for async ahead types
Browse files Browse the repository at this point in the history
currently hardcoded to 1500 threads
  • Loading branch information
harendra-kumar committed May 10, 2018
1 parent 5c975de commit ce79079
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
8 changes: 7 additions & 1 deletion src/Streamly/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,13 @@ sendWorkerWait sv = do
when (n <= 0) $ do
done <- queueEmpty sv
if not done
then pushWorker sv >> sendWorkerWait sv
then do
cnt <- liftIO $ readIORef $ activeWorkers sv
if (cnt < 1500)
then do
pushWorker sv
sendWorkerWait sv
else liftIO $ takeMVar (doorBell sv)
else liftIO $ takeMVar (doorBell sv)

-- | Pull a stream from an SVar.
Expand Down
7 changes: 6 additions & 1 deletion test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Control.Exception (Exception, try)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.Error.Class (throwError, MonadError)
import Control.Monad.Trans.Except (runExceptT, ExceptT)
import Data.Foldable (forM_)
import Data.Foldable (forM_, fold)
import Data.List (sort)
import Test.Hspec

Expand Down Expand Up @@ -334,6 +334,11 @@ main = hspec $ do
describe "Composed MonadThrow asCoparAhead" $ composeWithMonadThrow asCoparAhead
describe "Composed MonadThrow asParallel" $ composeWithMonadThrow asParallel

it "Crosses thread limit (2000 threads)" $
runStream (asParAhead $ fold $
replicate 2000 $ A.once $ threadDelay 1000000)
`shouldReturn` ()

-- XXX need to test that we have promptly cleaned up everything after the error
-- XXX We can also check the output that we are expected to get before the
-- error occurs.
Expand Down

0 comments on commit ce79079

Please sign in to comment.