Skip to content

Commit

Permalink
Move StreamAPI to ImmutableDB
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Apr 16, 2024
1 parent 9aafb24 commit a524414
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 127 deletions.
2 changes: 1 addition & 1 deletion ouroboros-consensus/ouroboros-consensus.cabal
Expand Up @@ -218,13 +218,13 @@ library
Ouroboros.Consensus.Storage.ImmutableDB.Impl.Types
Ouroboros.Consensus.Storage.ImmutableDB.Impl.Util
Ouroboros.Consensus.Storage.ImmutableDB.Impl.Validation
Ouroboros.Consensus.Storage.ImmutableDB.Stream
Ouroboros.Consensus.Storage.LedgerDB
Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy
Ouroboros.Consensus.Storage.LedgerDB.Init
Ouroboros.Consensus.Storage.LedgerDB.LedgerDB
Ouroboros.Consensus.Storage.LedgerDB.Query
Ouroboros.Consensus.Storage.LedgerDB.Snapshots
Ouroboros.Consensus.Storage.LedgerDB.Stream
Ouroboros.Consensus.Storage.LedgerDB.Update
Ouroboros.Consensus.Storage.Serialisation
Ouroboros.Consensus.Storage.VolatileDB
Expand Down
Expand Up @@ -69,15 +69,13 @@ import Ouroboros.Consensus.Storage.ChainDB.API (ChainDbFailure (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache
(BlockCache)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache as BlockCache
import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import Ouroboros.Consensus.Storage.ImmutableDB.Stream
import Ouroboros.Consensus.Storage.LedgerDB (LedgerDB')
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
import Ouroboros.Consensus.Storage.Serialisation
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import System.FS.API (SomeHasFS (..), createDirectoryIfMissing)
import System.FS.API.Types (FsError, mkFsPath)

Expand Down Expand Up @@ -374,37 +372,6 @@ validate LgrDB{..} ledgerDB blockCache numRollbacks trace = \hdrs -> do
-> Set (RealPoint blk) -> Set (RealPoint blk)
addPoints hs set = foldl' (flip Set.insert) set hs

{-------------------------------------------------------------------------------
Stream API to the immutable DB
-------------------------------------------------------------------------------}

streamAPI ::
forall m blk.
(IOLike m, HasHeader blk)
=> ImmutableDB m blk -> LedgerDB.StreamAPI m blk
streamAPI immutableDB = LedgerDB.StreamAPI streamAfter
where
streamAfter :: HasCallStack
=> Point blk
-> (Either (RealPoint blk) (m (LedgerDB.NextBlock blk)) -> m a)
-> m a
streamAfter tip k = withRegistry $ \registry -> do
eItr <-
ImmutableDB.streamAfterPoint
immutableDB
registry
GetBlock
tip
case eItr of
-- Snapshot is too recent
Left err -> k $ Left $ ImmutableDB.missingBlockPoint err
Right itr -> k $ Right $ streamUsing itr

streamUsing :: ImmutableDB.Iterator m blk blk -> m (LedgerDB.NextBlock blk)
streamUsing itr = ImmutableDB.iteratorNext itr >>= \case
ImmutableDB.IteratorExhausted -> return $ LedgerDB.NoMoreBlocks
ImmutableDB.IteratorResult blk -> return $ LedgerDB.NextBlock blk

{-------------------------------------------------------------------------------
Previously applied blocks
-------------------------------------------------------------------------------}
Expand Down
@@ -0,0 +1,113 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.Storage.ImmutableDB.Stream (
NextItem (..)
, StreamAPI (..)
, streamAPI
, streamAPI'
, streamAll
) where

import Control.Monad.Except
import GHC.Stack
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.ImmutableDB hiding (streamAll)
import qualified Ouroboros.Consensus.Storage.ImmutableDB.API as ImmutableDB
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry

{-------------------------------------------------------------------------------
Abstraction over the streaming API provided by the Chain DB
-------------------------------------------------------------------------------}

-- | Next item returned during streaming
data NextItem blk = NoMoreItems | NextItem blk

-- | Stream items from the immutable DB
--
-- When we initialize the ledger DB, we try to find a snapshot close to the
-- tip of the immutable DB, and then stream blocks from the immutable DB to its
-- tip to bring the ledger up to date with the tip of the immutable DB.
--
-- In CPS form to enable the use of 'withXYZ' style iterator init functions.
newtype StreamAPI m blk a = StreamAPI {
-- | Start streaming after the specified block
streamAfter :: forall b. HasCallStack
=> Point blk
-- Reference to the block corresponding to the snapshot we found
-- (or 'GenesisPoint' if we didn't find any)

-> (Either (RealPoint blk) (m (NextItem a)) -> m b)
-- Get the next item
--
-- Should be @Left pt@ if the snapshot we found is more recent than the
-- tip of the immutable DB. Since we only store snapshots to disk for
-- blocks in the immutable DB, this can only happen if the immutable DB
-- got truncated due to disk corruption. The returned @pt@ is a
-- 'RealPoint', not a 'Point', since it must always be possible to
-- stream after genesis.
-> m b
}

-- | Stream all items
streamAll ::
forall m blk e b a. (Monad m, HasCallStack)
=> StreamAPI m blk b
-> Point blk -- ^ Starting point for streaming
-> (RealPoint blk -> e) -- ^ Error when tip not found
-> a -- ^ Starting point when tip /is/ found
-> (b -> a -> m a) -- ^ Update function for each item
-> ExceptT e m a
streamAll StreamAPI{..} tip notFound e f = ExceptT $
streamAfter tip $ \case
Left tip' -> return $ Left (notFound tip')

Right getNext -> do
let go :: a -> m a
go a = do mNext <- getNext
case mNext of
NoMoreItems -> return a
NextItem b -> go =<< f b a
Right <$> go e


streamAPI ::
(IOLike m, HasHeader blk)
=> ImmutableDB m blk -> StreamAPI m blk blk
streamAPI = streamAPI' (return . NextItem) GetBlock

streamAPI' ::
forall m blk a.
(IOLike m, HasHeader blk)
=> (a -> m (NextItem a)) -- ^ Stop condition
-> BlockComponent blk a
-> ImmutableDB m blk
-> StreamAPI m blk a
streamAPI' shouldStop blockComponent immutableDB = StreamAPI streamAfter
where
streamAfter :: Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> m b
streamAfter tip k = withRegistry $ \registry -> do
eItr <-
ImmutableDB.streamAfterPoint
immutableDB
registry
blockComponent
tip
case eItr of
-- Snapshot is too recent
Left err -> k $ Left $ ImmutableDB.missingBlockPoint err
Right itr -> k $ Right $ streamUsing itr

streamUsing :: ImmutableDB.Iterator m blk a
-> m (NextItem a)
streamUsing itr = do
itrResult <- ImmutableDB.iteratorNext itr
case itrResult of
ImmutableDB.IteratorExhausted -> return NoMoreItems
ImmutableDB.IteratorResult b -> shouldStop b
Expand Up @@ -128,10 +128,6 @@ module Ouroboros.Consensus.Storage.LedgerDB (
, PushStart (..)
, Pushing (..)
, UpdateLedgerDbTraceEvent (..)
-- * Streaming
, NextBlock (..)
, StreamAPI (..)
, streamAll
-- * Snapshots
, DiskSnapshot (..)
-- ** Read from disk
Expand Down Expand Up @@ -180,8 +176,6 @@ import Ouroboros.Consensus.Storage.LedgerDB.Snapshots
deleteSnapshot, diskSnapshotIsTemporary, encodeSnapshot,
listSnapshots, readSnapshot, snapshotToFileName,
snapshotToPath, takeSnapshot, trimSnapshots, writeSnapshot)
import Ouroboros.Consensus.Storage.LedgerDB.Stream (NextBlock (..),
StreamAPI (..), streamAll)
import Ouroboros.Consensus.Storage.LedgerDB.Update
(AnnLedgerError (..), AnnLedgerError', Ap (..),
ExceededRollback (..), PushGoal (..), PushStart (..),
Expand Down
Expand Up @@ -32,10 +32,10 @@ import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Storage.ImmutableDB.Stream
import Ouroboros.Consensus.Storage.LedgerDB.LedgerDB
import Ouroboros.Consensus.Storage.LedgerDB.Query
import Ouroboros.Consensus.Storage.LedgerDB.Snapshots
import Ouroboros.Consensus.Storage.LedgerDB.Stream
import Ouroboros.Consensus.Storage.LedgerDB.Update
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.Block (Point (Point))
Expand Down Expand Up @@ -103,7 +103,7 @@ initLedgerDB ::
-> (forall s. Decoder s (HeaderHash blk))
-> LedgerDbCfg (ExtLedgerState blk)
-> m (ExtLedgerState blk) -- ^ Genesis ledger state
-> StreamAPI m blk
-> StreamAPI m blk blk
-> m (InitLog blk, LedgerDB' blk, Word64)
initLedgerDB replayTracer
tracer
Expand All @@ -112,7 +112,7 @@ initLedgerDB replayTracer
decHash
cfg
getGenesisLedger
streamAPI = do
stream = do
snapshots <- listSnapshots hasFS
tryNewestFirst id snapshots
where
Expand All @@ -124,7 +124,7 @@ initLedgerDB replayTracer
traceWith replayTracer ReplayFromGenesis
initDb <- ledgerDbWithAnchor <$> getGenesisLedger
let replayTracer' = decorateReplayTracerWithStart (Point Origin) replayTracer
ml <- runExceptT $ initStartingWith replayTracer' cfg streamAPI initDb
ml <- runExceptT $ initStartingWith replayTracer' cfg stream initDb
case ml of
Left _ -> error "invariant violation: invalid current chain"
Right (l, replayed) -> return (acc InitFromGenesis, l, replayed)
Expand All @@ -136,7 +136,7 @@ initLedgerDB replayTracer
decLedger
decHash
cfg
streamAPI
stream
s
case ml of
Left err -> do
Expand Down Expand Up @@ -170,10 +170,10 @@ initFromSnapshot ::
-> (forall s. Decoder s (ExtLedgerState blk))
-> (forall s. Decoder s (HeaderHash blk))
-> LedgerDbCfg (ExtLedgerState blk)
-> StreamAPI m blk
-> StreamAPI m blk blk
-> DiskSnapshot
-> ExceptT (SnapshotFailure blk) m (RealPoint blk, LedgerDB' blk, Word64)
initFromSnapshot tracer hasFS decLedger decHash cfg streamAPI ss = do
initFromSnapshot tracer hasFS decLedger decHash cfg stream ss = do
initSS <- withExceptT InitFailureRead $
readSnapshot hasFS decLedger decHash ss
let initialPoint = withOrigin (Point Origin) annTipPoint $ headerStateTip $ headerState $ initSS
Expand All @@ -186,7 +186,7 @@ initFromSnapshot tracer hasFS decLedger decHash cfg streamAPI ss = do
initStartingWith
tracer'
cfg
streamAPI
stream
(ledgerDbWithAnchor initSS)
return (tip, initDB, replayed)

Expand All @@ -200,11 +200,11 @@ initStartingWith ::
)
=> Tracer m (ReplayStart blk -> ReplayGoal blk -> TraceReplayEvent blk)
-> LedgerDbCfg (ExtLedgerState blk)
-> StreamAPI m blk
-> StreamAPI m blk blk
-> LedgerDB' blk
-> ExceptT (SnapshotFailure blk) m (LedgerDB' blk, Word64)
initStartingWith tracer cfg streamAPI initDb = do
streamAll streamAPI (castPoint (ledgerDbTip initDb))
initStartingWith tracer cfg stream initDb = do
streamAll stream (castPoint (ledgerDbTip initDb))
InitFailureTooRecent
(initDb, 0)
push
Expand Down

This file was deleted.

0 comments on commit a524414

Please sign in to comment.