Skip to content

Commit

Permalink
Add internal monitoring library
Browse files Browse the repository at this point in the history
  • Loading branch information
paolino committed Apr 23, 2024
1 parent 18fc069 commit 3fcd64f
Show file tree
Hide file tree
Showing 7 changed files with 716 additions and 20 deletions.
35 changes: 15 additions & 20 deletions lib/local-cluster/exe/local-cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

import Prelude

Expand All @@ -30,13 +28,6 @@ import Cardano.Wallet.Launch.Cluster.CommandLine
( CommandLineOptions (..)
, parseCommandLineOptions
)
import Cardano.Wallet.Launch.Cluster.Control.Server
( server
)
import Cardano.Wallet.Launch.Cluster.Control.State
( changePhase
, withControlLayer
)
import Cardano.Wallet.Launch.Cluster.FileOf
( DirOf (..)
, FileOf (..)
Expand All @@ -53,7 +44,6 @@ import Control.Lens
)
import Control.Monad
( void
, (<=<)
)
import Control.Monad.Cont
( ContT (..)
Expand All @@ -62,12 +52,16 @@ import Control.Monad.Trans
( MonadIO (..)
, MonadTrans (..)
)
import Control.Monitoring
import Data.Foldable
( toList
)
import Data.Time
( getCurrentTime
)
import Main.Utf8
( withUtf8
)
import Network.Wai.Handler.Warp
( run
)
import System.Environment.Extended
( isEnvSet
)
Expand All @@ -85,14 +79,11 @@ import System.Path
import System.Path.Directory
( createDirectoryIfMissing
)
import UnliftIO
( async
, link
)

import qualified Cardano.Node.Cli.Launcher as NC
import qualified Cardano.Wallet.Cli.Launcher as WC
import qualified Cardano.Wallet.Launch.Cluster as Cluster
import qualified Control.Foldl as F

-- |
-- # OVERVIEW
Expand Down Expand Up @@ -172,8 +163,12 @@ main = withUtf8 $ do
parseCommandLineOptions
funds <- retrieveFunds faucetFundsFile
flip runContT pure $ do
monitoring <- withControlLayer
liftIO $ link <=< async $ server monitoring >>= run monitoringPort
-- a non-pulling tracer that traces integers and the time they were traced
-- and accumulates them in a set
c <- lift $
mkFoldingMonitor (liftIO getCurrentTime) F.set PullingState
>>= mkMonitor
trace <- ContT $ runMonitor monitoringPort (fmap show . toList) c
clusterPath <-
case clusterDir of
Just path -> pure path
Expand All @@ -198,7 +193,7 @@ main = withUtf8 $ do
lift $ createDirectoryIfMissing True walletDir
node <-
ContT
$ Cluster.withCluster (changePhase monitoring) clusterCfg funds
$ Cluster.withCluster trace clusterCfg funds
nodeSocket <-
case parse . nodeSocketFile $ Cluster.runningNodeSocketPath node of
Left e -> error e
Expand Down
293 changes: 293 additions & 0 deletions lib/local-cluster/lib/Control/Monitoring.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}

module Control.Monitoring where

import Prelude

import Control.Comonad
( Comonad (duplicate, extract)
, ($>)
)
import Control.Foldl
( Fold
)
import qualified Control.Foldl as F
import Control.Monad
( forever
, when
, (<=<)
)
import Data.Bifunctor
( Bifunctor (..)
)
import Data.Profunctor
( Profunctor (..)
, dimap
, lmap
, rmap
)
import GHC.IO.Handle
( hGetLine
)
import Network.Simple.TCP
( HostPreference (HostAny)
, serve
)
import Network.Socket
( socketToHandle
)
import System.IO
( hPutStrLn
)
import Text.Read
( readMaybe
)
import UnliftIO
( IOMode (..)
, MonadIO
, MonadUnliftIO
, UnliftIO (..)
, askUnliftIO
, async
, atomically
, liftIO
, link
, modifyTVar
, newEmptyTMVarIO
, newTVarIO
, putTMVar
, readTVarIO
, takeTMVar
, writeTVar
)

data Protocol = Pull | Switch | Observe | Kill
deriving stock (Show, Read)

runMonitor
:: MonadUnliftIO m
=> Int
-- ^ The port to listen on
-> (b -> [String])
-- ^ How to render the output
-> Monitor m a b
-- ^ The controller
-> ((a -> m ()) -> m c)
-- ^ The action to run with the tracer action
-> m c
runMonitor port renderOuptut c action = do
UnliftIO run <- askUnliftIO
liftIO
$ link <=< async
$ serve HostAny (show port)
$ \(socket, _) -> do
handle <- socketToHandle socket ReadWriteMode
forever $ do
l <- hGetLine handle
let p = do
(output, state) <- run (observe c)
mapM_ (hPutStrLn handle) $ renderOuptut output
hPutStrLn handle $ "State: " <> show state

case readMaybe l of
Just Pull -> run (pull c) >> p
Just Switch -> run (switch c)
Just Observe -> p
Just Kill -> run (kill c)
Nothing -> hPutStrLn handle "Invalid command"
action $ trace c

