From 64e9677cabb418515370af3d73ce046b836bd016 Mon Sep 17 00:00:00 2001 From: Cmdv Date: Thu, 16 Mar 2023 22:11:06 +0000 Subject: [PATCH] Seperating ledgerEnv into HasLedger and NoLedger --- .../test/Test/Cardano/Db/Mock/Config.hs | 2 +- cardano-db-sync/app/cardano-db-sync.hs | 28 ++- cardano-db-sync/src/Cardano/DbSync.hs | 2 +- cardano-db-sync/src/Cardano/DbSync/Api.hs | 153 ++++++++------ .../src/Cardano/DbSync/Config/Types.hs | 2 +- .../src/Cardano/DbSync/Database.hs | 75 ++++--- cardano-db-sync/src/Cardano/DbSync/Default.hs | 102 ++++----- .../src/Cardano/DbSync/Era/Byron/Insert.hs | 5 +- .../src/Cardano/DbSync/Era/Shelley/Insert.hs | 15 +- .../DbSync/Era/Shelley/Insert/Epoch.hs | 16 +- .../src/Cardano/DbSync/LedgerState.hs | 84 ++++---- .../src/Cardano/DbSync/LocalStateQuery.hs | 42 ++-- .../src/Cardano/DbSync/Rollback.hs | 11 +- cardano-db-sync/src/Cardano/DbSync/Sync.hs | 199 ++++++++++-------- 14 files changed, 393 insertions(+), 343 deletions(-) diff --git a/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs b/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs index 816b69bf3..201983047 100644 --- a/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs +++ b/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs @@ -224,7 +224,7 @@ mkSyncNodeParams staticDir mutableDir = do , enpPGPassSource = Db.PGPassCached pgconfig , enpExtended = True , enpHasCache = True - , enpHasLedger = True + , enpShouldUseLedger = True , enpSkipFix = True , enpOnlyFix = False , enpForceIndexes = False diff --git a/cardano-db-sync/app/cardano-db-sync.hs b/cardano-db-sync/app/cardano-db-sync.hs index 1a1183cae..51adeb90c 100644 --- a/cardano-db-sync/app/cardano-db-sync.hs +++ b/cardano-db-sync/app/cardano-db-sync.hs @@ -9,6 +9,7 @@ import Cardano.Prelude import Cardano.Slotting.Slot (SlotNo (..)) import Data.String (String) import qualified Data.Text as Text +import qualified Data.Text.IO as Text import Data.Version (showVersion) import MigrationValidations (KnownMigration (..), knownMigrations) import Options.Applicative (Parser, ParserInfo) @@ -22,14 +23,27 @@ main = do case cmd of CmdVersion -> runVersionCommand CmdRun params -> do - prometheusPort <- dncPrometheusPort <$> readSyncNodeConfig (enpConfigFile params) - - withMetricSetters prometheusPort $ \metricsSetters -> - runDbSyncNode metricsSetters knownMigrationsPlain params + let maybeLedgerStateDir = enpMaybeLedgerStateDir params + case (maybeLedgerStateDir, enpShouldUseLedger params) of + (Just _, True ) -> run params + (Nothing, False ) -> run params + (Just _, False ) -> Text.putStrLn $ + "Error: Using `--dissable-ledger` doesn't require having a --state-dir. " <> moreDetails + (Nothing, True) -> Text.putStrLn $ + "Error: If not using --state-dir then make sure to have --dissable-ledger. " <> moreDetails where knownMigrationsPlain :: [(Text, Text)] knownMigrationsPlain = (\x -> (hash x, filepath x)) <$> knownMigrations + moreDetails :: Text + moreDetails = "For more details view https://github.com/input-output-hk/cardano-db-sync/blob/master/doc/configuration.md#--disable-ledger" + + run :: SyncNodeParams -> IO () + run prms = do + prometheusPort <- dncPrometheusPort <$> readSyncNodeConfig (enpConfigFile prms) + withMetricSetters prometheusPort $ \metricsSetters -> + runDbSyncNode metricsSetters knownMigrationsPlain prms + -- ------------------------------------------------------------------------------------------------- opts :: ParserInfo SyncCommand @@ -57,7 +71,7 @@ pRunDbSyncNode = <*> pPGPassSource <*> pExtended <*> pHasCache - <*> pHasLedger + <*> pUseLedger <*> pSkipFix <*> pOnlyFix <*> pForceIndexes @@ -148,8 +162,8 @@ pHasCache = <> Opt.help "Disables the db-sync caches. Reduces memory usage but it takes longer to sync." ) -pHasLedger :: Parser Bool -pHasLedger = +pUseLedger :: Parser Bool +pUseLedger = Opt.flag True False diff --git a/cardano-db-sync/src/Cardano/DbSync.hs b/cardano-db-sync/src/Cardano/DbSync.hs index 1524be865..bcf0c3552 100644 --- a/cardano-db-sync/src/Cardano/DbSync.hs +++ b/cardano-db-sync/src/Cardano/DbSync.hs @@ -141,7 +141,7 @@ startupReport :: Trace IO Text -> Bool -> SyncNodeParams -> IO () startupReport trce aop params = do logInfo trce $ mconcat ["Version number: ", Text.pack (showVersion version)] logInfo trce $ mconcat ["Git hash: ", Db.gitRev] - logInfo trce $ mconcat ["Option disable-ledger: ", textShow (not $ enpHasLedger params)] + logInfo trce $ mconcat ["Option disable-ledger: ", textShow (not $ enpShouldUseLedger params)] logInfo trce $ mconcat ["Option disable-cache: ", textShow (not $ enpHasCache params)] logInfo trce $ mconcat ["Option disable-epoch: ", textShow (not $ enpExtended params)] logInfo trce $ mconcat ["Option skip-plutus-data-fix: ", textShow (enpSkipFix params)] diff --git a/cardano-db-sync/src/Cardano/DbSync/Api.hs b/cardano-db-sync/src/Cardano/DbSync/Api.hs index 9114b8802..b02ed3ac4 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} @@ -22,6 +23,9 @@ module Cardano.DbSync.Api ( replaceConnection, verifySnapshotPoint, getBackend, + getTrace, + getTopLevelConfig, + getNetwork, hasLedgerState, getLatestPoints, getSlotHash, @@ -66,10 +70,12 @@ import Database.Persist.Postgresql (ConnectionString) import Database.Persist.Sql (SqlBackend) import Ouroboros.Consensus.Block.Abstract (HeaderHash, Point (..), fromRawHash) import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..)) +import Ouroboros.Consensus.Config (TopLevelConfig) import Ouroboros.Consensus.Node.ProtocolInfo (ProtocolInfo) import Ouroboros.Network.Block (BlockNo (..), Point (..)) import Ouroboros.Network.Magic (NetworkMagic (..)) import qualified Ouroboros.Network.Point as Point +import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus data SyncEnv = SyncEnv { envProtocol :: !SyncProtocol @@ -87,18 +93,25 @@ data SyncEnv = SyncEnv , envOfflineResultQueue :: !(StrictTBQueue IO FetchResult) , envEpochState :: !(StrictTVar IO EpochState) , envEpochSyncTime :: !(StrictTVar IO UTCTime) - , envNoLedgerEnv :: !NoLedgerStateEnv -- only used when configured without ledger state. - , envLedger :: !(Maybe LedgerEnv) + , envLedgerEnv :: !LedgerEnv } +-- A representation of if we are using a ledger or not given CLI options +data LedgerEnv where + HasLedger :: HasLedgerEnv -> LedgerEnv + NoLedger :: NoLedgerEnv -> LedgerEnv + +-- topLevelConfig :: SyncEnv -> TopLevelConfig CardanoBlock +-- topLevelConfig = Consensus.pInfoConfig . envProtocolInfo + type RunMigration = DB.MigrationToRun -> IO () data ConsistentLevel = Consistent | DBAheadOfLedger | Unchecked deriving (Show, Eq) -setConsistentLevel :: SyncEnv -> LedgerEnv -> ConsistentLevel -> IO () -setConsistentLevel env ledgerEnv cst = do - logInfo (leTrace ledgerEnv) $ "Setting ConsistencyLevel to " <> textShow cst +setConsistentLevel :: SyncEnv -> ConsistentLevel -> IO () +setConsistentLevel env cst = do + logInfo (getTrace env) $ "Setting ConsistencyLevel to " <> textShow cst atomically $ writeTVar (envConsistentLevel env) cst getConsistentLevel :: SyncEnv -> IO ConsistentLevel @@ -124,19 +137,19 @@ getRanIndexes :: SyncEnv -> IO Bool getRanIndexes env = do readTVarIO $ envIndexes env -runIndexMigrations :: SyncEnv -> LedgerEnv -> IO () -runIndexMigrations env ledgerEnv = do +runIndexMigrations :: SyncEnv -> IO () +runIndexMigrations env = do haveRan <- readTVarIO $ envIndexes env unless haveRan $ do envRunDelayedMigration env DB.Indexes - logInfo (leTrace ledgerEnv) "Indexes were created" + logInfo (getTrace env) "Indexes were created" atomically $ writeTVar (envIndexes env) True data SyncOptions = SyncOptions { soptExtended :: !Bool , soptAbortOnInvalid :: !Bool , soptCache :: !Bool - , soptLedger :: !Bool + , soptUseLedger :: !Bool , soptSkipFix :: !Bool , soptOnlyFix :: !Bool , snapshotEveryFollowing :: !Word64 @@ -184,6 +197,24 @@ generateNewEpochEvents env details = do , esEpochNo = Strict.Just currentEpochNo } +getTopLevelConfig :: SyncEnv -> TopLevelConfig CardanoBlock +getTopLevelConfig syncEnv = + case envLedgerEnv syncEnv of + HasLedger hasLedgerEnv -> Consensus.pInfoConfig $ leProtocolInfo hasLedgerEnv + NoLedger noLedgerEnv -> Consensus.pInfoConfig $ nleProtocolInfo noLedgerEnv + +getTrace :: SyncEnv -> Trace IO Text +getTrace sEnv = + case envLedgerEnv sEnv of + HasLedger hasLedgerEnv -> leTrace hasLedgerEnv + NoLedger noLedgerEnv -> nleTracer noLedgerEnv + +getNetwork :: SyncEnv -> Ledger.Network +getNetwork sEnv = + case envLedgerEnv sEnv of + HasLedger hasLedgerEnv -> leNetwork hasLedgerEnv + NoLedger noLedgerEnv -> nleNetwork noLedgerEnv + getSlotHash :: SqlBackend -> SlotNo -> IO [(SlotNo, ByteString)] getSlotHash backend = DB.runDbIohkNoLogging backend . DB.querySlotHash @@ -195,7 +226,7 @@ getBackend env = do Strict.Nothing -> panic "sql connection not initiated" hasLedgerState :: SyncEnv -> Bool -hasLedgerState = soptLedger . envOptions +hasLedgerState = soptUseLedger . envOptions getDbLatestBlockInfo :: SqlBackend -> IO (Maybe TipInfo) getDbLatestBlockInfo backend = do @@ -217,9 +248,9 @@ getDbTipBlockNo env = >>= getDbLatestBlockInfo <&> maybe Point.Origin (Point.At . bBlockNo) -logDbState :: SyncEnv -> LedgerEnv -> IO () -logDbState env ledgerEnv= do - let tracer = leTrace ledgerEnv +logDbState :: SyncEnv -> IO () +logDbState env = do + let tracer = getTrace env backend <- getBackend env mblk <- getDbLatestBlockInfo backend case mblk of @@ -257,19 +288,6 @@ mkSyncEnv :: RunMigration -> IO SyncEnv mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedgerDir ranAll forcedIndexes runMigration = do - maybeLedgerEnv <- - case maybeLedgerDir of - Nothing -> pure Nothing - Just dir -> do - Just <$> mkLedgerEnv - trce - protoInfo - dir - nw - systemStart - (soptAbortOnInvalid syncOptions) - (snapshotEveryFollowing syncOptions) - (snapshotEveryLagging syncOptions) cache <- if soptCache syncOptions then newEmptyCache 250000 else pure uninitiatedCache backendVar <- newTVarIO Strict.Nothing consistentLevelVar <- newTVarIO Unchecked @@ -279,7 +297,23 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedge orq <- newTBQueueIO 100 epochVar <- newTVarIO initEpochState epochSyncTime <- newTVarIO =<< getCurrentTime - noLegdState <- mkNoLedgerStateEnv trce systemStart + ledgerEnvType <- + case (maybeLedgerDir, soptUseLedger syncOptions) of + (Just dir, True) -> + HasLedger + <$> mkHasLedgerEnv + trce + protoInfo + dir + nw + systemStart + (soptAbortOnInvalid syncOptions) + (snapshotEveryFollowing syncOptions) + (snapshotEveryLagging syncOptions) + (_, False) -> NoLedger <$> mkNoLedgerEnv trce protoInfo nw systemStart + -- This won't ever call because we error out this combination at parse time + (Nothing, True) -> NoLedger <$> mkNoLedgerEnv trce protoInfo nw systemStart + pure $ SyncEnv { envProtocol = SyncProtocolCardano @@ -297,8 +331,7 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart maybeLedge , envOfflineResultQueue = orq , envEpochState = epochVar , envEpochSyncTime = epochSyncTime - , envNoLedgerEnv = noLegdState - , envLedger = maybeLedgerEnv + , envLedgerEnv = ledgerEnvType } mkSyncEnvFromConfig :: @@ -315,44 +348,44 @@ mkSyncEnvFromConfig trce connSring syncOptions maybeLedgerDir genCfg ranAll forc case genCfg of GenesisCardano _ bCfg sCfg _ | unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) -> - pure . Left . NECardanoConfig $ - mconcat - [ "ProtocolMagicId " - , DB.textShow (unProtocolMagicId $ Byron.configProtocolMagicId bCfg) - , " /= " - , DB.textShow (Shelley.sgNetworkMagic $ scConfig sCfg) - ] + pure . Left . NECardanoConfig $ + mconcat + [ "ProtocolMagicId " + , DB.textShow (unProtocolMagicId $ Byron.configProtocolMagicId bCfg) + , " /= " + , DB.textShow (Shelley.sgNetworkMagic $ scConfig sCfg) + ] | Byron.gdStartTime (Byron.configGenesisData bCfg) /= Shelley.sgSystemStart (scConfig sCfg) -> - pure . Left . NECardanoConfig $ - mconcat - [ "SystemStart " - , DB.textShow (Byron.gdStartTime $ Byron.configGenesisData bCfg) - , " /= " - , DB.textShow (Shelley.sgSystemStart $ scConfig sCfg) - ] + pure . Left . NECardanoConfig $ + mconcat + [ "SystemStart " + , DB.textShow (Byron.gdStartTime $ Byron.configGenesisData bCfg) + , " /= " + , DB.textShow (Shelley.sgSystemStart $ scConfig sCfg) + ] | otherwise -> - Right - <$> mkSyncEnv - trce - connSring - syncOptions - (mkProtocolInfoCardano genCfg []) - (Shelley.sgNetworkId $ scConfig sCfg) - (NetworkMagic . unProtocolMagicId $ Byron.configProtocolMagicId bCfg) - (SystemStart . Byron.gdStartTime $ Byron.configGenesisData bCfg) - maybeLedgerDir - ranAll - forcedIndexes - runMigration + Right + <$> mkSyncEnv + trce + connSring + syncOptions + (mkProtocolInfoCardano genCfg []) + (Shelley.sgNetworkId $ scConfig sCfg) + (NetworkMagic . unProtocolMagicId $ Byron.configProtocolMagicId bCfg) + (SystemStart . Byron.gdStartTime $ Byron.configGenesisData bCfg) + maybeLedgerDir + ranAll + forcedIndexes + runMigration -- | 'True' is for in memory points and 'False' for on disk getLatestPoints :: SyncEnv -> IO [(CardanoPoint, Bool)] getLatestPoints env = do - case envLedger env of - Just ledgerEnv -> do - snapshotPoints <- listKnownSnapshots ledgerEnv + case envLedgerEnv env of + HasLedger hasLedgerEnv -> do + snapshotPoints <- listKnownSnapshots hasLedgerEnv verifySnapshotPoint env snapshotPoints - Nothing -> do + NoLedger _ -> do -- Brings the 5 latest. dbBackend <- getBackend env lastPoints <- DB.runDbIohkNoLogging dbBackend DB.queryLatestPoints diff --git a/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs b/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs index f8cb814a1..6bf559351 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs @@ -61,7 +61,7 @@ data SyncNodeParams = SyncNodeParams , enpPGPassSource :: !PGPassSource , enpExtended :: !Bool , enpHasCache :: !Bool - , enpHasLedger :: !Bool + , enpShouldUseLedger :: !Bool , enpSkipFix :: !Bool , enpOnlyFix :: !Bool , enpForceIndexes :: !Bool diff --git a/cardano-db-sync/src/Cardano/DbSync/Database.hs b/cardano-db-sync/src/Cardano/DbSync/Database.hs index 1f9d83d38..46d4b79ce 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Database.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Database.hs @@ -40,23 +40,22 @@ data NextState runDbThread :: SyncEnv -> - LedgerEnv -> MetricSetters -> DbActionQueue -> IO () -runDbThread syncEnv ledgerEnv metricsSetters queue = do +runDbThread syncEnv metricsSetters queue = do logInfo trce "Running DB thread" logException trce "runDBThread: " loop logInfo trce "Shutting down DB thread" where - trce = leTrace ledgerEnv + trce = getTrace syncEnv loop = do xs <- blockingFlushDbActionQueue queue when (length xs > 1) $ do logDebug trce $ "runDbThread: " <> textShow (length xs) <> " blocks" - eNextState <- runExceptT $ runActions syncEnv ledgerEnv xs + eNextState <- runExceptT $ runActions syncEnv xs backend <- getBackend syncEnv mBlock <- getDbLatestBlockInfo backend @@ -73,10 +72,9 @@ runDbThread syncEnv ledgerEnv metricsSetters queue = do -- and other operations are applied one-by-one. runActions :: SyncEnv -> - LedgerEnv -> [DbAction] -> ExceptT SyncNodeError IO NextState -runActions env ledgerEnv actions = do +runActions env actions = do dbAction Continue actions where dbAction :: NextState -> [DbAction] -> ExceptT SyncNodeError IO NextState @@ -87,21 +85,24 @@ runActions env ledgerEnv actions = do ([], DbFinish : _) -> do pure Done ([], DbRollBackToPoint chainSyncPoint serverTip resultVar : ys) -> do - deletedAllBlocks <- newExceptT $ prepareRollback env ledgerEnv chainSyncPoint serverTip - points <- liftIO $ rollbackLedger env ledgerEnv chainSyncPoint + deletedAllBlocks <- newExceptT $ prepareRollback env chainSyncPoint serverTip + points <- + if hasLedgerState env + then lift $ rollbackLedger env chainSyncPoint + else pure Nothing -- Ledger state always rollbacks at least back to the 'point' given by the Node. -- It needs to rollback even further, if 'points' is not 'Nothing'. -- The db may not rollback to the Node point. case (deletedAllBlocks, points) of (True, Nothing) -> do - liftIO $ setConsistentLevel env ledgerEnv Consistent - liftIO $ validateConsistentLevel env ledgerEnv chainSyncPoint + liftIO $ setConsistentLevel env Consistent + liftIO $ validateConsistentLevel env chainSyncPoint (False, Nothing) -> do - liftIO $ setConsistentLevel env ledgerEnv DBAheadOfLedger - liftIO $ validateConsistentLevel env ledgerEnv chainSyncPoint + liftIO $ setConsistentLevel env DBAheadOfLedger + liftIO $ validateConsistentLevel env chainSyncPoint _ -> -- No need to validate here - liftIO $ setConsistentLevel env ledgerEnv DBAheadOfLedger + liftIO $ setConsistentLevel env DBAheadOfLedger blockNo <- lift $ getDbTipBlockNo env lift $ atomically $ putTMVar resultVar (points, blockNo) dbAction Continue ys @@ -111,30 +112,34 @@ runActions env ledgerEnv actions = do then pure Continue else dbAction Continue zs -rollbackLedger :: SyncEnv -> LedgerEnv ->CardanoPoint -> IO (Maybe [CardanoPoint]) -rollbackLedger syncEnv ledgerEnv point = do - mst <- loadLedgerAtPoint ledgerEnv point - case mst of - Right st -> do - let statePoint = headerStatePoint $ headerState $ clsState st - -- This is an extra validation that should always succeed. - unless (point == statePoint) $ - logAndPanic (leTrace ledgerEnv) $ - mconcat - [ "Ledger " - , textShow statePoint - , " and ChainSync " - , textShow point - , " don't match." - ] - pure Nothing - Left lsfs -> - Just . fmap fst <$> verifySnapshotPoint syncEnv (OnDisk <$> lsfs) +rollbackLedger :: SyncEnv -> CardanoPoint -> IO (Maybe [CardanoPoint]) +rollbackLedger syncEnv point = + case envLedgerEnv syncEnv of + HasLedger hle -> do + mst <- loadLedgerAtPoint hle point + case mst of + Right st -> do + let statePoint = headerStatePoint $ headerState $ clsState st + -- This is an extra validation that should always succeed. + unless (point == statePoint) $ + logAndPanic (getTrace syncEnv) $ + mconcat + [ "Ledger " + , textShow statePoint + , " and ChainSync " + , textShow point + , " don't match." + ] + pure Nothing + Left lsfs -> + Just . fmap fst <$> verifySnapshotPoint syncEnv (OnDisk <$> lsfs) + -- TODO: Vince + NoLedger _ -> pure Nothing -- | This not only checks that the ledger and ChainSync points are equal, but also that the -- 'Consistent' Level is correct based on the db tip. -validateConsistentLevel :: SyncEnv -> LedgerEnv -> CardanoPoint -> IO () -validateConsistentLevel syncEnv ledgerEnv stPoint = do +validateConsistentLevel :: SyncEnv -> CardanoPoint -> IO () +validateConsistentLevel syncEnv stPoint = do backend <- getBackend syncEnv dbTipInfo <- getDbLatestBlockInfo backend cLevel <- getConsistentLevel syncEnv @@ -155,7 +160,7 @@ validateConsistentLevel syncEnv ledgerEnv stPoint = do logAndPanic tracer $ "Unexpected Consistent Level. " <> showContext dbTip cLevel - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv showContext dbTip cLevel = mconcat [ "Ledger state point is " diff --git a/cardano-db-sync/src/Cardano/DbSync/Default.hs b/cardano-db-sync/src/Cardano/DbSync/Default.hs index 25c518e53..da6ca0c95 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Default.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Default.hs @@ -54,81 +54,67 @@ insertListBlocks :: SyncEnv -> [CardanoBlock] -> IO (Either SyncNodeError ()) -insertListBlocks env blocks = - case envLedger env of - Just envL -> do - backend <- getBackend env - DB.runDbIohkLogging backend (leTrace envL) - . runExceptT - $ do - traverse_ (applyAndInsertBlockMaybe env) blocks - -- TODO: Vince: is this right??? - Nothing -> pure $ Left $ NEError "" +insertListBlocks env blocks = do + backend <- getBackend env + DB.runDbIohkLogging backend tracer + . runExceptT + $ do + traverse_ (applyAndInsertBlockMaybe env) blocks + where + tracer = getTrace env applyAndInsertBlockMaybe :: - SyncEnv -> - CardanoBlock -> - ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) () -applyAndInsertBlockMaybe env cblk = do + SyncEnv -> CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) () +applyAndInsertBlockMaybe syncEnv cblk = do (!applyRes, !tookSnapshot) <- liftIO mkApplyResult - bl <- liftIO $ isConsistent env - - -- TODO: VINCE: not sure this is right but `logInfo (leTrace envL)` was needed and it can only be access - -- when there is a LedgerEnv. - case envLedger env of - Nothing -> pure () - Just envL -> - if bl - then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block - insertBlock env envL cblk applyRes False tookSnapshot - else do - blockIsInDbAlready <- lift (isRight <$> DB.queryBlockId (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk)) - -- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or - -- equal, insert the block and restore consistency between ledger and db. - unless blockIsInDbAlready $ do - liftIO . logInfo (leTrace envL) $ - mconcat - [ "Received block which is not in the db with " - , textShow (getHeaderFields cblk) - , ". Time to restore consistency." - ] - rollbackFromBlockNo env envL (blockNo cblk) - insertBlock env envL cblk applyRes True tookSnapshot - liftIO $ setConsistentLevel env envL Consistent - - + bl <- liftIO $ isConsistent syncEnv + if bl + then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block + insertBlock syncEnv cblk applyRes False tookSnapshot + else do + blockIsInDbAlready <- lift (isRight <$> DB.queryBlockId (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk)) + -- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or + -- equal, insert the block and restore consistency between ledger and db. + unless blockIsInDbAlready $ do + liftIO . logInfo tracer $ + mconcat + [ "Received block which is not in the db with " + , textShow (getHeaderFields cblk) + , ". Time to restore consistency." + ] + rollbackFromBlockNo syncEnv (blockNo cblk) + insertBlock syncEnv cblk applyRes True tookSnapshot + liftIO $ setConsistentLevel syncEnv Consistent where - -- TODO: VINCE: here used to be ` if hasLedgerState env ...` - -- but in the context above it's not clear if this is right or not + tracer = getTrace syncEnv + mkApplyResult :: IO (ApplyResult, Bool) mkApplyResult = do - case envLedger env of - Just envL -> applyBlockAndSnapshot envL cblk - Nothing -> do - slotDetails <- getSlotDetailsNode (envNoLedgerEnv env) (cardanoBlockSlotNo cblk) + case envLedgerEnv syncEnv of + HasLedger hle -> applyBlockAndSnapshot hle cblk + NoLedger nle -> do + slotDetails <- getSlotDetailsNode nle (cardanoBlockSlotNo cblk) pure (defaultApplyResult slotDetails, False) insertBlock :: SyncEnv -> - LedgerEnv -> CardanoBlock -> ApplyResult -> Bool -> Bool -> ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) () -insertBlock syncEnv ledgerEnv cblk applyRes firstAfterRollback tookSnapshot = do +insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do !epochEvents <- liftIO $ atomically $ generateNewEpochEvents syncEnv (apSlotDetails applyRes) let !applyResult = applyRes {apEvents = sort $ epochEvents <> apEvents applyRes} let !details = apSlotDetails applyResult let !withinTwoMin = isWithinTwoMin details let !withinHalfHour = isWithinHalfHour details - insertLedgerEvents syncEnv ledgerEnv (sdEpochNo details) (apEvents applyResult) + insertLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult) let shouldLog = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback let isMember poolId = Set.member poolId (apPoolsRegistered applyResult) let insertShelley blk = insertShelleyBlock syncEnv - ledgerEnv shouldLog withinTwoMin withinHalfHour @@ -140,7 +126,7 @@ insertBlock syncEnv ledgerEnv cblk applyRes firstAfterRollback tookSnapshot = do case cblk of BlockByron blk -> newExceptT $ - insertByronBlock syncEnv ledgerEnv shouldLog blk details + insertByronBlock syncEnv shouldLog blk details BlockShelley blk -> newExceptT $ insertShelley $ @@ -163,8 +149,9 @@ insertBlock syncEnv ledgerEnv cblk applyRes firstAfterRollback tookSnapshot = do Generic.fromBabbageBlock (getPrices applyResult) blk insertEpoch details lift $ commitOrIndexes withinTwoMin withinHalfHour + where - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv insertEpoch details = when (soptExtended $ envOptions syncEnv) @@ -189,7 +176,7 @@ insertBlock syncEnv ledgerEnv cblk applyRes firstAfterRollback tookSnapshot = do ranIndexes <- liftIO $ getRanIndexes syncEnv unless ranIndexes $ do unless commited DB.transactionCommit - liftIO $ runIndexMigrations syncEnv ledgerEnv + liftIO $ runIndexMigrations syncEnv isWithinTwoMin :: SlotDetails -> Bool isWithinTwoMin sd = isSyncedWithinSeconds sd 120 == SyncFollowing @@ -202,16 +189,15 @@ insertBlock syncEnv ledgerEnv cblk applyRes firstAfterRollback tookSnapshot = do insertLedgerEvents :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> - LedgerEnv -> EpochNo -> [LedgerEvent] -> ExceptT SyncNodeError (ReaderT SqlBackend m) () -insertLedgerEvents syncEnv ledgerEnv currentEpochNo@(EpochNo curEpoch) = +insertLedgerEvents syncEnv currentEpochNo@(EpochNo curEpoch) = mapM_ handler where - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv cache = envCache syncEnv - ntw = leNetwork ledgerEnv + ntw = getNetwork syncEnv subFromCurrentEpoch :: Word64 -> EpochNo subFromCurrentEpoch m = @@ -261,7 +247,7 @@ insertLedgerEvents syncEnv ledgerEnv currentEpochNo@(EpochNo curEpoch) = liftIO . logInfo tracer $ "Inserted " <> show (length rewards) <> " Mir rewards" LedgerPoolReap en drs -> do unless (Map.null $ Generic.unRewards drs) $ do - insertPoolDepositRefunds syncEnv ledgerEnv en drs + insertPoolDepositRefunds syncEnv en drs hasEpochStartEvent :: [LedgerEvent] -> Bool hasEpochStartEvent = any isNewEpoch diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs index 0be494f91..026e358ff 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs @@ -50,12 +50,11 @@ data ValueFee = ValueFee insertByronBlock :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> - LedgerEnv -> Bool -> ByronBlock -> SlotDetails -> ReaderT SqlBackend m (Either SyncNodeError ()) -insertByronBlock syncEnv ledgerEnv firstBlockOfEpoch blk details = do +insertByronBlock syncEnv firstBlockOfEpoch blk details = do res <- runExceptT $ case byronBlockRaw blk of Byron.ABOBBlock ablk -> insertABlock tracer cache firstBlockOfEpoch ablk details @@ -68,7 +67,7 @@ insertByronBlock syncEnv ledgerEnv firstBlockOfEpoch blk details = do pure res where tracer :: Trace IO Text - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv cache :: Cache cache = envCache syncEnv diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs index 639f3295f..505268067 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs @@ -74,7 +74,6 @@ type IsPoolMember = PoolKeyHash -> Bool insertShelleyBlock :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> - LedgerEnv -> Bool -> Bool -> Bool -> @@ -84,7 +83,7 @@ insertShelleyBlock :: Strict.Maybe Generic.NewEpoch -> Generic.StakeSliceRes -> ReaderT SqlBackend m (Either SyncNodeError ()) -insertShelleyBlock env ledgerEnv shouldLog withinTwoMins withinHalfHour blk details isMember mNewEpoch stakeSlice = do +insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details isMember mNewEpoch stakeSlice = do runExceptT $ do pbid <- case Generic.blkPreviousHash blk of Nothing -> liftLookupFail (renderErrorMessage (Generic.blkEra blk)) DB.queryGenesis -- this is for networks that fork from Byron on epoch 0. @@ -114,7 +113,7 @@ insertShelleyBlock env ledgerEnv shouldLog withinTwoMins withinHalfHour blk deta } let zippedTx = zip [0 ..] (Generic.blkTxs blk) - let txInserter = insertTx tracer cache (leNetwork ledgerEnv) isMember blkId (sdEpochNo details) (Generic.blkSlotNo blk) + let txInserter = insertTx tracer cache (getNetwork syncEnv) isMember blkId (sdEpochNo details) (Generic.blkSlotNo blk) grouped <- foldM (\grouped (idx, tx) -> txInserter idx tx grouped) mempty zippedTx minIds <- insertBlockGroupedData tracer grouped when withinHalfHour $ @@ -152,13 +151,13 @@ insertShelleyBlock env ledgerEnv shouldLog withinTwoMins withinHalfHour blk deta whenStrictJust mNewEpoch $ \newEpoch -> do insertOnNewEpoch tracer blkId (Generic.blkSlotNo blk) (sdEpochNo details) newEpoch - insertStakeSlice env ledgerEnv stakeSlice + insertStakeSlice syncEnv stakeSlice when (unBlockNo (Generic.blkBlockNo blk) `mod` offlineModBase == 0) . lift $ do - insertOfflineResults tracer (envOfflineResultQueue env) - loadOfflineWorkQueue tracer (envOfflineWorkQueue env) + insertOfflineResults tracer (envOfflineResultQueue syncEnv) + loadOfflineWorkQueue tracer (envOfflineWorkQueue syncEnv) where logger :: Trace IO a -> a -> IO () logger @@ -181,10 +180,10 @@ insertShelleyBlock env ledgerEnv shouldLog withinTwoMins withinHalfHour blk deta offlineModBase = if withinTwoMins then 10 else 2000 tracer :: Trace IO Text - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv cache :: Cache - cache = envCache env + cache = envCache syncEnv -- ----------------------------------------------------------------------------- diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Epoch.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Epoch.hs index 0ecc779d6..769f6d505 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Epoch.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Epoch.hs @@ -39,21 +39,20 @@ import Database.Persist.Sql (SqlBackend) insertStakeSlice :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> - LedgerEnv -> Generic.StakeSliceRes -> ExceptT SyncNodeError (ReaderT SqlBackend m) () -insertStakeSlice _ _ Generic.NoSlices = pure () -insertStakeSlice syncEnv ledgerEnv (Generic.Slice slice finalSlice) = do +insertStakeSlice _ Generic.NoSlices = pure () +insertStakeSlice syncEnv (Generic.Slice slice finalSlice) = do insertEpochStake (envCache syncEnv) network (Generic.sliceEpochNo slice) (Map.toList $ Generic.sliceDistr slice) when finalSlice $ do size <- lift $ DB.queryEpochStakeCount (unEpochNo $ Generic.sliceEpochNo slice) liftIO . logInfo tracer $ mconcat ["Inserted ", show size, " EpochStake for ", show (Generic.sliceEpochNo slice)] where tracer :: Trace IO Text - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv network :: Network - network = leNetwork ledgerEnv + network = getNetwork syncEnv insertEpochStake :: (MonadBaseControl IO m, MonadIO m) => @@ -137,17 +136,16 @@ insertRewards nw earnedEpoch spendableEpoch cache rewardsChunk = do insertPoolDepositRefunds :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> - LedgerEnv -> EpochNo -> Generic.Rewards -> ExceptT SyncNodeError (ReaderT SqlBackend m) () -insertPoolDepositRefunds syncEnv ledgerEnv epochNo refunds = do +insertPoolDepositRefunds syncEnv epochNo refunds = do insertRewards nw epochNo epochNo (envCache syncEnv) (Map.toList rwds) liftIO . logInfo tracer $ "Inserted " <> show (Generic.rewardsCount refunds) <> " deposit refund rewards" where - tracer = leTrace ledgerEnv + tracer = getTrace syncEnv rwds = Generic.unRewards refunds - nw = leNetwork ledgerEnv + nw = getNetwork syncEnv sumRewardTotal :: Map StakeCred (Set Generic.Reward) -> Shelley.Coin sumRewardTotal = diff --git a/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs b/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs index 44f8496cc..ec6f0e086 100644 --- a/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs +++ b/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs @@ -11,14 +11,14 @@ module Cardano.DbSync.LedgerState ( CardanoLedgerState (..), - LedgerEnv (..), + HasLedgerEnv (..), LedgerEvent (..), ApplyResult (..), LedgerStateFile (..), SnapshotPoint (..), applyBlock, defaultApplyResult, - mkLedgerEnv, + mkHasLedgerEnv, applyBlockAndSnapshot, listLedgerStateFilesOrdered, listKnownSnapshots, @@ -138,7 +138,7 @@ import Prelude (String, fail, id) {- HLINT ignore "Reduce duplication" -} {- HLINT ignore "Use readTVarIO" -} -data LedgerEnv = LedgerEnv +data HasLedgerEnv = HasLedgerEnv { leTrace :: Trace IO Text , leProtocolInfo :: !(Consensus.ProtocolInfo IO CardanoBlock) , leDir :: !LedgerStateDir @@ -151,9 +151,6 @@ data LedgerEnv = LedgerEnv , leStateVar :: !(StrictTVar IO (Strict.Maybe LedgerDB)) } -topLevelConfig :: LedgerEnv -> TopLevelConfig CardanoBlock -topLevelConfig = Consensus.pInfoConfig . leProtocolInfo - data CardanoLedgerState = CardanoLedgerState { clsState :: !(ExtLedgerState CardanoBlock) , clsEpochBlockNo :: !EpochBlockNo @@ -251,7 +248,7 @@ instance Anchorable (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState wh ledgerDbCurrent :: LedgerDB -> CardanoLedgerState ledgerDbCurrent = either id id . AS.head . ledgerDbCheckpoints -mkLedgerEnv :: +mkHasLedgerEnv :: Trace IO Text -> Consensus.ProtocolInfo IO CardanoBlock -> LedgerStateDir -> @@ -260,14 +257,14 @@ mkLedgerEnv :: Bool -> Word64 -> Word64 -> - IO LedgerEnv -mkLedgerEnv trce protocolInfo dir nw systemStart aop snapshotEveryFollowing snapshotEveryLagging = do + IO HasLedgerEnv +mkHasLedgerEnv trce protoInfo dir nw systemStart aop snapshotEveryFollowing snapshotEveryLagging = do svar <- newTVarIO Strict.Nothing intervar <- newTVarIO Strict.Nothing pure - LedgerEnv + HasLedgerEnv { leTrace = trce - , leProtocolInfo = protocolInfo + , leProtocolInfo = protoInfo , leDir = dir , leNetwork = nw , leSystemStart = systemStart @@ -285,16 +282,19 @@ initCardanoLedgerState pInfo = , clsEpochBlockNo = GenesisEpochBlockNo } +getTopLevelconfigHasLedger :: HasLedgerEnv -> TopLevelConfig CardanoBlock +getTopLevelconfigHasLedger = Consensus.pInfoConfig . leProtocolInfo + -- TODO make this type safe. We make the assumption here that the first message of -- the chainsync protocol is 'RollbackTo'. -readStateUnsafe :: LedgerEnv -> STM LedgerDB +readStateUnsafe :: HasLedgerEnv -> STM LedgerDB readStateUnsafe env = do mState <- readTVar $ leStateVar env case mState of Strict.Nothing -> panic "LedgerState.readStateUnsafe: Ledger state is not found" Strict.Just st -> pure st -applyBlockAndSnapshot :: LedgerEnv -> CardanoBlock -> IO (ApplyResult, Bool) +applyBlockAndSnapshot :: HasLedgerEnv -> CardanoBlock -> IO (ApplyResult, Bool) applyBlockAndSnapshot ledgerEnv blk = do (oldState, appResult) <- applyBlock ledgerEnv blk tookSnapshot <- storeSnapshotAndCleanupMaybe ledgerEnv oldState appResult (blockNo blk) (isSyncedWithinSeconds (apSlotDetails appResult) 600) @@ -303,13 +303,13 @@ applyBlockAndSnapshot ledgerEnv blk = do -- The function 'tickThenReapply' does zero validation, so add minimal validation ('blockPrevHash' -- matches the tip hash of the 'LedgerState'). This was originally for debugging but the check is -- cheap enough to keep. -applyBlock :: LedgerEnv -> CardanoBlock -> IO (CardanoLedgerState, ApplyResult) +applyBlock :: HasLedgerEnv -> CardanoBlock -> IO (CardanoLedgerState, ApplyResult) applyBlock env blk = do time <- getCurrentTime atomically $ do !ledgerDB <- readStateUnsafe env let oldState = ledgerDbCurrent ledgerDB - let !result = applyBlk (ExtLedgerCfg (topLevelConfig env)) blk (clsState oldState) + let !result = applyBlk (ExtLedgerCfg (getTopLevelconfigHasLedger env)) blk (clsState oldState) let !newLedgerState = lrResult result !details <- getSlotDetails env (ledgerState newLedgerState) time (cardanoBlockSlotNo blk) let !newEpoch = mkNewEpoch (clsState oldState) newLedgerState @@ -375,7 +375,7 @@ applyBlock env blk = do _ -> Generic.NoSlices storeSnapshotAndCleanupMaybe :: - LedgerEnv -> + HasLedgerEnv -> CardanoLedgerState -> ApplyResult -> BlockNo -> @@ -401,7 +401,7 @@ storeSnapshotAndCleanupMaybe env oldState appResult blkNo syncState = (SyncFollowing, bno) -> bno `mod` leSnapshotEveryFollowing env == 0 (SyncLagging, bno) -> bno `mod` leSnapshotEveryLagging env == 0 -saveCurrentLedgerState :: LedgerEnv -> CardanoLedgerState -> Maybe EpochNo -> IO () +saveCurrentLedgerState :: HasLedgerEnv -> CardanoLedgerState -> Maybe EpochNo -> IO () saveCurrentLedgerState env ledger mEpochNo = do case mkLedgerStateFilename (leDir env) (clsState ledger) mEpochNo of Origin -> pure () -- we don't store genesis @@ -436,14 +436,14 @@ saveCurrentLedgerState env ledger mEpochNo = do ] where codecConfig :: CodecConfig CardanoBlock - codecConfig = configCodec (topLevelConfig env) + codecConfig = configCodec (getTopLevelconfigHasLedger env) mkLedgerStateFilename :: LedgerStateDir -> ExtLedgerState CardanoBlock -> Maybe EpochNo -> WithOrigin FilePath mkLedgerStateFilename dir ledger mEpochNo = lsfFilePath . dbPointToFileName dir mEpochNo <$> getPoint (ledgerTipPoint (Proxy @CardanoBlock) (ledgerState ledger)) -saveCleanupState :: LedgerEnv -> CardanoLedgerState -> Maybe EpochNo -> IO () +saveCleanupState :: HasLedgerEnv -> CardanoLedgerState -> Maybe EpochNo -> IO () saveCleanupState env ledger mEpochNo = do let st = clsState ledger saveCurrentLedgerState env ledger mEpochNo @@ -508,7 +508,7 @@ parseLedgerStateFileName (LedgerStateDir stateDir) fp = -- ------------------------------------------------------------------------------------------------- -cleanupLedgerStateFiles :: LedgerEnv -> SlotNo -> IO () +cleanupLedgerStateFiles :: HasLedgerEnv -> SlotNo -> IO () cleanupLedgerStateFiles env slotNo = do files <- listLedgerStateFilesOrdered (leDir env) let (epochBoundary, valid, invalid) = foldr groupFiles ([], [], []) files @@ -531,9 +531,9 @@ cleanupLedgerStateFiles env slotNo = do | otherwise = (epochBoundary, lFile : regularFile, invalid) -loadLedgerAtPoint :: LedgerEnv -> CardanoPoint -> IO (Either [LedgerStateFile] CardanoLedgerState) -loadLedgerAtPoint env point = do - mLedgerDB <- atomically $ readTVar $ leStateVar env +loadLedgerAtPoint :: HasLedgerEnv -> CardanoPoint -> IO (Either [LedgerStateFile] CardanoLedgerState) +loadLedgerAtPoint hasLedgerEnv point = do + mLedgerDB <- atomically $ readTVar $ leStateVar hasLedgerEnv -- First try to find the ledger in memory let mAnchoredSeq = rollbackLedger mLedgerDB case mAnchoredSeq of @@ -541,21 +541,21 @@ loadLedgerAtPoint env point = do -- Ledger states are growing to become very big in memory. -- Before parsing the new ledger state we need to make sure the old states -- are or can be garbage collected. - writeLedgerState env Strict.Nothing + writeLedgerState hasLedgerEnv Strict.Nothing performMajorGC - mst <- findStateFromPoint env point + mst <- findStateFromPoint hasLedgerEnv point case mst of Right st -> do - writeLedgerState env (Strict.Just . LedgerDB $ AS.Empty st) - logInfo (leTrace env) $ mconcat ["Found snapshot file for ", renderPoint point] + writeLedgerState hasLedgerEnv (Strict.Just . LedgerDB $ AS.Empty st) + logInfo (leTrace hasLedgerEnv) $ mconcat ["Found snapshot file for ", renderPoint point] pure $ Right st Left lsfs -> pure $ Left lsfs Just anchoredSeq' -> do - logInfo (leTrace env) $ mconcat ["Found in memory ledger snapshot at ", renderPoint point] + logInfo (leTrace hasLedgerEnv) $ mconcat ["Found in memory ledger snapshot at ", renderPoint point] let ledgerDB' = LedgerDB anchoredSeq' let st = ledgerDbCurrent ledgerDB' - deleteNewerFiles env point - writeLedgerState env $ Strict.Just ledgerDB' + deleteNewerFiles hasLedgerEnv point + writeLedgerState hasLedgerEnv $ Strict.Just ledgerDB' pure $ Right st where rollbackLedger :: @@ -566,7 +566,7 @@ loadLedgerAtPoint env point = do Strict.Just ledgerDB -> AS.rollback (pointSlot point) (const True) (ledgerDbCheckpoints ledgerDB) -deleteNewerFiles :: LedgerEnv -> CardanoPoint -> IO () +deleteNewerFiles :: HasLedgerEnv -> CardanoPoint -> IO () deleteNewerFiles env point = do files <- listLedgerStateFilesOrdered (leDir env) -- Genesis can be reproduced from configuration. @@ -579,7 +579,7 @@ deleteNewerFiles env point = do findLedgerStateFile files (Point.blockPointSlot blk, mkRawHash $ Point.blockPointHash blk) deleteAndLogStateFile env "newer" newerFiles -deleteAndLogFiles :: LedgerEnv -> Text -> [FilePath] -> IO () +deleteAndLogFiles :: HasLedgerEnv -> Text -> [FilePath] -> IO () deleteAndLogFiles env descr files = case files of [] -> pure () @@ -590,10 +590,10 @@ deleteAndLogFiles env descr files = logInfo (leTrace env) $ mconcat ["Removing ", descr, " files ", textShow files] mapM_ safeRemoveFile files -deleteAndLogStateFile :: LedgerEnv -> Text -> [LedgerStateFile] -> IO () +deleteAndLogStateFile :: HasLedgerEnv -> Text -> [LedgerStateFile] -> IO () deleteAndLogStateFile env descr lsfs = deleteAndLogFiles env descr (lsfFilePath <$> lsfs) -findStateFromPoint :: LedgerEnv -> CardanoPoint -> IO (Either [LedgerStateFile] CardanoLedgerState) +findStateFromPoint :: HasLedgerEnv -> CardanoPoint -> IO (Either [LedgerStateFile] CardanoLedgerState) findStateFromPoint env point = do files <- listLedgerStateFilesOrdered (leDir env) -- Genesis can be reproduced from configuration. @@ -608,7 +608,7 @@ findStateFromPoint env point = do deleteAndLogStateFile env "newer" newerFiles case found of Just lsf -> do - mState <- loadLedgerStateFromFile (leTrace env) (topLevelConfig env) False point lsf + mState <- loadLedgerStateFromFile (leTrace env) (getTopLevelconfigHasLedger env) False point lsf case mState of Left err -> do deleteLedgerFile err lsf @@ -725,13 +725,13 @@ getSlotNoSnapshot :: SnapshotPoint -> WithOrigin SlotNo getSlotNoSnapshot (OnDisk lsf) = at $ lsfSlotNo lsf getSlotNoSnapshot (InMemory cp) = pointSlot cp -listKnownSnapshots :: LedgerEnv -> IO [SnapshotPoint] +listKnownSnapshots :: HasLedgerEnv -> IO [SnapshotPoint] listKnownSnapshots env = do inMem <- fmap InMemory <$> listMemorySnapshots env onDisk <- fmap OnDisk <$> listLedgerStateFilesOrdered (leDir env) pure $ reverse $ List.sortOn getSlotNoSnapshot $ inMem <> onDisk -listMemorySnapshots :: LedgerEnv -> IO [CardanoPoint] +listMemorySnapshots :: HasLedgerEnv -> IO [CardanoPoint] listMemorySnapshots env = do mState <- atomically $ readTVar $ leStateVar env case mState of @@ -762,7 +762,7 @@ listLedgerStateFilesOrdered dir = do revSlotNoOrder :: LedgerStateFile -> LedgerStateFile -> Ordering revSlotNoOrder a b = compare (lsfSlotNo b) (lsfSlotNo a) -writeLedgerState :: LedgerEnv -> Strict.Maybe LedgerDB -> IO () +writeLedgerState :: HasLedgerEnv -> Strict.Maybe LedgerDB -> IO () writeLedgerState env mLedgerDb = atomically $ writeTVar (leStateVar env) mLedgerDb -- | Remove given file path and ignore any IOEXceptions. @@ -805,7 +805,7 @@ getAdaPots st = LedgerStateAlonzo sta -> Just $ totalAdaPots sta LedgerStateBabbage stb -> Just $ totalAdaPots stb -ledgerEpochNo :: LedgerEnv -> ExtLedgerState CardanoBlock -> EpochNo +ledgerEpochNo :: HasLedgerEnv -> ExtLedgerState CardanoBlock -> EpochNo ledgerEpochNo env cls = case ledgerTipSlot (ledgerState cls) of Origin -> 0 -- An empty chain is in epoch 0 @@ -815,7 +815,7 @@ ledgerEpochNo env cls = Right en -> en where epochInfo :: EpochInfo (Except Consensus.PastHorizonException) - epochInfo = epochInfoLedger (configLedger $ topLevelConfig env) (hardForkLedgerStatePerEra $ ledgerState cls) + epochInfo = epochInfoLedger (configLedger $ getTopLevelconfigHasLedger env) (hardForkLedgerStatePerEra $ ledgerState cls) -- Like 'Consensus.tickThenReapply' but also checks that the previous hash from the block matches -- the head hash of the ledger state. @@ -851,7 +851,7 @@ totalAdaPots = Shelley.totalAdaPotsES . Shelley.nesEs . Consensus.shelleyLedgerS getHeaderHash :: HeaderHash CardanoBlock -> ByteString getHeaderHash bh = SBS.fromShort (Consensus.getOneEraHash bh) -getSlotDetails :: LedgerEnv -> LedgerState CardanoBlock -> UTCTime -> SlotNo -> STM SlotDetails +getSlotDetails :: HasLedgerEnv -> LedgerState CardanoBlock -> UTCTime -> SlotNo -> STM SlotDetails getSlotDetails env st time slot = do minter <- readTVar $ leInterpreter env details <- case minter of @@ -861,7 +861,7 @@ getSlotDetails env st time slot = do Strict.Nothing -> queryNewInterpreter pure $ details {sdCurrentTime = time} where - hfConfig = configLedger $ Consensus.pInfoConfig (leProtocolInfo env) + hfConfig = configLedger $ getTopLevelconfigHasLedger env queryNewInterpreter :: STM SlotDetails queryNewInterpreter = diff --git a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs index 0f33d89e3..78e10db01 100644 --- a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs +++ b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs @@ -3,8 +3,8 @@ {-# LANGUAGE TypeFamilies #-} module Cardano.DbSync.LocalStateQuery ( - NoLedgerStateEnv (..), - mkNoLedgerStateEnv, + NoLedgerEnv (..), + mkNoLedgerEnv, getSlotDetailsNode, localStateQueryHandler, newStateQueryTMVar, @@ -52,12 +52,16 @@ import Ouroboros.Network.Protocol.LocalStateQuery.Client ( ) import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as StateQuery import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure) +import qualified Cardano.Ledger.BaseTypes as Ledger +import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus -data NoLedgerStateEnv = NoLedgerStateEnv - { nlsTracer :: Trace IO Text - , nlsSystemStart :: !SystemStart - , nlsQueryVar :: StateQueryTMVar CardanoBlock CardanoInterpreter - , nlsHistoryInterpreterVar :: StrictTVar IO (Strict.Maybe CardanoInterpreter) +data NoLedgerEnv = NoLedgerEnv + { nleTracer :: Trace IO Text + , nleSystemStart :: !SystemStart + , nleQueryVar :: StateQueryTMVar CardanoBlock CardanoInterpreter + , nleHistoryInterpreterVar :: StrictTVar IO (Strict.Maybe CardanoInterpreter) + , nleNetwork :: !Ledger.Network + , nleProtocolInfo :: !(Consensus.ProtocolInfo IO CardanoBlock) } newtype StateQueryTMVar blk result = StateQueryTMVar @@ -69,11 +73,11 @@ newtype StateQueryTMVar blk result = StateQueryTMVar ) } -mkNoLedgerStateEnv :: Trace IO Text -> SystemStart -> IO NoLedgerStateEnv -mkNoLedgerStateEnv trce systemStart = do +mkNoLedgerEnv :: Trace IO Text -> Consensus.ProtocolInfo IO CardanoBlock -> Ledger.Network -> SystemStart -> IO NoLedgerEnv +mkNoLedgerEnv trce protoInfo network systemStart = do qVar <- newStateQueryTMVar interVar <- newTVarIO Strict.Nothing - pure $ NoLedgerStateEnv trce systemStart qVar interVar + pure $ NoLedgerEnv trce systemStart qVar interVar network protoInfo newStateQueryTMVar :: IO (StateQueryTMVar blk result) newStateQueryTMVar = StateQueryTMVar <$> newEmptyTMVarIO @@ -82,7 +86,7 @@ newStateQueryTMVar = StateQueryTMVar <$> newEmptyTMVarIO -- If the history interpreter does not exist, get one. -- If the existing history interpreter returns an error, get a new one and try again. getSlotDetailsNode :: - NoLedgerStateEnv -> + NoLedgerEnv -> SlotNo -> IO SlotDetails getSlotDetailsNode env slot = do @@ -95,11 +99,11 @@ getSlotDetailsNode env slot = do Left err -> panic $ "getSlotDetailsNode: " <> textShow err Right sd -> insertCurrentTime sd where - interVar = nlsHistoryInterpreterVar env + interVar = nleHistoryInterpreterVar env evalSlotDetails :: Interpreter (CardanoEras StandardCrypto) -> Either PastHorizonException SlotDetails evalSlotDetails interp = - interpretQuery interp (querySlotDetails (nlsSystemStart env) slot) + interpretQuery interp (querySlotDetails (nleSystemStart env) slot) insertCurrentTime :: SlotDetails -> IO SlotDetails insertCurrentTime sd = do @@ -111,7 +115,7 @@ getSlotDetailsNode env slot = do fromStrictMaybe Strict.Nothing = Nothing getHistoryInterpreter :: - NoLedgerStateEnv -> + NoLedgerEnv -> IO CardanoInterpreter getHistoryInterpreter env = do respVar <- newEmptyTMVarIO @@ -125,15 +129,15 @@ getHistoryInterpreter env = do atomically $ writeTVar interVar $ Strict.Just interp pure interp where - reqVar = unStateQueryTMVar $ nlsQueryVar env - interVar = nlsHistoryInterpreterVar env - tracer = nlsTracer env + reqVar = unStateQueryTMVar $ nleQueryVar env + interVar = nleHistoryInterpreterVar env + tracer = nleTracer env -- This is called during the ChainSync setup and loops forever. Queries can be posted to -- it and responses retrieved via a TVar. localStateQueryHandler :: forall a. - NoLedgerStateEnv -> + NoLedgerEnv -> LocalStateQueryClient CardanoBlock (Point CardanoBlock) (Query CardanoBlock) IO a localStateQueryHandler env = LocalStateQueryClient idleState @@ -156,4 +160,4 @@ localStateQueryHandler env = idleState } - reqVar = unStateQueryTMVar $ nlsQueryVar env + reqVar = unStateQueryTMVar $ nleQueryVar env diff --git a/cardano-db-sync/src/Cardano/DbSync/Rollback.hs b/cardano-db-sync/src/Cardano/DbSync/Rollback.hs index 8d971af8a..d50ecfd85 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Rollback.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Rollback.hs @@ -29,10 +29,9 @@ import Ouroboros.Network.Point rollbackFromBlockNo :: MonadIO m => SyncEnv -> - LedgerEnv -> BlockNo -> ExceptT SyncNodeError (ReaderT SqlBackend m) () -rollbackFromBlockNo syncEnv ledgerEnv blkNo = do +rollbackFromBlockNo syncEnv blkNo = do lift $ rollbackCache cache nBlocks <- lift $ DB.queryBlockCountAfterBlockNo (unBlockNo blkNo) True mres <- lift $ DB.queryBlockNoAndEpoch (unBlockNo blkNo) @@ -49,15 +48,15 @@ rollbackFromBlockNo syncEnv ledgerEnv blkNo = do DB.deleteEpochRows epochNo liftIO . logInfo trce $ "Blocks deleted" where - trce = leTrace ledgerEnv + trce = getTrace syncEnv cache = envCache syncEnv -prepareRollback :: SyncEnv -> LedgerEnv -> CardanoPoint -> Tip CardanoBlock -> IO (Either SyncNodeError Bool) -prepareRollback syncEnv ledgerEnv point serverTip = do +prepareRollback :: SyncEnv -> CardanoPoint -> Tip CardanoBlock -> IO (Either SyncNodeError Bool) +prepareRollback syncEnv point serverTip = do backend <- getBackend syncEnv DB.runDbIohkNoLogging backend $ runExceptT action where - trce = leTrace ledgerEnv + trce = getTrace syncEnv action :: MonadIO m => ExceptT SyncNodeError (ReaderT SqlBackend m) Bool action = do diff --git a/cardano-db-sync/src/Cardano/DbSync/Sync.hs b/cardano-db-sync/src/Cardano/DbSync/Sync.hs index ab9368f4e..2b4fff473 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Sync.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Sync.hs @@ -47,6 +47,7 @@ import Cardano.DbSync.Util import Cardano.Prelude hiding (Meta, Nat, option, (%)) import Cardano.Slotting.Slot (EpochNo (..), WithOrigin (..)) import qualified Codec.CBOR.Term as CBOR +import Control.Monad.Extra (whenJust) import Control.Monad.Trans.Except.Exit (orDie) import Control.Tracer (Tracer) import qualified Data.ByteString.Lazy as BSL @@ -70,7 +71,6 @@ import Ouroboros.Consensus.Network.NodeToClient ( cTxSubmissionCodec, ) import Ouroboros.Consensus.Node.ErrorPolicy (consensusErrorPolicy) -import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus import Ouroboros.Network.Block ( BlockNo (..), Point (..), @@ -124,7 +124,6 @@ import Ouroboros.Network.Protocol.LocalStateQuery.Client (localStateQueryClientP import qualified Ouroboros.Network.Snocket as Snocket import Ouroboros.Network.Subscription (SubscriptionTrace) import System.Directory (createDirectoryIfMissing) -import Control.Monad.Extra (whenJust) runSyncNode :: MetricSetters -> @@ -158,15 +157,16 @@ runSyncNode metricsSetters trce iomgr aop snEveryFollowing snEveryLagging dbConn mkSyncEnvFromConfig trce dbConnString - (SyncOptions - (enpExtended syncNodeParams) - aop - (enpHasCache syncNodeParams) - (enpHasLedger syncNodeParams) - (enpSkipFix syncNodeParams) - (enpOnlyFix syncNodeParams) - snEveryFollowing - snEveryLagging) + ( SyncOptions + (enpExtended syncNodeParams) + aop + (enpHasCache syncNodeParams) + (enpShouldUseLedger syncNodeParams) + (enpSkipFix syncNodeParams) + (enpOnlyFix syncNodeParams) + snEveryFollowing + snEveryLagging + ) (enpMaybeLedgerStateDir syncNodeParams) genCfg ranAll @@ -175,17 +175,19 @@ runSyncNode metricsSetters trce iomgr aop snEveryFollowing snEveryLagging dbConn -- If the DB is empty it will be inserted, otherwise it will be validated (to make -- sure we are on the right chain). - lift $ Db.runIohkLogging trce $ withPostgresqlConn dbConnString $ \backend -> do - liftIO $ unless (enpHasLedger syncNodeParams) $ do - logInfo trce "Migrating to a no ledger schema" - Db.noLedgerMigrations backend trce - lift $ orDie renderSyncNodeError $ insertValidateGenesisDist trce backend (dncNetworkName syncNodeConfig) genCfg (useShelleyInit syncNodeConfig) - liftIO $ epochStartup (enpExtended syncNodeParams) trce backend - -- TODO: Vince: a lot of the downstream functions require LedgerEnv but is this right? - whenJust (envLedger syncEnv) $ \ledgerEnv -> - case genCfg of - GenesisCardano {} -> do - liftIO $ runSyncNodeClient metricsSetters syncEnv ledgerEnv iomgr trce (enpSocketPath syncNodeParams) + lift $ + Db.runIohkLogging trce $ + withPostgresqlConn dbConnString $ \backend -> do + liftIO $ + unless (enpShouldUseLedger syncNodeParams) $ do + logInfo trce "Migrating to a no ledger schema" + Db.noLedgerMigrations backend trce + lift $ orDie renderSyncNodeError $ insertValidateGenesisDist trce backend (dncNetworkName syncNodeConfig) genCfg (useShelleyInit syncNodeConfig) + liftIO $ epochStartup (enpExtended syncNodeParams) trce backend + -- TODO: Vince: a lot of the downstream functions require HasLedgerEnv but is this right? + case genCfg of + GenesisCardano {} -> do + liftIO $ runSyncNodeClient metricsSetters syncEnv iomgr trce (enpSocketPath syncNodeParams) where useShelleyInit :: SyncNodeConfig -> Bool useShelleyInit cfg = @@ -198,12 +200,11 @@ runSyncNode metricsSetters trce iomgr aop snEveryFollowing snEveryLagging dbConn runSyncNodeClient :: MetricSetters -> SyncEnv -> - LedgerEnv -> IOManager -> Trace IO Text -> SocketPath -> IO () -runSyncNodeClient metricsSetters env ledgerEnv iomgr trce (SocketPath socketPath) = do +runSyncNodeClient metricsSetters env iomgr trce (SocketPath socketPath) = do logInfo trce $ "localInitiatorNetworkApplication: connecting to node via " <> textShow socketPath void $ subscribe @@ -212,10 +213,10 @@ runSyncNodeClient metricsSetters env ledgerEnv iomgr trce (SocketPath socketPath (envNetworkMagic env) networkSubscriptionTracers clientSubscriptionParams - (dbSyncProtocols trce env ledgerEnv metricsSetters) + (dbSyncProtocols trce env metricsSetters) where codecConfig :: CodecConfig CardanoBlock - codecConfig = configCodec $ Consensus.pInfoConfig (leProtocolInfo ledgerEnv) + codecConfig = configCodec $ getTopLevelConfig env clientSubscriptionParams = ClientSubscriptionParams @@ -253,13 +254,12 @@ runSyncNodeClient metricsSetters env ledgerEnv iomgr trce (SocketPath socketPath dbSyncProtocols :: Trace IO Text -> SyncEnv -> - LedgerEnv -> MetricSetters -> Network.NodeToClientVersion -> ClientCodecs CardanoBlock IO -> ConnectionId LocalAddress -> NodeToClientProtocols 'InitiatorMode BSL.ByteString IO () Void -dbSyncProtocols trce env ledgerEnv metricsSetters _version codecs _connectionId = +dbSyncProtocols trce env metricsSetters _version codecs _connectionId = NodeToClientProtocols { localChainSyncProtocol = localChainSyncPtcl , localTxSubmissionProtocol = dummylocalTxSubmit @@ -271,70 +271,75 @@ dbSyncProtocols trce env ledgerEnv metricsSetters _version codecs _connectionId localChainSyncTracer :: Tracer IO (TraceSendRecv (ChainSync CardanoBlock (Point CardanoBlock) (Tip CardanoBlock))) localChainSyncTracer = toLogObject $ appendName "ChainSync" trce + tracer :: Trace IO Text + tracer = getTrace env + localChainSyncPtcl :: RunMiniProtocol 'InitiatorMode BSL.ByteString IO () Void - localChainSyncPtcl = InitiatorProtocolOnly $ MuxPeerRaw $ \channel -> - liftIO . logException trce "ChainSyncWithBlocksPtcl: " $ do - Db.runIohkLogging trce $ withPostgresqlConn (envConnString env) $ \backend -> liftIO $ do - replaceConnection env backend - setConsistentLevel env ledgerEnv Unchecked - - isFixed <- getIsSyncFixed env - let skipFix = soptSkipFix $ envOptions env - let onlyFix = soptOnlyFix $ envOptions env - if onlyFix || (not isFixed && not skipFix) - then do - fd <- runDbIohkLogging backend (leTrace ledgerEnv) $ getWrongPlutusData (leTrace ledgerEnv) - unless (nullData fd) $ - void $ - runPeer - localChainSyncTracer - (cChainSyncCodec codecs) - channel - ( Client.chainSyncClientPeer $ - chainSyncClientFix backend (leTrace ledgerEnv) fd + localChainSyncPtcl = InitiatorProtocolOnly $ + MuxPeerRaw $ \channel -> + liftIO . logException trce "ChainSyncWithBlocksPtcl: " $ do + Db.runIohkLogging trce $ + withPostgresqlConn (envConnString env) $ \backend -> liftIO $ do + replaceConnection env backend + setConsistentLevel env Unchecked + + isFixed <- getIsSyncFixed env + let skipFix = soptSkipFix $ envOptions env + let onlyFix = soptOnlyFix $ envOptions env + if onlyFix || (not isFixed && not skipFix) + then do + fd <- runDbIohkLogging backend tracer $ getWrongPlutusData tracer + unless (nullData fd) $ + void $ + runPeer + localChainSyncTracer + (cChainSyncCodec codecs) + channel + ( Client.chainSyncClientPeer $ + chainSyncClientFix backend tracer fd + ) + setIsFixedAndMigrate env + when onlyFix $ panic "All Good! This error is only thrown to exit db-sync." -- TODO fix. + else do + when skipFix $ setIsFixedAndMigrate env + -- The Db thread is not forked at this point, so we can use + -- the connection here. A connection cannot be used concurrently by many + -- threads + logInfo trce "Starting chainSyncClient" + latestPoints <- getLatestPoints env + let (inMemory, onDisk) = List.span snd latestPoints + logInfo trce $ + mconcat + [ "Suggesting intersection points from memory: " + , textShow (fst <$> inMemory) + , " and from disk: " + , textShow (fst <$> onDisk) + ] + currentTip <- getCurrentTipBlockNo env + logDbState env + -- communication channel between datalayer thread and chainsync-client thread + actionQueue <- newDbActionQueue + + race_ + ( race + (runDbThread env metricsSetters actionQueue) + (runOfflineFetchThread trce env) ) - setIsFixedAndMigrate env - when onlyFix $ panic "All Good! This error is only thrown to exit db-sync." -- TODO fix. - else do - when skipFix $ setIsFixedAndMigrate env - -- The Db thread is not forked at this point, so we can use - -- the connection here. A connection cannot be used concurrently by many - -- threads - logInfo trce "Starting chainSyncClient" - latestPoints <- getLatestPoints env - let (inMemory, onDisk) = List.span snd latestPoints - logInfo trce $ - mconcat - [ "Suggesting intersection points from memory: " - , textShow (fst <$> inMemory) - , " and from disk: " - , textShow (fst <$> onDisk) - ] - currentTip <- getCurrentTipBlockNo env - logDbState env ledgerEnv - -- communication channel between datalayer thread and chainsync-client thread - actionQueue <- newDbActionQueue - - race_ - ( race - (runDbThread env ledgerEnv metricsSetters actionQueue) - (runOfflineFetchThread trce env) - ) - ( runPipelinedPeer - localChainSyncTracer - (cChainSyncCodec codecs) - channel - ( chainSyncClientPeerPipelined $ - chainSyncClient metricsSetters trce (fst <$> latestPoints) currentTip actionQueue + ( runPipelinedPeer + localChainSyncTracer + (cChainSyncCodec codecs) + channel + ( chainSyncClientPeerPipelined $ + chainSyncClient metricsSetters trce (fst <$> latestPoints) currentTip actionQueue + ) ) - ) - atomically $ writeDbActionQueue actionQueue DbFinish - -- We should return leftover bytes returned by 'runPipelinedPeer', but - -- client application do not care about them (it's only important if one - -- would like to restart a protocol on the same mux and thus bearer). - pure () - pure ((), Nothing) + atomically $ writeDbActionQueue actionQueue DbFinish + -- We should return leftover bytes returned by 'runPipelinedPeer', but + -- client application do not care about them (it's only important if one + -- would like to restart a protocol on the same mux and thus bearer). + pure () + pure ((), Nothing) dummylocalTxSubmit :: RunMiniProtocol 'InitiatorMode BSL.ByteString IO () Void dummylocalTxSubmit = @@ -346,11 +351,19 @@ dbSyncProtocols trce env ledgerEnv metricsSetters _version codecs _connectionId localStateQuery :: RunMiniProtocol 'InitiatorMode BSL.ByteString IO () Void localStateQuery = - InitiatorProtocolOnly $ - MuxPeer - (if isJust $ envLedger env then Logging.nullTracer else contramap (Text.pack . show) . toLogObject $ appendName "local-state-query" trce) - (cStateQueryCodec codecs) - (if isJust $ envLedger env then localStateQueryPeerNull else localStateQueryClientPeer (localStateQueryHandler (envNoLedgerEnv env))) + case envLedgerEnv env of + HasLedger _ -> + InitiatorProtocolOnly $ + MuxPeer + Logging.nullTracer + (cStateQueryCodec codecs) + localStateQueryPeerNull + NoLedger nle -> + InitiatorProtocolOnly $ + MuxPeer + (contramap (Text.pack . show) . toLogObject $ appendName "local-state-query" trce) + (cStateQueryCodec codecs) + (localStateQueryClientPeer $ localStateQueryHandler nle) -- | 'ChainSyncClient' which traces received blocks and ignores when it