Skip to content

Commit

Permalink
Added new buffer type 'New'
Browse files Browse the repository at this point in the history
Like 'Latest' but with a guaranteed write between any two reads.

Conflicts:
	src/Pipes/Concurrent.hs
  • Loading branch information
urv committed Jan 20, 2014
1 parent 6bf07e7 commit 6419d6d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/Pipes/Concurrent.hs
Expand Up @@ -47,7 +47,7 @@ module Pipes.Concurrent (
) where

import Control.Applicative (
Alternative(empty, (<|>)), Applicative(pure, (<*>)), (<*), (<$>) )
Alternative(empty, (<|>)), Applicative(pure, (*>), (<*>)), (<*), (<$>) )
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, STM)
import qualified Control.Concurrent.STM as S
Expand Down Expand Up @@ -184,6 +184,9 @@ spawn' buffer = do
Latest a -> do
t <- S.newTVarIO a
return (S.writeTVar t, S.readTVar t)
New -> do
m <- S.newEmptyTMVarIO
return (\x -> S.tryTakeTMVar m *> S.putTMVar m x, S.takeTMVar m)

sealed <- S.newTVarIO False
let seal = S.writeTVar sealed True
Expand Down Expand Up @@ -226,6 +229,7 @@ data Buffer a
'Latest' is never empty nor full.
-}
| Latest a
| New

{- $reexport
@Control.Concurrent@ re-exports 'forkIO', although I recommend using the
Expand Down
2 changes: 2 additions & 0 deletions tests/tests-main.hs
Expand Up @@ -114,9 +114,11 @@ main = do
runTest (testSenderClose $ Bounded 7) "BoundedNotFilledSenderClose"
runTest (testSenderClose Single) "SingleSenderClose"
runTestExpectTimeout (testSenderCloseDelayedSend $ Latest 42) "LatestSenderClose"
runTest (testSenderCloseDelayedSend New) "NewSenderClose"
--
runTest (testReceiverClose Unbounded) "UnboundedReceiverClose"
runTest (testReceiverClose $ Bounded 3) "BoundedFilledReceiverClose"
runTest (testReceiverClose $ Bounded 7) "BoundedNotFilledReceiverClose"
runTest (testReceiverClose Single) "SingleReceiverClose"
runTest (testReceiverCloseDelayedReceive $ Latest 42) "LatestReceiverClose"
runTest (testReceiverClose New) "NewReceiverClose"

0 comments on commit 6419d6d

Please sign in to comment.