Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port previous Concurrent Stream tests cases #2541

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/streamly-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ library
, Streamly.Internal.Data.Stream.StreamD
, Streamly.Internal.Data.Stream.Common
, Streamly.Internal.Data.Stream
--, Streamly.Internal.Data.Stream.Cross
--, Streamly.Internal.Data.Stream.Type

, Streamly.Internal.Data.Parser.ParserD.Tee

Expand Down
85 changes: 77 additions & 8 deletions test/Streamly/Test/Data/Stream/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ import Control.Monad (replicateM)
import Control.Monad.Catch (throwM)
import Data.List (sort)
import Data.Word (Word8)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Stream (mkCross, unCross)
import Test.Hspec.QuickCheck
import Test.QuickCheck (Testable, Property, choose, forAll, withMaxSuccess)
import Test.QuickCheck.Monadic (monadicIO, run)
import Test.Hspec as H

import qualified Streamly.Data.Fold as Fold ( toList )
import qualified Streamly.Data.Stream as Stream
( replicate, fromEffect, fromPure, fromList, fold, take, nil )
import qualified Streamly.Internal.Data.Stream.Concurrent as Async

import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as IS
import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Test.Common (listEquals)

moduleName :: String
Expand Down Expand Up @@ -218,6 +221,68 @@ sequenceReplicate cfg = constructWithLenM stream list
list = flip replicateM (return 1 :: IO Int)
stream = Async.parSequence cfg . flip Stream.replicate (return 1 :: IO Int)

constructWithCons ::
Word8
-> (Async.Config -> Async.Config)
-> Property
constructWithCons len cfg =
withMaxSuccess maxTestCount $
monadicIO $ do
strm <-
run
$ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len)
$ IS.fromStreamK
$ foldr K.cons K.nil (repeat (0::Int))
let list = replicate (fromIntegral len) 0
listEquals (==) strm list

constructWithConsM ::
Word8
-> (Async.Config -> Async.Config)
-> Property
constructWithConsM len cfg =
withMaxSuccess maxTestCount $
monadicIO $ do
strm <-
run
$ Stream.toList . Async.parEval cfg . Stream.take (fromIntegral len)
$ IS.fromStreamK
$ foldr K.consM K.nil (repeat (return (0::Int)))
let list = replicate (fromIntegral len) 0
listEquals (==) strm list

monadBind ::
([Int], [Int])
-> (Async.Config -> Async.Config)
-> Property
monadBind (a, b) cfg =
withMaxSuccess maxTestCount $
monadicIO $ do
let st = (mkCross $ Stream.fromList a)
>>= \x -> (+ x) <$> (mkCross $ Stream.fromList b)
strm <- run
((Stream.toList . Async.parEval cfg . unCross ) st)

let list = a >>= \x -> (+ x) <$> b
listEquals (==) strm list

monadThen ::
([Int], [Int])
-> (Async.Config -> Async.Config)
-> Property
monadThen (a, b) cfg =
withMaxSuccess maxTestCount $
monadicIO $ do

let st = (mkCross $ (Stream.fromList a :: IS.Stream IO Int))
>> (mkCross $ Stream.fromList b)
strm <- run
((Stream.toList . Async.parEval cfg . unCross ) st)

let list = a >> b
listEquals (==) strm list


main :: IO ()
main = hspec
$ H.parallel
Expand All @@ -227,10 +292,14 @@ main = hspec
$ describe moduleName $ do
let transform = transformCombineFromList Stream.fromList sortEq

prop "parEval" $
transform
(fmap (+2))
(fmap (+1) . Async.parEval id . fmap (+1))
asyncSpec $
let appWith cfg = (fmap (+1) . Async.parEval cfg . fmap (+1))
in prop "parEval" . transform (fmap (+2)) . appWith

asyncSpec $ prop "constructWithCons" . constructWithCons 100
asyncSpec $ prop "constructWithConsM" . constructWithConsM 100
asyncSpec $ prop "monadBind" . monadBind ([1..100] , [1..100])
asyncSpec $ prop "monadThen" . monadThen ([1..100] , [1..100])

asyncSpec $ prop "parSequence" . sequenceReplicate

Expand Down