From 3a1e1a6ddb44fb806b951c22920eff41aff0ca6a Mon Sep 17 00:00:00 2001 From: Ranjeet Kumar Ranjan Date: Thu, 27 Jul 2023 11:34:02 +0530 Subject: [PATCH 1/4] Port previous test cases --- test/Streamly/Test/Data/Stream/Concurrent.hs | 40 +++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index 9ed9237ea3..d7f9d673c8 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -25,7 +25,7 @@ import Test.Hspec as H import qualified Streamly.Data.Fold as Fold ( toList ) import qualified Streamly.Data.Stream as Stream - ( replicate, fromEffect, fromPure, fromList, fold, take, nil ) + ( replicate, fromEffect, fromPure, fromList, fold, take, nil, toList, cons, consM ) import qualified Streamly.Internal.Data.Stream.Concurrent as Async import Streamly.Test.Common (listEquals) @@ -218,6 +218,34 @@ sequenceReplicate cfg = constructWithLenM stream list list = flip replicateM (return 1 :: IO Int) stream = Async.parSequence cfg . flip Stream.replicate (return 1 :: IO Int) +constructWithCons :: + Word8 + -> (Async.Config -> Async.Config) + -> Property +constructWithCons len cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + strm <- + run + $ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len) + $ foldr Stream.cons Stream.nil (repeat (0::Int)) + let list = replicate (fromIntegral len) 0 + listEquals (==) strm list + +constructWithConsM :: + Word8 + -> (Async.Config -> Async.Config) + -> Property +constructWithConsM len cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + strm <- + run + $ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len) + $ foldr Stream.consM Stream.nil (repeat (return (0::Int))) + let list = replicate (fromIntegral len) 0 + listEquals (==) strm list + main :: IO () main = hspec $ H.parallel @@ -227,10 +255,12 @@ main = hspec $ describe moduleName $ do let transform = transformCombineFromList Stream.fromList sortEq - prop "parEval" $ - transform - (fmap (+2)) - (fmap (+1) . Async.parEval id . fmap (+1)) + asyncSpec $ + let appWith cfg = (fmap (+1) . Async.parEval cfg . fmap (+1)) + in prop "parEval" . transform (fmap (+2)) . appWith + + asyncSpec $ prop "constructWithCons" . constructWithCons 100 + asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 asyncSpec $ prop "parSequence" . sequenceReplicate From 6682e4bd568adc0f3bd17053b08cb991bd0b1812 Mon Sep 17 00:00:00 2001 From: Ranjeet Kumar Ranjan Date: Thu, 27 Jul 2023 16:56:53 +0530 Subject: [PATCH 2/4] Fix up --- test/Streamly/Test/Data/Stream/Concurrent.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index d7f9d673c8..abba90e03c 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -260,7 +260,7 @@ main = hspec in prop "parEval" . transform (fmap (+2)) . appWith asyncSpec $ prop "constructWithCons" . constructWithCons 100 - asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 + --asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 asyncSpec $ prop "parSequence" . sequenceReplicate From d2af1a4a003a39d41c1b6a670e81a8889aa286cb Mon Sep 17 00:00:00 2001 From: Ranjeet Kumar Ranjan Date: Tue, 1 Aug 2023 08:17:52 +0530 Subject: [PATCH 3/4] Fix up --- core/streamly-core.cabal | 2 + test/Streamly/Test/Data/Stream/Concurrent.hs | 45 ++++++++++++++++++-- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index 8b4350f48d..d78088abba 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -312,6 +312,8 @@ library , Streamly.Internal.Data.Stream.StreamD , Streamly.Internal.Data.Stream.Common , Streamly.Internal.Data.Stream + , Streamly.Internal.Data.Stream.Cross + , Streamly.Internal.Data.Stream.Type , Streamly.Internal.Data.Parser.ParserD.Tee diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index abba90e03c..2b00c7e9f4 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -17,16 +17,20 @@ import Control.Monad (replicateM) import Control.Monad.Catch (throwM) import Data.List (sort) import Data.Word (Word8) -import Streamly.Data.Stream (Stream) +import Streamly.Internal.Data.Stream ( Stream ) import Test.Hspec.QuickCheck import Test.QuickCheck (Testable, Property, choose, forAll, withMaxSuccess) import Test.QuickCheck.Monadic (monadicIO, run) import Test.Hspec as H import qualified Streamly.Data.Fold as Fold ( toList ) -import qualified Streamly.Data.Stream as Stream - ( replicate, fromEffect, fromPure, fromList, fold, take, nil, toList, cons, consM ) +import qualified Streamly.Internal.Data.Stream as Stream + ( replicate, fromEffect, fromPure, fromList, fold, take, nil, toList, cons, consM, toStreamK, fromStreamK ) import qualified Streamly.Internal.Data.Stream.Concurrent as Async +import Streamly.Internal.Data.Stream.Cross (CrossStream(..)) +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Internal.Data.Stream.Type as IS import Streamly.Test.Common (listEquals) @@ -246,6 +250,39 @@ constructWithConsM len cfg = let list = replicate (fromIntegral len) 0 listEquals (==) strm list +monadBind :: + ([Int], [Int]) + -> (Async.Config -> Async.Config) + -> Property +monadBind (a, b) cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + + let st = (CrossStream $ (IS.fromStream (Stream.fromList a) :: IS.Stream IO Int)) + >>= \x -> (+ x) <$> (CrossStream $ (IS.fromStream (Stream.fromList b))) + strm <- run + ((Stream.toList . Async.parEval cfg . IS.toStreamD . unCrossStream ) st) + + let list = a >>= \x -> (+ x) <$> b + listEquals (==) strm list + +monadThen :: + ([Int], [Int]) + -> (Async.Config -> Async.Config) + -> Property +monadThen (a, b) cfg = + withMaxSuccess maxTestCount $ + monadicIO $ do + + let st = (CrossStream $ (IS.fromStream (Stream.fromList a) :: IS.Stream IO Int)) + >> (CrossStream $ (IS.fromStream (Stream.fromList b))) + strm <- run + ((Stream.toList . Async.parEval cfg . IS.toStreamD . unCrossStream ) st) + + let list = a >> b + listEquals (==) strm list + + main :: IO () main = hspec $ H.parallel @@ -261,6 +298,8 @@ main = hspec asyncSpec $ prop "constructWithCons" . constructWithCons 100 --asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 + asyncSpec $ prop "monadBind" . monadBind ([1..100] , [1..100]) + asyncSpec $ prop "monadThen" . monadThen ([1..100] , [1..100]) asyncSpec $ prop "parSequence" . sequenceReplicate From 7bf08d688cd2113dd37b8bce09a5fa20c6462183 Mon Sep 17 00:00:00 2001 From: Ranjeet Kumar Ranjan Date: Thu, 17 Aug 2023 17:52:12 +0530 Subject: [PATCH 4/4] Port previous Prelude concurrent Streams test cases --- core/streamly-core.cabal | 4 +-- test/Streamly/Test/Data/Stream/Concurrent.hs | 32 ++++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index d78088abba..a93f26926c 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -312,8 +312,8 @@ library , Streamly.Internal.Data.Stream.StreamD , Streamly.Internal.Data.Stream.Common , Streamly.Internal.Data.Stream - , Streamly.Internal.Data.Stream.Cross - , Streamly.Internal.Data.Stream.Type + --, Streamly.Internal.Data.Stream.Cross + --, Streamly.Internal.Data.Stream.Type , Streamly.Internal.Data.Parser.ParserD.Tee diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index 2b00c7e9f4..7fcbabb95c 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -17,20 +17,19 @@ import Control.Monad (replicateM) import Control.Monad.Catch (throwM) import Data.List (sort) import Data.Word (Word8) -import Streamly.Internal.Data.Stream ( Stream ) +import Streamly.Internal.Data.Stream (Stream) +import Streamly.Internal.Data.Stream (mkCross, unCross) import Test.Hspec.QuickCheck import Test.QuickCheck (Testable, Property, choose, forAll, withMaxSuccess) import Test.QuickCheck.Monadic (monadicIO, run) import Test.Hspec as H -import qualified Streamly.Data.Fold as Fold ( toList ) -import qualified Streamly.Internal.Data.Stream as Stream - ( replicate, fromEffect, fromPure, fromList, fold, take, nil, toList, cons, consM, toStreamK, fromStreamK ) import qualified Streamly.Internal.Data.Stream.Concurrent as Async -import Streamly.Internal.Data.Stream.Cross (CrossStream(..)) + import qualified Streamly.Data.Fold as Fold import qualified Streamly.Data.Stream as Stream -import qualified Streamly.Internal.Data.Stream.Type as IS +import qualified Streamly.Internal.Data.Stream as IS +import qualified Streamly.Internal.Data.StreamK as K import Streamly.Test.Common (listEquals) @@ -232,7 +231,8 @@ constructWithCons len cfg = strm <- run $ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len) - $ foldr Stream.cons Stream.nil (repeat (0::Int)) + $ IS.fromStreamK + $ foldr K.cons K.nil (repeat (0::Int)) let list = replicate (fromIntegral len) 0 listEquals (==) strm list @@ -246,7 +246,8 @@ constructWithConsM len cfg = strm <- run $ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len) - $ foldr Stream.consM Stream.nil (repeat (return (0::Int))) + $ IS.fromStreamK + $ foldr K.consM K.nil (repeat (return (0::Int))) let list = replicate (fromIntegral len) 0 listEquals (==) strm list @@ -257,11 +258,10 @@ monadBind :: monadBind (a, b) cfg = withMaxSuccess maxTestCount $ monadicIO $ do - - let st = (CrossStream $ (IS.fromStream (Stream.fromList a) :: IS.Stream IO Int)) - >>= \x -> (+ x) <$> (CrossStream $ (IS.fromStream (Stream.fromList b))) + let st = (mkCross $ Stream.fromList a) + >>= \x -> (+ x) <$> (mkCross $ Stream.fromList b) strm <- run - ((Stream.toList . Async.parEval cfg . IS.toStreamD . unCrossStream ) st) + ((Stream.toList . Async.parEval cfg . unCross ) st) let list = a >>= \x -> (+ x) <$> b listEquals (==) strm list @@ -274,10 +274,10 @@ monadThen (a, b) cfg = withMaxSuccess maxTestCount $ monadicIO $ do - let st = (CrossStream $ (IS.fromStream (Stream.fromList a) :: IS.Stream IO Int)) - >> (CrossStream $ (IS.fromStream (Stream.fromList b))) + let st = (mkCross $ (Stream.fromList a :: IS.Stream IO Int)) + >> (mkCross $ Stream.fromList b) strm <- run - ((Stream.toList . Async.parEval cfg . IS.toStreamD . unCrossStream ) st) + ((Stream.toList . Async.parEval cfg . unCross ) st) let list = a >> b listEquals (==) strm list @@ -297,7 +297,7 @@ main = hspec in prop "parEval" . transform (fmap (+2)) . appWith asyncSpec $ prop "constructWithCons" . constructWithCons 100 - --asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 + asyncSpec $ prop "constructWithConsM" . constructWithConsM 100 asyncSpec $ prop "monadBind" . monadBind ([1..100] , [1..100]) asyncSpec $ prop "monadThen" . monadThen ([1..100] , [1..100])