Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lazier versions of the TQueue and TBQueue reading functions #62

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions Control/Concurrent/STM/TBQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ module Control.Concurrent.STM.TBQueue (
newTBQueue,
newTBQueueIO,
readTBQueue,
lazyReadTBQueue,
tryReadTBQueue,
flushTBQueue,
peekTBQueue,
lazyPeekTBQueue,
tryPeekTBQueue,
writeTBQueue,
unGetTBQueue,
Expand All @@ -47,6 +49,7 @@ module Control.Concurrent.STM.TBQueue (

import Control.Monad (unless)
import Data.Typeable (Typeable)
import Data.Tuple (Solo(..))
import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse,
readTVar, retry, writeTVar)
import Numeric.Natural (Natural)
Expand Down Expand Up @@ -117,29 +120,36 @@ writeTBQueue (TBQueue rsize _read wsize write _size) a = do
listend <- readTVar write
writeTVar write (a:listend)

-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rsize read _wsize write _size) = do
-- |Read the next value from the 'TBQueue', retrying if the channel is empty.
-- Separates reversing the write-end from evaluating the next value.
lazyReadTBQueue :: TBQueue a -> STM (Solo a)
lazyReadTBQueue (TBQueue rsize read _wsize write _size) = do
xs <- readTVar read
r <- readTVar rsize
writeTVar rsize $! r + 1
case xs of
(x:xs') -> do
writeTVar read xs'
return x
return (Solo x)
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
-- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
let ~(z,zs) = case reverse ys of
let (z,zs) = case reverse ys of
z':zs' -> (z',zs')
_ -> error "readTBQueue: impossible"
writeTVar write []
writeTVar read zs
return z
return (zs `seq` Solo z)

-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue q = do
r <- lazyReadTBQueue q
return (case r of Solo z -> z)

-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand All @@ -163,13 +173,14 @@ flushTBQueue (TBQueue rsize read wsize write size) = do
writeTVar wsize size
return (xs ++ reverse ys)

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue (TBQueue _ read _ write _) = do
-- | Get the next value from the @TBQueue@ without removing it, retrying if the
-- channel is empty. Separates reversing the write-end from evaluating the next
-- value.
lazyPeekTBQueue :: TBQueue a -> STM (Solo a)
lazyPeekTBQueue (TBQueue _ read _ write _) = do
xs <- readTVar read
case xs of
(x:_) -> return x
(x:_) -> return (Solo x)
[] -> do
ys <- readTVar write
case ys of
Expand All @@ -179,7 +190,14 @@ peekTBQueue (TBQueue _ read _ write _) = do
-- short, otherwise it will conflict
writeTVar write []
writeTVar read (z:zs)
return z
return (zs `seq` Solo z)

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue q = do
r <- lazyPeekTBQueue q
return (case r of Solo z -> z)

-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand Down
40 changes: 29 additions & 11 deletions Control/Concurrent/STM/TQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ module Control.Concurrent.STM.TQueue (
newTQueue,
newTQueueIO,
readTQueue,
lazyReadTQueue,
tryReadTQueue,
flushTQueue,
peekTQueue,
lazyPeekTQueue,
tryPeekTQueue,
writeTQueue,
unGetTQueue,
Expand All @@ -49,6 +51,7 @@ module Control.Concurrent.STM.TQueue (

import GHC.Conc
import Control.Monad (unless)
import Data.Tuple (Solo(..))
import Data.Typeable (Typeable)

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
Expand Down Expand Up @@ -84,14 +87,15 @@ writeTQueue (TQueue _read write) a = do
listend <- readTVar write
writeTVar write (a:listend)

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
-- |Read the next value from the 'TQueue', retrying if the channel is empty.
-- Separates reversing the write-end from evaluating the next value.
lazyReadTQueue :: TQueue a -> STM (Solo a)
lazyReadTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do
writeTVar read xs'
return x
return (Solo x)
[] -> do
ys <- readTVar write
case ys of
Expand All @@ -101,7 +105,13 @@ readTQueue (TQueue read write) = do
-- short, otherwise it will conflict
writeTVar write []
writeTVar read zs
return z
return (zs `seq` Solo z)

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue q = do
r <- lazyReadTQueue q
return (case r of Solo z -> z)

-- | A version of 'readTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand All @@ -120,13 +130,14 @@ flushTQueue (TQueue read write) = do
unless (null ys) $ writeTVar write []
return (xs ++ reverse ys)

-- | Get the next value from the @TQueue@ without removing it,
-- retrying if the channel is empty.
peekTQueue :: TQueue a -> STM a
peekTQueue (TQueue read write) = do
-- | Get the next value from the @TQueue@ without removing it, retrying if the
-- channel is empty. Separates reversing the write-end from evaluating the next
-- value.
lazyPeekTQueue :: TQueue a -> STM (Solo a)
lazyPeekTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:_) -> return x
(x:_) -> return (Solo x)
[] -> do
ys <- readTVar write
case ys of
Expand All @@ -136,7 +147,14 @@ peekTQueue (TQueue read write) = do
-- short, otherwise it will conflict
writeTVar write []
writeTVar read (z:zs)
return z
return (zs `seq` Solo z)

-- | Get the next value from the @TQueue@ without removing it,
-- retrying if the channel is empty.
peekTQueue :: TQueue a -> STM a
peekTQueue q = do
r <- lazyPeekTQueue q
return (case r of Solo z -> z)

-- | A version of 'peekTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand Down