-- | A tracing controller. It accumulates the trace of `a` in `b` and can
-- control the program via `step`.
data Tracing m a b = Tracing
{ observeTracing :: m b
-- ^ Observe the current trace
, traceTracing :: a -> m ()
-- ^ The tracer
}

instance Monad m => Functor (Tracing m a) where
fmap = rmap

instance Monad m => Profunctor (Tracing m) where
dimap f g Tracing{..} =
Tracing
{ observeTracing = fmap g observeTracing
, traceTracing = lmap f traceTracing
}

data Pulling m a b = Pulling
{ tracingPulling :: Tracing m a b
, notPulling :: m (NotPulling m a b)
, pullPulling :: m ()
}

instance Monad m => Profunctor (Pulling m) where
dimap f g Pulling{..} =
Pulling
{ tracingPulling = dimap f g tracingPulling
, notPulling = dimap f g <$> notPulling
, pullPulling
}

data NotPulling m a b = NotPulling
{ tracingNotPulling :: Tracing m a b
, pulling :: Pulling m a b
}

instance Monad m => Profunctor (NotPulling m) where
dimap f g NotPulling{..} =
NotPulling
{ tracingNotPulling = dimap f g tracingNotPulling
, pulling = dimap f g pulling
}

data MonitorState = PullingState | NotPullingState
deriving stock (Show, Read)

-- | A controller that can switch between pulling and not-pulling
-- * `trace` is tracing in both states
-- * `switch` switches between pulling and not pulling and vice versa
-- * `observe` observes the current state which is kept however we switch
-- * `pull` is a no-op when not pulling
data Monitor m a b = Monitor
{ trace :: a -> m ()
, switch :: m ()
, observe :: m (b, MonitorState)
, pull :: m ()
, kill :: m ()
}

natMonitor :: (forall x. m x -> n x) -> Monitor m a b -> Monitor n a b
natMonitor nat Monitor{..} =
Monitor
{ trace = nat . trace
, switch = nat switch
, observe = nat observe
, pull = nat pull
, kill = nat kill
}

instance Monad m => Functor (Monitor m a) where
fmap = rmap

instance Monad m => Profunctor (Monitor m) where
dimap f g Monitor{..} =
Monitor
{ trace = trace . f
, switch
, observe = fmap (first g) observe
, pull
, kill
}

-- | Create an STM thread safe controller from a tracer
-- in either pulling or not-pulling state
mkMonitor
:: MonadIO m
=> PullingOrNotPulling m a b
-> m (Monitor m a b)
mkMonitor tracer = do
tracerVar <- liftIO $ newTVarIO tracer
let r = liftIO $ readTVarIO tracerVar
w = liftIO . atomically . writeTVar tracerVar
k <- liftIO $ newTVarIO False
let pull' = do
t <- r
eitherPulling pullPulling (const $ pure ()) t
pure
$ Monitor
{ trace = \a -> do
t <- r
kill <- liftIO $ readTVarIO k
when kill $ error "Killed"
eitherPulling
(traceTracing . tracingPulling)
(traceTracing . tracingNotPulling)
t
a
, switch = do
t <- r
case t of
MkPulling Pulling{notPulling} ->
notPulling >>= w . MkNotPulling
MkNotPulling NotPulling{pulling} ->
w $ MkPulling pulling
, observe = do
t <- r
eitherPulling
(fmap (,PullingState) . observeTracing . tracingPulling)
(fmap (,NotPullingState) . observeTracing . tracingNotPulling)
t
, pull = pull'
, kill = do
liftIO $ atomically $ writeTVar k True
pull'
}

data PullingOrNotPulling m a b
= MkPulling (Pulling m a b)
| MkNotPulling (NotPulling m a b)

eitherPulling
:: (Pulling m a b -> c)
-> (NotPulling m a b -> c)
-> PullingOrNotPulling m a b
-> c
eitherPulling f _ (MkPulling p) = f p
eitherPulling _ g (MkNotPulling p) = g p

-- | Create a NotPulling tracer from a folding function
mkFoldingMonitor
:: MonadIO m
=> m ctx
-- ^ Get the context after each trace
-> Fold (ctx, a) b
-- ^ How to fold the context and the value
-> MonitorState
-> m (PullingOrNotPulling m a b)
mkFoldingMonitor getContext how state = do
resultVar <- liftIO $ newTVarIO how
let updateResult a =
modifyTVar resultVar $ \how' -> F.fold (duplicate how') [a]
block <- liftIO newEmptyTMVarIO
let
observeTracing = liftIO $ extract <$> readTVarIO resultVar
step =
liftIO
$ atomically
$ takeTMVar block >>= updateResult
tracePulling a = do
ctx <- getContext
liftIO $ atomically $ putTMVar block (ctx, a)
traceNotPulling a = do
ctx <- getContext
liftIO $ atomically $ updateResult (ctx, a)
let notPulling =
NotPulling
{ tracingNotPulling =
Tracing
{ traceTracing = traceNotPulling
, observeTracing
}
, pulling = pulling
}
pulling =
Pulling
{ tracingPulling =
Tracing
{ traceTracing = tracePulling
, observeTracing
}
, pullPulling = step
, notPulling = step $> notPulling
}
pure $ case state of
PullingState -> MkNotPulling notPulling
NotPullingState -> MkPulling pulling

0 comments on commit 3fcd64f

Please sign in to comment.