diff --git a/Changelog.md b/Changelog.md index b233a9a433..7177aa0938 100644 --- a/Changelog.md +++ b/Changelog.md @@ -15,9 +15,6 @@ * `encodeLatin1` now silently truncates any character beyond 255 to incorrect characters in the input stream. Use `encodeLatin1'` to recover previous functionality. -* The zipping functions `Streamly.Prelude.zipWith` and - `Streamly.Prelude.zipWithM` are now applied concurrently for concurrent - streams. ### Breaking type changes diff --git a/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs b/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs index 06f10195ec..d4be31f194 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs @@ -372,7 +372,7 @@ iterateDropWhileTrue streamLen iterStreamLen maxIters = iterateSource iterStream ------------------------------------------------------------------------------- {-# INLINE zipWith #-} -zipWith :: S.MonadAsync m => Stream m Int -> m () +zipWith :: Monad m => Stream m Int -> m () zipWith src = drain $ S.zipWith (,) src src ------------------------------------------------------------------------------- diff --git a/src/Streamly/Internal/Data/Stream/StreamK.hs b/src/Streamly/Internal/Data/Stream/StreamK.hs index ccae866d17..d22bd65338 100644 --- a/src/Streamly/Internal/Data/Stream/StreamK.hs +++ b/src/Streamly/Internal/Data/Stream/StreamK.hs @@ -995,40 +995,34 @@ mapMaybe f m = go m -- Serial Zipping ------------------------------------------------------------------------------ --- XXX We can probably implement zipWith in terms of zipWithM --- | Zip two streams serially using a pure zipping function. The zipping --- function is applied concurrently for concurrent streams. +-- | Zip two streams serially using a pure zipping function. -- -- @since 0.1.0 {-# INLINABLE zipWith #-} -zipWith :: - (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c +zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c zipWith f = go where go mx my = mkStream $ \st yld sng stp -> do let merge a ra = let single2 b = sng (f a b) - runIt x = foldStream st yld sng stp x - yield2 b rb = runIt (return (f a b) `consM` go ra rb) + yield2 b rb = yld (f a b) (go ra rb) in foldStream (adaptState st) yield2 single2 stp my let single1 a = merge a nil yield1 = merge foldStream (adaptState st) yield1 single1 stp mx --- | Zip two streams serially using a monadic zipping function. The zipping --- function is applied concurrently for concurrent streams. +-- | Zip two streams serially using a monadic zipping function. -- -- @since 0.1.0 {-# INLINABLE zipWithM #-} -zipWithM :: - (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c +zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c zipWithM f m1 m2 = go m1 m2 where go mx my = mkStream $ \st yld sng stp -> do let merge a ra = let runIt x = foldStream st yld sng stp x single2 b = f a b >>= sng - yield2 b rb = runIt (f a b `consM` go ra rb) + yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb) in foldStream (adaptState st) yield2 single2 stp my let single1 a = merge a nil yield1 = merge diff --git a/src/Streamly/Internal/Data/Stream/Zip.hs b/src/Streamly/Internal/Data/Stream/Zip.hs index f28315ad70..f3ebbce5ea 100644 --- a/src/Streamly/Internal/Data/Stream/Zip.hs +++ b/src/Streamly/Internal/Data/Stream/Zip.hs @@ -35,8 +35,6 @@ module Streamly.Internal.Data.Stream.Zip ) where -#include "inline.hs" - import Control.Applicative (liftA2) import Control.DeepSeq (NFData(..)) #if MIN_VERSION_deepseq(1,4,3) @@ -58,13 +56,18 @@ import Streamly.Internal.Data.Stream.StreamK (IsStream(..), Stream) import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe) import Streamly.Internal.Data.SVar (MonadAsync) -import Streamly.Internal.Data.Stream.Serial (SerialT) - import qualified Streamly.Internal.Data.Stream.Parallel as Par import qualified Streamly.Internal.Data.Stream.Prelude as P - (cmpBy, eqBy, foldl', foldr, fromList, toList) -import qualified Streamly.Internal.Data.Stream.StreamK as K -import qualified Streamly.Internal.Data.Stream.StreamD as D + (cmpBy, eqBy, foldl', foldr, fromList, toList, fromStreamS, toStreamS) +import qualified Streamly.Internal.Data.Stream.StreamK as K (repeat) +import qualified Streamly.Internal.Data.Stream.StreamK.Type as K +import qualified Streamly.Internal.Data.Stream.StreamD as D (zipWithM) +import qualified Streamly.Internal.Data.Stream.StreamD.Type as D +#ifdef USE_STREAMK_ONLY +import qualified Streamly.Internal.Data.Stream.StreamK as S (zipWith, zipWithM) +#else +import qualified Streamly.Internal.Data.Stream.StreamD as S (zipWith, zipWithM) +#endif import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace) @@ -83,21 +86,11 @@ import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace) -- | Like 'zipWith' but using a monadic zipping function. -- -- @since 0.4.0 -{-# INLINE_EARLY zipWithM #-} -zipWithM :: - (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c -zipWithM f m1 m2 = - K.fromStream $ K.zipWithM f (K.toStream m1) (K.toStream m2) - -{-# RULES "zipWithM serial" zipWithM = zipWithMSerial #-} -{-# INLINE zipWithMSerial #-} -zipWithMSerial :: - Monad m => (a -> b -> m c) -> SerialT m a -> SerialT m b -> SerialT m c -zipWithMSerial f m1 m2 = - D.fromStreamD $ D.zipWithM f (D.toStreamD m1) (D.toStreamD m2) - --- | Zip two streams serially using a pure zipping function. The zipping --- function is applied concurrently for concurrent streams. +{-# INLINABLE zipWithM #-} +zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c +zipWithM f m1 m2 = P.fromStreamS $ S.zipWithM f (P.toStreamS m1) (P.toStreamS m2) + +-- | Zip two streams serially using a pure zipping function. -- -- @ -- > S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) @@ -105,17 +98,9 @@ zipWithMSerial f m1 m2 = -- @ -- -- @since 0.1.0 -{-# INLINE_EARLY zipWith #-} -zipWith :: - (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c -zipWith f m1 m2 = K.fromStream $ K.zipWith f (K.toStream m1) (K.toStream m2) - -{-# RULES "zipWith serial" zipWith = zipWithSerial #-} -{-# INLINE zipWithSerial #-} -zipWithSerial :: - Monad m => (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c -zipWithSerial f m1 m2 = - D.fromStreamD $ D.zipWith f (D.toStreamD m1) (D.toStreamD m2) +{-# INLINABLE zipWith #-} +zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c +zipWith f m1 m2 = P.fromStreamS $ S.zipWith f (P.toStreamS m1) (P.toStreamS m2) ------------------------------------------------------------------------------ -- Parallel Zipping @@ -220,10 +205,7 @@ instance Monad m => Functor (ZipSerialM m) where instance Monad m => Applicative (ZipSerialM m) where pure = ZipSerialM . K.repeat {-# INLINE (<*>) #-} - m1 <*> m2 = fromStream $ toStream $ zipWithSerial id m1_ m2_ - where - m1_ = fromStream (toStream m1) - m2_ = fromStream (toStream m2) + (<*>) = zipWith id FOLDABLE_INSTANCE(ZipSerialM) TRAVERSABLE_INSTANCE(ZipSerialM)