Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Prelude/Serial/O_n_Space.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ main = do

allBenchmarks size =
concat
[ o_n_space_serial_toList size -- < 2MB
[ o_n_space_serial_toContainers size -- < 2MB
, o_n_space_serial_outerProductStreams size
, o_n_space_wSerial_outerProductStreams size
, o_n_space_serial_traversable size -- < 2MB
Expand Down
22 changes: 17 additions & 5 deletions benchmark/lib/Streamly/Benchmark/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}

#ifdef __HADDOCK_VERSION__
#undef INSPECTION
Expand Down Expand Up @@ -48,7 +49,7 @@ module Streamly.Benchmark.Prelude

, o_1_space_zipSerial_transformation

, o_n_space_serial_toList
, o_n_space_serial_toContainers
, o_n_space_serial_outerProductStreams

, o_n_space_wSerial_outerProductStreams
Expand Down Expand Up @@ -101,9 +102,9 @@ import GHC.Generics (Generic)
import System.Random (randomRIO)
import Prelude
(Monad, String, Int, (+), ($), (.), return, even, (>), (<=), (==), (>=),
subtract, undefined, Maybe(..), Bool, not, (>>=), curry,
subtract, undefined, Maybe(..), Bool(..), not, (>>=), curry,
maxBound, div, IO, compare, Double, fromIntegral, Integer, (<$>),
(<*>), flip, sqrt, round, (*), seq)
(<*>), flip, sqrt, round, (*), seq, const, Either(..))
import qualified Prelude as P
import qualified Data.Foldable as F
import qualified GHC.Exts as GHC
Expand Down Expand Up @@ -549,6 +550,15 @@ tapAsync n = composeN n $ Internal.tapAsync FL.sum
timestamped :: (S.MonadAsync m) => Stream m Int -> m ()
timestamped = transform . Internal.timestamped

