From cfb9db94637cbaa07a1fd0ae56b6d792f00a0492 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 31 May 2026 22:59:47 +0530 Subject: [PATCH 1/8] Move unfoldrM from Stream and Unfold to Producer --- core/src/Streamly/Internal/Data/Producer.hs | 12 ++++++++++++ core/src/Streamly/Internal/Data/Stream/Generate.hs | 8 +++----- core/src/Streamly/Internal/Data/Unfold/Type.hs | 8 +------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index a0ecfb9f25..fffa02416d 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -14,11 +14,13 @@ module Streamly.Internal.Data.Producer ( mapMaybeM , takeWhileM + , unfoldrM ) where #include "inline.hs" +import Data.Functor ((<&>)) import Streamly.Internal.Data.Stream.Step (Step(..)) -- | A stream transition: given the current state, produce the next 'Step'. @@ -51,3 +53,13 @@ takeWhileM f step1 st = do return $ if b then Yield x s else Stop Skip s -> return (Skip s) Stop -> return Stop + +-- | Build a 'Producer' from a /monadic/ step function that generates the next +-- element and the next seed value from the current seed value. It is invoked +-- until it returns 'Nothing'. +{-# INLINE_LATE unfoldrM #-} +unfoldrM :: Applicative m => (a -> m (Maybe (b, a))) -> Producer m a b +unfoldrM next a = + next a <&> \case + Just (b, a1) -> Yield b a1 + Nothing -> Stop diff --git a/core/src/Streamly/Internal/Data/Stream/Generate.hs b/core/src/Streamly/Internal/Data/Stream/Generate.hs index dea9da85f4..a56974e33a 100644 --- a/core/src/Streamly/Internal/Data/Stream/Generate.hs +++ b/core/src/Streamly/Internal/Data/Stream/Generate.hs @@ -119,6 +119,8 @@ import Streamly.Internal.System.IO (unsafeInlineIO) #ifdef USE_UNFOLDS_EVERYWHERE import qualified Streamly.Internal.Data.Unfold as Unfold +#else +import qualified Streamly.Internal.Data.Producer as Producer #endif import Data.Fixed @@ -224,11 +226,7 @@ unfoldrM next = unfold (Unfold.unfoldrM next) unfoldrM next = Stream step where {-# INLINE_LATE step #-} - step _ st = do - r <- next st - return $ case r of - Just (x, s) -> Yield x s - Nothing -> Stop + step _ = Producer.unfoldrM next #endif -- | Build a stream by unfolding a /pure/ step function @step@ starting from a diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index ea8ab2756d..d2db16191b 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -271,13 +271,7 @@ mkUnfoldrM step = Unfold step pure -- {-# INLINE unfoldrM #-} unfoldrM :: Applicative m => (a -> m (Maybe (b, a))) -> Unfold m a b -unfoldrM next = Unfold step pure - where - {-# INLINE_LATE step #-} - step st = - (\case - Just (x, s) -> Yield x s - Nothing -> Stop) <$> next st +unfoldrM next = Unfold (Producer.unfoldrM next) pure -- | Like 'unfoldrM' but uses a pure step function. -- From 0186872b3aee683fc9ec785b53c99ed3b5dba82f Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 31 May 2026 23:58:45 +0530 Subject: [PATCH 2/8] Move mapM to Producer module --- core/src/Streamly/Internal/Data/Producer.hs | 16 +++++++++++++++- core/src/Streamly/Internal/Data/Stream/Type.hs | 7 +------ core/src/Streamly/Internal/Data/Unfold/Type.hs | 10 +--------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index fffa02416d..f3add0c9f2 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -12,7 +12,8 @@ module Streamly.Internal.Data.Producer ( - mapMaybeM + mapM + , mapMaybeM , takeWhileM , unfoldrM ) @@ -23,11 +24,24 @@ where import Data.Functor ((<&>)) import Streamly.Internal.Data.Stream.Step (Step(..)) +import Prelude hiding (mapM) + -- | A stream transition: given the current state, produce the next 'Step'. -- The state type @a@ is also the type carried inside 'Step', so a 'Yield' -- delivers a new value alongside the updated state. type Producer m a b = a -> m (Step a b) +{-# INLINE_LATE mapM #-} +mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c +mapM f step1 st = do + r <- step1 st + case r of + Yield x s -> do + b <- f x + return $ Yield b s + Skip s -> return (Skip s) + Stop -> return Stop + {-# INLINE_LATE mapMaybeM #-} mapMaybeM :: Monad m => (b -> m (Maybe c)) -> Producer m s b -> Producer m s c diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index bd436145cb..10a923f2ec 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -979,12 +979,7 @@ mapM f (Stream step state) = Stream step1 state where {-# INLINE_LATE step1 #-} - step1 gst st = do - r <- step (adaptState gst) st - case r of - Yield x s -> f x >>= \a -> return $ Yield a s - Skip s -> return $ Skip s - Stop -> return Stop + step1 gst = Producer.mapM f (step (adaptState gst)) {-# INLINE map #-} map :: Monad m => (a -> b) -> Stream m a -> Stream m b diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index d2db16191b..50a33e44cd 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -441,15 +441,7 @@ mapM2 f (Unfold ustep uinject) = Unfold step inject {-# INLINE_NORMAL mapM #-} mapM :: Monad m => (b -> m c) -> Unfold m a b -> Unfold m a c -- mapM f = mapM2 (const f) -mapM f (Unfold ustep uinject) = Unfold step uinject - where - {-# INLINE_LATE step #-} - step st = do - r <- ustep st - case r of - Yield x s -> f x >>= \a -> return $ Yield a s - Skip s -> return $ Skip s - Stop -> return Stop +mapM f (Unfold ustep uinject) = Unfold (Producer.mapM f ustep) uinject -- | Carry the input along with the output as the first element of the output -- tuple. From d67d4497ccaf66d53b9f0dc9951ca91a921adbe9 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 1 Jun 2026 00:19:58 +0530 Subject: [PATCH 3/8] Move fromEffect to Producer module --- core/src/Streamly/Internal/Data/Producer.hs | 10 +++++++++- core/src/Streamly/Internal/Data/Stream/Type.hs | 8 +------- core/src/Streamly/Internal/Data/Unfold/Type.hs | 9 +-------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index f3add0c9f2..e5765bf95a 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -12,7 +12,8 @@ module Streamly.Internal.Data.Producer ( - mapM + fromEffect + , mapM , mapMaybeM , takeWhileM , unfoldrM @@ -31,6 +32,13 @@ import Prelude hiding (mapM) -- delivers a new value alongside the updated state. type Producer m a b = a -> m (Step a b) +-- | Build a single element 'Producer' from an effect. The 'Bool' state is +-- 'True' before the effect is run and 'False' after, when the producer stops. +{-# INLINE_LATE fromEffect #-} +fromEffect :: Applicative m => m b -> Producer m Bool b +fromEffect m True = (`Yield` False) <$> m +fromEffect _ False = pure Stop + {-# INLINE_LATE mapM #-} mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c mapM f step1 st = do diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 10a923f2ec..7c9b7b3e65 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -399,13 +399,7 @@ fromPure x = Stream (\_ s -> pure $ step undefined s) True -- {-# INLINE_NORMAL fromEffect #-} fromEffect :: Applicative m => m a -> Stream m a -fromEffect m = Stream step True - - where - - {-# INLINE_LATE step #-} - step _ True = (`Yield` False) <$> m - step _ False = pure Stop +fromEffect m = Stream (const $ Producer.fromEffect m) True ------------------------------------------------------------------------------ -- From Containers diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index 50a33e44cd..fd1d6ed484 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -538,14 +538,7 @@ instance Functor m => Functor (Unfold m a) where -- /Pre-release/ {-# INLINE fromEffect #-} fromEffect :: Applicative m => m b -> Unfold m a b -fromEffect m = Unfold step inject - - where - - inject _ = pure False - - step False = (`Yield` True) <$> m - step True = pure Stop +fromEffect m = Unfold (Producer.fromEffect m) (const $ pure True) -- XXX Shouldn't this be Unfold m a a ? Which is identity. Should this function -- even exist for Unfolds. Should we have applicative/Monad for unfolds? From ff73b6ae1c4710805b33e11c17fbfb38f208b299 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 1 Jun 2026 00:30:12 +0530 Subject: [PATCH 4/8] Move fromTuple and fromList to Producer module --- core/src/Streamly/Internal/Data/Producer.hs | 18 ++++++++++++++++- .../src/Streamly/Internal/Data/Stream/Type.hs | 12 ++++++----- .../src/Streamly/Internal/Data/Unfold/Type.hs | 20 +++---------------- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index e5765bf95a..70814b0dd1 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -12,7 +12,10 @@ module Streamly.Internal.Data.Producer ( - fromEffect + TupleState(..) + , fromEffect + , fromList + , fromTuple , mapM , mapMaybeM , takeWhileM @@ -32,6 +35,8 @@ import Prelude hiding (mapM) -- delivers a new value alongside the updated state. type Producer m a b = a -> m (Step a b) +data TupleState a = TupleBoth a a | TupleOne a | TupleNone + -- | Build a single element 'Producer' from an effect. The 'Bool' state is -- 'True' before the effect is run and 'False' after, when the producer stops. {-# INLINE_LATE fromEffect #-} @@ -39,6 +44,17 @@ fromEffect :: Applicative m => m b -> Producer m Bool b fromEffect m True = (`Yield` False) <$> m fromEffect _ False = pure Stop +{-# INLINE_LATE fromTuple #-} +fromTuple :: Applicative m => Producer m (TupleState a) a +fromTuple (TupleBoth x y) = pure $ Yield x (TupleOne y) +fromTuple (TupleOne y) = pure $ Yield y TupleNone +fromTuple TupleNone = pure Stop + +{-# INLINE_LATE fromList #-} +fromList :: Applicative m => Producer m [a] a +fromList (x:xs) = pure $ Yield x xs +fromList [] = pure Stop + {-# INLINE_LATE mapM #-} mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c mapM f step1 st = do diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 7c9b7b3e65..b4e1cc7f2e 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -48,6 +48,7 @@ module Streamly.Internal.Data.Stream.Type , fromEffect -- ** From Containers + , fromTuple , Streamly.Internal.Data.Stream.Type.fromList -- * Elimination @@ -407,17 +408,18 @@ fromEffect m = Stream (const $ Producer.fromEffect m) True -- Adapted from the vector package. +-- | Construct a stream from a tuple. +{-# INLINE_LATE fromTuple #-} +fromTuple :: Applicative m => (a, a) -> Stream m a +fromTuple (x, y) = Stream (const Producer.fromTuple) (Producer.TupleBoth x y) + -- | Construct a stream from a list of pure values. {-# INLINE_LATE fromList #-} fromList :: Applicative m => [a] -> Stream m a #ifdef USE_UNFOLDS_EVERYWHERE fromList = unfold Unfold.fromList #else -fromList = Stream step - where - {-# INLINE_LATE step #-} - step _ (x:xs) = pure $ Yield x xs - step _ [] = pure Stop +fromList = Stream (const Producer.fromList) #endif ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index fd1d6ed484..47f1854af3 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -552,20 +552,12 @@ fromEffect m = Unfold (Producer.fromEffect m) (const $ pure True) fromPure :: Applicative m => b -> Unfold m a b fromPure = fromEffect . pure -data TupleState a = TupleBoth a a | TupleOne a | TupleNone - -- | Convert a tuple to a 'Stream'. -- {-# INLINE_LATE fromTuple #-} fromTuple :: Applicative m => Unfold m (a,a) a -fromTuple = Unfold step (\(x,y) -> pure $ TupleBoth x y) - - where - - {-# INLINE_LATE step #-} - step (TupleBoth x y) = pure $ Yield x (TupleOne y) - step (TupleOne y) = pure $ Yield y TupleNone - step TupleNone = pure Stop +fromTuple = + Unfold Producer.fromTuple (\(x,y) -> pure $ Producer.TupleBoth x y) -- XXX Check if "unfold (fromList [1..10])" fuses, if it doesn't we can use -- rewrite rules to rewrite list enumerations to unfold enumerations. @@ -574,13 +566,7 @@ fromTuple = Unfold step (\(x,y) -> pure $ TupleBoth x y) -- {-# INLINE_LATE fromList #-} fromList :: Applicative m => Unfold m [a] a -fromList = Unfold step pure - - where - - {-# INLINE_LATE step #-} - step (x:xs) = pure $ Yield x xs - step [] = pure Stop +fromList = Unfold Producer.fromList pure -- | Outer product discarding the first element. -- From de037510e309c154d342a1b13c7b2739c019e4dd Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 1 Jun 2026 00:48:20 +0530 Subject: [PATCH 5/8] Move crossApply to Producer module --- core/src/Streamly/Internal/Data/Producer.hs | 29 ++++++++++++++++++- .../src/Streamly/Internal/Data/Stream/Type.hs | 22 +++++--------- .../src/Streamly/Internal/Data/Unfold/Type.hs | 12 +++++++- 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index 70814b0dd1..8fb89e85e7 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -12,7 +12,9 @@ module Streamly.Internal.Data.Producer ( - TupleState(..) + CrossApplyState(..) + , TupleState(..) + , crossApply , fromEffect , fromList , fromTuple @@ -35,6 +37,10 @@ import Prelude hiding (mapM) -- delivers a new value alongside the updated state. type Producer m a b = a -> m (Step a b) +data CrossApplyState m s1 s2 a b = + CrossApplyOuter (m s2) s1 + | CrossApplyInner (m s2) (a -> b) s1 s2 + data TupleState a = TupleBoth a a | TupleOne a | TupleNone -- | Build a single element 'Producer' from an effect. The 'Bool' state is @@ -55,6 +61,27 @@ fromList :: Applicative m => Producer m [a] a fromList (x:xs) = pure $ Yield x xs fromList [] = pure Stop +{-# INLINE_LATE crossApply #-} +crossApply + :: Monad m + => Producer m s1 (a -> b) + -> Producer m s2 a + -> Producer m (CrossApplyState m s1 s2 a b) b +crossApply step1 _ (CrossApplyOuter inject2 st) = do + r <- step1 st + case r of + Yield f s -> do + s2 <- inject2 + return $ Skip (CrossApplyInner inject2 f s s2) + Skip s -> return $ Skip (CrossApplyOuter inject2 s) + Stop -> return Stop +crossApply _ step2 (CrossApplyInner inject2 f os st) = do + r <- step2 st + return $ case r of + Yield a s -> Yield (f a) (CrossApplyInner inject2 f os s) + Skip s -> Skip (CrossApplyInner inject2 f os s) + Stop -> Skip (CrossApplyOuter inject2 os) + {-# INLINE_LATE mapM #-} mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c mapM f step1 st = do diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index b4e1cc7f2e..368c7ab6e7 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -1314,25 +1314,17 @@ zipWith f = zipWithM (\a b -> return (f a b)) -- >>> crossApply = Stream.crossWith id -- {-# INLINE_NORMAL crossApply #-} -crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b +crossApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b crossApply (Stream stepa statea) (Stream stepb stateb) = - Stream step' (Left statea) + Stream step (Producer.CrossApplyOuter (return stateb) statea) where - {-# INLINE_LATE step' #-} - step' gst (Left st) = fmap - (\case - Yield f s -> Skip (Right (f, s, stateb)) - Skip s -> Skip (Left s) - Stop -> Stop) - (stepa (adaptState gst) st) - step' gst (Right (f, os, st)) = fmap - (\case - Yield a s -> Yield (f a) (Right (f, os, s)) - Skip s -> Skip (Right (f,os, s)) - Stop -> Skip (Left os)) - (stepb (adaptState gst) st) + {-# INLINE_LATE step #-} + step gst = + Producer.crossApply + (stepa (adaptState gst)) + (stepb (adaptState gst)) -- This is shared by all fairUnfold, fairConcat combinators. data FairUnfoldState o i = diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index 47f1854af3..ed278a8164 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -759,7 +759,17 @@ fairCross = fairCrossWith (,) {-# INLINE crossApply #-} crossApply :: Monad m => Unfold m a (b -> c) -> Unfold m a b -> Unfold m a c -crossApply u1 u2 = fmap (\(a, b) -> a b) (cross u1 u2) +crossApply (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + {-# INLINE_LATE inject #-} + inject a = do + s1 <- inject1 a + return $ Producer.CrossApplyOuter (inject2 a) s1 + + {-# INLINE_LATE step #-} + step = Producer.crossApply step1 step2 -- XXX Applicative makes sense for unfolds, but monad does not. Use streams for -- monad. From 724f5805224270fd9d1f2a8a8aa4d07dbaeffa3a Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 1 Jun 2026 01:50:51 +0530 Subject: [PATCH 6/8] Move crossApplyFst and crossApplySnd to Producer module --- core/src/Streamly/Internal/Data/Producer.hs | 50 +++++++++++++++++++ .../src/Streamly/Internal/Data/Stream/Type.hs | 44 +++++----------- .../src/Streamly/Internal/Data/Unfold/Type.hs | 32 +++++++++--- 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index 8fb89e85e7..7af061c98d 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -15,6 +15,8 @@ module Streamly.Internal.Data.Producer CrossApplyState(..) , TupleState(..) , crossApply + , crossApplyFst + , crossApplySnd , fromEffect , fromList , fromTuple @@ -82,6 +84,54 @@ crossApply _ step2 (CrossApplyInner inject2 f os st) = do Skip s -> Skip (CrossApplyInner inject2 f os s) Stop -> Skip (CrossApplyOuter inject2 os) +-- | Outer product discarding the second (inner) element. For each element of +-- the first producer the entire second producer is run, yielding the first +-- producer's element each time. +{-# INLINE_LATE crossApplyFst #-} +crossApplyFst + :: Monad m + => Producer m s1 b + -> Producer m s2 a + -> Producer m (CrossApplyState m s1 s2 a b) b +crossApplyFst step1 _ (CrossApplyOuter inject2 st) = do + r <- step1 st + case r of + Yield b s -> do + s2 <- inject2 + return $ Skip (CrossApplyInner inject2 (const b) s s2) + Skip s -> return $ Skip (CrossApplyOuter inject2 s) + Stop -> return Stop +crossApplyFst _ step2 (CrossApplyInner inject2 f os st) = do + r <- step2 st + return $ case r of + Yield a s -> Yield (f a) (CrossApplyInner inject2 f os s) + Skip s -> Skip (CrossApplyInner inject2 f os s) + Stop -> Skip (CrossApplyOuter inject2 os) + +-- | Outer product discarding the first (outer) element. For each element of +-- the first producer the entire second producer is run, yielding the second +-- producer's elements. +{-# INLINE_LATE crossApplySnd #-} +crossApplySnd + :: Monad m + => Producer m s1 a + -> Producer m s2 b + -> Producer m (CrossApplyState m s1 s2 b b) b +crossApplySnd step1 _ (CrossApplyOuter inject2 st) = do + r <- step1 st + case r of + Yield _ s -> do + s2 <- inject2 + return $ Skip (CrossApplyInner inject2 id s s2) + Skip s -> return $ Skip (CrossApplyOuter inject2 s) + Stop -> return Stop +crossApplySnd _ step2 (CrossApplyInner inject2 f os st) = do + r <- step2 st + return $ case r of + Yield a s -> Yield (f a) (CrossApplyInner inject2 f os s) + Skip s -> Skip (CrossApplyInner inject2 f os s) + Stop -> Skip (CrossApplyOuter inject2 os) + {-# INLINE_LATE mapM #-} mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c mapM f step1 st = do diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 368c7ab6e7..451a570c16 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -1380,50 +1380,30 @@ fairCrossWithM f (Stream step1 state1) (Stream step2 state2) = Stop -> return $ Skip (FairUnfoldDrain ys ls) {-# INLINE_NORMAL crossApplySnd #-} -crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b +crossApplySnd :: Monad f => Stream f a -> Stream f b -> Stream f b crossApplySnd (Stream stepa statea) (Stream stepb stateb) = - Stream step (Left statea) + Stream step (Producer.CrossApplyOuter (return stateb) statea) where {-# INLINE_LATE step #-} - step gst (Left st) = - fmap - (\case - Yield _ s -> Skip (Right (s, stateb)) - Skip s -> Skip (Left s) - Stop -> Stop) - (stepa (adaptState gst) st) - step gst (Right (ostate, st)) = - fmap - (\case - Yield b s -> Yield b (Right (ostate, s)) - Skip s -> Skip (Right (ostate, s)) - Stop -> Skip (Left ostate)) - (stepb gst st) + step gst = + Producer.crossApplySnd + (stepa (adaptState gst)) + (stepb gst) {-# INLINE_NORMAL crossApplyFst #-} -crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a +crossApplyFst :: Monad f => Stream f a -> Stream f b -> Stream f a crossApplyFst (Stream stepa statea) (Stream stepb stateb) = - Stream step (Left statea) + Stream step (Producer.CrossApplyOuter (return stateb) statea) where {-# INLINE_LATE step #-} - step gst (Left st) = - fmap - (\case - Yield b s -> Skip (Right (s, stateb, b)) - Skip s -> Skip (Left s) - Stop -> Stop) - (stepa gst st) - step gst (Right (ostate, st, b)) = - fmap - (\case - Yield _ s -> Yield b (Right (ostate, s, b)) - Skip s -> Skip (Right (ostate, s, b)) - Stop -> Skip (Left ostate)) - (stepb (adaptState gst) st) + step gst = + Producer.crossApplyFst + (stepa gst) + (stepb (adaptState gst)) {- instance Applicative f => Applicative (Stream f) where diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index ed278a8164..6e0c656526 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -570,21 +570,37 @@ fromList = Unfold Producer.fromList pure -- | Outer product discarding the first element. -- --- /Unimplemented/ --- {-# INLINE_NORMAL crossApplySnd #-} -crossApplySnd :: -- Monad m => +crossApplySnd :: Monad m => Unfold m a b -> Unfold m a c -> Unfold m a c -crossApplySnd (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined +crossApplySnd (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + {-# INLINE_LATE inject #-} + inject a = do + s1 <- inject1 a + return $ Producer.CrossApplyOuter (inject2 a) s1 + + {-# INLINE_LATE step #-} + step = Producer.crossApplySnd step1 step2 -- | Outer product discarding the second element. -- --- /Unimplemented/ --- {-# INLINE_NORMAL crossApplyFst #-} -crossApplyFst :: -- Monad m => +crossApplyFst :: Monad m => Unfold m a b -> Unfold m a c -> Unfold m a b -crossApplyFst (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined +crossApplyFst (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + {-# INLINE_LATE inject #-} + inject a = do + s1 <- inject1 a + return $ Producer.CrossApplyOuter (inject2 a) s1 + + {-# INLINE_LATE step #-} + step = Producer.crossApplyFst step1 step2 {- {-# ANN type Many2State Fuse #-} From 6fb316480099673136249d7bf25f667edb76271d Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 1 Jun 2026 02:57:15 +0530 Subject: [PATCH 7/8] Fix crossApply* regressions --- core/src/Streamly/Internal/Data/Producer.hs | 85 ++++++++++++------- .../src/Streamly/Internal/Data/Stream/Type.hs | 9 +- .../src/Streamly/Internal/Data/Unfold/Type.hs | 12 +-- 3 files changed, 64 insertions(+), 42 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs index 7af061c98d..389a63d30e 100644 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -13,6 +13,7 @@ module Streamly.Internal.Data.Producer ( CrossApplyState(..) + , CrossApplyFstState(..) , TupleState(..) , crossApply , crossApplyFst @@ -39,9 +40,24 @@ import Prelude hiding (mapM) -- delivers a new value alongside the updated state. type Producer m a b = a -> m (Step a b) -data CrossApplyState m s1 s2 a b = - CrossApplyOuter (m s2) s1 - | CrossApplyInner (m s2) (a -> b) s1 s2 +-- | State of a cross-apply style producer. @x@ is the seed from which the +-- inner producer's state is (re)injected for every element of the outer +-- producer. We store the first-order @x@ seed rather than the injection +-- action @m s2@ so that the inner injection stays a known, statically inlined +-- call and the loop state remains unboxable (storing @m s2@ here defeats +-- fusion and forces per-element allocation). +data CrossApplyState x s1 s2 a b = + CrossApplyOuter x s1 + | CrossApplyInner x (a -> b) s1 s2 + +-- | State for 'crossApplyFst'. The inner constructor stores the outer +-- producer's value @b@ /directly/ so that it can be re-yielded for each element +-- of the inner producer as a loop-invariant value. Storing a function +-- (@const b@) here instead would force a per-element PAP application in the hot +-- inner loop and defeat the hoisting the original yielded value gets. +data CrossApplyFstState x s1 s2 b = + CrossApplyFstOuter x s1 + | CrossApplyFstInner x b s1 s2 data TupleState a = TupleBoth a a | TupleOne a | TupleNone @@ -66,23 +82,24 @@ fromList [] = pure Stop {-# INLINE_LATE crossApply #-} crossApply :: Monad m - => Producer m s1 (a -> b) + => (x -> m s2) + -> Producer m s1 (a -> b) -> Producer m s2 a - -> Producer m (CrossApplyState m s1 s2 a b) b -crossApply step1 _ (CrossApplyOuter inject2 st) = do + -> Producer m (CrossApplyState x s1 s2 a b) b +crossApply inject2 step1 _ (CrossApplyOuter seed st) = do r <- step1 st case r of Yield f s -> do - s2 <- inject2 - return $ Skip (CrossApplyInner inject2 f s s2) - Skip s -> return $ Skip (CrossApplyOuter inject2 s) + s2 <- inject2 seed + return $ Skip (CrossApplyInner seed f s s2) + Skip s -> return $ Skip (CrossApplyOuter seed s) Stop -> return Stop -crossApply _ step2 (CrossApplyInner inject2 f os st) = do +crossApply _ _ step2 (CrossApplyInner seed f os st) = do r <- step2 st return $ case r of - Yield a s -> Yield (f a) (CrossApplyInner inject2 f os s) - Skip s -> Skip (CrossApplyInner inject2 f os s) - Stop -> Skip (CrossApplyOuter inject2 os) + Yield a s -> Yield (f a) (CrossApplyInner seed f os s) + Skip s -> Skip (CrossApplyInner seed f os s) + Stop -> Skip (CrossApplyOuter seed os) -- | Outer product discarding the second (inner) element. For each element of -- the first producer the entire second producer is run, yielding the first @@ -90,23 +107,24 @@ crossApply _ step2 (CrossApplyInner inject2 f os st) = do {-# INLINE_LATE crossApplyFst #-} crossApplyFst :: Monad m - => Producer m s1 b + => (x -> m s2) + -> Producer m s1 b -> Producer m s2 a - -> Producer m (CrossApplyState m s1 s2 a b) b -crossApplyFst step1 _ (CrossApplyOuter inject2 st) = do + -> Producer m (CrossApplyFstState x s1 s2 b) b +crossApplyFst inject2 step1 _ (CrossApplyFstOuter seed st) = do r <- step1 st case r of Yield b s -> do - s2 <- inject2 - return $ Skip (CrossApplyInner inject2 (const b) s s2) - Skip s -> return $ Skip (CrossApplyOuter inject2 s) + s2 <- inject2 seed + return $ Skip (CrossApplyFstInner seed b s s2) + Skip s -> return $ Skip (CrossApplyFstOuter seed s) Stop -> return Stop -crossApplyFst _ step2 (CrossApplyInner inject2 f os st) = do +crossApplyFst _ _ step2 (CrossApplyFstInner seed b os st) = do r <- step2 st return $ case r of - Yield a s -> Yield (f a) (CrossApplyInner inject2 f os s) - Skip s -> Skip (CrossApplyInner inject2 f os s) - Stop -> Skip (CrossApplyOuter inject2 os) + Yield _ s -> Yield b (CrossApplyFstInner seed b os s) + Skip s -> Skip (CrossApplyFstInner seed b os s) + Stop -> Skip (CrossApplyFstOuter seed os) -- | Outer product discarding the first (outer) element. For each element of -- the first producer the entire second producer is run, yielding the second @@ -114,23 +132,24 @@ crossApplyFst _ step2 (CrossApplyInner inject2 f os st) = do {-# INLINE_LATE crossApplySnd #-} crossApplySnd :: Monad m - => Producer m s1 a + => (x -> m s2) + -> Producer m s1 a -> Producer m s2 b - -> Producer m (CrossApplyState m s1 s2 b b) b -crossApplySnd step1 _ (CrossApplyOuter inject2 st) = do + -> Producer m (CrossApplyState x s1 s2 b b) b +crossApplySnd inject2 step1 _ (CrossApplyOuter seed st) = do r <- step1 st case r of Yield _ s -> do - s2 <- inject2 - return $ Skip (CrossApplyInner inject2 id s s2) - Skip s -> return $ Skip (CrossApplyOuter inject2 s) + s2 <- inject2 seed + return $ Skip (CrossApplyInner seed id s s2) + Skip s -> return $ Skip (CrossApplyOuter seed s) Stop -> return Stop -crossApplySnd _ step2 (CrossApplyInner inject2 f os st) = do +crossApplySnd _ _ step2 (CrossApplyInner seed f os st) = do r <- step2 st return $ case r of - Yield a s -> Yield (f a) (CrossApplyInner inject2 f os s) - Skip s -> Skip (CrossApplyInner inject2 f os s) - Stop -> Skip (CrossApplyOuter inject2 os) + Yield a s -> Yield (f a) (CrossApplyInner seed f os s) + Skip s -> Skip (CrossApplyInner seed f os s) + Stop -> Skip (CrossApplyOuter seed os) {-# INLINE_LATE mapM #-} mapM :: Monad m => (b -> m c) -> Producer m s b -> Producer m s c diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 451a570c16..14d50b9c73 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -1316,13 +1316,14 @@ zipWith f = zipWithM (\a b -> return (f a b)) {-# INLINE_NORMAL crossApply #-} crossApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b crossApply (Stream stepa statea) (Stream stepb stateb) = - Stream step (Producer.CrossApplyOuter (return stateb) statea) + Stream step (Producer.CrossApplyOuter stateb statea) where {-# INLINE_LATE step #-} step gst = Producer.crossApply + return (stepa (adaptState gst)) (stepb (adaptState gst)) @@ -1382,26 +1383,28 @@ fairCrossWithM f (Stream step1 state1) (Stream step2 state2) = {-# INLINE_NORMAL crossApplySnd #-} crossApplySnd :: Monad f => Stream f a -> Stream f b -> Stream f b crossApplySnd (Stream stepa statea) (Stream stepb stateb) = - Stream step (Producer.CrossApplyOuter (return stateb) statea) + Stream step (Producer.CrossApplyOuter stateb statea) where {-# INLINE_LATE step #-} step gst = Producer.crossApplySnd + return (stepa (adaptState gst)) (stepb gst) {-# INLINE_NORMAL crossApplyFst #-} crossApplyFst :: Monad f => Stream f a -> Stream f b -> Stream f a crossApplyFst (Stream stepa statea) (Stream stepb stateb) = - Stream step (Producer.CrossApplyOuter (return stateb) statea) + Stream step (Producer.CrossApplyFstOuter stateb statea) where {-# INLINE_LATE step #-} step gst = Producer.crossApplyFst + return (stepa gst) (stepb (adaptState gst)) diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index 6e0c656526..c271cfd391 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -580,10 +580,10 @@ crossApplySnd (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject {-# INLINE_LATE inject #-} inject a = do s1 <- inject1 a - return $ Producer.CrossApplyOuter (inject2 a) s1 + return $ Producer.CrossApplyOuter a s1 {-# INLINE_LATE step #-} - step = Producer.crossApplySnd step1 step2 + step = Producer.crossApplySnd inject2 step1 step2 -- | Outer product discarding the second element. -- @@ -597,10 +597,10 @@ crossApplyFst (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject {-# INLINE_LATE inject #-} inject a = do s1 <- inject1 a - return $ Producer.CrossApplyOuter (inject2 a) s1 + return $ Producer.CrossApplyFstOuter a s1 {-# INLINE_LATE step #-} - step = Producer.crossApplyFst step1 step2 + step = Producer.crossApplyFst inject2 step1 step2 {- {-# ANN type Many2State Fuse #-} @@ -782,10 +782,10 @@ crossApply (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject {-# INLINE_LATE inject #-} inject a = do s1 <- inject1 a - return $ Producer.CrossApplyOuter (inject2 a) s1 + return $ Producer.CrossApplyOuter a s1 {-# INLINE_LATE step #-} - step = Producer.crossApply step1 step2 + step = Producer.crossApply inject2 step1 step2 -- XXX Applicative makes sense for unfolds, but monad does not. Use streams for -- monad. From d7f6beb97d3082c1fe2cfe20e84e11d1e5485b93 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 4 Jun 2026 12:39:09 +0530 Subject: [PATCH 8/8] Fix iterated benchmark regressions due to eta-reduced step functions Eta-reducing step functions to arity 1 causes PAP allocations when the stream is composed at runtime (iterated/dynamic pipelines), since the step closure is then called through a dynamic function pointer where GHC cannot inline away the partial applications. Fix by making all arguments explicit on mapM, crossApply, crossApplySnd and crossApplyFst step functions. Document the guideline. --- .../Streamly/Internal/Data/Stream/Generate.hs | 3 +- .../Internal/Data/Stream/Transform.hs | 3 +- .../src/Streamly/Internal/Data/Stream/Type.hs | 18 ++++++--- .../src/Streamly/Internal/Data/Unfold/Type.hs | 9 +++-- docs/Developer/optimization-guidelines.md | 37 +++++++++++++++++++ 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Generate.hs b/core/src/Streamly/Internal/Data/Stream/Generate.hs index a56974e33a..13df444652 100644 --- a/core/src/Streamly/Internal/Data/Stream/Generate.hs +++ b/core/src/Streamly/Internal/Data/Stream/Generate.hs @@ -225,8 +225,9 @@ unfoldrM next = unfold (Unfold.unfoldrM next) #else unfoldrM next = Stream step where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step _ = Producer.unfoldrM next + step _ st = Producer.unfoldrM next st #endif -- | Build a stream by unfolding a /pure/ step function @step@ starting from a diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 04d1696f79..c3e51a4bda 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -2171,8 +2171,9 @@ mapMaybeM f (Stream step1 state1) = Stream step state1 where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = Producer.mapMaybeM f (step1 (adaptState gst)) + step gst st = Producer.mapMaybeM f (step1 (adaptState gst)) st -- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's. -- diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 14d50b9c73..1cf912dde0 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -974,8 +974,9 @@ mapM f (Stream step state) = Stream step1 state where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step1 #-} - step1 gst = Producer.mapM f (step (adaptState gst)) + step1 gst st = Producer.mapM f (step (adaptState gst)) st {-# INLINE map #-} map :: Monad m => (a -> b) -> Stream m a -> Stream m b @@ -1158,8 +1159,9 @@ takeWhileM f (Stream step1 state1) = Stream step state1 where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = Producer.takeWhileM f (step1 gst) + step gst st = Producer.takeWhileM f (step1 gst) st -- | End the stream as soon as the predicate fails on an element. -- @@ -1320,12 +1322,14 @@ crossApply (Stream stepa statea) (Stream stepb stateb) = where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = + step gst st = Producer.crossApply return (stepa (adaptState gst)) (stepb (adaptState gst)) + st -- This is shared by all fairUnfold, fairConcat combinators. data FairUnfoldState o i = @@ -1387,12 +1391,14 @@ crossApplySnd (Stream stepa statea) (Stream stepb stateb) = where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = + step gst st = Producer.crossApplySnd return (stepa (adaptState gst)) (stepb gst) + st {-# INLINE_NORMAL crossApplyFst #-} crossApplyFst :: Monad f => Stream f a -> Stream f b -> Stream f a @@ -1401,12 +1407,14 @@ crossApplyFst (Stream stepa statea) (Stream stepb stateb) = where + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step gst = + step gst st = Producer.crossApplyFst return (stepa gst) (stepb (adaptState gst)) + st {- instance Applicative f => Applicative (Stream f) where diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index c271cfd391..7ac3231a3a 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -582,8 +582,9 @@ crossApplySnd (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject s1 <- inject1 a return $ Producer.CrossApplyOuter a s1 + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step = Producer.crossApplySnd inject2 step1 step2 + step st = Producer.crossApplySnd inject2 step1 step2 st -- | Outer product discarding the second element. -- @@ -599,8 +600,9 @@ crossApplyFst (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject s1 <- inject1 a return $ Producer.CrossApplyFstOuter a s1 + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step = Producer.crossApplyFst inject2 step1 step2 + step st = Producer.crossApplyFst inject2 step1 step2 st {- {-# ANN type Many2State Fuse #-} @@ -784,8 +786,9 @@ crossApply (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject s1 <- inject1 a return $ Producer.CrossApplyOuter a s1 + {- HLINT ignore "Eta reduce" -} {-# INLINE_LATE step #-} - step = Producer.crossApply inject2 step1 step2 + step st = Producer.crossApply inject2 step1 step2 st -- XXX Applicative makes sense for unfolds, but monad does not. Use streams for -- monad. diff --git a/docs/Developer/optimization-guidelines.md b/docs/Developer/optimization-guidelines.md index 56f9d95dc7..45b23b928d 100644 --- a/docs/Developer/optimization-guidelines.md +++ b/docs/Developer/optimization-guidelines.md @@ -111,6 +111,43 @@ Step function of a stream or unfold: may help in such cases. * The step function must be annotated such that it gets inlined after the main combinator (`INLINE_LATE`). +* Always declare the step function with all arguments explicit, even when the + body delegates to a helper: + + ```haskell + -- Good: full arity visible to GHC from the source + {-# INLINE_LATE step #-} + step gst st = Helper.mapM f (innerStep (adaptState gst)) st + + -- Bad: eta-reduced, last argument missing + {-# INLINE_LATE step #-} + step gst = Helper.mapM f (innerStep (adaptState gst)) + ``` + + The eta-reduced form causes a performance problem. + In runtime-composed pipelines (iterated/dynamic composition):** When the + `Stream` object is constructed at runtime the step closure is invoked + through a dynamic function pointer rather than at a known call site, so + GHC's static inlining cannot eliminate the intermediate partial + applications at all. Each call to `step gst` allocates a PAP for + `innerStep (adaptState gst)` and another for `Helper.mapM f ` — + even if the helper carries `INLINE_LATE`. For example, eta-reducing the + `mapM`, `crossApply`, `crossApplySnd`, and `crossApplyFst` step functions + caused the following regressions in o-n-space (iterated, not fused) + benchmarks: + + ``` + o-n-space/iterated/(<$) : 14 MB → 24 MB (+71%) + o-n-space/iterated/fmap : 15 MB → 26 MB (+73%) + o-n-space/iterated/mapM : 3.6 MB → 9.1 MB (+153%) + ``` + + Making the arity explicit in the source prevents these allocations regardless + of inlining phase or order. + + Note: helpers referenced by value with no free variables + (e.g. `Stream (const Producer.fromList)`) are unaffected because returning a + top-level function reference does not allocate. Multiple yield points or single?: