Skip to content

Commit

Permalink
rename ThreadFailed to Propagating and move it to its own module
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellwrosen committed Nov 28, 2023
1 parent cc554dd commit 6102681
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 49 deletions.
1 change: 1 addition & 0 deletions ki/ki.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ library
Ki.Internal.ByteCount
Ki.Internal.IO
Ki.Internal.NonblockingSTM
Ki.Internal.Propagating
Ki.Internal.Scope
Ki.Internal.Thread
Ki.Internal.ThreadAffinity
Expand Down
9 changes: 2 additions & 7 deletions ki/src/Ki.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,8 @@ import Ki.Internal.Scope
fork_,
scoped,
)
import Ki.Internal.Thread
( Thread,
ThreadAffinity (..),
ThreadOptions (..),
await,
defaultThreadOptions,
)
import Ki.Internal.Thread (Thread, ThreadOptions (..), await, defaultThreadOptions)
import Ki.Internal.ThreadAffinity (ThreadAffinity (..))

-- $introduction
--
Expand Down
40 changes: 40 additions & 0 deletions ki/src/Ki/Internal/Propagating.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Ki.Internal.Propagating
( pattern PropagatingFrom,
Tid,
peelOffPropagating,
propagate,
)
where

import Control.Concurrent (ThreadId)
import Control.Exception (Exception (..), SomeException, asyncExceptionFromException, asyncExceptionToException, throwTo)

-- Internal exception type thrown by a child thread to its parent, if it fails unexpectedly.
data Propagating = Propagating
{ childId :: {-# UNPACK #-} !Tid,
exception :: !SomeException
}
deriving stock (Show)

instance Exception Propagating where
toException = asyncExceptionToException
fromException = asyncExceptionFromException

pattern PropagatingFrom :: Tid -> SomeException
pattern PropagatingFrom childId <- (fromException -> Just Propagating {childId})

-- A unique identifier for a thread within a scope. (Internal type alias)
type Tid =
Int

-- Peel an outer Propagating layer off of some exception, if there is one.
peelOffPropagating :: SomeException -> SomeException
peelOffPropagating e0 =
case fromException e0 of
Just (Propagating _ e1) -> e1
Nothing -> e0

-- @propagate exception child parent@ propagates @exception@ from @child@ to @parent@.
propagate :: SomeException -> Tid -> ThreadId -> IO ()
propagate exception childId parentThreadId =
throwTo parentThreadId Propagating {childId, exception}
17 changes: 9 additions & 8 deletions ki/src/Ki/Internal/Scope.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import GHC.Conc.Sync (readTVarIO)
import GHC.IO (unsafeUnmask)
import IntSupply (IntSupply)
import qualified IntSupply
import Ki.Internal.ByteCount
import Ki.Internal.ByteCount (byteCountToInt64)
import Ki.Internal.IO
( IOResult (..),
UnexceptionalIO (..),
Expand All @@ -60,7 +60,8 @@ import Ki.Internal.IO
uninterruptiblyMasked,
)
import Ki.Internal.NonblockingSTM
import Ki.Internal.Thread
import Ki.Internal.Propagating (Tid, peelOffPropagating, propagate, pattern PropagatingFrom)
import Ki.Internal.Thread (Thread, ThreadOptions (..), defaultThreadOptions, forkWithAffinity, makeThread)

-- | A scope.
--
Expand Down Expand Up @@ -171,7 +172,7 @@ scoped action = do
-- If one of our children propagated an exception to us, then we know it's about to terminate, so we don't bother
-- throwing an exception to it.
pure case result of
Left (fromException -> Just ThreadFailed {childId}) -> IntMap.Lazy.delete childId livingChildren0
Left (PropagatingFrom childId) -> IntMap.Lazy.delete childId livingChildren0
_ -> livingChildren0

-- Deliver a ScopeClosing exception to every living child.
Expand All @@ -190,8 +191,8 @@ scoped action = do

-- By now there are three sources of exception:
--
-- 1) A sync or async exception thrown during the callback, captured in `result`. If applicable, we want to unwrap
-- the `ThreadFailed` off of this, which was only used to indicate it came from one of our children.
-- 1) A sync or async exception thrown during the callback, captured in `result`. If applicable, we want to peel
-- the `Propagating` off of this, which was only used to indicate it came from one of our children.
--
-- 2) A sync or async exception left for us in `childExceptionVar` by a child that tried to propagate it to us
-- directly, but failed (because we killed it concurrently).
Expand All @@ -201,7 +202,7 @@ scoped action = do
--
-- We cannot throw more than one, so throw them in that priority order.
case result of
Left exception -> throwIO (unwrapThreadFailed exception)
Left exception -> throwIO (peelOffPropagating exception)
Right value ->
tryTakeMVar childExceptionVar >>= \case
Nothing -> pure value
Expand Down Expand Up @@ -429,11 +430,11 @@ propagateException :: Scope -> Tid -> SomeException -> UnexceptionalIO ()
propagateException Scope {childExceptionVar, parentThreadId, statusVar} childId exception =
UnexceptionalIO (readTVarIO statusVar) >>= \case
Closing -> tryPutChildExceptionVar -- (A) / (B)
_ -> loop -- we know status is Open here
status -> assert (status >= 0) loop -- we know status is Open here
where
loop :: UnexceptionalIO ()
loop =
unexceptionalTry (throwTo parentThreadId ThreadFailed {childId, exception}) >>= \case
unexceptionalTry (propagate exception childId parentThreadId) >>= \case
Failure IsScopeClosingException -> tryPutChildExceptionVar -- (C)
Failure _ -> loop -- (D)
Success _ -> pure ()
Expand Down
35 changes: 1 addition & 34 deletions ki/src/Ki/Internal/Thread.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,14 @@ module Ki.Internal.Thread
( Thread,
makeThread,
await,
Tid,
ThreadAffinity (..),
forkWithAffinity,
ThreadOptions (..),
defaultThreadOptions,
ThreadFailed (..),
unwrapThreadFailed,
)
where

import Control.Concurrent (ThreadId, forkOS)
import Control.Exception
( BlockedIndefinitelyOnSTM (..),
Exception (fromException, toException),
MaskingState (..),
SomeException,
asyncExceptionFromException,
asyncExceptionToException,
)
import Control.Exception (BlockedIndefinitelyOnSTM (..), MaskingState (..))
import GHC.Conc (STM)
import Ki.Internal.ByteCount (ByteCount)
import Ki.Internal.IO (forkIO, forkOn, tryEitherSTM)
Expand Down Expand Up @@ -62,10 +51,6 @@ makeThread threadId action =
await_ = tryEitherSTM (\BlockedIndefinitelyOnSTM -> action) pure action
}

-- A unique identifier for a thread within a scope. (Internal type alias)
type Tid =
Int

-- forkIO/forkOn/forkOS, switching on affinity
forkWithAffinity :: ThreadAffinity -> IO () -> IO ThreadId
forkWithAffinity = \case
Expand Down Expand Up @@ -129,24 +114,6 @@ defaultThreadOptions =
maskingState = Unmasked
}

-- Internal exception type thrown by a child thread to its parent, if it fails unexpectedly.
data ThreadFailed = ThreadFailed
{ childId :: {-# UNPACK #-} !Tid,
exception :: !SomeException
}
deriving stock (Show)

instance Exception ThreadFailed where
toException = asyncExceptionToException
fromException = asyncExceptionFromException

-- Peel an outer ThreadFailed layer off of some exception, if there is one.
unwrapThreadFailed :: SomeException -> SomeException
unwrapThreadFailed e0 =
case fromException e0 of
Just (ThreadFailed _ e1) -> e1
Nothing -> e0

-- | Wait for a thread to terminate.
await :: Thread a -> STM a
await =
Expand Down

0 comments on commit 6102681

Please sign in to comment.