{-# INLINE classifySessionsOf #-}
classifySessionsOf :: (S.MonadAsync m) => Stream m Int -> m ()
classifySessionsOf =
transform
. IP.classifySessionsOf 3 (const (return False)) (P.fmap Right FL.drain)
. S.map (\(ts,(k,a)) -> (k, a, ts))
. IP.timestamped
. S.concatMap (\x -> S.map (x,) (S.enumerateFromTo 1 (10 :: Int)))

{-# INLINE mapMaybe #-}
mapMaybe :: MonadIO m => Int -> Stream m Int -> m ()
mapMaybe n =
Expand Down Expand Up @@ -2128,8 +2138,8 @@ o_1_space_zipSerial_transformation value =
-- Serial : O(n) Space
-------------------------------------------------------------------------------

o_n_space_serial_toList :: Int -> [Benchmark]
o_n_space_serial_toList value =
o_n_space_serial_toContainers :: Int -> [Benchmark]
o_n_space_serial_toContainers value =
[ bgroup
"serially"
[ bgroup
Expand All @@ -2141,6 +2151,8 @@ o_n_space_serial_toList value =
-- , benchIOSink value "toPure" toPure
-- , benchIOSink value "toPureRev" toPureRev
]
, benchIOSink (value `div` 10) "classifySessionsOf"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change classifySessionsOf to classifySessionsOf/10 maybe to indicate that you're using something other than the default value. It's not a big deal though.

classifySessionsOf
]
]

Expand Down
34 changes: 34 additions & 0 deletions src/Streamly/Internal/Data/Stream/StreamD.hs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ module Streamly.Internal.Data.Stream.StreamD

-- ** By folding (scans)
, scanlM'
, scanlMAfter'
, scanl'
, scanlM
, scanl
Expand All @@ -241,6 +242,7 @@ module Streamly.Internal.Data.Stream.StreamD
, postscanlM
, postscanl'
, postscanlM'
, postscanlMAfter'

, postscanlx'
, postscanlMx'
Expand Down Expand Up @@ -3609,6 +3611,31 @@ postscanlM' fstep begin (Stream step state) =
postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl' f = postscanlM' (\a b -> return (f a b))

-- We can possibly have the "done" function as a Maybe to provide an option to
-- emit or not emit the accumulator when the stream stops.
--
-- TBD: use a single Yield point
--
{-# INLINE_NORMAL postscanlMAfter' #-}
postscanlMAfter' :: Monad m
=> (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' fstep initial done (Stream step1 state1) = do
Stream step (Just (state1, initial))

where

{-# INLINE_LATE step #-}
step gst (Just (st, acc)) = do
r <- step1 (adaptState gst) st
case r of
Yield x s -> do
old <- acc
y <- fstep old x
y `seq` return (Yield y (Just (s, return y)))
Skip s -> return $ Skip $ Just (s, acc)
Stop -> acc >>= done >>= \res -> return (Yield res Nothing)
step _ Nothing = return Stop

{-# INLINE_NORMAL postscanlM #-}
postscanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
postscanlM fstep begin (Stream step state) = Stream step' (state, begin)
Expand All @@ -3631,6 +3658,13 @@ postscanl f = postscanlM (\a b -> return (f a b))
scanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
scanlM' fstep begin s = begin `seq` (begin `cons` postscanlM' fstep begin s)

{-# INLINE scanlMAfter' #-}
scanlMAfter' :: Monad m
=> (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' fstep initial done s =
(initial >>= \x -> x `seq` return x) `consM`
postscanlMAfter' fstep initial done s

{-# INLINE scanl' #-}
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' f = scanlM' (\a b -> return (f a b))
Expand Down
60 changes: 57 additions & 3 deletions src/Streamly/Internal/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ module Streamly.Internal.Prelude
-- ** Left scans
, scanl'
, scanlM'
, scanlMAfter'
, postscanl'
, postscanlM'
, prescanl'
Expand Down Expand Up @@ -2125,6 +2126,23 @@ scanx = P.scanlx'
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
scanlM' step begin m = fromStreamD $ D.scanlM' step begin $ toStreamD m

-- | @scanlMAfter' accumulate initial done stream@ is like 'scanlM'' except
-- that it provides an additional @done@ function to be applied on the
-- accumulator when the stream stops. The result of @done@ is also emitted in
-- the stream.
--
-- This function can be used to allocate a resource in the beginning of the
-- scan and release it when the stream ends or to flush the internal state of
-- the scan at the end.
--
-- /Internal/
--
{-# INLINE scanlMAfter' #-}
scanlMAfter' :: (IsStream t, Monad m)
=> (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' step initial done stream =
fromStreamD $ D.scanlMAfter' step initial done $ toStreamD stream

-- | Strict left scan. Like 'map', 'scanl'' too is a one to one transformation,
-- however it adds an extra element.
--
Expand Down Expand Up @@ -4507,8 +4525,8 @@ classifySessionsBy
-> t m (k, b) -- ^ session key, fold result
classifySessionsBy tick tmout reset ejectPred
(Fold step initial extract) str =
concatMap sessionOutputStream
$ scanlM' sstep szero stream
concatMap sessionOutputStream $
scanlMAfter' sstep (return szero) flush stream

where

Expand Down Expand Up @@ -4613,6 +4631,21 @@ classifySessionsBy tick tmout reset ejectPred
let curTime = addToAbsTime sessionCurTime tickMs
in ejectExpired sessionState curTime

flush session@SessionState{..} = do
(hp', mp', out, count) <-
ejectAll
( sessionTimerHeap
, sessionKeyValueMap
, K.nil
, sessionCount
)
return $ session
{ sessionCount = count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = out
}

fromEither e =
case e of
Left x -> x
Expand All @@ -4625,6 +4658,18 @@ classifySessionsBy tick tmout reset ejectPred
let mp1 = Map.delete key mp
return (hp, mp1, out1, cnt - 1)

ejectAll (hp, mp, out, !cnt) = do
let hres = H.uncons hp
case hres of
Just (Entry _ key, hp1) -> do
r <- case Map.lookup key mp of
Nothing -> return (hp1, mp, out, cnt)
Just (Tuple' _ acc) -> ejectEntry hp1 mp out cnt acc key
ejectAll r
Nothing -> do
assert (Map.null mp) (return ())
return (hp, mp, out, cnt)

ejectOne (hp, mp, out, !cnt) = do
let hres = H.uncons hp
case hres of
Expand Down Expand Up @@ -4687,7 +4732,7 @@ classifySessionsBy tick tmout reset ejectPred
return (hp, mp, out, cnt)

-- merge timer events in the stream
stream = Serial.map Just str `Par.parallel` repeatM timer
stream = Serial.map Just str `Par.parallelFst` repeatM timer
timer = do
liftIO $ threadDelay (round $ tick * 1000000)
return Nothing
Expand Down Expand Up @@ -4761,6 +4806,15 @@ classifyChunksOf wsize = classifyChunksBy wsize False
-- classifySessionsOf interval pred = classifySessionsBy 1 interval False pred
-- @
--
-- @
-- >>> S.mapM_ print
-- $ S.classifySessionsOf 3 (const (return False)) (fmap Right FL.toList)
-- $ S.map (\(ts,(k,a)) -> (k, a, ts))
-- $ S.timestamped
-- $ S.delay 1
-- $ (,) <$> S.fromList [1,2,3] <*> S.fromList [1,2,3]
-- @
--
-- /Internal/
--
{-# INLINABLE classifySessionsOf #-}
Expand Down