diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index 8b4350f48d..a93f26926c 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -312,6 +312,8 @@ library , Streamly.Internal.Data.Stream.StreamD , Streamly.Internal.Data.Stream.Common , Streamly.Internal.Data.Stream + --, Streamly.Internal.Data.Stream.Cross + --, Streamly.Internal.Data.Stream.Type , Streamly.Internal.Data.Parser.ParserD.Tee diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index 9ed9237ea3..7fcbabb95c 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -17,17 +17,20 @@ import Control.Monad (replicateM) import Control.Monad.Catch (throwM) import Data.List (sort) import Data.Word (Word8) -import Streamly.Data.Stream (Stream) +import Streamly.Internal.Data.Stream (Stream) +import Streamly.Internal.Data.Stream (mkCross, unCross) import Test.Hspec.QuickCheck import Test.QuickCheck (Testable, Property, choose, forAll, withMaxSuccess) import Test.QuickCheck.Monadic (monadicIO, run) import Test.Hspec as H -import qualified Streamly.Data.Fold as Fold ( toList ) -import qualified Streamly.Data.Stream as Stream - ( replicate, fromEffect, fromPure, fromList, fold, take, nil ) import qualified Streamly.Internal.Data.Stream.Concurrent as Async +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Internal.Data.Stream as IS +import qualified Streamly.Internal.Data.StreamK as K + import Streamly.Test.Common (listEquals) moduleName :: String @@ -218,6 +221,68 @@ sequenceReplicate cfg = constructWithLenM stream list list = flip replicateM (return 1 :: IO Int) stream = Async.parSequence cfg . flip Stream.replicate (return 1 :: IO Int) +constructWithCons :: + Word8 + -> (Async.Config -> Async.Config) + -> Property +constructWithCons len cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + strm <- + run + $ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len) + $ IS.fromStreamK + $ foldr K.cons K.nil (repeat (0::Int)) + let list = replicate (fromIntegral len) 0 + listEquals (==) strm list + +constructWithConsM :: + Word8 + -> (Async.Config -> Async.Config) + -> Property +constructWithConsM len cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + strm <- + run + $ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len) + $ IS.fromStreamK + $ foldr K.consM K.nil (repeat (return (0::Int))) + let list = replicate (fromIntegral len) 0 + listEquals (==) strm list + +monadBind :: + ([Int], [Int]) + -> (Async.Config -> Async.Config) + -> Property +monadBind (a, b) cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + let st = (mkCross $ Stream.fromList a) + >>= \x -> (+ x) <$> (mkCross $ Stream.fromList b) + strm <- run + ((Stream.toList . Async.parEval cfg . unCross ) st) + + let list = a >>= \x -> (+ x) <$> b + listEquals (==) strm list + +monadThen :: + ([Int], [Int]) + -> (Async.Config -> Async.Config) + -> Property +monadThen (a, b) cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + + let st = (mkCross $ (Stream.fromList a :: IS.Stream IO Int)) + >> (mkCross $ Stream.fromList b) + strm <- run + ((Stream.toList . Async.parEval cfg . unCross ) st) + + let list = a >> b + listEquals (==) strm list + + main :: IO () main = hspec $ H.parallel @@ -227,10 +292,14 @@ main = hspec $ describe moduleName $ do let transform = transformCombineFromList Stream.fromList sortEq - prop "parEval" $ - transform - (fmap (+2)) - (fmap (+1) . Async.parEval id . fmap (+1)) + asyncSpec $ + let appWith cfg = (fmap (+1) . Async.parEval cfg . fmap (+1)) + in prop "parEval" . transform (fmap (+2)) . appWith + + asyncSpec $ prop "constructWithCons" . constructWithCons 100 + asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 + asyncSpec $ prop "monadBind" . monadBind ([1..100] , [1..100]) + asyncSpec $ prop "monadThen" . monadThen ([1..100] , [1..100]) asyncSpec $ prop "parSequence" . sequenceReplicate