Skip to content

Commit

Permalink
Extract control api component of the local-cluster http service
Browse files Browse the repository at this point in the history
  • Loading branch information
paolino committed May 3, 2024
1 parent 9d74fed commit 6b163b4
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,141 +2,25 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Cardano.Wallet.Launch.Cluster.Monitoring.Http.API
( API
, ApiT (..)
, ReadyAPI
, StepAPI
, SwitchAPI
, ObserveAPI
, ControlAPI
)
where

import Prelude

import Cardano.Wallet.Launch.Cluster.Monitoring.Http.Application.API
( ApplicationAPI
)
import Cardano.Wallet.Launch.Cluster.Monitoring.Http.OpenApi
( monitorStateSchema
, observationSchema
)
import Cardano.Wallet.Launch.Cluster.Monitoring.Phase
( History (..)
)
import Control.Monitoring.Tracing
( MonitorState (..)
)
import Data.Aeson.Types
( FromJSON (..)
, Parser
, ToJSON (..)
, Value (..)
, object
, withArray
, withObject
, (.:)
, (.=)
)
import Data.Foldable
( toList
)
import Data.OpenApi
( NamedSchema (..)
, ToSchema (..)
)
import GHC.Generics
( Generic (..)
)
import Servant
( Post
, PostNoContent
import Cardano.Wallet.Launch.Cluster.Monitoring.Http.Control.API
( ControlAPI
)
import Servant.API
( Get
, JSON
, (:<|>)
, (:>)
( (:<|>)
)

type ReadyAPI = "ready" :> Get '[JSON] Bool
type StepAPI = "control" :> "step" :> PostNoContent
type SwitchAPI = "control" :> "switch" :> Post '[JSON] (ApiT MonitorState)
type ObserveAPI = "control" :> "observe" :> Get '[JSON] (ApiT (History, MonitorState))

-- | The API to control the monitoring server
type ControlAPI = ReadyAPI :<|> StepAPI :<|> SwitchAPI :<|> ObserveAPI

-- | The API for the monitoring server and the query cluster application
type API n = ControlAPI :<|> ApplicationAPI n

-- | A newtype wrapper to avoid orphan instances
newtype ApiT a = ApiT {unApiT :: a}
deriving newtype (Eq, Show, Generic)

renderHistory :: History -> Value
renderHistory History{history} = toJSON $ do
(time, phase) <- history
pure
$ object
[ "time" .= time
, "phase" .= phase
]

parseHistory :: Value -> Parser History
parseHistory = withArray "History" $ \arr -> do
history <- traverse parsePhase (toList arr)
pure $ History{history}
where
parsePhase = withObject "Phase" $ \o -> do
time <- o .: "time"
phase <- o .: "phase"
pure (time, phase)

instance ToJSON (ApiT MonitorState) where
toJSON = \case
ApiT Wait -> String "waiting"
ApiT Step -> String "stepping"
ApiT Run -> String "running"

instance ToSchema (ApiT MonitorState) where
declareNamedSchema _ = do
pure
$ NamedSchema
(Just "ApiT MonitorState")
monitorStateSchema

instance FromJSON (ApiT MonitorState) where
parseJSON = \case
String "waiting" -> pure $ ApiT Wait
String "stepping" -> pure $ ApiT Step
String "running" -> pure $ ApiT Run
_ -> fail "Invalid state"

instance ToJSON (ApiT (History, MonitorState)) where
toJSON (ApiT (history, state)) =
object
[ "phases" .= renderHistory history
, "state" .= ApiT state
]

instance ToSchema (ApiT (History, MonitorState)) where
declareNamedSchema _ =
pure
$ NamedSchema
(Just "ApiT (History, MonitorState)")
observationSchema

instance FromJSON (ApiT (History, MonitorState)) where
parseJSON = withObject "ApiT (History, MonitorState)" $ \o -> do
history <- o .: "phases" >>= parseHistory
ApiT state <- o .: "state"
pure $ ApiT (history, state)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Cardano.Wallet.Launch.Cluster.Monitoring.Http.ApiT
( ApiT (..)
)
where

import Prelude

import GHC.Generics
( Generic
)

-- | A newtype wrapper to avoid orphan instances
newtype ApiT a = ApiT {unApiT :: a}
deriving newtype (Eq, Show, Generic)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
Expand All @@ -11,6 +12,7 @@ module Cardano.Wallet.Launch.Cluster.Monitoring.Http.Application.Client
, AnyApplicationQ (..)
, RunApplicationQ (..)
, Application
, MsgApplication (..)
)
where

Expand Down Expand Up @@ -73,6 +75,9 @@ mkApplication _ =
{ sendFaucetAssets = client (Proxy @(SendFaucetAssetsAPI n))
}

newtype MsgApplication = MsgApplicationRequest AnyApplicationQ
deriving stock (Show)

-- | Run any query against the monitoring server.
newtype RunApplicationQ m
= RunApplicationQ (forall a. Show a => ApplicationQ a -> m a)
Expand All @@ -81,13 +86,13 @@ newtype RunApplicationQ m
newApplicationQ
:: MonadUnliftIO m
=> (forall a. ClientM a -> IO a)
-> Tracer m AnyApplicationQ
-> Tracer m MsgApplication
-> Application n
-> m (RunApplicationQ m)
newApplicationQ query tr Application{..} = pure
$ RunApplicationQ
$ \request -> do
traceWith tr (AnyApplicationQ request)
traceWith tr (MsgApplicationRequest $ AnyApplicationQ request)
case request of
SendFaucetAssetsQ assets ->
liftIO
Expand Down
Original file line number Diff line number Diff line change
@@ -1,70 +1,43 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module Cardano.Wallet.Launch.Cluster.Monitoring.Http.Client
( withHttpClient
, RunQuery (..)
, Query (..)
, MsgClient (..)
, AnyQuery (..)
)
where

import Prelude

import Cardano.Wallet.Launch.Cluster.Monitoring.Http.API
( ApiT (..)
, ObserveAPI
, ReadyAPI
, StepAPI
, SwitchAPI
)
import Cardano.Wallet.Launch.Cluster.Monitoring.Http.Application.Client
( AnyApplicationQ
( MsgApplication
, RunApplicationQ
, mkApplication
, newApplicationQ
)
import Cardano.Wallet.Launch.Cluster.Monitoring.Phase
( History
import Cardano.Wallet.Launch.Cluster.Monitoring.Http.Control.Client
( MsgControl
, RunQuery
, mkControl
, newRunQuery
)
import Cardano.Wallet.Primitive.NetworkId
( HasSNetworkId
, SNetworkId
)
import Control.Monad
( unless
)
import Control.Monad.Cont
( ContT (..)
)
import Control.Monad.IO.Class
( liftIO
)
import Control.Monitoring.Tracing
( MonitorState
)
import Control.Retry
( RetryPolicyM
, RetryStatus (..)
, capDelay
, exponentialBackoff
, recoverAll
)
import Control.Tracer
( Tracer
, traceWith
)
import Data.Functor
( ($>)
)
import Data.Functor.Contravariant
( (>$<)
)
Expand All @@ -77,93 +50,25 @@ import Network.HTTP.Client
import Network.Socket
( PortNumber
)
import Servant
( NoContent
, Proxy (..)
)
import Servant.Client
( BaseUrl (..)
, ClientM
, Scheme (..)
, client
, mkClientEnv
, runClientM
)
import UnliftIO
( MonadUnliftIO
, UnliftIO (..)
, askUnliftIO
, throwIO
)

-- | Queries that can be sent to the monitoring server via HTTP.
data Query a where
ReadyQ :: Query Bool
ObserveQ :: Query (History, MonitorState)
StepQ :: Query ()
SwitchQ :: Query MonitorState

data Client = Client
{ ready :: ClientM Bool
, observe :: ClientM (ApiT (History, MonitorState))
, step :: ClientM NoContent
, switch :: ClientM (ApiT MonitorState)
}

mkClient
:: Client
mkClient =
let ready = client (Proxy @ReadyAPI)
observe = client (Proxy @ObserveAPI)
step = client (Proxy @StepAPI)
switch = client (Proxy @SwitchAPI)
in Client{..}

-- | A showable existential wrapper around a 'Query' value, for logging purposes.
data AnyQuery = forall a. Show a => AnyQuery (Query a)

instance Show AnyQuery where
show (AnyQuery ReadyQ) = "Ready"
show (AnyQuery ObserveQ) = "Observe"
show (AnyQuery StepQ) = "Step"
show (AnyQuery SwitchQ) = "Switch"

-- | Run any query against the monitoring server.
newtype RunQuery m = RunQuery (forall a. Show a => Query a -> m a)

-- | Messages that can be logged by the http client.
data MsgClient
= MsgClientStart
| MsgClientReq AnyQuery
| MsgClientRetry AnyQuery
| MsgClienAppReq AnyApplicationQ
= MsgControl MsgControl
| MsgApplication MsgApplication
| MsgClientStart
| MsgClientDone
deriving stock (Show)

newRunQuery
:: MonadUnliftIO m
=> (forall a. ClientM a -> IO a)
-> Tracer m MsgClient
-> Client
-> m (RunQuery m)
newRunQuery query tr Client{ready, observe, step, switch} =
do
UnliftIO unlift <- askUnliftIO
pure $ RunQuery $ \request -> do
traceWith tr $ MsgClientReq $ AnyQuery request
liftIO $ case request of
ReadyQ -> recoverAll retryPolicy
$ \rt -> do
unless (firstTry rt)
$ unlift
$ traceWith tr
$ MsgClientRetry
$ AnyQuery request
query ready
ObserveQ -> unApiT <$> query observe
StepQ -> query step $> ()
SwitchQ -> unApiT <$> query switch

-- | Produce a closure over the http client of an http monitoring server that
-- can be used to query the server.
withHttpClient
Expand All @@ -189,21 +94,12 @@ withHttpClient networkId tracer httpPort = ContT $ \continue -> do
query f = do
r <- runClientM f $ mkClientEnv manager url
either throwIO pure r
runQuery <- newRunQuery query tracer mkClient
runQuery <- newRunQuery query (MsgControl >$< tracer) mkControl
runApplication <-
newApplicationQ
query
(MsgClienAppReq >$< tracer)
(MsgApplication >$< tracer)
$ mkApplication networkId
continue (runQuery, runApplication)

tr MsgClientDone

retryPolicy :: RetryPolicyM IO
retryPolicy = capDelay (60 * oneSecond) $ exponentialBackoff oneSecond
where
oneSecond = 1_000_000 :: Int

firstTry :: RetryStatus -> Bool
firstTry (RetryStatus 0 _ _) = True
firstTry _ = False

0 comments on commit 6b163b4

Please sign in to comment.