From 6419d6d3907cc34a971fe18b652cc5495cab2bb7 Mon Sep 17 00:00:00 2001 From: Arvin Moezzi Date: Wed, 15 Jan 2014 05:14:02 +0100 Subject: [PATCH] Added new buffer type 'New' Like 'Latest' but with a guaranteed write between any two reads. Conflicts: src/Pipes/Concurrent.hs --- src/Pipes/Concurrent.hs | 6 +++++- tests/tests-main.hs | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Pipes/Concurrent.hs b/src/Pipes/Concurrent.hs index dafa3b1..698482d 100644 --- a/src/Pipes/Concurrent.hs +++ b/src/Pipes/Concurrent.hs @@ -47,7 +47,7 @@ module Pipes.Concurrent ( ) where import Control.Applicative ( - Alternative(empty, (<|>)), Applicative(pure, (<*>)), (<*), (<$>) ) + Alternative(empty, (<|>)), Applicative(pure, (*>), (<*>)), (<*), (<$>) ) import Control.Concurrent (forkIO) import Control.Concurrent.STM (atomically, STM) import qualified Control.Concurrent.STM as S @@ -184,6 +184,9 @@ spawn' buffer = do Latest a -> do t <- S.newTVarIO a return (S.writeTVar t, S.readTVar t) + New -> do + m <- S.newEmptyTMVarIO + return (\x -> S.tryTakeTMVar m *> S.putTMVar m x, S.takeTMVar m) sealed <- S.newTVarIO False let seal = S.writeTVar sealed True @@ -226,6 +229,7 @@ data Buffer a 'Latest' is never empty nor full. -} | Latest a + | New {- $reexport @Control.Concurrent@ re-exports 'forkIO', although I recommend using the diff --git a/tests/tests-main.hs b/tests/tests-main.hs index d8ebbf3..5032eb8 100644 --- a/tests/tests-main.hs +++ b/tests/tests-main.hs @@ -114,9 +114,11 @@ main = do runTest (testSenderClose $ Bounded 7) "BoundedNotFilledSenderClose" runTest (testSenderClose Single) "SingleSenderClose" runTestExpectTimeout (testSenderCloseDelayedSend $ Latest 42) "LatestSenderClose" + runTest (testSenderCloseDelayedSend New) "NewSenderClose" -- runTest (testReceiverClose Unbounded) "UnboundedReceiverClose" runTest (testReceiverClose $ Bounded 3) "BoundedFilledReceiverClose" runTest (testReceiverClose $ Bounded 7) "BoundedNotFilledReceiverClose" runTest (testReceiverClose Single) "SingleReceiverClose" runTest (testReceiverCloseDelayedReceive $ Latest 42) "LatestReceiverClose" + runTest (testReceiverClose New) "NewReceiverClose"