Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor time related code in StreamD/Generate #911

Merged
merged 4 commits into from Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/Streamly/Internal/Control/Concurrent.hs
@@ -0,0 +1,89 @@
{-# LANGUAGE UnboxedTuples #-}

-- |
-- Module : Streamly.Internal.Control.Concurrent
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC

module Streamly.Internal.Control.Concurrent
(
MonadAsync
, RunInIO(..)
, doFork
, fork
, forkManaged
)
where

import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control
(MonadBaseControl, control, StM, liftBaseDiscard)
import Data.Functor (void)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.Mem.Weak (addFinalizer)

-- /Since: 0.8.0 ("Streamly.Prelude")/
--
-- | A monad that can perform concurrent or parallel IO operations. Streams
-- that can be composed concurrently require the underlying monad to be
-- 'MonadAsync'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)

newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO (StM m b) }

-- Stolen from the async package. The perf improvement is modest, 2% on a
-- thread heavy benchmark (parallel composition using noop computations).
-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case fork# action s of (# s1, tid #) -> (# s1, ThreadId tid #)

-- | Fork a thread to run the given computation, installing the provided
-- exception handler. Lifted to any monad with 'MonadBaseControl IO m'
-- capability.
--
-- TODO: the RunInIO argument can be removed, we can directly pass the action
-- as "mrun action" instead.
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> RunInIO m
-> (SomeException -> IO ())
-> m ThreadId
doFork action (RunInIO mrun) exHandler =
control $ \run ->
mask $ \restore -> do
tid <- rawForkIO $ catch (restore $ void $ mrun action)
exHandler
run (return tid)

-- | 'fork' lifted to any monad with 'MonadBaseControl IO m' capability.
--
{-# INLINABLE fork #-}
fork :: MonadBaseControl IO m => m () -> m ThreadId
fork = liftBaseDiscard forkIO

-- | Fork a thread that is automatically killed as soon as the reference to the
-- returned threadId is garbage collected.
--
{-# INLINABLE forkManaged #-}
forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId
forkManaged action = do
tid <- fork action
liftIO $ addFinalizer tid (killThread tid)
return tid
71 changes: 8 additions & 63 deletions src/Streamly/Internal/Data/SVar.hs
Expand Up @@ -105,42 +105,37 @@ module Streamly.Internal.Data.SVar
)
where

import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo, forkIO, killThread)
import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, tryTakeMVar, newMVar,
tryReadMVar)
import Control.Exception
(SomeException(..), catch, mask, assert, Exception, catches,
(SomeException(..), assert, Exception, catches,
throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control
(MonadBaseControl, control, StM, liftBaseDiscard)
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.Kind (Type)
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.Kind (Type)
import Data.Maybe (fromJust, fromMaybe)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.IO (hPutStrLn, stderr)
import System.Mem.Weak (addFinalizer)

import Streamly.Internal.Data.Time.Clock (Clock(..), getTime)
import Streamly.Internal.Control.Concurrent
(MonadAsync, RunInIO(..), doFork, fork, forkManaged)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
Expand Down Expand Up @@ -898,66 +893,16 @@ withDiagMVar sv label action =
-- Spawning threads
------------------------------------------------------------------------------

-- /Since: 0.8.0 ("Streamly.Prelude")/
--
-- | A monad that can perform concurrent or parallel IO operations. Streams
-- that can be composed concurrently require the underlying monad to be
-- 'MonadAsync'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)

-- When we run computations concurrently, we completely isolate the state of
-- | When we run computations concurrently, we completely isolate the state of
-- the concurrent computations from the parent computation. The invariant is
-- that we should never be running two concurrent computations in the same
-- thread without using the runInIO function. Also, we should never be running
-- a concurrent computation in the parent thread, otherwise it may affect the
-- state of the parent which is against the defined semantics of concurrent
-- execution.
newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO (StM m b) }

captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
captureMonadState = control $ \run -> run (return $ RunInIO run)

