Skip to content

Commit

Permalink
Add Monad instance for the Producer type
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Feb 24, 2021
1 parent 8a2aae0 commit 14168d9
Showing 1 changed file with 85 additions and 1 deletion.
86 changes: 85 additions & 1 deletion src/Streamly/Internal/Data/Producer/Type.hs
Expand Up @@ -28,14 +28,16 @@ module Streamly.Internal.Data.Producer.Type
-- * Nesting
, cross
, NestedLoop (..)
, concatMapM
, concatMap
, concat
)
where

#include "inline.hs"

import Fusion.Plugin.Types (Fuse(..))
import Prelude hiding (concat, map, const)
import Prelude hiding (concat, map, const, concatMap)

-- $setup
-- >>> :m
Expand Down Expand Up @@ -265,6 +267,88 @@ instance Monad m => Applicative (Producer m a) where
-- {-# INLINE (<*) #-}
-- (<*) = apDiscardSnd

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

data ConcatMapState m a b s1 =
ConcatMapOuter s1
| forall s2. ConcatMapInner s1 s2 (s2 -> m (Step s2 a b)) (s2 -> m (Maybe a))

-- | Map a producer generating action to each element of a producer and
-- flatten the results into a single stream. Each producer consumes from the
-- same shared state.
--
{-# INLINE_NORMAL concatMapM #-}
concatMapM :: Monad m
=> (b -> m (Producer m a c)) -> Producer m a b -> Producer m a c
concatMapM f (Producer step1 inject1 extract1) = Producer step inject extract

where

inject a = do
s1 <- inject1 a
return $ ConcatMapOuter s1

{-# INLINE_LATE step #-}
step (ConcatMapOuter st) = do
r <- step1 st
case r of
Yield x s -> do
res <- extract1 s
case res of
-- XXX We should probably undo what step1 did since we are
-- discarding "b" here. The state type must support
-- something like "unread". Or should this be considered as
-- an error?
Nothing -> return $ Stop Nothing
Just a -> do
Producer step2 inject2 extract2 <- f x
s2 <- inject2 a
return $ Skip (ConcatMapInner s s2 step2 extract2)
Skip s -> return $ Skip (ConcatMapOuter s)
Stop a -> return $ Stop a

step (ConcatMapInner s1 s2 step2 extract2) = do
r <- step2 s2
case r of
Yield x s -> return $ Yield x (ConcatMapInner s1 s step2 extract2)
Skip s -> return $ Skip (ConcatMapInner s1 s step2 extract2)
Stop res -> do
case res of
Nothing -> return $ Stop Nothing
Just a -> do
s <- inject1 a
return $ Skip (ConcatMapOuter s)

extract (ConcatMapOuter s1) = extract1 s1
extract (ConcatMapInner _ s2 _ extract2) = extract2 s2

{-# INLINE concatMap #-}
concatMap :: Monad m =>
(b -> Producer m a c) -> Producer m a b -> Producer m a c
concatMap f = concatMapM (return Prelude.. f)

-- Note: concatMap and Monad instance for producers have performance comparable
-- to Stream. In fact, concatMap is slower than Stream, that may be some
-- optimization issue though.
--
-- | Example:
--
-- >>> u = do { x <- Producer.fromList; y <- Producer.fromList; return (x,y); }
-- >>> Stream.toList $ Stream.unfold (Producer.simplify u) ([1,2,3,4])
-- [(1,2),(1,3),(1,4)]
--
instance Monad m => Monad (Producer m a) where
{-# INLINE return #-}
return = pure

{-# INLINE (>>=) #-}
(>>=) = flip concatMap

-- {-# INLINE (>>) #-}
-- (>>) = (*>)

------------------------------------------------------------------------------
-- Nesting
------------------------------------------------------------------------------
Expand Down

0 comments on commit 14168d9

Please sign in to comment.