diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index a0ecfb9f25..389a63d30e 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -12,20 +12,156 @@ module Streamly.Internal.Data.Producer ( - mapMaybeM + CrossApplyState(..) + , CrossApplyFstState(..) + , TupleState(..) + , crossApply + , crossApplyFst + , crossApplySnd + , fromEffect + , fromList + , fromTuple + , mapM + , mapMaybeM , takeWhileM + , unfoldrM ) where #include "inline.hs" +import Data.Functor ((<&>)) import Streamly.Internal.Data.Stream.Step (Step(..)) +import Prelude hiding (mapM) + -- | A stream transition: given the current state, produce the next 'Step'. -- The state type @a@ is also the type carried inside 'Step', so a 'Yield' -- delivers a new value alongside the updated state. type Producer m a b = a -> m (Step a b) +-- | State of a cross-apply style producer. @x@ is the seed from which the +-- inner producer's state is (re)injected for every element of the outer +-- producer. We store the first-order @x@ seed rather than the injection +-- action @m s2@ so that the inner injection stays a known, statically inlined +-- call and the loop state remains unboxable (storing @m s2@ here defeats +-- fusion and forces per-element allocation). +data CrossApplyState x s1 s2 a b = + CrossApplyOuter x s1 + | CrossApplyInner x (a -> b) s1 s2 + +-- | State for 'crossApplyFst'. The inner constructor stores the outer +-- producer's value @b@ /directly/ so that it can be re-yielded for each element +-- of the inner producer as a loop-invariant value. Storing a function +-- (@const b@) here instead would force a per-element PAP application in the hot +-- inner loop and defeat the hoisting the original yielded value gets. +data CrossApplyFstState x s1 s2 b = + CrossApplyFstOuter x s1 + | CrossApplyFstInner x b s1 s2 + +data TupleState a = TupleBoth a a | TupleOne a | TupleNone + +-- | Build a single element 'Producer' from an effect. The 'Bool' state is +-- 'True' before the effect is run and 'False' after, when the producer stops. +{-# INLINE_LATE fromEffect #-} +fromEffect :: Applicative m => m b -> Producer m Bool b +fromEffect m True = (`Yield` False) <$> m +fromEffect _ False = pure Stop + +{-# INLINE_LATE fromTuple #-} +fromTuple :: Applicative m => Producer m (TupleState a) a +fromTuple (TupleBoth x y) = pure $ Yield x (TupleOne y) +fromTuple (TupleOne y) = pure $ Yield y TupleNone +fromTuple TupleNone = pure Stop + +{-# INLINE_LATE fromList #-} +fromList :: Applicative m => Producer m [a] a +fromList (x:xs) = pure $ Yield x xs +fromList [] = pure Stop + +{-# INLINE_LATE crossApply #-} +crossApply + :: Monad m + => (x -> m s2) + -> Producer m s1 (a -> b) + -> Producer m s2 a + -> Producer m (CrossApplyState x s1 s2 a b) b +crossApply inject2 step1 _ (CrossApplyOuter seed st) = do + r <- step1 st + case r of + Yield f s -> do + s2 <- inject2 seed + return $ Skip (CrossApplyInner seed f s s2) + Skip s -> return $ Skip (CrossApplyOuter seed s) + Stop -> return Stop +crossApply _ _ step2 (CrossApplyInner seed f os st) = do + r <- step2 st + return $ case r of + Yield a s -> Yield (f a) (CrossApplyInner seed f os s) + Skip s -> Skip (CrossApplyInner seed f os s) + Stop -> Skip (CrossApplyOuter seed os) + +-- | Outer product discarding the second (inner) element. For each element of +-- the first producer the entire second producer is run, yielding the first +-- producer's element each time. +{-# INLINE_LATE crossApplyFst #-} +crossApplyFst + :: Monad m + => (x -> m s2) + -> Producer m s1 b + -> Producer m s2 a + -> Producer m (CrossApplyFstState x s1 s2 b) b +crossApplyFst inject2 step1 _ (CrossApplyFstOuter seed st) = do + r <- step1 st + case r of + Yield b s -> do + s2 <- inject2 seed + return $ Skip (CrossApplyFstInner seed b s s2) + Skip s -> return $ Skip (CrossApplyFstOuter seed s) + Stop -> return Stop +crossApplyFst _ _ step2 (CrossApplyFstInner seed b os st) = do + r <- step2 st + return $ case r of + Yield _ s -> Yield b (CrossApplyFstInner seed b os s) + Skip s -> Skip (CrossApplyFstInner seed b os s) + Stop -> Skip (CrossApplyFstOuter seed os) + +-- | Outer product discarding the first (outer) element. For each element of +-- the first producer the entire second producer is run, yielding the second +-- producer's elements. +{-# INLINE_LATE crossApplySnd #-} +crossApplySnd + :: Monad m + => (x -> m s2) + -> Producer m s1 a + -> Producer m s2 b + -> Producer m (CrossApplyState x s1 s2 b b) b +crossApplySnd inject2 step1 _ (CrossApplyOuter seed st) = do + r <- step1 st + case r of + Yield _ s -> do + s2 <- inject2 seed + return $ Skip (CrossApplyInner seed id s s2) + Skip s -> return $ Skip (CrossApplyOuter seed s) + Stop -> return Stop +crossApplySnd _ _ step2 (CrossApplyInner seed f os st) = do + r <- step2 st + return $ case r of + Yield a s -> Yield (f a) (CrossApplyInner seed f os s) + Skip s -> Skip (CrossApplyInner seed f os s) + Stop -> Skip (CrossApplyOuter seed os) + +{-# INLINE_LATE mapM #-} +mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c +mapM f step1 st = do + r <- step1 st + case r of + Yield x s -> do + b <- f x + return $ Yield b s + Skip s -> return (Skip s) + Stop -> return Stop + {-# INLINE_LATE mapMaybeM #-} mapMaybeM :: Monad m => (b -> m (Maybe c)) -> Producer m s b -> Producer m s c @@ -51,3 +187,13 @@ takeWhileM f step1 st = do return $ if b then Yield x s else Stop Skip s -> return (Skip s) Stop -> return Stop + +-- | Build a 'Producer' from a /monadic/ step function that generates the next +-- element and the next seed value from the current seed value. It is invoked +-- until it returns 'Nothing'. +{-# INLINE_LATE unfoldrM #-} +unfoldrM :: Applicative m => (a -> m (Maybe (b, a))) -> Producer m a b +unfoldrM next a = + next a <&> \case + Just (b, a1) -> Yield b a1 + Nothing -> Stop diff --git a/core/src/Streamly/Internal/Data/Stream/Generate.hs b/core/src/Streamly/Internal/Data/Stream/Generate.hs index dea9da85f4..13df444652 100644 --- a/core/src/Streamly/Internal/Data/Stream/Generate.hs +++ b/core/src/Streamly/Internal/Data/Stream/Generate.hs @@ -119,6 +119,8 @@ import Streamly.Internal.System.IO (unsafeInlineIO) #ifdef USE_UNFOLDS_EVERYWHERE import qualified Streamly.Internal.Data.Unfold as Unfold +#else +import qualified Streamly.Internal.Data.Producer as Producer #endif import Data.Fixed @@ -223,12 +225,9 @@ unfoldrM next = unfold (Unfold.unfoldrM next) #else unfoldrM next = Stream step where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step _ st = do - r <- next st - return $ case r of - Just (x, s) -> Yield x s - Nothing -> Stop + step _ st = Producer.unfoldrM next st #endif -- | Build a stream by unfolding a /pure/ step function @step@ starting from a diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 04d1696f79..c3e51a4bda 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -2171,8 +2171,9 @@ mapMaybeM f (Stream step1 state1) = Stream step state1 where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = Producer.mapMaybeM f (step1 (adaptState gst)) + step gst st = Producer.mapMaybeM f (step1 (adaptState gst)) st -- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's. -- diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index bd436145cb..1cf912dde0 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -48,6 +48,7 @@ module Streamly.Internal.Data.Stream.Type , fromEffect -- ** From Containers + , fromTuple , Streamly.Internal.Data.Stream.Type.fromList -- * Elimination @@ -399,13 +400,7 @@ fromPure x = Stream (\_ s -> pure $ step undefined s) True -- {-# INLINE_NORMAL fromEffect #-} fromEffect :: Applicative m => m a -> Stream m a -fromEffect m = Stream step True - - where - - {-# INLINE_LATE step #-} - step _ True = (`Yield` False) <$> m - step _ False = pure Stop +fromEffect m = Stream (const $ Producer.fromEffect m) True ------------------------------------------------------------------------------ -- From Containers @@ -413,17 +408,18 @@ fromEffect m = Stream step True -- Adapted from the vector package. +-- | Construct a stream from a tuple. +{-# INLINE_LATE fromTuple #-} +fromTuple :: Applicative m => (a, a) -> Stream m a +fromTuple (x, y) = Stream (const Producer.fromTuple) (Producer.TupleBoth x y) + -- | Construct a stream from a list of pure values. {-# INLINE_LATE fromList #-} fromList :: Applicative m => [a] -> Stream m a #ifdef USE_UNFOLDS_EVERYWHERE fromList = unfold Unfold.fromList #else -fromList = Stream step - where - {-# INLINE_LATE step #-} - step _ (x:xs) = pure $ Yield x xs - step _ [] = pure Stop +fromList = Stream (const Producer.fromList) #endif ------------------------------------------------------------------------------ @@ -978,13 +974,9 @@ mapM f (Stream step state) = Stream step1 state where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step1 #-} - step1 gst st = do - r <- step (adaptState gst) st - case r of - Yield x s -> f x >>= \a -> return $ Yield a s - Skip s -> return $ Skip s - Stop -> return Stop + step1 gst st = Producer.mapM f (step (adaptState gst)) st {-# INLINE map #-} map :: Monad m => (a -> b) -> Stream m a -> Stream m b @@ -1167,8 +1159,9 @@ takeWhileM f (Stream step1 state1) = Stream step state1 where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = Producer.takeWhileM f (step1 gst) + step gst st = Producer.takeWhileM f (step1 gst) st -- | End the stream as soon as the predicate fails on an element. -- @@ -1323,25 +1316,20 @@ zipWith f = zipWithM (\a b -> return (f a b)) -- >>> crossApply = Stream.crossWith id -- {-# INLINE_NORMAL crossApply #-} -crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b +crossApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b crossApply (Stream stepa statea) (Stream stepb stateb) = - Stream step' (Left statea) + Stream step (Producer.CrossApplyOuter stateb statea) where - {-# INLINE_LATE step' #-} - step' gst (Left st) = fmap - (\case - Yield f s -> Skip (Right (f, s, stateb)) - Skip s -> Skip (Left s) - Stop -> Stop) - (stepa (adaptState gst) st) - step' gst (Right (f, os, st)) = fmap - (\case - Yield a s -> Yield (f a) (Right (f, os, s)) - Skip s -> Skip (Right (f,os, s)) - Stop -> Skip (Left os)) - (stepb (adaptState gst) st) + {- HLINT ignore "Eta reduce" -} + {-# INLINE_LATE step #-} + step gst st = + Producer.crossApply + return + (stepa (adaptState gst)) + (stepb (adaptState gst)) + st -- This is shared by all fairUnfold, fairConcat combinators. data FairUnfoldState o i = @@ -1397,50 +1385,36 @@ fairCrossWithM f (Stream step1 state1) (Stream step2 state2) = Stop -> return $ Skip (FairUnfoldDrain ys ls) {-# INLINE_NORMAL crossApplySnd #-} -crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b +crossApplySnd :: Monad f => Stream f a -> Stream f b -> Stream f b crossApplySnd (Stream stepa statea) (Stream stepb stateb) = - Stream step (Left statea) + Stream step (Producer.CrossApplyOuter stateb statea) where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst (Left st) = - fmap - (\case - Yield _ s -> Skip (Right (s, stateb)) - Skip s -> Skip (Left s) - Stop -> Stop) - (stepa (adaptState gst) st) - step gst (Right (ostate, st)) = - fmap - (\case - Yield b s -> Yield b (Right (ostate, s)) - Skip s -> Skip (Right (ostate, s)) - Stop -> Skip (Left ostate)) - (stepb gst st) + step gst st = + Producer.crossApplySnd + return + (stepa (adaptState gst)) + (stepb gst) + st {-# INLINE_NORMAL crossApplyFst #-} -crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a +crossApplyFst :: Monad f => Stream f a -> Stream f b -> Stream f a crossApplyFst (Stream stepa statea) (Stream stepb stateb) = - Stream step (Left statea) + Stream step (Producer.CrossApplyFstOuter stateb statea) where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst (Left st) = - fmap - (\case - Yield b s -> Skip (Right (s, stateb, b)) - Skip s -> Skip (Left s) - Stop -> Stop) - (stepa gst st) - step gst (Right (ostate, st, b)) = - fmap - (\case - Yield _ s -> Yield b (Right (ostate, s, b)) - Skip s -> Skip (Right (ostate, s, b)) - Stop -> Skip (Left ostate)) - (stepb (adaptState gst) st) + step gst st = + Producer.crossApplyFst + return + (stepa gst) + (stepb (adaptState gst)) + st {- instance Applicative f => Applicative (Stream f) where diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index ea8ab2756d..7ac3231a3a 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -271,13 +271,7 @@ mkUnfoldrM step = Unfold step pure -- {-# INLINE unfoldrM #-} unfoldrM :: Applicative m => (a -> m (Maybe (b, a))) -> Unfold m a b -unfoldrM next = Unfold step pure - where - {-# INLINE_LATE step #-} - step st = - (\case - Just (x, s) -> Yield x s - Nothing -> Stop) <$> next st +unfoldrM next = Unfold (Producer.unfoldrM next) pure -- | Like 'unfoldrM' but uses a pure step function. -- @@ -447,15 +441,7 @@ mapM2 f (Unfold ustep uinject) = Unfold step inject {-# INLINE_NORMAL mapM #-} mapM :: Monad m => (b -> m c) -> Unfold m a b -> Unfold m a c -- mapM f = mapM2 (const f) -mapM f (Unfold ustep uinject) = Unfold step uinject - where - {-# INLINE_LATE step #-} - step st = do - r <- ustep st - case r of - Yield x s -> f x >>= \a -> return $ Yield a s - Skip s -> return $ Skip s - Stop -> return Stop +mapM f (Unfold ustep uinject) = Unfold (Producer.mapM f ustep) uinject -- | Carry the input along with the output as the first element of the output -- tuple. @@ -552,14 +538,7 @@ instance Functor m => Functor (Unfold m a) where -- /Pre-release/ {-# INLINE fromEffect #-} fromEffect :: Applicative m => m b -> Unfold m a b -fromEffect m = Unfold step inject - - where - - inject _ = pure False - - step False = (`Yield` True) <$> m - step True = pure Stop +fromEffect m = Unfold (Producer.fromEffect m) (const $ pure True) -- XXX Shouldn't this be Unfold m a a ? Which is identity. Should this function -- even exist for Unfolds. Should we have applicative/Monad for unfolds? @@ -573,20 +552,12 @@ fromEffect m = Unfold step inject fromPure :: Applicative m => b -> Unfold m a b fromPure = fromEffect . pure -data TupleState a = TupleBoth a a | TupleOne a | TupleNone - -- | Convert a tuple to a 'Stream'. -- {-# INLINE_LATE fromTuple #-} fromTuple :: Applicative m => Unfold m (a,a) a -fromTuple = Unfold step (\(x,y) -> pure $ TupleBoth x y) - - where - - {-# INLINE_LATE step #-} - step (TupleBoth x y) = pure $ Yield x (TupleOne y) - step (TupleOne y) = pure $ Yield y TupleNone - step TupleNone = pure Stop +fromTuple = + Unfold Producer.fromTuple (\(x,y) -> pure $ Producer.TupleBoth x y) -- XXX Check if "unfold (fromList [1..10])" fuses, if it doesn't we can use -- rewrite rules to rewrite list enumerations to unfold enumerations. @@ -595,31 +566,43 @@ fromTuple = Unfold step (\(x,y) -> pure $ TupleBoth x y) -- {-# INLINE_LATE fromList #-} fromList :: Applicative m => Unfold m [a] a -fromList = Unfold step pure - - where - - {-# INLINE_LATE step #-} - step (x:xs) = pure $ Yield x xs - step [] = pure Stop +fromList = Unfold Producer.fromList pure -- | Outer product discarding the first element. -- --- /Unimplemented/ --- {-# INLINE_NORMAL crossApplySnd #-} -crossApplySnd :: -- Monad m => +crossApplySnd :: Monad m => Unfold m a b -> Unfold m a c -> Unfold m a c -crossApplySnd (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined +crossApplySnd (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + {-# INLINE_LATE inject #-} + inject a = do + s1 <- inject1 a + return $ Producer.CrossApplyOuter a s1 + + {- HLINT ignore "Eta reduce" -} + {-# INLINE_LATE step #-} + step st = Producer.crossApplySnd inject2 step1 step2 st -- | Outer product discarding the second element. -- --- /Unimplemented/ --- {-# INLINE_NORMAL crossApplyFst #-} -crossApplyFst :: -- Monad m => +crossApplyFst :: Monad m => Unfold m a b -> Unfold m a c -> Unfold m a b -crossApplyFst (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined +crossApplyFst (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + {-# INLINE_LATE inject #-} + inject a = do + s1 <- inject1 a + return $ Producer.CrossApplyFstOuter a s1 + + {- HLINT ignore "Eta reduce" -} + {-# INLINE_LATE step #-} + step st = Producer.crossApplyFst inject2 step1 step2 st {- {-# ANN type Many2State Fuse #-} @@ -794,7 +777,18 @@ fairCross = fairCrossWith (,) {-# INLINE crossApply #-} crossApply :: Monad m => Unfold m a (b -> c) -> Unfold m a b -> Unfold m a c -crossApply u1 u2 = fmap (\(a, b) -> a b) (cross u1 u2) +crossApply (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + {-# INLINE_LATE inject #-} + inject a = do + s1 <- inject1 a + return $ Producer.CrossApplyOuter a s1 + + {- HLINT ignore "Eta reduce" -} + {-# INLINE_LATE step #-} + step st = Producer.crossApply inject2 step1 step2 st -- XXX Applicative makes sense for unfolds, but monad does not. Use streams for -- monad. diff --git a/docs/Developer/optimization-guidelines.md b/docs/Developer/optimization-guidelines.md index 56f9d95dc7..45b23b928d 100644 --- a/docs/Developer/optimization-guidelines.md +++ b/docs/Developer/optimization-guidelines.md @@ -111,6 +111,43 @@ Step function of a stream or unfold: may help in such cases. * The step function must be annotated such that it gets inlined after the main combinator (`INLINE_LATE`). +* Always declare the step function with all arguments explicit, even when the + body delegates to a helper: + + ```haskell + -- Good: full arity visible to GHC from the source + {-# INLINE_LATE step #-} + step gst st = Helper.mapM f (innerStep (adaptState gst)) st + + -- Bad: eta-reduced, last argument missing + {-# INLINE_LATE step #-} + step gst = Helper.mapM f (innerStep (adaptState gst)) + ``` + + The eta-reduced form causes a performance problem. + In runtime-composed pipelines (iterated/dynamic composition):** When the + `Stream` object is constructed at runtime the step closure is invoked + through a dynamic function pointer rather than at a known call site, so + GHC's static inlining cannot eliminate the intermediate partial + applications at all. Each call to `step gst` allocates a PAP for + `innerStep (adaptState gst)` and another for `Helper.mapM f ` — + even if the helper carries `INLINE_LATE`. For example, eta-reducing the + `mapM`, `crossApply`, `crossApplySnd`, and `crossApplyFst` step functions + caused the following regressions in o-n-space (iterated, not fused) + benchmarks: + + ``` + o-n-space/iterated/(<$) : 14 MB → 24 MB (+71%) + o-n-space/iterated/fmap : 15 MB → 26 MB (+73%) + o-n-space/iterated/mapM : 3.6 MB → 9.1 MB (+153%) + ``` + + Making the arity explicit in the source prevents these allocations regardless + of inlining phase or order. + + Note: helpers referenced by value with no free variables + (e.g. `Stream (const Producer.fromList)`) are unaffected because returning a + top-level function reference does not allocate. Multiple yield points or single?: