Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
93da14e
Modify the current Fold type to handle terminating Folds
adithyaov Apr 13, 2020
89bcf96
Implement groupsOf, lchunksOf in terms of "many"
harendra-kumar Aug 21, 2020
36b3d9e
Rewrite code removing fold helpers
adithyaov Aug 24, 2020
5d26e55
Add a few benchmarks to Fold
adithyaov Sep 5, 2020
159e6ff
fixup! Add a few benchmarks to Fold
adithyaov Sep 6, 2020
11934c1
Intermediate splitOn
adithyaov Sep 8, 2020
b9e30f8
Interim commit
adithyaov Sep 9, 2020
f491349
fixup! Interim commit
adithyaov Sep 9, 2020
7d26e53
Expose advance and add moveBy in Ring
adithyaov Sep 9, 2020
d4b97db
fixup! fixup! Interim commit
adithyaov Sep 9, 2020
d787b1b
splitOn Rabin Karp
adithyaov Sep 9, 2020
05a0172
fixup! splitOn Rabin Karp
adithyaov Sep 9, 2020
1b0fc46
fixup! fixup! splitOn Rabin Karp
adithyaov Sep 9, 2020
5666f1f
Fix splitOn
adithyaov Sep 10, 2020
caeb5ae
splitSuffixOn
adithyaov Sep 11, 2020
67de32b
fixup! splitSuffixOn
adithyaov Sep 11, 2020
f0c6f8f
fixup! fixup! splitSuffixOn
adithyaov Sep 11, 2020
ad21107
fixup! fixup! fixup! splitSuffixOn
adithyaov Sep 11, 2020
6d0e7bf
Indentation
adithyaov Sep 11, 2020
0d26fd1
Interim review
adithyaov Sep 13, 2020
86fce93
Another review commit
adithyaov Sep 13, 2020
84ffab4
fixup! Another review commit
adithyaov Sep 13, 2020
5340ed2
Test fix
adithyaov Sep 14, 2020
3f88a4e
Intermediate commit
adithyaov Sep 17, 2020
95d4c6f
Fix bug
adithyaov Sep 17, 2020
83d1e82
change IsStream groupsBy arg order
adithyaov Sep 17, 2020
1c833c1
Fix build for 822
adithyaov Sep 18, 2020
7bc1e67
fixup! Fix build for 822
adithyaov Sep 18, 2020
fac225e
Add tests for splitOnSeq and splitOnSuffixSeq
adithyaov Sep 18, 2020
9c8045a
Update the implementation of lchunksOf
adithyaov Oct 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 121 additions & 26 deletions benchmark/Streamly/Benchmark/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,26 @@ import Control.DeepSeq (NFData(..))
import Data.Monoid (Last(..), Sum(..))

import System.Random (randomRIO)
import Prelude (IO, Int, Double, String, (>), (<*>), (<$>), (+), ($),
import Prelude (IO, Int, Double, String, (>), (<$>), (+), ($),
(<=), Monad(..), (==), Maybe(..), (.), fromIntegral,
compare, (>=), concat, seq)
compare, (>=), concat, seq, mod, fst, snd, const, Bool, Ord(..),
div, Num(..))

import Data.Map.Strict (Map)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Stream.IsStream (SerialT)

import qualified Data.Map.Strict as Map

import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Internal.Data.Pipe as Pipe

import qualified Streamly.Internal.Data.Sink as Sink

import qualified Streamly.Data.Array.Storable.Foreign as A
import qualified Streamly.Internal.Data.Array.Storable.Foreign as IA
import qualified Streamly.Internal.Data.Fold as IFL
import qualified Streamly.Internal.Data.Stream.IsStream as IP

import Gauge
Expand Down Expand Up @@ -54,8 +61,84 @@ benchIOSink
=> Int -> String -> (t IO Int -> IO b) -> Benchmark
benchIOSink value name f = bench name $ nfIO $ randomRIO (1,1) >>= f . source value


-------------------------------------------------------------------------------
-- Folds
-------------------------------------------------------------------------------

{-# INLINE any #-}
any :: (Monad m, Ord a) => a -> SerialT m a -> m Bool
any value = IP.fold (FL.any (> value))

{-# INLINE all #-}
all :: (Monad m, Ord a) => a -> SerialT m a -> m Bool
all value = IP.fold (FL.all (<= value))

{-# INLINE take #-}
take :: Monad m => Int -> SerialT m a -> m ()
take value = IP.fold (FL.ltake value FL.drain)

{-# INLINE takeWhile #-}
takeWhile :: Monad m => Int -> SerialT m Int -> m ()
takeWhile value = IP.fold (FL.ltakeWhile (<= value) FL.drain)

{-# INLINE many #-}
many :: Monad m => SerialT m Int -> m Int
many = IP.fold (FL.many FL.length FL.sum)

{-# INLINE splitAllAny #-}
splitAllAny :: Monad m => Int -> SerialT m Int -> m (Bool, Bool)
splitAllAny value =
IP.fold (FL.splitWith (,) (FL.all (<= (value `div` 2))) (FL.any (> value)))

{-# INLINE teeAllAny #-}
teeAllAny :: (Monad m, Ord a) => a -> SerialT m a -> m (Bool, Bool)
teeAllAny value =
IP.fold (FL.teeWith (,) (FL.all (<= value)) (FL.any (> value)))

{-# INLINE teeSumLength #-}
teeSumLength :: Monad m => SerialT m Int -> m (Int, Int)
teeSumLength = IP.fold (FL.teeWith (,) FL.sum FL.length)

{-# INLINE distribute_ #-}
distribute_ :: Monad m => SerialT m Int -> m ()
distribute_ =
IP.fold (FL.distribute_ [const () <$> FL.sum, const () <$> FL.length])

{-# INLINE demuxWith #-}
demuxWith ::
(Monad m, Ord k)
=> (a -> (k, a'))
-> Map k (Fold m a' b)
-> SerialT m a
-> m (Map k b)
demuxWith f mp = S.fold (FL.demuxWith f mp)

{-# INLINE demuxWith_ #-}
demuxWith_ ::
(Monad m, Ord k)
=> (a -> (k, a'))
-> Map k (Fold m a' b)
-> SerialT m a
-> m ()
demuxWith_ f mp = S.fold (FL.demuxWith_ f mp)

{-# INLINE demuxWithDefault_ #-}
demuxWithDefault_ ::
(Monad m, Ord k, Num b)
=> (a -> (k, b))
-> Map k (Fold m b b)
-> SerialT m a
-> m ()
demuxWithDefault_ f mp = S.fold (FL.demuxWithDefault_ f mp (FL.lmap snd FL.sum))

{-# INLINE classifyWith #-}
classifyWith ::
(Monad m, Ord k, Num a) => (a -> k) -> SerialT m a -> m (Map k a)
classifyWith f = S.fold (FL.classifyWith f FL.sum)

-------------------------------------------------------------------------------
-- Stream folds
-- Benchmarks
-------------------------------------------------------------------------------

moduleName :: String
Expand All @@ -66,15 +149,15 @@ o_1_space_serial_elimination value =
[ bgroup "serially"
[ bgroup "elimination"
[ benchIOSink value "drain" (S.fold FL.drain)
, benchIOSink value "drainN" (S.fold (IFL.drainN value))
, benchIOSink value "drainN" (S.fold (FL.drainN value))
, benchIOSink
value
"drainWhileTrue"
(S.fold (IFL.drainWhile $ (<=) (value + 1)))
(S.fold (FL.drainWhile $ (<=) (value + 1)))
, benchIOSink
value
"drainWhileFalse"
(S.fold (IFL.drainWhile $ (>=) (value + 1)))
(S.fold (FL.drainWhile $ (>=) (value + 1)))
, benchIOSink value "sink" (S.fold $ Sink.toFold Sink.drain)
, benchIOSink value "last" (S.fold FL.last)
, benchIOSink value "lastN.1" (S.fold (IA.lastN 1))
Expand Down Expand Up @@ -130,8 +213,10 @@ o_1_space_serial_elimination value =
, benchIOSink value "null" (S.fold FL.null)
, benchIOSink value "elem" (S.fold (FL.elem (value + 1)))
, benchIOSink value "notElem" (S.fold (FL.notElem (value + 1)))
, benchIOSink value "all" (S.fold (FL.all (<= (value + 1))))
, benchIOSink value "any" (S.fold (FL.any (> (value + 1))))
, benchIOSink value "all" $ all value
, benchIOSink value "any" $ any value
, benchIOSink value "take" $ take value
, benchIOSink value "takeWhile" $ takeWhile value
, benchIOSink
value
"and"
Expand All @@ -149,12 +234,12 @@ o_1_space_serial_transformation :: Int -> [Benchmark]
o_1_space_serial_transformation value =
[ bgroup "serially"
[ bgroup "transformation"
[ benchIOSink value "lmap" (S.fold (IFL.lmap (+ 1) FL.drain))
[ benchIOSink value "lmap" (S.fold (FL.lmap (+ 1) FL.drain))
, benchIOSink
value
"pipe-mapM"
(S.fold
(IFL.transform
(FL.transform
(Pipe.mapM (\x -> return $ x + 1))
FL.drain))
]
Expand All @@ -163,22 +248,32 @@ o_1_space_serial_transformation value =

o_1_space_serial_composition :: Int -> [Benchmark]
o_1_space_serial_composition value =
[ bgroup "serially"
[ bgroup "composition" -- Applicative
[ benchIOSink
value
"all,any"
(S.fold
((,) <$> FL.all (<= (value + 1)) <*>
FL.any (> (value + 1))))
, benchIOSink
value
"sum,length"
(S.fold ((,) <$> FL.sum <*> FL.length))
]
]
[ bgroup
"serially"
[ bgroup
"composition"
-- Applicative
[ benchIOSink value "tee (all, any)" $ teeAllAny value
, benchIOSink value "tee (sum, length)" $ teeSumLength
, benchIOSink value "distribute_ [sum, length]" $ distribute_
, benchIOSink value "demuxWith [sum, length]" $ demuxWith fn mp
, benchIOSink value "demuxWith_ [sum, length]"
$ demuxWith_ fn mp
, benchIOSink value "demuxWithDefault_ [sum, length] sum"
$ demuxWithDefault_ fn mp
, benchIOSink value "classifyWith sum" $ classifyWith (fst . fn)
, benchIOSink value "many length sum" many
, benchIOSink value "split (all, any)" $ splitAllAny value
]
]
]

where

fn x = (x `mod` 3, x)

mp = Map.fromList [(0, FL.sum), (1, FL.length)]

o_n_heap_serial :: Int -> [Benchmark]
o_n_heap_serial value =
[ bgroup "serially"
Expand All @@ -188,7 +283,7 @@ o_n_heap_serial value =
[ benchIOSink value "toStream" (S.fold IP.toStream)
, benchIOSink value "toStreamRev" (S.fold IP.toStreamRev)
, benchIOSink value "toList" (S.fold FL.toList)
, benchIOSink value "toListRevF" (S.fold IFL.toListRevF)
, benchIOSink value "toListRevF" (S.fold FL.toListRevF)
-- Converting the stream to an array
, benchIOSink value "lastN.Max" (S.fold (IA.lastN (value + 1)))
, benchIOSink value "writeN" (S.fold (A.writeN value))
Expand Down
12 changes: 6 additions & 6 deletions benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs
Original file line number Diff line number Diff line change
Expand Up @@ -472,18 +472,18 @@ o_1_space_reduce_read_split env =
splitOnSuffix inh
, mkBench "S.splitOnSeq \"\" FL.drain" env $ \inh _ ->
splitOnSeq "" inh
, mkBench "S.splitOnSuffixSeq \"\" FL.drain" env $ \inh _ ->
splitOnSuffixSeq "" inh
-- , mkBench "S.splitOnSuffixSeq \"\" FL.drain" env $ \inh _ ->
-- splitOnSuffixSeq "" inh
, mkBench "S.splitOnSeq \"\\n\" FL.drain" env $ \inh _ ->
splitOnSeq "\n" inh
, mkBench "S.splitOnSuffixSeq \"\\n\" FL.drain" env $ \inh _ ->
splitOnSuffixSeq "\n" inh
-- , mkBench "S.splitOnSuffixSeq \"\\n\" FL.drain" env $ \inh _ ->
-- splitOnSuffixSeq "\n" inh
, mkBench "S.splitOnSeq \"a\" FL.drain" env $ \inh _ ->
splitOnSeq "a" inh
, mkBench "S.splitOnSeq \"\\r\\n\" FL.drain" env $ \inh _ ->
splitOnSeq "\r\n" inh
, mkBench "S.splitOnSuffixSeq \"\\r\\n\" FL.drain" env $ \inh _ ->
splitOnSuffixSeq "\r\n" inh
-- , mkBench "S.splitOnSuffixSeq \"\\r\\n\" FL.drain" env $ \inh _ ->
-- splitOnSuffixSeq "\r\n" inh
, mkBench "S.splitOnSeq \"aa\" FL.drain" env $ \inh _ ->
splitOnSeq "aa" inh
, mkBench "S.splitOnSeq \"aaaa\" FL.drain" env $ \inh _ ->
Expand Down
1 change: 1 addition & 0 deletions benchmark/streamly-benchmarks.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ benchmark Data.Fold
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data
main-is: Fold.hs
build-depends: containers >= 0.5 && < 0.7
if impl(ghcjs)
buildable: False
else
Expand Down
2 changes: 1 addition & 1 deletion src/Streamly.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ import qualified Streamly.Internal.Data.Stream.Async as Async
-- reducers of streams. Reducers can be combined to consume a stream source in
-- many ways. The simplest is to reduce a stream source using a fold e.g.:
--
-- > S.runFold FL.length $ S.enumerateTo 100
-- > S.foldOnce FL.length $ S.enumerateTo 100
--
-- Folds are consumers of streams and can be used to split a stream into
-- multiple independent flows. Grouping transforms a stream by applying a fold
Expand Down
26 changes: 14 additions & 12 deletions src/Streamly/Internal/Data/Array.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ import Data.Functor.Identity (runIdentity)
import Data.Primitive.Array hiding (fromList, fromListN)
import qualified GHC.Exts as Exts

import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import Streamly.Internal.Data.Stream.Serial (SerialT)

import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Fold.Types as FL

{-# NOINLINE bottomElement #-}
bottomElement :: a
Expand Down Expand Up @@ -109,32 +111,32 @@ writeN limit = Fold step initial extract
where
initial = do
marr <- liftIO $ newArray limit bottomElement
return (marr, 0)
step (marr, i) x
| i == limit = return (marr, i)
return (Tuple' marr 0)
step st@(Tuple' marr i) x
| i == limit = fmap FL.Done $ extract st
| otherwise = do
liftIO $ writeArray marr i x
return (marr, i + 1)
extract (marr, len) = liftIO $ freezeArray marr 0 len
return $ FL.Partial $ Tuple' marr (i + 1)
extract (Tuple' marr len) = liftIO $ freezeArray marr 0 len

{-# INLINE_NORMAL write #-}
write :: MonadIO m => Fold m a (Array a)
write = Fold step initial extract
where
initial = do
marr <- liftIO $ newArray 0 bottomElement
return (marr, 0, 0)
step (marr, i, capacity) x
return (Tuple3' marr 0 0)
step (Tuple3' marr i capacity) x
| i == capacity =
let newCapacity = max (capacity * 2) 1
in do newMarr <- liftIO $ newArray newCapacity bottomElement
liftIO $ copyMutableArray newMarr 0 marr 0 i
liftIO $ writeArray newMarr i x
return (newMarr, i + 1, newCapacity)
return $ FL.Partial $ Tuple3' newMarr (i + 1) newCapacity
| otherwise = do
liftIO $ writeArray marr i x
return (marr, i + 1, capacity)
extract (marr, len, _) = liftIO $ freezeArray marr 0 len
return $ FL.Partial $ Tuple3' marr (i + 1) capacity
extract (Tuple3' marr len _) = liftIO $ freezeArray marr 0 len

{-# INLINE_NORMAL fromStreamDN #-}
fromStreamDN :: MonadIO m => Int -> D.Stream m a -> m (Array a)
Expand All @@ -149,7 +151,7 @@ fromStreamDN limit str = do

{-# INLINE fromStreamD #-}
fromStreamD :: MonadIO m => D.Stream m a -> m (Array a)
fromStreamD = D.runFold write
fromStreamD = D.foldOnce write

{-# INLINABLE fromListN #-}
fromListN :: Int -> [a] -> Array a
Expand Down Expand Up @@ -185,7 +187,7 @@ toStreamRev = D.fromStreamD . toStreamDRev

{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> Array a -> m b
fold f arr = D.runFold f (toStreamD arr)
fold f arr = D.foldOnce f (toStreamD arr)

{-# INLINE streamFold #-}
streamFold :: Monad m => (SerialT m a -> m b) -> Array a -> m b
Expand Down
Loading