Permalink
Browse files

Initial commit

  • Loading branch information...
0 parents commit a1e1853fddbf8fea8f3e2e606d4d37119b8b8a82 Neutrality committed Feb 24, 2013
Showing with 107 additions and 0 deletions.
  1. +7 −0 .gitignore
  2. +79 −0 Control/Proxy/TQueue.hs
  3. 0 LICENSE
  4. +21 −0 pipes-stm.cabal
@@ -0,0 +1,7 @@
+dist
+cabal-dev
+*.o
+*.hi
+*.chi
+*.chs.h
+.virthualenv
@@ -0,0 +1,79 @@
+module Control.Proxy.TQueue where
+
+import Control.Monad
+import Control.Proxy
+import Control.Monad.STM
+import Control.Concurrent.STM.TBQueue
+import Control.Concurrent.STM.TQueue
+import Control.Proxy.Safe
+import Data.Maybe
+
+chanProducer
+ :: (Proxy p)
+ => chan -- ^ The channel.
+ -> (chan -> IO (Maybe a)) -- ^ The 'read' function.
+ -> (chan -> IO ()) -- ^ The 'close' function.
+ -> () -> Producer (ExceptionP p) a SafeIO r
+chanProducer ch readfn closefn () = finally id close produce
+ where
+ produce = forever $ do
+ item <- tryIO $ readfn ch
+ unless (isNothing item) $ respond (fromJust item)
+
+ close = closefn ch
+{-# INLINE chanProducer #-}
+
+
+chanConsumer
+ :: (Proxy p)
+ => chan -- ^ The channel.
+ -> (chan -> a -> IO ()) -- ^ The 'write' function.
+ -> (chan -> IO ()) -- ^ The 'close' function.
+ -> () -> Consumer (ExceptionP p) a SafeIO r
+chanConsumer ch writfn closefn () = finally id close consume
+ where
+ consume = forever $ do
+ input <- request ()
+ tryIO $ writfn ch input
+ close = closefn ch
+{-# INLINE chanConsumer #-}
+
+
+-- | A simple wrapper around a TBQueue. As data is pushed into the channel, the
+-- source will read it and pass it down the pipeline. When the
+-- channel is closed, the source will close also.
+--
+-- If the channel fills up, the pipeline will stall until values are read.
+sourceTBQueue :: (Proxy p) => TBQueue a -> () -> Producer (ExceptionP p) a SafeIO r
+sourceTBQueue ch = chanProducer ch (atomically . tryReadTBQueue) doNothing
+{-# INLINE sourceTBQueue #-}
+
+
+-- | A simple wrapper around a TQueue. As data is pushed into the channel, the
+-- source will read it and pass it down the pipeline. When the
+-- channel is closed, the source will close also.
+sourceTQueue :: (Proxy p) => TQueue a -> () -> Producer (ExceptionP p) a SafeIO r
+sourceTQueue ch = chanProducer ch (atomically . tryReadTQueue) doNothing
+{-# INLINE sourceTQueue #-}
+
+
+-- | A simple wrapper around a TBQueue. As data is pushed into the sink, it
+-- will magically begin to appear in the channel. If the channel is full,
+-- the sink will block until space frees up. When the sink is closed, the
+-- channel will close too.
+sinkTBQueue :: (Proxy p) => TBQueue a -> () -> Consumer (ExceptionP p) a SafeIO ()
+sinkTBQueue ch = chanConsumer ch (\a -> atomically . writeTBQueue a) doNothing
+{-# INLINE sinkTBQueue #-}
+
+
+-- | A simple wrapper around a TQueue. As data is pushed into this sink, it
+-- will magically begin to appear in the channel. When the sink is closed,
+-- the channel will close too.
+sinkTQueue :: (Proxy p) => TQueue a -> () -> Consumer (ExceptionP p) a SafeIO ()
+sinkTQueue ch = chanConsumer ch (\a -> atomically . writeTQueue a) doNothing
+{-# INLINE sinkTQueue #-}
+
+
+doNothing :: b -> IO ()
+doNothing = const $ return ()
+{-# INLINE doNothing #-}
No changes.
@@ -0,0 +1,21 @@
+name: async-pipes
+version: 0.1.0.0
+synopsis: Run Proxies asynchronously.
+description:
+
+license: BSD3
+license-file: LICENSE
+author:
+maintainer:
+
+category: Development
+build-type: Simple
+cabal-version: >=1.8
+
+library
+
+ exposed-modules: Control.Proxy.TQueue
+ build-depends: base ==4.5.*,
+ pipes ==3.1.*,
+ pipes-safe ==1.0.*,
+ stm ==2.4.*

0 comments on commit a1e1853

Please sign in to comment.