-- Stolen from the async package. The perf improvement is modest, 2% on a
-- thread heavy benchmark (parallel composition using noop computations).
-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case fork# action s of (# s1, tid #) -> (# s1, ThreadId tid #)

{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> RunInIO m
-> (SomeException -> IO ())
-> m ThreadId
doFork action (RunInIO mrun) exHandler =
control $ \run ->
mask $ \restore -> do
tid <- rawForkIO $ catch (restore $ void $ mrun action)
exHandler
run (return tid)

{-# INLINABLE fork #-}
fork :: MonadBaseControl IO m => m () -> m ThreadId
fork = liftBaseDiscard forkIO

-- | Fork a thread that is automatically killed as soon as the reference to the
-- returned threadId is garbage collected.
--
{-# INLINABLE forkManaged #-}
forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId
forkManaged action = do
tid <- fork action
liftIO $ addFinalizer tid (killThread tid)
return tid

------------------------------------------------------------------------------
-- Collecting results from child workers in a streamed fashion
------------------------------------------------------------------------------
Expand Down
56 changes: 10 additions & 46 deletions src/Streamly/Internal/Data/Stream/StreamD/Generate.hs
Expand Up @@ -89,16 +89,11 @@ where

#include "inline.hs"

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Int (Int64)
import Streamly.Internal.Data.Time.Units (toRelTime64, RelTime64)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Clock
(Clock(Monotonic), asyncClock, readClock)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), fromAbsTime, toAbsTime, AbsTime)

import qualified Streamly.Internal.Data.IORef.Prim as Prim
(toAbsTime, AbsTime, toRelTime64, RelTime64)

import Prelude hiding (iterate, repeat, replicate, takeWhile)
import Streamly.Internal.Data.Stream.StreamD.Type
Expand Down Expand Up @@ -325,31 +320,6 @@ numFromThen from next = enumerateFromStepNum from (next - from)
-- Time Enumeration
------------------------------------------------------------------------------

{-# INLINE updateTimeVar #-}
updateTimeVar :: Prim.IORef Int64 -> IO ()
updateTimeVar timeVar = do
MicroSecond64 t <- fromAbsTime <$> getTime Monotonic
Prim.modifyIORef' timeVar (const t)

{-# INLINE updateWithDelay #-}
updateWithDelay :: RealFrac a => a -> Prim.IORef Int64 -> IO ()
updateWithDelay precision timeVar = do
threadDelay (delayTime precision)
updateTimeVar timeVar

where

-- Keep the minimum at least a millisecond to avoid high CPU usage
{-# INLINE delayTime #-}
delayTime g
| g' >= fromIntegral (maxBound :: Int) = maxBound
| g' < 1000 = 1000
| otherwise = round g'

where

g' = g * 10 ^ (6 :: Int)

{-# INLINE_NORMAL times #-}
times :: MonadAsync m => Double -> Stream m (AbsTime, RelTime64)
times g = Stream step Nothing
Expand All @@ -358,22 +328,16 @@ times g = Stream step Nothing

{-# INLINE_LATE step #-}
step _ Nothing = do
-- XXX note that this is safe only on a 64-bit machine. On a 32-bit
-- machine a 64-bit 'Var' cannot be read consistently without a lock
-- while another thread is writing to it.
timeVar <- liftIO $ Prim.newIORef (0 :: Int64)
liftIO $ updateTimeVar timeVar
tid <- forkManaged $ liftIO $ forever (updateWithDelay g timeVar)
a <- liftIO $ Prim.readIORef timeVar
return $ Skip $ Just (timeVar, tid, a)

step _ s@(Just (timeVar, _, t0)) = do
a <- liftIO $ Prim.readIORef timeVar
clock <- liftIO $ asyncClock Monotonic g
a <- liftIO $ readClock clock
return $ Skip $ Just (clock, a)

step _ s@(Just (clock, t0)) = do
a <- liftIO $ readClock clock
-- XXX we can perhaps use an AbsTime64 using a 64 bit Int for
-- efficiency. or maybe we can use a representation using Double for
-- floating precision time
return $ Yield (toAbsTime (MicroSecond64 t0),
(toRelTime64 (MicroSecond64 (a - t0)))) s
return $ Yield (toAbsTime t0, toRelTime64 (a - t0)) s

-------------------------------------------------------------------------------
-- From Generators
Expand Down
80 changes: 80 additions & 0 deletions src/Streamly/Internal/Data/Time/Clock.hs
@@ -0,0 +1,80 @@
-- |
-- Module : Streamly.Internal.Data.Time.Clock
-- Copyright : (c) 2021 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC

module Streamly.Internal.Data.Time.Clock
(
-- * System clock
Clock(..)
, getTime

-- * Async clock
, asyncClock
, readClock
)
where

import Control.Concurrent (threadDelay, ThreadId)
import Control.Monad (forever)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units (MicroSecond64(..), fromAbsTime)
import Streamly.Internal.Control.Concurrent (forkManaged)

import qualified Streamly.Internal.Data.IORef.Prim as Prim


------------------------------------------------------------------------------
-- Async clock
------------------------------------------------------------------------------

{-# INLINE updateTimeVar #-}
updateTimeVar :: Clock -> Prim.IORef MicroSecond64 -> IO ()
updateTimeVar clock timeVar = do
t <- fromAbsTime <$> getTime clock
Prim.modifyIORef' timeVar (const t)

{-# INLINE updateWithDelay #-}
updateWithDelay :: RealFrac a =>
Clock -> a -> Prim.IORef MicroSecond64 -> IO ()
updateWithDelay clock precision timeVar = do
threadDelay (delayTime precision)
updateTimeVar clock timeVar

where

-- Keep the minimum at least a millisecond to avoid high CPU usage
{-# INLINE delayTime #-}
delayTime g
| g' >= fromIntegral (maxBound :: Int) = maxBound
| g' < 1000 = 1000
| otherwise = round g'

where

g' = g * 10 ^ (6 :: Int)

-- | @asyncClock g@ starts a clock thread that updates an IORef with current
-- time as a 64-bit value in microseconds, every 'g' seconds. The IORef can be
-- read asynchronously. The thread exits automatically when the reference to
-- the returned 'ThreadId' is lost.
--
-- Minimum granularity of clock update is 1 ms. Higher is better for
-- performance.
--
-- CAUTION! This is safe only on a 64-bit machine. On a 32-bit machine a 64-bit
-- 'Var' cannot be read consistently without a lock while another thread is
-- writing to it.
asyncClock :: Clock -> Double -> IO (ThreadId, Prim.IORef MicroSecond64)
asyncClock clock g = do
timeVar <- Prim.newIORef undefined
updateTimeVar clock timeVar
tid <- forkManaged $ forever (updateWithDelay clock g timeVar)
return (tid, timeVar)

{-# INLINE readClock #-}
readClock :: (ThreadId, Prim.IORef MicroSecond64) -> IO MicroSecond64
readClock (_, timeVar) = Prim.readIORef timeVar
Expand Up @@ -5,10 +5,10 @@
#include "config.h"
#endif

#include "Streamly/Internal/Data/Time/config-clock.h"
#include "Streamly/Internal/Data/Time/Clock/config-clock.h"

-- |
-- Module : Streamly.Internal.Data.Time.Clock
-- Module : Streamly.Internal.Data.Time.Clock.Type
-- Copyright : (c) 2019 Composewell Technologies
-- (c) 2009-2012, Cetin Sert
-- (c) 2010, Eugene Kirpichov
Expand All @@ -19,7 +19,7 @@

-- A majority of the code below has been stolen from the "clock" package.

module Streamly.Internal.Data.Time.Clock
module Streamly.Internal.Data.Time.Clock.Type
(
-- * get time from the system clock
Clock(..)
Expand Down