Skip to content

Commit

Permalink
Revert "Fix zipWith(M) to work concurrently according to the stream t…
Browse files Browse the repository at this point in the history
…ype"

This reverts commit 1ddcfc4.

This requires MonadAsync constraint which breaks the existing
zipWith for pure streams e.g. 'SerialT Identity' (for example in
streaming-benchmarks package). We can possibly have different zipWith
APIs for concurrent zipping.
  • Loading branch information
harendra-kumar committed Jun 22, 2021
1 parent 6587585 commit e094283
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 53 deletions.
3 changes: 0 additions & 3 deletions Changelog.md
Expand Up @@ -15,9 +15,6 @@
* `encodeLatin1` now silently truncates any character beyond 255 to incorrect
characters in the input stream. Use `encodeLatin1'` to recover previous
functionality.
* The zipping functions `Streamly.Prelude.zipWith` and
`Streamly.Prelude.zipWithM` are now applied concurrently for concurrent
streams.

### Breaking type changes

Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs
Expand Up @@ -372,7 +372,7 @@ iterateDropWhileTrue streamLen iterStreamLen maxIters = iterateSource iterStream
-------------------------------------------------------------------------------

{-# INLINE zipWith #-}
zipWith :: S.MonadAsync m => Stream m Int -> m ()
zipWith :: Monad m => Stream m Int -> m ()
zipWith src = drain $ S.zipWith (,) src src

-------------------------------------------------------------------------------
Expand Down
18 changes: 6 additions & 12 deletions src/Streamly/Internal/Data/Stream/StreamK.hs
Expand Up @@ -995,40 +995,34 @@ mapMaybe f m = go m
-- Serial Zipping
------------------------------------------------------------------------------

-- XXX We can probably implement zipWith in terms of zipWithM
-- | Zip two streams serially using a pure zipping function. The zipping
-- function is applied concurrently for concurrent streams.
-- | Zip two streams serially using a pure zipping function.
--
-- @since 0.1.0
{-# INLINABLE zipWith #-}
zipWith ::
(IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f = go
where
go mx my = mkStream $ \st yld sng stp -> do
let merge a ra =
let single2 b = sng (f a b)
runIt x = foldStream st yld sng stp x
yield2 b rb = runIt (return (f a b) `consM` go ra rb)
yield2 b rb = yld (f a b) (go ra rb)
in foldStream (adaptState st) yield2 single2 stp my
let single1 a = merge a nil
yield1 = merge
foldStream (adaptState st) yield1 single1 stp mx

-- | Zip two streams serially using a monadic zipping function. The zipping
-- function is applied concurrently for concurrent streams.
-- | Zip two streams serially using a monadic zipping function.
--
-- @since 0.1.0
{-# INLINABLE zipWithM #-}
zipWithM ::
(IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = go m1 m2
where
go mx my = mkStream $ \st yld sng stp -> do
let merge a ra =
let runIt x = foldStream st yld sng stp x
single2 b = f a b >>= sng
yield2 b rb = runIt (f a b `consM` go ra rb)
yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb)
in foldStream (adaptState st) yield2 single2 stp my
let single1 a = merge a nil
yield1 = merge
Expand Down
56 changes: 19 additions & 37 deletions src/Streamly/Internal/Data/Stream/Zip.hs
Expand Up @@ -35,8 +35,6 @@ module Streamly.Internal.Data.Stream.Zip
)
where

#include "inline.hs"

import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..))
#if MIN_VERSION_deepseq(1,4,3)
Expand All @@ -58,13 +56,18 @@ import Streamly.Internal.Data.Stream.StreamK (IsStream(..), Stream)
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.SVar (MonadAsync)

import Streamly.Internal.Data.Stream.Serial (SerialT)

import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Prelude as P
(cmpBy, eqBy, foldl', foldr, fromList, toList)
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
(cmpBy, eqBy, foldl', foldr, fromList, toList, fromStreamS, toStreamS)
import qualified Streamly.Internal.Data.Stream.StreamK as K (repeat)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD as D (zipWithM)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S (zipWith, zipWithM)
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S (zipWith, zipWithM)
#endif

import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)

Expand All @@ -83,39 +86,21 @@ import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
-- | Like 'zipWith' but using a monadic zipping function.
--
-- @since 0.4.0
{-# INLINE_EARLY zipWithM #-}
zipWithM ::
(IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 =
K.fromStream $ K.zipWithM f (K.toStream m1) (K.toStream m2)

{-# RULES "zipWithM serial" zipWithM = zipWithMSerial #-}
{-# INLINE zipWithMSerial #-}
zipWithMSerial ::
Monad m => (a -> b -> m c) -> SerialT m a -> SerialT m b -> SerialT m c
zipWithMSerial f m1 m2 =
D.fromStreamD $ D.zipWithM f (D.toStreamD m1) (D.toStreamD m2)

-- | Zip two streams serially using a pure zipping function. The zipping
-- function is applied concurrently for concurrent streams.
{-# INLINABLE zipWithM #-}
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = P.fromStreamS $ S.zipWithM f (P.toStreamS m1) (P.toStreamS m2)

-- | Zip two streams serially using a pure zipping function.
--
-- @
-- > S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
-- [5,7,9]
-- @
--
-- @since 0.1.0
{-# INLINE_EARLY zipWith #-}
zipWith ::
(IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f m1 m2 = K.fromStream $ K.zipWith f (K.toStream m1) (K.toStream m2)

{-# RULES "zipWith serial" zipWith = zipWithSerial #-}
{-# INLINE zipWithSerial #-}
zipWithSerial ::
Monad m => (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c
zipWithSerial f m1 m2 =
D.fromStreamD $ D.zipWith f (D.toStreamD m1) (D.toStreamD m2)
{-# INLINABLE zipWith #-}
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f m1 m2 = P.fromStreamS $ S.zipWith f (P.toStreamS m1) (P.toStreamS m2)

------------------------------------------------------------------------------
-- Parallel Zipping
Expand Down Expand Up @@ -220,10 +205,7 @@ instance Monad m => Functor (ZipSerialM m) where
instance Monad m => Applicative (ZipSerialM m) where
pure = ZipSerialM . K.repeat
{-# INLINE (<*>) #-}
m1 <*> m2 = fromStream $ toStream $ zipWithSerial id m1_ m2_
where
m1_ = fromStream (toStream m1)
m2_ = fromStream (toStream m2)
(<*>) = zipWith id

FOLDABLE_INSTANCE(ZipSerialM)
TRAVERSABLE_INSTANCE(ZipSerialM)
Expand Down

0 comments on commit e094283

Please sign in to comment.