Skip to content

Commit

Permalink
Use ContT in withCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
paolino committed May 3, 2024
1 parent 53c4d1c commit 0b08671
Showing 1 changed file with 107 additions and 95 deletions.
202 changes: 107 additions & 95 deletions lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ import Control.Monad
, replicateM
, replicateM_
)
import Control.Monad.Cont
( ContT (..)
, evalContT
)
import Control.Monad.Reader
( MonadIO (..)
, MonadTrans (..)
)
import Control.Tracer
( traceWith
Expand Down Expand Up @@ -172,24 +177,25 @@ withCluster
withCluster config@Config{..} faucetFunds onClusterStart = runClusterM config
$ bracketTracer' "withCluster"
$ do
liftIO resetGlobals

let clusterDir = absDirOf cfgClusterDir
liftIO $ createDirectoryIfMissing True clusterDir

traceClusterLog $ MsgStartingCluster cfgClusterDir
traceClusterLog $ MsgHardFork cfgLastHardFork
withPoolMetadataServer $ \metadataServer -> do
liftIO $ createDirectoryIfMissing True clusterDir
traceClusterLog $ MsgStartingCluster cfgClusterDir
liftIO resetGlobals

configuredPools <- configurePools metadataServer cfgStakePools
evalContT $ do
configuredPools <- do
metadataServer <- ContT withPoolMetadataServer
lift $ configurePools metadataServer cfgStakePools

addGenesisPools <- do
genesisFiles <- lift $ do
genesisDeltas <- mapM registerViaShelleyGenesis configuredPools
pure $ foldr (.) id genesisDeltas
-- TODO (yura): Use Faucet API isntead of these fixed addresses
faucetAddresses <-
map (,Coin 1_000_000_000_000_000)
<$> readFaucetAddresses

genesisFiles <-
let addGenesisPools = foldr (.) id genesisDeltas
faucetAddresses <-
map (,Coin 1_000_000_000_000_000)
<$> readFaucetAddresses
generateGenesis
(pureAdaFunds <> faucetAddresses <> massiveWalletFunds)
(addGenesisPools : cfgShelleyGenesisMods)
Expand All @@ -201,45 +207,47 @@ withCluster config@Config{..} faucetFunds onClusterStart = runClusterM config
let pool0port :| poolPorts = NE.fromList (rotate poolsTcpPorts)
let pool0 :| otherPools = configuredPools

let pool0Cfg =
NodeParams
runningPool0 <-
ContT
$ operatePool pool0
$ NodeParams
genesisFiles
cfgLastHardFork
pool0port
cfgNodeLogging
cfgNodeOutputFile
operatePool pool0 pool0Cfg $ \runningPool0 ->
do
extraClusterSetupUsingNode configuredPools runningPool0
case NE.nonEmpty otherPools of
Nothing -> liftIO $ onClusterStart runningPool0
Just others -> do
let relayNodeParams =
NodeParams
{ nodeGenesisFiles = genesisFiles
, nodeHardForks = cfgLastHardFork
, nodePeers = (extraPort, poolsTcpPorts)
, nodeLogConfig =
LogFileConfig
{ minSeverityTerminal = Info
, extraLogDir = Nothing
, minSeverityFile = Info
}
, nodeParamsOutputFile
= cfgNodeOutputFile

lift $ extraClusterSetupUsingNode configuredPools runningPool0

case NE.nonEmpty otherPools of
Nothing -> liftIO $ onClusterStart runningPool0
Just others -> do
let relayNodeParams =
NodeParams
{ nodeGenesisFiles = genesisFiles
, nodeHardForks = cfgLastHardFork
, nodePeers = (extraPort, poolsTcpPorts)
, nodeLogConfig =
LogFileConfig
{ minSeverityTerminal = Info
, extraLogDir = Nothing
, minSeverityFile = Info
}
launchPools
, nodeParamsOutputFile =
cfgNodeOutputFile
}
_ <-
ContT
$ launchPools
others
genesisFiles
poolPorts
runningPool0
$ \_poolNode ->
withRelayNode
relayNodeParams
$ liftIO . onClusterStart
runningNode <- ContT $ withRelayNode relayNodeParams
liftIO $ onClusterStart runningNode
where
FaucetFunds pureAdaFunds maryAllegraFunds massiveWalletFunds
= faucetFunds
FaucetFunds pureAdaFunds maryAllegraFunds massiveWalletFunds =
faucetFunds
-- Important cluster setup to run without rollbacks
extraClusterSetupUsingNode
:: NonEmpty ConfiguredPool -> RunningNode -> ClusterM ()
Expand All @@ -265,8 +273,8 @@ withCluster config@Config{..} faucetFunds onClusterStart = runClusterM config
conn
(changeFileOf @"reg-tx" @"tx-body" rawTx)
[ changeFileOf @"faucet-prv" @"signing-key" faucetPrv
, changeFileOf @"stake-prv" @"signing-key" stakePrv
]
, changeFileOf @"stake-prv" @"signing-key" stakePrv
]
"pre-registered stake key"

-- Give the above txs a chance of getting included into the chain
Expand All @@ -287,63 +295,67 @@ withCluster config@Config{..} faucetFunds onClusterStart = runClusterM config
-- \^ Action to run once when the stake pools are setup.
-> ClusterM a
launchPools
configuredPools genesisFiles ports fallbackNode action = do
waitGroup <- newChan
doneGroup <- newChan
configuredPools
genesisFiles
ports
fallbackNode
action = do
waitGroup <- newChan
doneGroup <- newChan

let poolCount = length configuredPools
let poolCount = length configuredPools

let waitAll = do
traceClusterLog
$ MsgDebug "waiting for stake pools to register"
replicateM poolCount (readChan waitGroup)
let waitAll = do
traceClusterLog
$ MsgDebug "waiting for stake pools to register"
replicateM poolCount (readChan waitGroup)

let onException :: SomeException -> ClusterM ()
onException e = do
traceClusterLog
$ MsgDebug
$ "exception while starting pool: "
<> T.pack (show e)
writeChan waitGroup (Left e)
let onException :: SomeException -> ClusterM ()
onException e = do
traceClusterLog
$ MsgDebug
$ "exception while starting pool: "
<> T.pack (show e)
writeChan waitGroup (Left e)

let mkConfig (port, peers) =
NodeParams
genesisFiles
cfgLastHardFork
(port, peers)
cfgNodeLogging
cfgNodeOutputFile
asyncs <- forM (zip (NE.toList configuredPools) ports)
$ \(configuredPool, (port, peers)) -> do
async $ handle onException $ do
let cfg = mkConfig (port, peers)
operatePool configuredPool cfg $ \runningPool -> do
writeChan waitGroup $ Right runningPool
readChan doneGroup
mapM_ link asyncs
let cancelAll = do
traceWith cfgTracer $ MsgDebug "stopping all stake pools"
replicateM_ poolCount (writeChan doneGroup ())
mapM_ wait asyncs
let mkConfig (port, peers) =
NodeParams
genesisFiles
cfgLastHardFork
(port, peers)
cfgNodeLogging
cfgNodeOutputFile
asyncs <- forM (zip (NE.toList configuredPools) ports)
$ \(configuredPool, (port, peers)) -> do
async $ handle onException $ do
let cfg = mkConfig (port, peers)
operatePool configuredPool cfg $ \runningPool -> do
writeChan waitGroup $ Right runningPool
readChan doneGroup
mapM_ link asyncs
let cancelAll = do
traceWith cfgTracer $ MsgDebug "stopping all stake pools"
replicateM_ poolCount (writeChan doneGroup ())
mapM_ wait asyncs

traceClusterLog $ MsgRegisteringStakePools poolCount
group <- waitAll
if length (filter isRight group) /= poolCount
then do
liftIO cancelAll
let errors = show (filter isLeft group)
throwIO
$ ProcessHasExited
("cluster didn't start correctly: " <> errors)
(ExitFailure 1)
else do
-- Run the action using the connection to the first pool,
-- or the fallback.
let node = case group of
[] -> fallbackNode
Right firstPool : _ -> firstPool
Left e : _ -> error $ show e
action node `finally` liftIO cancelAll
traceClusterLog $ MsgRegisteringStakePools poolCount
group <- waitAll
if length (filter isRight group) /= poolCount
then do
liftIO cancelAll
let errors = show (filter isLeft group)
throwIO
$ ProcessHasExited
("cluster didn't start correctly: " <> errors)
(ExitFailure 1)
else do
-- Run the action using the connection to the first pool,
-- or the fallback.
let node = case group of
[] -> fallbackNode
Right firstPool : _ -> firstPool
Left e : _ -> error $ show e
action node `finally` liftIO cancelAll

-- \| Get permutations of the size (n-1) for a list of n elements, alongside
-- with the element left aside. `[a]` is really expected to be `Set a`.
Expand Down

0 comments on commit 0b08671

Please sign in to comment.