From a5244146bd76b6618cfeb3a5b1e554dbe3655423 Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Thu, 11 Apr 2024 13:16:40 +0200 Subject: [PATCH] Move `StreamAPI` to `ImmutableDB` --- ouroboros-consensus/ouroboros-consensus.cabal | 2 +- .../Consensus/Storage/ChainDB/Impl/LgrDB.hs | 35 +----- .../Consensus/Storage/ImmutableDB/Stream.hs | 113 ++++++++++++++++++ .../Ouroboros/Consensus/Storage/LedgerDB.hs | 6 - .../Consensus/Storage/LedgerDB/Init.hs | 22 ++-- .../Consensus/Storage/LedgerDB/Stream.hs | 68 ----------- .../Test/Ouroboros/Storage/LedgerDB/OnDisk.hs | 15 +-- 7 files changed, 134 insertions(+), 127 deletions(-) create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ImmutableDB/Stream.hs delete mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Stream.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index be08df3cd8..a869fcec9a 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -218,13 +218,13 @@ library Ouroboros.Consensus.Storage.ImmutableDB.Impl.Types Ouroboros.Consensus.Storage.ImmutableDB.Impl.Util Ouroboros.Consensus.Storage.ImmutableDB.Impl.Validation + Ouroboros.Consensus.Storage.ImmutableDB.Stream Ouroboros.Consensus.Storage.LedgerDB Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy Ouroboros.Consensus.Storage.LedgerDB.Init Ouroboros.Consensus.Storage.LedgerDB.LedgerDB Ouroboros.Consensus.Storage.LedgerDB.Query Ouroboros.Consensus.Storage.LedgerDB.Snapshots - Ouroboros.Consensus.Storage.LedgerDB.Stream Ouroboros.Consensus.Storage.LedgerDB.Update Ouroboros.Consensus.Storage.Serialisation Ouroboros.Consensus.Storage.VolatileDB diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs index 4463c3789b..9c897020ed 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/LgrDB.hs @@ -69,15 +69,13 @@ import Ouroboros.Consensus.Storage.ChainDB.API (ChainDbFailure (..)) import Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache (BlockCache) import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache as BlockCache -import Ouroboros.Consensus.Storage.Common import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB) -import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB +import Ouroboros.Consensus.Storage.ImmutableDB.Stream import Ouroboros.Consensus.Storage.LedgerDB (LedgerDB') import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB import Ouroboros.Consensus.Storage.Serialisation import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.IOLike -import Ouroboros.Consensus.Util.ResourceRegistry import System.FS.API (SomeHasFS (..), createDirectoryIfMissing) import System.FS.API.Types (FsError, mkFsPath) @@ -374,37 +372,6 @@ validate LgrDB{..} ledgerDB blockCache numRollbacks trace = \hdrs -> do -> Set (RealPoint blk) -> Set (RealPoint blk) addPoints hs set = foldl' (flip Set.insert) set hs -{------------------------------------------------------------------------------- - Stream API to the immutable DB --------------------------------------------------------------------------------} - -streamAPI :: - forall m blk. - (IOLike m, HasHeader blk) - => ImmutableDB m blk -> LedgerDB.StreamAPI m blk -streamAPI immutableDB = LedgerDB.StreamAPI streamAfter - where - streamAfter :: HasCallStack - => Point blk - -> (Either (RealPoint blk) (m (LedgerDB.NextBlock blk)) -> m a) - -> m a - streamAfter tip k = withRegistry $ \registry -> do - eItr <- - ImmutableDB.streamAfterPoint - immutableDB - registry - GetBlock - tip - case eItr of - -- Snapshot is too recent - Left err -> k $ Left $ ImmutableDB.missingBlockPoint err - Right itr -> k $ Right $ streamUsing itr - - streamUsing :: ImmutableDB.Iterator m blk blk -> m (LedgerDB.NextBlock blk) - streamUsing itr = ImmutableDB.iteratorNext itr >>= \case - ImmutableDB.IteratorExhausted -> return $ LedgerDB.NoMoreBlocks - ImmutableDB.IteratorResult blk -> return $ LedgerDB.NextBlock blk - {------------------------------------------------------------------------------- Previously applied blocks -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ImmutableDB/Stream.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ImmutableDB/Stream.hs new file mode 100644 index 0000000000..f290033acb --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ImmutableDB/Stream.hs @@ -0,0 +1,113 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Consensus.Storage.ImmutableDB.Stream ( + NextItem (..) + , StreamAPI (..) + , streamAPI + , streamAPI' + , streamAll + ) where + +import Control.Monad.Except +import GHC.Stack +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Storage.Common +import Ouroboros.Consensus.Storage.ImmutableDB hiding (streamAll) +import qualified Ouroboros.Consensus.Storage.ImmutableDB.API as ImmutableDB +import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.ResourceRegistry + +{------------------------------------------------------------------------------- + Abstraction over the streaming API provided by the Chain DB +-------------------------------------------------------------------------------} + +-- | Next item returned during streaming +data NextItem blk = NoMoreItems | NextItem blk + +-- | Stream items from the immutable DB +-- +-- When we initialize the ledger DB, we try to find a snapshot close to the +-- tip of the immutable DB, and then stream blocks from the immutable DB to its +-- tip to bring the ledger up to date with the tip of the immutable DB. +-- +-- In CPS form to enable the use of 'withXYZ' style iterator init functions. +newtype StreamAPI m blk a = StreamAPI { + -- | Start streaming after the specified block + streamAfter :: forall b. HasCallStack + => Point blk + -- Reference to the block corresponding to the snapshot we found + -- (or 'GenesisPoint' if we didn't find any) + + -> (Either (RealPoint blk) (m (NextItem a)) -> m b) + -- Get the next item + -- + -- Should be @Left pt@ if the snapshot we found is more recent than the + -- tip of the immutable DB. Since we only store snapshots to disk for + -- blocks in the immutable DB, this can only happen if the immutable DB + -- got truncated due to disk corruption. The returned @pt@ is a + -- 'RealPoint', not a 'Point', since it must always be possible to + -- stream after genesis. + -> m b + } + +-- | Stream all items +streamAll :: + forall m blk e b a. (Monad m, HasCallStack) + => StreamAPI m blk b + -> Point blk -- ^ Starting point for streaming + -> (RealPoint blk -> e) -- ^ Error when tip not found + -> a -- ^ Starting point when tip /is/ found + -> (b -> a -> m a) -- ^ Update function for each item + -> ExceptT e m a +streamAll StreamAPI{..} tip notFound e f = ExceptT $ + streamAfter tip $ \case + Left tip' -> return $ Left (notFound tip') + + Right getNext -> do + let go :: a -> m a + go a = do mNext <- getNext + case mNext of + NoMoreItems -> return a + NextItem b -> go =<< f b a + Right <$> go e + + +streamAPI :: + (IOLike m, HasHeader blk) + => ImmutableDB m blk -> StreamAPI m blk blk +streamAPI = streamAPI' (return . NextItem) GetBlock + +streamAPI' :: + forall m blk a. + (IOLike m, HasHeader blk) + => (a -> m (NextItem a)) -- ^ Stop condition + -> BlockComponent blk a + -> ImmutableDB m blk + -> StreamAPI m blk a +streamAPI' shouldStop blockComponent immutableDB = StreamAPI streamAfter + where + streamAfter :: Point blk + -> (Either (RealPoint blk) (m (NextItem a)) -> m b) + -> m b + streamAfter tip k = withRegistry $ \registry -> do + eItr <- + ImmutableDB.streamAfterPoint + immutableDB + registry + blockComponent + tip + case eItr of + -- Snapshot is too recent + Left err -> k $ Left $ ImmutableDB.missingBlockPoint err + Right itr -> k $ Right $ streamUsing itr + + streamUsing :: ImmutableDB.Iterator m blk a + -> m (NextItem a) + streamUsing itr = do + itrResult <- ImmutableDB.iteratorNext itr + case itrResult of + ImmutableDB.IteratorExhausted -> return NoMoreItems + ImmutableDB.IteratorResult b -> shouldStop b diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs index 104a412a0f..7e970703d9 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs @@ -128,10 +128,6 @@ module Ouroboros.Consensus.Storage.LedgerDB ( , PushStart (..) , Pushing (..) , UpdateLedgerDbTraceEvent (..) - -- * Streaming - , NextBlock (..) - , StreamAPI (..) - , streamAll -- * Snapshots , DiskSnapshot (..) -- ** Read from disk @@ -180,8 +176,6 @@ import Ouroboros.Consensus.Storage.LedgerDB.Snapshots deleteSnapshot, diskSnapshotIsTemporary, encodeSnapshot, listSnapshots, readSnapshot, snapshotToFileName, snapshotToPath, takeSnapshot, trimSnapshots, writeSnapshot) -import Ouroboros.Consensus.Storage.LedgerDB.Stream (NextBlock (..), - StreamAPI (..), streamAll) import Ouroboros.Consensus.Storage.LedgerDB.Update (AnnLedgerError (..), AnnLedgerError', Ap (..), ExceededRollback (..), PushGoal (..), PushStart (..), diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Init.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Init.hs index 273289ca5a..ba6d0f08e4 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Init.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Init.hs @@ -32,10 +32,10 @@ import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Ledger.Inspect import Ouroboros.Consensus.Ledger.SupportsProtocol +import Ouroboros.Consensus.Storage.ImmutableDB.Stream import Ouroboros.Consensus.Storage.LedgerDB.LedgerDB import Ouroboros.Consensus.Storage.LedgerDB.Query import Ouroboros.Consensus.Storage.LedgerDB.Snapshots -import Ouroboros.Consensus.Storage.LedgerDB.Stream import Ouroboros.Consensus.Storage.LedgerDB.Update import Ouroboros.Consensus.Util.IOLike import Ouroboros.Network.Block (Point (Point)) @@ -103,7 +103,7 @@ initLedgerDB :: -> (forall s. Decoder s (HeaderHash blk)) -> LedgerDbCfg (ExtLedgerState blk) -> m (ExtLedgerState blk) -- ^ Genesis ledger state - -> StreamAPI m blk + -> StreamAPI m blk blk -> m (InitLog blk, LedgerDB' blk, Word64) initLedgerDB replayTracer tracer @@ -112,7 +112,7 @@ initLedgerDB replayTracer decHash cfg getGenesisLedger - streamAPI = do + stream = do snapshots <- listSnapshots hasFS tryNewestFirst id snapshots where @@ -124,7 +124,7 @@ initLedgerDB replayTracer traceWith replayTracer ReplayFromGenesis initDb <- ledgerDbWithAnchor <$> getGenesisLedger let replayTracer' = decorateReplayTracerWithStart (Point Origin) replayTracer - ml <- runExceptT $ initStartingWith replayTracer' cfg streamAPI initDb + ml <- runExceptT $ initStartingWith replayTracer' cfg stream initDb case ml of Left _ -> error "invariant violation: invalid current chain" Right (l, replayed) -> return (acc InitFromGenesis, l, replayed) @@ -136,7 +136,7 @@ initLedgerDB replayTracer decLedger decHash cfg - streamAPI + stream s case ml of Left err -> do @@ -170,10 +170,10 @@ initFromSnapshot :: -> (forall s. Decoder s (ExtLedgerState blk)) -> (forall s. Decoder s (HeaderHash blk)) -> LedgerDbCfg (ExtLedgerState blk) - -> StreamAPI m blk + -> StreamAPI m blk blk -> DiskSnapshot -> ExceptT (SnapshotFailure blk) m (RealPoint blk, LedgerDB' blk, Word64) -initFromSnapshot tracer hasFS decLedger decHash cfg streamAPI ss = do +initFromSnapshot tracer hasFS decLedger decHash cfg stream ss = do initSS <- withExceptT InitFailureRead $ readSnapshot hasFS decLedger decHash ss let initialPoint = withOrigin (Point Origin) annTipPoint $ headerStateTip $ headerState $ initSS @@ -186,7 +186,7 @@ initFromSnapshot tracer hasFS decLedger decHash cfg streamAPI ss = do initStartingWith tracer' cfg - streamAPI + stream (ledgerDbWithAnchor initSS) return (tip, initDB, replayed) @@ -200,11 +200,11 @@ initStartingWith :: ) => Tracer m (ReplayStart blk -> ReplayGoal blk -> TraceReplayEvent blk) -> LedgerDbCfg (ExtLedgerState blk) - -> StreamAPI m blk + -> StreamAPI m blk blk -> LedgerDB' blk -> ExceptT (SnapshotFailure blk) m (LedgerDB' blk, Word64) -initStartingWith tracer cfg streamAPI initDb = do - streamAll streamAPI (castPoint (ledgerDbTip initDb)) +initStartingWith tracer cfg stream initDb = do + streamAll stream (castPoint (ledgerDbTip initDb)) InitFailureTooRecent (initDb, 0) push diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Stream.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Stream.hs deleted file mode 100644 index c40a95da03..0000000000 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Stream.hs +++ /dev/null @@ -1,68 +0,0 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} - -module Ouroboros.Consensus.Storage.LedgerDB.Stream ( - NextBlock (..) - , StreamAPI (..) - , streamAll - ) where - -import Control.Monad.Except -import GHC.Stack -import Ouroboros.Consensus.Block - -{------------------------------------------------------------------------------- - Abstraction over the streaming API provided by the Chain DB --------------------------------------------------------------------------------} - --- | Next block returned during streaming -data NextBlock blk = NoMoreBlocks | NextBlock blk - --- | Stream blocks from the immutable DB --- --- When we initialize the ledger DB, we try to find a snapshot close to the --- tip of the immutable DB, and then stream blocks from the immutable DB to its --- tip to bring the ledger up to date with the tip of the immutable DB. --- --- In CPS form to enable the use of 'withXYZ' style iterator init functions. -data StreamAPI m blk = StreamAPI { - -- | Start streaming after the specified block - streamAfter :: forall a. HasCallStack - => Point blk - -- Reference to the block corresponding to the snapshot we found - -- (or 'GenesisPoint' if we didn't find any) - - -> (Either (RealPoint blk) (m (NextBlock blk)) -> m a) - -- Get the next block (by value) - -- - -- Should be @Left pt@ if the snapshot we found is more recent than the - -- tip of the immutable DB. Since we only store snapshots to disk for - -- blocks in the immutable DB, this can only happen if the immutable DB - -- got truncated due to disk corruption. The returned @pt@ is a - -- 'RealPoint', not a 'Point', since it must always be possible to - -- stream after genesis. - -> m a - } - --- | Stream all blocks -streamAll :: - forall m blk e a. (Monad m, HasCallStack) - => StreamAPI m blk - -> Point blk -- ^ Starting point for streaming - -> (RealPoint blk -> e) -- ^ Error when tip not found - -> a -- ^ Starting point when tip /is/ found - -> (blk -> a -> m a) -- ^ Update function for each block - -> ExceptT e m a -streamAll StreamAPI{..} tip notFound e f = ExceptT $ - streamAfter tip $ \case - Left tip' -> return $ Left (notFound tip') - - Right getNext -> do - let go :: a -> m a - go a = do mNext <- getNext - case mNext of - NoMoreBlocks -> return a - NextBlock b -> go =<< f b a - Right <$> go e diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs index 8c32f69a0b..ea472ad665 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/OnDisk.hs @@ -68,6 +68,7 @@ import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended +import Ouroboros.Consensus.Storage.ImmutableDB.Stream import Ouroboros.Consensus.Storage.LedgerDB import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.IOLike @@ -665,12 +666,12 @@ initStandaloneDB dbEnv@DbEnv{..} = do , "or LedgerDB not re-initialized after chain truncation" ] -dbStreamAPI :: forall m. IOLike m => StandaloneDB m -> StreamAPI m TestBlock +dbStreamAPI :: forall m. IOLike m => StandaloneDB m -> StreamAPI m TestBlock TestBlock dbStreamAPI DB{..} = StreamAPI {..} where streamAfter :: Point TestBlock - -> (Either (RealPoint TestBlock) (m (NextBlock TestBlock)) -> m a) + -> (Either (RealPoint TestBlock) (m (NextItem TestBlock)) -> m a) -> m a streamAfter tip k = do pts <- atomically $ reverse . fst <$> readTVar dbState @@ -693,7 +694,7 @@ dbStreamAPI DB{..} = StreamAPI {..} blocksToStream Origin = id blocksToStream (NotOrigin r) = tail . dropWhile (/= r) - getNext :: StrictTVar m [RealPoint TestBlock] -> m (NextBlock TestBlock) + getNext :: StrictTVar m [RealPoint TestBlock] -> m (NextItem TestBlock) getNext toStream = do mr <- atomically $ do rs <- readTVar toStream @@ -701,10 +702,10 @@ dbStreamAPI DB{..} = StreamAPI {..} [] -> return Nothing r:rs' -> writeTVar toStream rs' >> return (Just r) case mr of - Nothing -> return NoMoreBlocks + Nothing -> return NoMoreItems Just r -> do mb <- atomically $ Map.lookup r <$> readTVar dbBlocks case mb of - Just b -> return $ NextBlock b + Just b -> return $ NextItem b Nothing -> error blockNotFound blockNotFound :: String @@ -721,7 +722,7 @@ runDB standalone@DB{..} cmd = case dbEnv of DbEnv{dbHasFS} -> Resp <$> go dbHasFS cmd where - streamAPI = dbStreamAPI standalone + stream = dbStreamAPI standalone annLedgerErr' :: AnnLedgerError (ExtLedgerState TestBlock) TestBlock @@ -771,7 +772,7 @@ runDB standalone@DB{..} cmd = S.decode dbLedgerDbCfg (return (testInitExtLedgerWithState initialTestLedgerState)) - streamAPI + stream atomically $ modifyTVar dbState (\(rs, _) -> (rs, db)) return $ Restored (fromInitLog initLog, ledgerDbCurrent db) go hasFS (Corrupt c ss) =