Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion core/src/Streamly/Internal/Data/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,156 @@

module Streamly.Internal.Data.Producer
(
mapMaybeM
CrossApplyState(..)
, CrossApplyFstState(..)
, TupleState(..)
, crossApply
, crossApplyFst
, crossApplySnd
, fromEffect
, fromList
, fromTuple
, mapM
, mapMaybeM
, takeWhileM
, unfoldrM
)
where

#include "inline.hs"

import Data.Functor ((<&>))
import Streamly.Internal.Data.Stream.Step (Step(..))

import Prelude hiding (mapM)

-- | A stream transition: given the current state, produce the next 'Step'.
-- The state type @a@ is also the type carried inside 'Step', so a 'Yield'
-- delivers a new value alongside the updated state.
type Producer m a b = a -> m (Step a b)

-- | State of a cross-apply style producer. @x@ is the seed from which the
-- inner producer's state is (re)injected for every element of the outer
-- producer. We store the first-order @x@ seed rather than the injection
-- action @m s2@ so that the inner injection stays a known, statically inlined
-- call and the loop state remains unboxable (storing @m s2@ here defeats
-- fusion and forces per-element allocation).
data CrossApplyState x s1 s2 a b =
CrossApplyOuter x s1
| CrossApplyInner x (a -> b) s1 s2

-- | State for 'crossApplyFst'. The inner constructor stores the outer
-- producer's value @b@ /directly/ so that it can be re-yielded for each element
-- of the inner producer as a loop-invariant value. Storing a function
-- (@const b@) here instead would force a per-element PAP application in the hot
-- inner loop and defeat the hoisting the original yielded value gets.
data CrossApplyFstState x s1 s2 b =
CrossApplyFstOuter x s1
| CrossApplyFstInner x b s1 s2

data TupleState a = TupleBoth a a | TupleOne a | TupleNone

