From ba2098b8924f87d0589c110339148ab4e159b57d Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Tue, 18 Jan 2022 16:41:45 +0100 Subject: [PATCH] Implement LedgerDB initialization --- .gitignore | 2 + ouroboros-consensus/.dir-locals.el | 5 - .../Consensus/Storage/ChainDB/Impl/Args.hs | 2 +- .../Consensus/Storage/ChainDB/Impl/LgrDB.hs | 34 +- .../Consensus/Storage/ImmutableDB/API.hs | 30 +- .../Consensus/Storage/LedgerDB/InMemory.hs | 93 ++-- .../Consensus/Storage/LedgerDB/OnDisk.hs | 418 +++++++++++++----- 7 files changed, 408 insertions(+), 176 deletions(-) delete mode 100644 ouroboros-consensus/.dir-locals.el diff --git a/.gitignore b/.gitignore index 669bc3ea24c..d390b8f6d11 100644 --- a/.gitignore +++ b/.gitignore @@ -79,3 +79,5 @@ haddocks/ # Ignore directory used by .github/bin/check-dependencies /tmp + +.dir-locals.el diff --git a/ouroboros-consensus/.dir-locals.el b/ouroboros-consensus/.dir-locals.el deleted file mode 100644 index 70f722ed223..00000000000 --- a/ouroboros-consensus/.dir-locals.el +++ /dev/null @@ -1,5 +0,0 @@ -;;; Directory Local Variables -;;; For more information see (info "(emacs) Directory Variables") - -((nil - (fill-column . 80))) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs index 548d8cdd722..7dc15a38ce0 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs @@ -61,7 +61,7 @@ data ChainDbArgs f m blk = ChainDbArgs { , cdbTopLevelConfig :: HKD f (TopLevelConfig blk) , cdbChunkInfo :: HKD f ChunkInfo , cdbCheckIntegrity :: HKD f (blk -> Bool) - , cdbGenesis :: HKD f (m (ExtLedgerState blk EmptyMK)) + , cdbGenesis :: HKD f (m (ExtLedgerState blk ValuesMK)) , cdbCheckInFuture :: HKD f (CheckInFuture m blk) , cdbImmutableDbCacheConfig :: ImmutableDB.CacheConfig diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs index 3d61ae1f9e5..1ee4587886c 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs @@ -124,7 +124,7 @@ data LgrDB m blk = LgrDB { -- When a garbage-collection is performed on the VolatileDB, the points -- of the blocks eligible for garbage-collection should be removed from -- this set. - , lgrOnDiskLedgerStDb :: !(LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk)) + , lgrOnDiskLedgerStDb :: !(LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk) blk) -- ^ -- -- TODO: align the other fields. @@ -149,6 +149,8 @@ type LgrDbSerialiseConstraints blk = ( Serialise (HeaderHash blk) , EncodeDisk blk (LedgerState blk EmptyMK) , DecodeDisk blk (LedgerState blk EmptyMK) + , EncodeDisk blk (LedgerState blk ValuesMK) + , DecodeDisk blk (LedgerState blk ValuesMK) , EncodeDisk blk (AnnTip blk) , DecodeDisk blk (AnnTip blk) , EncodeDisk blk (ChainDepState (BlockProtocol blk)) @@ -161,7 +163,7 @@ type LgrDbSerialiseConstraints blk = data LgrDbArgs f m blk = LgrDbArgs { lgrDiskPolicy :: DiskPolicy - , lgrGenesis :: HKD f (m (ExtLedgerState blk EmptyMK)) + , lgrGenesis :: HKD f (m (ExtLedgerState blk ValuesMK)) , lgrHasFS :: SomeHasFS m , lgrHasFSLedgerSt :: SomeHasFS m , lgrTopLevelConfig :: HKD f (TopLevelConfig blk) @@ -273,7 +275,7 @@ initFromDisk => LgrDbArgs Identity m blk -> Tracer m (ReplayGoal blk -> TraceReplayEvent blk) -> ImmutableDB m blk - -> m (LedgerDB' blk, Word64, OnDiskLedgerStDb m (ExtLedgerState blk)) + -> m (LedgerDB' blk, Word64, OnDiskLedgerStDb m (ExtLedgerState blk) blk) initFromDisk args replayTracer immutableDB = wrapFailure (Proxy @blk) $ do onDiskLedgerStDb <- LedgerDB.mkOnDiskLedgerStDb lgrHasFSLedgerSt -- TODO: is it correct that we pick a instance of the 'OnDiskLedgerStDb' here? @@ -282,28 +284,32 @@ initFromDisk args replayTracer immutableDB = wrapFailure (Proxy @blk) $ do replayTracer lgrTracer hasFS + decodeExtLedgerState' + lgrHasFSLedgerSt onDiskLedgerStDb decodeExtLedgerState' decode (configLedgerDb lgrTopLevelConfig) lgrGenesis (streamAPI immutableDB) + True return (db, replayed, onDiskLedgerStDb) where LgrDbArgs { lgrHasFS = hasFS, .. } = args ccfg = configCodec lgrTopLevelConfig - decodeExtLedgerState' :: forall s. Decoder s (ExtLedgerState blk EmptyMK) + decodeExtLedgerState' :: forall s mk. DecodeDisk blk (LedgerState blk mk) => Decoder s (ExtLedgerState blk mk) decodeExtLedgerState' = decodeExtLedgerState (decodeDisk ccfg) (decodeDisk ccfg) (decodeDisk ccfg) + -- | For testing purposes mkLgrDB :: StrictTVar m (LedgerDB' blk) -> StrictTVar m (Set (RealPoint blk)) - -> LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk) + -> LedgerDB.OnDiskLedgerStDb m (ExtLedgerState blk) blk -> FlushLock -> (RealPoint blk -> m blk) -> LgrDbArgs Identity m blk @@ -458,7 +464,7 @@ streamAPI :: forall m blk. (IOLike m, HasHeader blk) => ImmutableDB m blk -> StreamAPI m blk -streamAPI immutableDB = StreamAPI streamAfter +streamAPI immutableDB = StreamAPI streamAfter streamAfterBefore where streamAfter :: HasCallStack => Point blk @@ -481,6 +487,22 @@ streamAPI immutableDB = StreamAPI streamAfter ImmutableDB.IteratorExhausted -> return $ NoMoreBlocks ImmutableDB.IteratorResult blk -> return $ NextBlock blk + streamAfterBefore :: HasCallStack + => Point blk + -> Point blk + -> (Either (RealPoint blk) (m (NextBlock blk)) -> m a) + -> m a + streamAfterBefore from to k = withRegistry $ \registry -> do + eItr <- ImmutableDB.streamWithBounds + immutableDB + registry + GetBlock + from + to + case eItr of + Left err -> k $ Left $ ImmutableDB.missingBlockPoint err + Right itr -> k $ Right $ streamUsing itr + {------------------------------------------------------------------------------- Previously applied blocks -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ImmutableDB/API.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ImmutableDB/API.hs index 97e328b2243..aadc28100a2 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ImmutableDB/API.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ImmutableDB/API.hs @@ -48,12 +48,12 @@ module Ouroboros.Consensus.Storage.ImmutableDB.API ( , streamAfterKnownPoint , streamAfterPoint , streamAll + , streamWithBounds , withDB ) where import qualified Codec.CBOR.Read as CBOR -import Control.Monad.Except (ExceptT (..), lift, runExceptT, - throwError) +import Control.Monad.Except (ExceptT (..), runExceptT, throwError) import qualified Data.ByteString.Lazy as Lazy import Data.Either (isRight) import Data.Function (on) @@ -491,10 +491,20 @@ streamAfterPoint :: -> BlockComponent blk b -> Point blk -> m (Either (MissingBlock blk) (Iterator m blk b)) -streamAfterPoint db registry blockComponent fromPt = runExceptT $ do - tipPt <- lift $ atomically $ getTipPoint db +streamAfterPoint db registry blockComponent fromPt = do + tipPt <- atomically $ getTipPoint db + streamWithBounds db registry blockComponent fromPt tipPt + +streamWithBounds :: (MonadSTM m, HasHeader blk, HasCallStack) + => ImmutableDB m blk + -> ResourceRegistry m + -> BlockComponent blk b + -> Point blk + -> Point blk + -> m (Either (MissingBlock blk) (Iterator m blk b)) +streamWithBounds db registry blockComponent fromPt toPt = runExceptT $ case (pointToWithOriginRealPoint fromPt, - pointToWithOriginRealPoint tipPt) of + pointToWithOriginRealPoint toPt) of (Origin, Origin) -> -- Nothing to stream @@ -504,22 +514,22 @@ streamAfterPoint db registry blockComponent fromPt = runExceptT $ do -- Asked to stream after a block while the ImmutableDB is empty throwError $ NewerThanTip fromPt' GenesisPoint - (NotOrigin fromPt', NotOrigin _) | pointSlot fromPt > pointSlot tipPt -> + (NotOrigin fromPt', NotOrigin _) | pointSlot fromPt > pointSlot toPt -> -- Lower bound is newer than the tip, nothing to stream - throwError $ NewerThanTip fromPt' tipPt + throwError $ NewerThanTip fromPt' toPt - (NotOrigin fromPt', NotOrigin tipPt') | fromPt' == tipPt' -> + (NotOrigin fromPt', NotOrigin toPt') | fromPt' == toPt' -> -- Nothing to stream after the tip return emptyIterator - (_, NotOrigin tipPt') -> + (_, NotOrigin toPt') -> -- Stream from the given point to the current tip (not genesis) ExceptT $ stream db registry blockComponent (StreamFromExclusive fromPt) - (StreamToInclusive tipPt') + (StreamToInclusive toPt') -- | Variant of 'streamAfterPoint' that throws a 'MissingBlockError' when the -- point is not in the ImmutableDB (or genesis). diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/InMemory.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/InMemory.hs index 9ba9a86d6b3..87c77b5c321 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/InMemory.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/InMemory.hs @@ -1,8 +1,10 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE EmptyDataDeriving #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} @@ -21,13 +23,14 @@ {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} - module Ouroboros.Consensus.Storage.LedgerDB.InMemory ( -- * LedgerDB proper LedgerDbCfg (..) , ledgerDbWithAnchor -- ** opaque - , LedgerDB + , LedgerDB (..) + , NewLedgerDB + , OldLedgerDB -- * Ledger DB types (TODO: we might want to place this somewhere else) , DbChangelog , DbReader (..) @@ -70,6 +73,14 @@ module Ouroboros.Consensus.Storage.LedgerDB.InMemory ( , ledgerDbPush' , ledgerDbPushMany' , ledgerDbSwitch' + -- ** Javier + , Checkpoint (..) + , HasSeqNo (..) + , dbChangelogStates + , extendDbChangelog + , initialDbChangelog + , seqLast + , withBlockReadSets ) where import Codec.Serialise.Decoding (Decoder) @@ -155,6 +166,9 @@ data LedgerDB (l :: LedgerStateKind) = LedgerDB { } deriving (Generic) +type OldLedgerDB l = AnchoredSeq (WithOrigin SlotNo) (Checkpoint (l ValuesMK)) (Checkpoint (l ValuesMK)) +type NewLedgerDB l = DbChangelog l + deriving instance (Eq (l EmptyMK), Eq (l ValuesMK)) => Eq (LedgerDB l) deriving instance (NoThunks (l EmptyMK), NoThunks (l ValuesMK)) => NoThunks (LedgerDB l) @@ -350,10 +364,10 @@ applyBlock cfg ap db = case ap of newApplyBlock :: Ap m l blk c -> m (l TrackingMK) newApplyBlock = \case - ReapplyVal b -> withBlockReadSets b $ \lh -> + ReapplyVal b -> withBlockReadSets b (ledgerDbChangelog db) $ \lh -> return $ tickThenReapply cfg b lh - ApplyVal b -> withBlockReadSets b $ \lh -> + ApplyVal b -> withBlockReadSets b (ledgerDbChangelog db) $ \lh -> ( either (throwLedgerError db (blockRealPoint b)) return $ runExcept $ tickThenApply cfg b lh) @@ -361,50 +375,54 @@ applyBlock cfg ap db = case ap of ReapplyRef r -> do b <- resolveBlock r -- TODO: ask: would it make sense to recursively call applyBlock using ReapplyVal? - withBlockReadSets b $ \lh -> + withBlockReadSets b (ledgerDbChangelog db) $ \lh -> return $ tickThenReapply cfg b lh ApplyRef r -> do b <- resolveBlock r - withBlockReadSets b $ \lh -> + withBlockReadSets b (ledgerDbChangelog db) $ \lh -> either (throwLedgerError db (blockRealPoint b)) return $ runExcept $ tickThenApply cfg b lh -- A value of @Weaken@ will not make it to this point, as @applyBlock@ will recurse until it fully unwraps. Weaken _ -> error "unreachable" - withBlockReadSets - :: ReadsKeySets m l - => blk - -> (l ValuesMK -> m (l TrackingMK)) - -> m (l TrackingMK) - withBlockReadSets b f = do - let ks = getBlockKeySets b :: TableKeySets l - let aks = rewindTableKeySets (ledgerDbChangelog db) ks :: RewoundTableKeySets l - urs <- readDb aks - case withHydratedLedgerState urs f of - Nothing -> - -- We performed the rewind;read;forward sequence in this function. So - -- the forward operation should not fail. If this is the case we're in - -- the presence of a problem that we cannot deal with at this level, - -- so we throw an error. - -- - -- When we introduce pipelining, if the forward operation fails it - -- could be because the DB handle was modified by a DB flush that took - -- place when __after__ we read the unforwarded keys-set from disk. - -- However, performing rewind;read;forward with the same __locked__ - -- changelog should always succeed. - error "Changelog rewind;read;forward sequence failed." - Just res -> res - - withHydratedLedgerState - :: UnforwardedReadSets l - -> (l ValuesMK -> a) - -> Maybe a - withHydratedLedgerState urs f = do - rs <- forwardTableKeySets (ledgerDbChangelog db) urs - return $ f $ withLedgerTables (ledgerDbCurrent db) rs +withBlockReadSets + :: forall m blk l. (ApplyBlock l blk, Monad m, TableStuff l, ReadsKeySets m l) + => blk + -> DbChangelog l + -> (l ValuesMK -> m (l TrackingMK)) + -> m (l TrackingMK) +withBlockReadSets b db f = do + let ks = getBlockKeySets b :: TableKeySets l + let aks = rewindTableKeySets db ks :: RewoundTableKeySets l + urs <- readDb aks + case withHydratedLedgerState urs db f of + Nothing -> + -- We performed the rewind;read;forward sequence in this function. So + -- the forward operation should not fail. If this is the case we're in + -- the presence of a problem that we cannot deal with at this level, + -- so we throw an error. + -- + -- When we introduce pipelining, if the forward operation fails it + -- could be because the DB handle was modified by a DB flush that took + -- place when __after__ we read the unforwarded keys-set from disk. + -- However, performing rewind;read;forward with the same __locked__ + -- changelog should always succeed. + error "Changelog rewind;read;forward sequence failed." + Just res -> res + +withHydratedLedgerState + :: forall l a. (TableStuff l) + => UnforwardedReadSets l + -> DbChangelog l + -> (l ValuesMK -> a) + -> Maybe a +withHydratedLedgerState urs db f = do + rs <- forwardTableKeySets db urs + return $ f $ withLedgerTables (seqLast . dbChangelogStates $ db) rs + {------------------------------------------------------------------------------- HD Interface that I need (Could be moved to Ouroboros.Consensus.Ledger.Basics ) @@ -610,7 +628,6 @@ ledgerDbPrune (SecurityParam k) db = db { -- 'LedgerDB' and thus a space leak. Alternatively, we could disable the -- @-fstrictness@ optimisation (enabled by default for -O1). See #2532. {-# INLINE ledgerDbPrune #-} -{-# LANGUAGE DerivingStrategies #-} {------------------------------------------------------------------------------- Internal updates diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/OnDisk.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/OnDisk.hs index 1d005c5dfd3..6160bfffd66 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/OnDisk.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/LedgerDB/OnDisk.hs @@ -67,8 +67,6 @@ import Text.Read (readMaybe) import Ouroboros.Network.Block (Point (Point)) import Ouroboros.Consensus.Block -import Ouroboros.Consensus.HeaderValidation - (HeaderState (headerStateTip), annTipPoint) import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Ledger.Inspect @@ -82,6 +80,12 @@ import Ouroboros.Consensus.Storage.FS.API.Types import Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy import Ouroboros.Consensus.Storage.LedgerDB.InMemory +import Ouroboros.Network.AnchoredSeq (AnchoredSeq (Empty)) +import Ouroboros.Network.Point (WithOrigin (At)) + +import Control.Exception +import Ouroboros.Consensus.Config.SecurityParam +import qualified Ouroboros.Network.AnchoredSeq as AS {------------------------------------------------------------------------------- Instantiate the in-memory DB to @blk@ @@ -121,6 +125,12 @@ data StreamAPI m blk = StreamAPI { -- 'RealPoint', not a 'Point', since it must always be possible to -- stream after genesis. -> m a + + , streamAfterBefore :: forall a. HasCallStack + => Point blk + -> Point blk + -> (Either (RealPoint blk) (m (NextBlock blk)) -> m a) + -> m a } -- | Stream all blocks @@ -144,6 +154,28 @@ streamAll StreamAPI{..} tip notFound e f = ExceptT $ NextBlock b -> go =<< f b a Right <$> go e +-- | Stream all blocks +streamUpTo :: + forall m blk e a. (Monad m, HasCallStack) + => StreamAPI m blk + -> Point blk -- ^ Starting point for streaming + -> Point blk -- ^ Ending point for streaming + -> (RealPoint blk -> e) -- ^ Error when tip not found + -> a -- ^ Starting point when tip /is/ found + -> (blk -> a -> m a) -- ^ Update function for each block + -> ExceptT e m a +streamUpTo StreamAPI{..} tip goal notFound e f = ExceptT $ + streamAfterBefore tip goal $ \case + Left tip' -> return $ Left (notFound tip') + + Right getNext -> do + let go :: a -> m a + go a = do mNext <- getNext + case mNext of + NoMoreBlocks -> return a + NextBlock b -> go =<< f b a + Right <$> go e + {------------------------------------------------------------------------------- Initialize the DB -------------------------------------------------------------------------------} @@ -202,74 +234,277 @@ initLedgerDB :: => Tracer m (ReplayGoal blk -> TraceReplayEvent blk) -> Tracer m (TraceEvent blk) -> SomeHasFS m - -> OnDiskLedgerStDb m (ExtLedgerState blk) + -> (forall s. Decoder s (ExtLedgerState blk ValuesMK)) + -> SomeHasFS m + -> OnDiskLedgerStDb m (ExtLedgerState blk) blk -> (forall s. Decoder s (ExtLedgerState blk EmptyMK)) -> (forall s. Decoder s (HeaderHash blk)) -> LedgerDbCfg (ExtLedgerState blk) - -> m (ExtLedgerState blk EmptyMK) -- ^ Genesis ledger state + -> m (ExtLedgerState blk ValuesMK) -- ^ Genesis ledger state -> StreamAPI m blk - -> m (InitLog blk, LedgerDB' blk, Word64) + -> Bool + -> m ((InitLog blk, InitLog blk), LedgerDB' blk, Word64) initLedgerDB replayTracer tracer hasFS - onDiskLedgerDbSt decLedger + newHasFS + onDiskLedgerDbSt + decNewLedger decHash cfg - _getGenesisLedger - streamAPI = do - snapshots <- listSnapshots hasFS - -- Here we'd get the dbhandle - -- - -- We can use this dbhandle to fetch the restore points. - tryNewestFirst id snapshots + getGenesisLedger + streamAPI + runDual = do + (initLog, ledgerDb, tipPoint) <- oldInitLedgerDB + getGenesisLedger + hasFS + decLedger + decHash + (initLog', ledgerDb', tipPoint', replayTracer') <- newInitLedgerDB + replayTracer + tracer + getGenesisLedger + newHasFS + decNewLedger + decHash + onDiskLedgerDbSt + + (lgrDB, w) <- case compare tipPoint tipPoint' of + LT -> bringUpOldLedgerDB runDual cfg streamAPI ledgerDb ledgerDb' tipPoint tipPoint' + GT -> bringUpNewLedgerDB runDual replayTracer' cfg streamAPI onDiskLedgerDbSt ledgerDb ledgerDb' tipPoint tipPoint' + EQ -> combineLedgerDBs runDual ledgerDb ledgerDb' + + ml <- runExceptT $ initStartingWith replayTracer' cfg onDiskLedgerDbSt streamAPI lgrDB + case ml of + Left err -> error $ "invariant violation: invalid current chain:" <> show err + Right (ledgerDB, w64) -> return ((initLog, initLog'), ledgerDB, w + w64) + +{------------------------------------------------------------------------------- + Load snapshots from the disk +-------------------------------------------------------------------------------} + +-- | Load a snapshot from the disk. Depending on the decoder, the snapshot is +-- expected to be of the @mk@ that the decoder knows how to decode. +loadSnapshot :: ( IOLike m + , LedgerSupportsProtocol blk + ) + => Maybe (Tracer m (ReplayGoal blk -> TraceReplayEvent blk)) -- ^ The replay tracer to be annotated with starting point. + -> SomeHasFS m -- ^ The filesystem with the snapshots + -> (forall s. Decoder s (ExtLedgerState blk mk)) -- ^ A decoder for the snapshots + -> (forall s. Decoder s (HeaderHash blk)) -- ^ A decoder for header hashes + -> DiskSnapshot -- ^ Which snapshot to load on the filesystem + -> ExceptT (InitFailure blk) m ( ExtLedgerState blk mk -- The resulting ledger state + , RealPoint blk -- The real point corresponding to the ledger state + , Maybe (Tracer m (ReplayStart blk -> ReplayGoal blk -> TraceReplayEvent blk)) -- The annotated tracer + ) +loadSnapshot tracer fs dec decHash ss = do + initSS <- withExceptT InitFailureRead $ readSnapshot fs dec decHash ss + let initialPoint = castPoint (getTip initSS) + case pointToWithOriginRealPoint initialPoint of + Origin -> throwError InitFailureGenesis + NotOrigin tip -> do + maybe (return ()) (\t -> lift $ traceWith t $ ReplayFromSnapshot ss tip (ReplayStart initialPoint)) tracer + return (initSS, tip, decorateReplayTracerWithStart initialPoint <$> tracer) + + +{------------------------------------------------------------------------------- + Old initialization of LedgerDB +-------------------------------------------------------------------------------} + +-- | Initialize the ledger DB in the old way from the most recent snapshot on disk +-- +-- If no such snapshot can be found, use the genesis ledger DB. Returns the +-- initialized old DB as well as the block reference corresponding to the snapshot +-- we found on disk (the latter primarily for testing/monitoring purposes). +-- +-- We only discard snapshots if we cannot deserialise them. +-- +-- It is possible that the Ledger DB will not be able to roll back @k@ blocks +-- after initialization if the chain has been truncated (data corruption). +-- +-- We do /not/ attempt to use multiple ledger states from disk to construct the +-- ledger DB. Instead we load only a /single/ ledger state from disk, and +-- /compute/ all subsequent ones. This is important, because the ledger states +-- obtained in this way will (hopefully) share much of their memory footprint +-- with their predecessors. +oldInitLedgerDB :: ( IOLike m + , LedgerSupportsProtocol blk + , HasCallStack + ) + => m (ExtLedgerState blk ValuesMK) -- ^ Action that gives the Genesis ledger state + -> SomeHasFS m -- ^ Filesystem containing the old ledger snapshots + -> (forall s. Decoder s (ExtLedgerState blk ValuesMK)) -- ^ A decoder for the old ledger snapshots + -> (forall s. Decoder s (HeaderHash blk)) -- ^ A decoder for header hashes + -> m ( InitLog blk -- The initialization log + , OldLedgerDB (ExtLedgerState blk) -- The resulting old ledgerDB + , Point blk -- The point corresponding to the adopted ledger state + ) +oldInitLedgerDB getGenesisLedger hasFS decLedger decHash = do + listSnapshots hasFS >>= tryNewestFirst id where - tryNewestFirst :: (InitLog blk -> InitLog blk) - -> [DiskSnapshot] - -> m (InitLog blk, LedgerDB' blk, Word64) tryNewestFirst acc [] = do - -- We're out of snapshots. Start at genesis - traceWith replayTracer ReplayFromGenesis - initDb <- ledgerDbWithAnchor <$> undefined --getGenesisLedger TODO: Commented to unlock further work. Implementing initialization will fix this. - let replayTracer' = decorateReplayTracerWithStart (Point Origin) replayTracer - ml <- runExceptT - -- TODO: here initStartingWith could and should probably flush! - -- - -- If we're replaying from genesis (or simply replaying a very - -- large number of blocks) we will need to flush from time to - -- time inside this function. - -- - -- Note that at the moment no LedgerDB snapshots are taken by - -- this function. - -- - -- Note that whatever restore point we take here, it will - -- belong to the immutable part of the chain. So the restore - -- point will not lie ahead of the immutable DB tip. - $ initStartingWith replayTracer' cfg onDiskLedgerDbSt streamAPI initDb - case ml of - Left _ -> error "invariant violation: invalid current chain" - Right (l, replayed) -> return (acc InitFromGenesis, l, replayed) + -- We're out of snapshots. Start at genesis + + initSS <- getGenesisLedger + return ( acc InitFromGenesis + , Empty (Checkpoint initSS) + , Point Origin + ) tryNewestFirst acc (s:ss) = do - -- If we fail to use this snapshot, delete it and try an older one - ml <- runExceptT $ initFromSnapshot - replayTracer - hasFS - decLedger - decHash - cfg - onDiskLedgerDbSt - streamAPI - s + ml <- runExceptT $ loadSnapshot Nothing hasFS decLedger decHash s + case ml of + Left err -> do + when (diskSnapshotIsTemporary s) $ + -- We don't delete permanent snapshots, even if we couldn't parse + -- them + deleteSnapshot hasFS s + tryNewestFirst (acc . InitFailure s err) ss + Right (ls, pt, _) -> do + return ( acc (InitFromSnapshot s pt) + , Empty (Checkpoint ls) + , Point (At $ undefined pt) + ) + +{------------------------------------------------------------------------------- + New initialization of LedgerDB +-------------------------------------------------------------------------------} + +-- | Initialize a DbChangelog from the snapshot in the disk. +-- +-- If there are multiple snapshots we can only be sure that the latest one +-- corresponds to what was flushed to the disk and therefore we can only try to +-- deserialize that one. If the point on said snapshot and the point on the disk +-- data is not the same one, this will throw an assertion failure. +-- +-- If we fail to deserialize the last snapshot, we cannot try previous ones +-- because they will be misaligned with the on disk data, so we can just revert +-- to Genesis. +newInitLedgerDB :: forall m blk. + ( IOLike m + , LedgerSupportsProtocol blk + , HasCallStack + ) + => Tracer m (ReplayGoal blk -> TraceReplayEvent blk) -- ^ A tracer for the replay events + -> Tracer m (TraceEvent blk) -- ^ A tracer for general events + -> m (ExtLedgerState blk ValuesMK) -- ^ An action to get the Genesis ledger state + -> SomeHasFS m -- ^ The filesystem with the new snapshots + -> (forall s. Decoder s (ExtLedgerState blk EmptyMK)) -- ^ A decoder for new snapshots + -> (forall s. Decoder s (HeaderHash blk)) -- ^ A decoder for header hashes + -> OnDiskLedgerStDb m (ExtLedgerState blk) blk + -> m ( InitLog blk -- The initialization log + , NewLedgerDB (ExtLedgerState blk) -- The new ledger database + , Point blk -- The point corresponding to the adopted ledger state + , Tracer m (ReplayStart blk -> ReplayGoal blk -> TraceReplayEvent blk) -- An annotated tracer for replay events + ) +newInitLedgerDB replayTracer tracer getGenesisLedger hasFS decLedger decHash onDiskLedgerDbSt = do + snapshots <- listSnapshots hasFS + case snapshots of + [] -> initUsingGenesis + s:_ -> do + ml <- runExceptT $ loadSnapshot (Just replayTracer) hasFS decLedger decHash s case ml of Left err -> do - when (diskSnapshotIsTemporary s) $ - -- We don't delete permanent snapshots, even if we couldn't parse - -- them - deleteSnapshot hasFS s + deleteSnapshot hasFS s traceWith tracer $ InvalidSnapshot s err - tryNewestFirst (acc . InitFailure s err) ss - Right (r, l, replayed) -> - return (acc (InitFromSnapshot s r), l, replayed) + (l, db, pt, tr) <- initUsingGenesis + return (InitFailure s err l, db, pt, tr) + Right (initSS, pt', replayTracer') -> do + pt'' <- odlsGetPt onDiskLedgerDbSt + + assert (realPointToPoint pt' == pt'') $ return ( InitFromSnapshot s pt' + , initialDbChangelogWithEmptyState (getTipSlot initSS) initSS + , Point (At $ undefined pt') + , maybe (error "unreachable as we provided a Just to loadSnapshot") id replayTracer' + ) + where initUsingGenesis = do + initSS <- getGenesisLedger + writeGenesisUTxO onDiskLedgerDbSt initSS + let replayTracer' = decorateReplayTracerWithStart (Point Origin) replayTracer + return ( InitFromGenesis + , initialDbChangelog (getTipSlot initSS) initSS + , Point Origin + , replayTracer' + ) + +initialDbChangelogWithEmptyState :: WithOrigin SlotNo -> (ExtLedgerState blk EmptyMK) -> NewLedgerDB (ExtLedgerState blk) +initialDbChangelogWithEmptyState = undefined + +{------------------------------------------------------------------------------- + Sync both ledger databases +-------------------------------------------------------------------------------} + +-- | The old ledger DB is behind the new ledger DB. +-- +-- Pushes blocks in the ImmutableDB up to the tip of the new ledger DB so that +-- they end up being in sync. +bringUpOldLedgerDB :: forall m blk. (Monad m, LedgerSupportsProtocol blk) + => Bool + -> LedgerDbCfg (ExtLedgerState blk) + -> StreamAPI m blk + -> OldLedgerDB (ExtLedgerState blk) + -> NewLedgerDB (ExtLedgerState blk) + -> Point blk + -> Point blk + -> m (LedgerDB' blk, Word64) +bringUpOldLedgerDB runDual cfg streamAPI old ledgerDbChangelog from to = do + either (error . ("invariant violation: invalid current chain:" <>) . show) (\(ledgerDbCheckpoints, w) -> return (LedgerDB{..}, w)) =<< + runExceptT (streamUpTo streamAPI from to InitFailureTooRecent (old, 0) push) + where + push :: blk -> (OldLedgerDB (ExtLedgerState blk), Word64) -> m (OldLedgerDB (ExtLedgerState blk), Word64) + push blk !(!db, !replayed) = do + let !db' = let ls = forgetLedgerStateTracking + $ tickThenReapply (ledgerDbCfg cfg) blk + $ either unCheckpoint unCheckpoint . AS.head + $ db + in AS.anchorNewest (maxRollbacks $ ledgerDbCfgSecParam cfg) (db AS.:> Checkpoint ls) + + !replayed' = replayed + 1 + + return (db', replayed') + +-- | The new ledger DB is behind the old ledger DB. +-- +-- Pushes blocks in the ImmutableDB up to the tip of the old ledger DB so that +-- they end up being in sync. +bringUpNewLedgerDB :: forall m blk. (Monad m, LedgerSupportsProtocol blk, InspectLedger blk) + => Bool + -> Tracer m (ReplayStart blk -> ReplayGoal blk -> TraceReplayEvent blk) + -> LedgerDbCfg (ExtLedgerState blk) + -> StreamAPI m blk + -> OnDiskLedgerStDb m (ExtLedgerState blk) blk + -> OldLedgerDB (ExtLedgerState blk) + -> NewLedgerDB (ExtLedgerState blk) + -> Point blk + -> Point blk + -> m (LedgerDB' blk, Word64) +bringUpNewLedgerDB runDual tracer cfg streamAPI onDiskLedgerDbSt ledgerDbCheckpoints new to from = + either (error . ("invariant violation: invalid current chain:" <>) . show) (\(ledgerDbChangelog, w) -> return (LedgerDB{..}, w)) =<< + runExceptT (streamUpTo streamAPI from to InitFailureTooRecent (new, 0) push) + where + push :: blk -> (NewLedgerDB (ExtLedgerState blk), Word64) -> m (NewLedgerDB (ExtLedgerState blk), Word64) + push blk !(db, replayed) = do + ls <- defaultReadKeySets (readKeySets onDiskLedgerDbSt) (withBlockReadSets blk db $ \lh -> return $ tickThenReapply (ledgerDbCfg cfg) blk lh) + db'' <- flushDb onDiskLedgerDbSt $ extendDbChangelog (stateSeqNo ls) (trackingTablesToDiffs ls) db + let replayed' :: Word64 + !replayed' = replayed + 1 + + events :: [LedgerEvent blk] + events = inspectLedger + (getExtLedgerCfg (ledgerDbCfg cfg)) + (ledgerState (seqLast . dbChangelogStates $ db)) + (ledgerState (seqLast . dbChangelogStates $ db'')) + + traceWith tracer (ReplayedBlock (blockRealPoint blk) events) + return (db'', replayed') + +-- | Both databases are already in sync, just combine them in the @LedgerDB@ datatype. +combineLedgerDBs :: Monad m + => Bool + -> OldLedgerDB (ExtLedgerState blk) + -> NewLedgerDB (ExtLedgerState blk) + -> m (LedgerDB' blk, Word64) +combineLedgerDBs runDual ledgerDbCheckpoints ledgerDbChangelog = return (LedgerDB {..}, 0) {------------------------------------------------------------------------------- Internal: initialize using the given snapshot @@ -289,47 +524,7 @@ data InitFailure blk = | InitFailureGenesis deriving (Show, Eq, Generic) --- | Attempt to initialize the ledger DB from the given snapshot --- --- If the chain DB or ledger layer reports an error, the whole thing is aborted --- and an error is returned. This should not throw any errors itself (ignoring --- unexpected exceptions such as asynchronous exceptions, of course). -initFromSnapshot :: - forall m blk . ( - IOLike m - , LedgerSupportsProtocol blk - , InspectLedger blk - , HasCallStack - ) - => Tracer m (ReplayGoal blk -> TraceReplayEvent blk) - -> SomeHasFS m - -> (forall s. Decoder s (ExtLedgerState blk EmptyMK)) - -> (forall s. Decoder s (HeaderHash blk)) - -> LedgerDbCfg (ExtLedgerState blk) - -> OnDiskLedgerStDb m (ExtLedgerState blk) - -> StreamAPI m blk - -> DiskSnapshot - -> ExceptT (InitFailure blk) m (RealPoint blk, LedgerDB' blk, Word64) -initFromSnapshot tracer hasFS decLedger decHash cfg readLedgerDb streamAPI ss = undefined -- TODO: Commented to unlock further work. Implementing initialization will fix this. - -- initSS <- withExceptT InitFailureRead $ - -- readSnapshot hasFS decLedger decHash ss - -- let initialPoint = withOrigin (Point Origin) annTipPoint $ headerStateTip $ headerState $ initSS - -- case pointToWithOriginRealPoint (castPoint (getTip initSS)) of - -- Origin -> throwError InitFailureGenesis - -- NotOrigin tip -> do - -- lift $ traceWith tracer $ ReplayFromSnapshot ss tip (ReplayStart initialPoint) - -- let tracer' = decorateReplayTracerWithStart initialPoint tracer - -- (initDB, replayed) <- - -- initStartingWith - -- tracer' - -- cfg - -- readLedgerDb - -- streamAPI - -- (ledgerDbWithAnchor initSS) - -- return (tip, initDB, replayed) - - -mkOnDiskLedgerStDb :: SomeHasFS m -> m (OnDiskLedgerStDb m l) +mkOnDiskLedgerStDb :: SomeHasFS m -> m (OnDiskLedgerStDb m l blk) mkOnDiskLedgerStDb = undefined -- \(SomeHasFS fs) -> do -- dbhandle <- hOpen fs "ledgerStateDb" @@ -344,7 +539,7 @@ mkOnDiskLedgerStDb = undefined -- | On disk ledger state API. -- -- -data OnDiskLedgerStDb m l = +data OnDiskLedgerStDb m l blk = OnDiskLedgerStDb { rewindTableKeySets :: () -- TODO: move the corresponding function from -- InMemory here. @@ -369,9 +564,12 @@ data OnDiskLedgerStDb m l = {- -* other restore point ops ... -} , closeDb :: m () -- ^ This closes the captured handle. + , odlsGetPt :: m (Point blk) + -- ^ Get the point representing the latest ledger state flushed to the disk + , writeGenesisUTxO :: l ValuesMK -> m () + -- ^ Write the initial Genesis UTxO to the disk } - deriving NoThunks via OnlyCheckWhnfNamed "OnDiskLedgerStDb" (OnDiskLedgerStDb m l) - + deriving NoThunks via OnlyCheckWhnfNamed "OnDiskLedgerStDb" (OnDiskLedgerStDb m l blk) -- | Attempt to initialize the ledger DB starting from the given ledger DB initStartingWith :: @@ -383,7 +581,7 @@ initStartingWith :: ) => Tracer m (ReplayStart blk -> ReplayGoal blk -> TraceReplayEvent blk) -> LedgerDbCfg (ExtLedgerState blk) - -> OnDiskLedgerStDb m (ExtLedgerState blk) + -> OnDiskLedgerStDb m (ExtLedgerState blk) blk -> StreamAPI m blk -> LedgerDB' blk -> ExceptT (InitFailure blk) m (LedgerDB' blk, Word64) @@ -397,20 +595,8 @@ initStartingWith tracer cfg onDiskLedgerDbSt streamAPI initDb = do push blk !(!db, !replayed) = do !db' <- defaultReadKeySets (readKeySets onDiskLedgerDbSt) $ ledgerDbPush cfg (ReapplyVal blk) db - -- TODO: here it is important that we don't have a lock acquired. - - -- Alternatively, we could chose not to check for a lock when we're - -- flushing here since we know the `LgrDB` does not exist at this point - -- yet. db'' <- ledgerDbFlush (flushDb onDiskLedgerDbSt) db' - -- TODO: it seems we'd want: - -- - -- - flush - -- - -- - make a restore-point - -- - -- We can't make the flush in the levels above push since this function - -- consumes the whole stream of immutable DB blocks. + let replayed' :: Word64 !replayed' = replayed + 1 @@ -527,18 +713,18 @@ diskSnapshotIsTemporary = not . diskSnapshotIsPermanent -- | Read snapshot from disk readSnapshot :: - forall m blk. IOLike m + forall m blk mk. IOLike m => SomeHasFS m - -> (forall s. Decoder s (ExtLedgerState blk EmptyMK)) + -> (forall s. Decoder s (ExtLedgerState blk mk)) -> (forall s. Decoder s (HeaderHash blk)) -> DiskSnapshot - -> ExceptT ReadIncrementalErr m (ExtLedgerState blk EmptyMK) + -> ExceptT ReadIncrementalErr m (ExtLedgerState blk mk) readSnapshot hasFS decLedger decHash = ExceptT . readIncremental hasFS decoder . snapshotToPath where - decoder :: Decoder s (ExtLedgerState blk EmptyMK) + decoder :: Decoder s (ExtLedgerState blk mk) decoder = decodeSnapshotBackwardsCompatible (Proxy @blk) decLedger decHash -- | Write snapshot to disk