Skip to content

Commit

Permalink
put a bound on the output queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed May 10, 2018
1 parent bac26fc commit 5c975de
Showing 1 changed file with 51 additions and 13 deletions.
64 changes: 51 additions & 13 deletions src/Streamly/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where

import Control.Concurrent (ThreadId, myThreadId, threadDelay)
import Control.Concurrent.MVar (MVar, newEmptyMVar, tryTakeMVar,
tryPutMVar, takeMVar)
tryPutMVar, takeMVar, readMVar)
import Control.Exception (SomeException (..))
import qualified Control.Exception.Lifted as EL
import Control.Monad (when)
Expand Down Expand Up @@ -148,8 +148,9 @@ data SVarStyle = SVarStyle SVarTag SVarSched deriving Eq
-- already enqueued computations get evaluated. See 'joinStreamVar2'.
--
data SVar m a =
SVar { outputQueue :: IORef [ChildEvent a]
, doorBell :: MVar () -- wakeup mechanism for outQ
SVar { outputQueue :: IORef ([ChildEvent a], Int)
, doorBell :: MVar () -- signal the consumer about output
, siren :: MVar () -- hooter for workers to begin work
, enqueue :: Stream m a -> IO ()
, runqueue :: m ()
, runningThreads :: IORef (Set ThreadId)
Expand Down Expand Up @@ -304,15 +305,45 @@ doFork action exHandler =

-- XXX exception safety of all atomic/MVar operations

-- TBD Each worker can have their own queue and the consumer can empty one
-- queue at a time, that way contention can be reduced.
{-# INLINE send #-}
send :: MonadIO m => SVar m a -> ChildEvent a -> m ()
send sv msg = liftIO $ do
old <- atomicModifyIORefCAS (outputQueue sv) $ \es -> (msg : es, es)
if null old
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
if (len <= 0) then do
-- XXX need a memory barrier? The wake up must happen only after the
-- store has finished otherwise we can have lost wakeup problems.
then void $ tryPutMVar (doorBell sv) ()
else return ()
--
-- Since multiple workers can try this at the same time, it is possible
-- that we may put a spurious MVar after the consumer has already seen
-- the output. But that's harmless, at worst it may cause the consumer
-- to read the queue again and find it empty.
-- The important point is that the consumer is guaranteed to receive a
-- doorbell if something was added to the queue after it empties it.
void $ tryPutMVar (doorBell sv) ()
-- The first worker who notices the output queue was emptied puts the
-- siren off.
void $ tryTakeMVar (siren sv)
else if (len + 1 >= 1500) then do
-- We are guaranteed to receive the siren if the consumer reads the
-- queue because the consumer puts the siren on before reading the
-- queue.
--
-- We may get the siren between the siren being swicthed on and the
-- queue getting read but that's harmless, at amost everyone will go
-- back to work and will have to sleep again if queue was still not
-- emptied.
--
-- If even before a worker could read the MVar, the queue gets emptied
-- and another worker queuing on it switches off the siren, then we may
-- sleep here. In that case we are guaranteed to be woken up on the
-- next siren. The next siren is guaranteed as we send a doorbell
-- before switching off the siren, and the consumer switches on the
-- siren after receiving the doorbell.
readMVar (siren sv)
else return ()

{-# INLINE sendStop #-}
sendStop :: MonadIO m => SVar m a -> m ()
Expand Down Expand Up @@ -451,8 +482,8 @@ sendWorkerWait sv = do
-- sent then we block, so that we do not keep looping fruitlessly.

liftIO $ threadDelay 200
output <- liftIO $ readIORef (outputQueue sv)
when (null output) $ do
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
done <- queueEmpty sv
if not done
then pushWorker sv >> sendWorkerWait sv
Expand All @@ -469,7 +500,8 @@ fromStreamVar sv = Stream $ \_ stp sng yld -> do
res <- liftIO $ tryTakeMVar (doorBell sv)
when (isNothing res) $ sendWorkerWait sv

list <- liftIO $ atomicModifyIORefCAS (outputQueue sv) $ \x -> ([], x)
void $ liftIO $ tryPutMVar (siren sv) ()
(list, _) <- liftIO $ atomicModifyIORefCAS (outputQueue sv) $ \x -> (([],0), x)
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
-- streams.
Expand Down Expand Up @@ -507,14 +539,16 @@ fromStreamVar sv = Stream $ \_ stp sng yld -> do

getFifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getFifoSVar ctype = do
outQ <- newIORef []
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
hooter <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
q <- newQ
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, siren = hooter
, runningThreads = running
, runqueue = runqueueFIFO sv q
, enqueue = pushL q
Expand All @@ -526,15 +560,17 @@ getFifoSVar ctype = do

getLifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getLifoSVar ctype = do
outQ <- newIORef []
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
hooter <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
q <- newIORef []
let checkEmpty = null <$> liftIO (readIORef q)
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, siren = hooter
, runningThreads = running
, runqueue = runqueueLIFO sv q
, enqueue = enqueueLIFO q
Expand All @@ -546,13 +582,15 @@ getLifoSVar ctype = do

getParSVar :: SVarStyle -> IO (SVar m a)
getParSVar style = do
outQ <- newIORef []
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
hooter <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, siren = hooter
, runningThreads = running
, runqueue = undefined
, enqueue = undefined
Expand Down

0 comments on commit 5c975de

Please sign in to comment.