-- | Build a single element 'Producer' from an effect. The 'Bool' state is
-- 'True' before the effect is run and 'False' after, when the producer stops.
{-# INLINE_LATE fromEffect #-}
fromEffect :: Applicative m => m b -> Producer m Bool b
fromEffect m True = (`Yield` False) <$> m
fromEffect _ False = pure Stop

{-# INLINE_LATE fromTuple #-}
fromTuple :: Applicative m => Producer m (TupleState a) a
fromTuple (TupleBoth x y) = pure $ Yield x (TupleOne y)
fromTuple (TupleOne y) = pure $ Yield y TupleNone
fromTuple TupleNone = pure Stop

{-# INLINE_LATE fromList #-}
fromList :: Applicative m => Producer m [a] a
fromList (x:xs) = pure $ Yield x xs
fromList [] = pure Stop

{-# INLINE_LATE crossApply #-}
crossApply
:: Monad m
=> (x -> m s2)
-> Producer m s1 (a -> b)
-> Producer m s2 a
-> Producer m (CrossApplyState x s1 s2 a b) b
crossApply inject2 step1 _ (CrossApplyOuter seed st) = do
r <- step1 st
case r of
Yield f s -> do
s2 <- inject2 seed
return $ Skip (CrossApplyInner seed f s s2)
Skip s -> return $ Skip (CrossApplyOuter seed s)
Stop -> return Stop
crossApply _ _ step2 (CrossApplyInner seed f os st) = do
r <- step2 st
return $ case r of
Yield a s -> Yield (f a) (CrossApplyInner seed f os s)
Skip s -> Skip (CrossApplyInner seed f os s)
Stop -> Skip (CrossApplyOuter seed os)

-- | Outer product discarding the second (inner) element. For each element of
-- the first producer the entire second producer is run, yielding the first
-- producer's element each time.
{-# INLINE_LATE crossApplyFst #-}
crossApplyFst
:: Monad m
=> (x -> m s2)
-> Producer m s1 b
-> Producer m s2 a
-> Producer m (CrossApplyFstState x s1 s2 b) b
crossApplyFst inject2 step1 _ (CrossApplyFstOuter seed st) = do
r <- step1 st
case r of
Yield b s -> do
s2 <- inject2 seed
return $ Skip (CrossApplyFstInner seed b s s2)
Skip s -> return $ Skip (CrossApplyFstOuter seed s)
Stop -> return Stop
crossApplyFst _ _ step2 (CrossApplyFstInner seed b os st) = do
r <- step2 st
return $ case r of
Yield _ s -> Yield b (CrossApplyFstInner seed b os s)
Skip s -> Skip (CrossApplyFstInner seed b os s)
Stop -> Skip (CrossApplyFstOuter seed os)

-- | Outer product discarding the first (outer) element. For each element of
-- the first producer the entire second producer is run, yielding the second
-- producer's elements.
{-# INLINE_LATE crossApplySnd #-}
crossApplySnd
:: Monad m
=> (x -> m s2)
-> Producer m s1 a
-> Producer m s2 b
-> Producer m (CrossApplyState x s1 s2 b b) b
crossApplySnd inject2 step1 _ (CrossApplyOuter seed st) = do
r <- step1 st
case r of
Yield _ s -> do
s2 <- inject2 seed
return $ Skip (CrossApplyInner seed id s s2)
Skip s -> return $ Skip (CrossApplyOuter seed s)
Stop -> return Stop
crossApplySnd _ _ step2 (CrossApplyInner seed f os st) = do
r <- step2 st
return $ case r of
Yield a s -> Yield (f a) (CrossApplyInner seed f os s)
Skip s -> Skip (CrossApplyInner seed f os s)
Stop -> Skip (CrossApplyOuter seed os)

{-# INLINE_LATE mapM #-}
mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c
mapM f step1 st = do
r <- step1 st
case r of
Yield x s -> do
b <- f x
return $ Yield b s
Skip s -> return (Skip s)
Stop -> return Stop

{-# INLINE_LATE mapMaybeM #-}
mapMaybeM :: Monad m
=> (b -> m (Maybe c)) -> Producer m s b -> Producer m s c
Expand All @@ -51,3 +187,13 @@ takeWhileM f step1 st = do
return $ if b then Yield x s else Stop
Skip s -> return (Skip s)
Stop -> return Stop

-- | Build a 'Producer' from a /monadic/ step function that generates the next
-- element and the next seed value from the current seed value. It is invoked
-- until it returns 'Nothing'.
{-# INLINE_LATE unfoldrM #-}
unfoldrM :: Applicative m => (a -> m (Maybe (b, a))) -> Producer m a b
unfoldrM next a =
next a <&> \case
Just (b, a1) -> Yield b a1
Nothing -> Stop
9 changes: 4 additions & 5 deletions core/src/Streamly/Internal/Data/Stream/Generate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ import Streamly.Internal.System.IO (unsafeInlineIO)

#ifdef USE_UNFOLDS_EVERYWHERE
import qualified Streamly.Internal.Data.Unfold as Unfold
#else
import qualified Streamly.Internal.Data.Producer as Producer
#endif

import Data.Fixed
Expand Down Expand Up @@ -223,12 +225,9 @@ unfoldrM next = unfold (Unfold.unfoldrM next)
#else
unfoldrM next = Stream step
where
{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step #-}
step _ st = do
r <- next st
return $ case r of
Just (x, s) -> Yield x s
Nothing -> Stop
step _ st = Producer.unfoldrM next st
#endif

-- | Build a stream by unfolding a /pure/ step function @step@ starting from a
Expand Down
3 changes: 2 additions & 1 deletion core/src/Streamly/Internal/Data/Stream/Transform.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2171,8 +2171,9 @@ mapMaybeM f (Stream step1 state1) = Stream step state1

where

{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step #-}
step gst = Producer.mapMaybeM f (step1 (adaptState gst))
step gst st = Producer.mapMaybeM f (step1 (adaptState gst)) st

-- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's.
--
Expand Down
106 changes: 40 additions & 66 deletions core/src/Streamly/Internal/Data/Stream/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ module Streamly.Internal.Data.Stream.Type
, fromEffect

-- ** From Containers
, fromTuple
, Streamly.Internal.Data.Stream.Type.fromList

-- * Elimination
Expand Down Expand Up @@ -399,31 +400,26 @@ fromPure x = Stream (\_ s -> pure $ step undefined s) True
--
{-# INLINE_NORMAL fromEffect #-}
fromEffect :: Applicative m => m a -> Stream m a
fromEffect m = Stream step True

where

{-# INLINE_LATE step #-}
step _ True = (`Yield` False) <$> m
step _ False = pure Stop
fromEffect m = Stream (const $ Producer.fromEffect m) True

------------------------------------------------------------------------------
-- From Containers
------------------------------------------------------------------------------

-- Adapted from the vector package.

-- | Construct a stream from a tuple.
{-# INLINE_LATE fromTuple #-}
fromTuple :: Applicative m => (a, a) -> Stream m a
fromTuple (x, y) = Stream (const Producer.fromTuple) (Producer.TupleBoth x y)

-- | Construct a stream from a list of pure values.
{-# INLINE_LATE fromList #-}
fromList :: Applicative m => [a] -> Stream m a
#ifdef USE_UNFOLDS_EVERYWHERE
fromList = unfold Unfold.fromList
#else
fromList = Stream step
where
{-# INLINE_LATE step #-}
step _ (x:xs) = pure $ Yield x xs
step _ [] = pure Stop
fromList = Stream (const Producer.fromList)
#endif

------------------------------------------------------------------------------
Expand Down Expand Up @@ -978,13 +974,9 @@ mapM f (Stream step state) = Stream step1 state

where

{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step1 #-}
step1 gst st = do
r <- step (adaptState gst) st
case r of
Yield x s -> f x >>= \a -> return $ Yield a s
Skip s -> return $ Skip s
Stop -> return Stop
step1 gst st = Producer.mapM f (step (adaptState gst)) st

{-# INLINE map #-}
map :: Monad m => (a -> b) -> Stream m a -> Stream m b
Expand Down Expand Up @@ -1167,8 +1159,9 @@ takeWhileM f (Stream step1 state1) = Stream step state1

where

{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step #-}
step gst = Producer.takeWhileM f (step1 gst)
step gst st = Producer.takeWhileM f (step1 gst) st

-- | End the stream as soon as the predicate fails on an element.
--
Expand Down Expand Up @@ -1323,25 +1316,20 @@ zipWith f = zipWithM (\a b -> return (f a b))
-- >>> crossApply = Stream.crossWith id
--
{-# INLINE_NORMAL crossApply #-}
crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
crossApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b
crossApply (Stream stepa statea) (Stream stepb stateb) =
Stream step' (Left statea)
Stream step (Producer.CrossApplyOuter stateb statea)

where

{-# INLINE_LATE step' #-}
step' gst (Left st) = fmap
(\case
Yield f s -> Skip (Right (f, s, stateb))
Skip s -> Skip (Left s)
Stop -> Stop)
(stepa (adaptState gst) st)
step' gst (Right (f, os, st)) = fmap
(\case
Yield a s -> Yield (f a) (Right (f, os, s))
Skip s -> Skip (Right (f,os, s))
Stop -> Skip (Left os))
(stepb (adaptState gst) st)
{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step #-}
step gst st =
Producer.crossApply
return
(stepa (adaptState gst))
(stepb (adaptState gst))
st

-- This is shared by all fairUnfold, fairConcat combinators.
data FairUnfoldState o i =
Expand Down Expand Up @@ -1397,50 +1385,36 @@ fairCrossWithM f (Stream step1 state1) (Stream step2 state2) =
Stop -> return $ Skip (FairUnfoldDrain ys ls)

{-# INLINE_NORMAL crossApplySnd #-}
crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b
crossApplySnd :: Monad f => Stream f a -> Stream f b -> Stream f b
crossApplySnd (Stream stepa statea) (Stream stepb stateb) =
Stream step (Left statea)
Stream step (Producer.CrossApplyOuter stateb statea)

where

{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step #-}
step gst (Left st) =
fmap
(\case
Yield _ s -> Skip (Right (s, stateb))
Skip s -> Skip (Left s)
Stop -> Stop)
(stepa (adaptState gst) st)
step gst (Right (ostate, st)) =
fmap
(\case
Yield b s -> Yield b (Right (ostate, s))
Skip s -> Skip (Right (ostate, s))
Stop -> Skip (Left ostate))
(stepb gst st)
step gst st =
Producer.crossApplySnd
return
(stepa (adaptState gst))
(stepb gst)
st

{-# INLINE_NORMAL crossApplyFst #-}
crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a
crossApplyFst :: Monad f => Stream f a -> Stream f b -> Stream f a
crossApplyFst (Stream stepa statea) (Stream stepb stateb) =
Stream step (Left statea)
Stream step (Producer.CrossApplyFstOuter stateb statea)

where

{- HLINT ignore "Eta reduce" -}
{-# INLINE_LATE step #-}
step gst (Left st) =
fmap
(\case
Yield b s -> Skip (Right (s, stateb, b))
Skip s -> Skip (Left s)
Stop -> Stop)
(stepa gst st)
step gst (Right (ostate, st, b)) =
fmap
(\case
Yield _ s -> Yield b (Right (ostate, s, b))
Skip s -> Skip (Right (ostate, s, b))
Stop -> Skip (Left ostate))
(stepb (adaptState gst) st)
step gst st =
Producer.crossApplyFst
return
(stepa gst)
(stepb (adaptState gst))
st

{-
instance Applicative f => Applicative (Stream f) where
Expand Down
Loading
Loading