Skip to content

Commit

Permalink
Merge pull request #26 from dmcclean/generalize-mergesources
Browse files Browse the repository at this point in the history
Generalized type of mergeSources and (>=<) per issue #25.
  • Loading branch information
Clark Gaebel committed Jul 21, 2014
2 parents e32fe4f + f8946cb commit 5684c4a
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions Data/Conduit/TMChan.hs
Expand Up @@ -56,6 +56,7 @@ module Data.Conduit.TMChan ( -- * Bounded Channel Connectors
import Control.Applicative
import Control.Monad
import Control.Monad.IO.Class ( liftIO, MonadIO )
import Control.Monad.Trans.Class
import Control.Monad.Trans.Resource
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMChan
Expand Down Expand Up @@ -146,10 +147,10 @@ liftSTM = liftIO . atomically
--
-- The order of the new source's data is undefined, but it will be some
-- combination of the two given sources.
(>=<) :: (MonadIO mi, MonadIO mo, MonadBaseControl IO mi)
=> Source (ResourceT mi) a
-> Source (ResourceT mi) a
-> ResourceT mi (Source mo a)
(>=<) :: (MonadResource mi, MonadIO mo, MonadBaseControl IO mi)
=> Source mi a
-> Source mi a
-> mi (Source mo a)
sa >=< sb = mergeSources [ sa, sb ] 16
{-# INLINE (>=<) #-}

Expand All @@ -165,13 +166,13 @@ decRefcount tv chan = do n <- modifyTVar'' tv (subtract 1)
-- The order of the new source's data is undefined, but it will be some
-- combination of the given sources. The monad of the resultant source
-- (@mo@) is independent of the monads of the input sources (@mi@).
mergeSources :: (MonadIO mi, MonadIO mo, MonadBaseControl IO mi)
=> [Source (ResourceT mi) a] -- ^ The sources to merge.
mergeSources :: (MonadResource mi, MonadIO mo, MonadBaseControl IO mi)
=> [Source mi a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT mi (Source mo a)
-> mi (Source mo a)
mergeSources sx bound = do c <- liftSTM $ newTBMChan bound
refcount <- liftSTM . newTVar $ length sx
mapM_ (\s -> resourceForkIO $ s $$ chanSink c writeTBMChan $ decRefcount refcount) sx
mapM_ (\s -> runResourceT $ resourceForkIO $ s $$ chanSink c writeTBMChan $ decRefcount refcount) (map (transPipe lift) sx)
return $ sourceTBMChan c

-- | Combines two conduits with unbounded channels, creating a new conduit
Expand Down

0 comments on commit 5684c4a

Please sign in to comment.