Skip to content

Commit

Permalink
Add control monitoring library to local-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
paolino committed Apr 30, 2024
1 parent 6012a6b commit b347daa
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 18 deletions.
46 changes: 46 additions & 0 deletions lib/local-cluster/lib/Control/Monitoring/Concurrent.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE RankNTypes #-}

module Control.Monitoring.Concurrent
( Register (..)
, newRegister
)
where

import Prelude

import UnliftIO
( MonadIO
, atomically
, newTVarIO
, readTVar
, readTVarIO
, writeTVar
)

import Control.Monad.STM
( retry
)

-- | A thread-safe register that can be read, be blocked on changing
data Register m a b = Register
{ readRegister :: m a
-- ^ Read the register
, changeRegister :: (a -> Maybe a) -> m ()
-- ^ Block on the register until a new `a` is ready
}

-- | Create a new `Register` with an initial value
newRegister :: MonadIO m => a -> m (Register m a b)
newRegister a = do
var <- newTVarIO a
pure
$ Register
{ readRegister = readTVarIO var
, changeRegister = \f -> atomically $ do
v <- readTVar var
case f v of
Just v' -> writeTVar var v'
Nothing -> retry
}
70 changes: 70 additions & 0 deletions lib/local-cluster/lib/Control/Monitoring/Folder.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}

module Control.Monitoring.Folder
( mkTracingFromFold
, mkTracingFromMoore
, mooreToFold
, foldToMoore
)
where

import Prelude

import Control.Comonad
( Comonad (duplicate, extract)
)
import Control.Foldl
( Fold (..)
)
import Control.Monitoring.Tracing
( State (..)
, StateS (..)
, Tracing (..)
)
import Data.Machine
( Moore (..)
)

import qualified Control.Foldl as L

consume :: Fold a b -> a -> Fold a b
consume f a = L.fold (duplicate f) [a]

-- | Create a machine from a `Fold` in a given initial state
mkTracingFromFold :: forall w a b. Fold a b -> StateS w -> Tracing w a b
mkTracingFromFold = go
where
go :: Fold a b -> StateS w' -> Tracing w' a b
go f w =
Tracing
{ observation = extract f
, state = case w of
WaitS ->
Waiting
{ traceW = \a -> go (consume f a) StepS
, switchW = go f RunS
}
StepS ->
Stepping
{ stepS = go f WaitS
, switchS = go f RunS
}
RunS ->
Running
{ traceR = \a -> go (consume f a) RunS
, switchR = go f StepS
}
}

-- | Convert a `Moore` machine to a `Fold`
mooreToFold :: Moore a b -> Fold a b
mooreToFold m = Fold (\(Moore _ f) -> f) m (\(Moore b _) -> b)

-- | Convert a `Fold` to a `Moore` machine
foldToMoore :: Fold a b -> Moore a b
foldToMoore f = Moore (extract f) $ foldToMoore . consume f

-- | Create a tracing from a `Moore` machine in a given initial state
mkTracingFromMoore :: forall w a b. Moore a b -> StateS w -> Tracing w a b
mkTracingFromMoore = mkTracingFromFold . mooreToFold
121 changes: 121 additions & 0 deletions lib/local-cluster/lib/Control/Monitoring/Monitor.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

-- | A monitor that can switch between pausing and running
module Control.Monitoring.Monitor
( Monitor (..)
, hoistMonitor
, mkMonitor
, monitorTracer
)
where

import Prelude

import Control.Monad
( when
)
import Data.Bifunctor
( Bifunctor (..)
)
import Data.Profunctor
( Profunctor (..)
, dimap
, rmap
)
import UnliftIO
( MonadIO
)

import Control.Monitoring.Concurrent
( Register (..)
, newRegister
)
import Control.Monitoring.Tracing
( AnyTracing
, MonitorState (..)
)
import Control.Tracer
( Tracer (..)
)

import qualified Control.Monitoring.Tracing as Tracing

-- | The state of the monitor
-- | A trace monitor that can switch between pausing and running
-- * `trace` is tracing in both states
-- * `switch` switches between pausing and not pausing and vice versa
-- * `observe` observes the current state which is kept however we switch
-- * `pull` is a no-op when not pausing
-- * `kill` tries to kill the program by placing a bomb on the next trace
data Monitor m a b = Monitor
{ trace :: a -> m ()
-- ^ Trace a value
, switch :: m ()
-- ^ Switch between pausing and running
, observe :: m (b, MonitorState)
-- ^ Observe the current state
, step :: m ()
-- ^ Pull the next trace, when in pausing state
, kill :: m ()
-- ^ Try to kill the program
}

-- | Natural transformation of monitors
hoistMonitor :: (forall x. m x -> n x) -> Monitor m a b -> Monitor n a b
hoistMonitor nat Monitor{..} =
Monitor
{ trace = nat . trace
, switch = nat switch
, observe = nat observe
, step = nat step
, kill = nat kill
}

instance Monad m => Functor (Monitor m a) where
fmap = rmap

instance Monad m => Profunctor (Monitor m) where
dimap f g Monitor{..} =
Monitor
{ trace = trace . f
, switch
, observe = fmap (first g) observe
, step
, kill
}

-- | Create a monitor from a tracer in either pausing or running state
mkMonitor
:: MonadIO m
=> AnyTracing c b
-- ^ The initial state of the monitor
-> (a -> m c)
-- ^ Contextualize the tracing
-> m (Monitor m a b)
mkMonitor anyTracing addCtx = do
Register readTracing changeTracing <- newRegister anyTracing
let block e = changeTracing $ \s -> case Tracing.tracingState s of
Step -> Nothing
_ -> Just $ Tracing.trace e s
Register readKill changeKill <- newRegister False
let pull' = changeTracing $ pure . Tracing.step
pure
$ Monitor
{ trace = \event -> do
kill <- readKill
when kill $ error "Killed"
ctxed <- addCtx event
block ctxed
, switch = changeTracing $ pure . Tracing.switch
, observe = do
m <- readTracing
pure (Tracing.observe m, Tracing.tracingState m)
, step = pull'
, kill = changeKill (const $ Just True) >> pull'
}

-- | Extract the `Tracer` from a `Monitor`
monitorTracer :: Monitor m a b -> Tracer m a
monitorTracer = Tracer . trace

0 comments on commit b347daa

Please sign in to comment.