Skip to content

Commit

Permalink
ImmutableDB: truncate blocks from the future
Browse files Browse the repository at this point in the history
During validation, truncate any blocks >= the current (wall clock) slot.
  • Loading branch information
mrBliss committed Feb 4, 2020
1 parent 46e700b commit f7d9d8c
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 69 deletions.
Expand Up @@ -177,6 +177,7 @@ fromChainDbArgs ChainDbArgs{..} = (
, immAddHdrEnv = cdbAddHdrEnv
, immCacheConfig = cdbImmDbCacheConfig
, immRegistry = cdbRegistry
, immBlockchainTime = cdbBlockchainTime
}
, VolDB.VolDbArgs {
volHasFS = cdbHasFSVolDb
Expand Down
Expand Up @@ -78,6 +78,7 @@ import Ouroboros.Network.Block (pattern BlockPoint, ChainHash (..),
import Ouroboros.Network.Point (WithOrigin (..))

import Ouroboros.Consensus.Block (GetHeader (..), IsEBB (..))
import Ouroboros.Consensus.BlockchainTime (BlockchainTime)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
Expand Down Expand Up @@ -157,6 +158,7 @@ data ImmDbArgs m blk = forall h. ImmDbArgs {
, immTracer :: Tracer m (TraceEvent blk)
, immCacheConfig :: Index.CacheConfig
, immRegistry :: ResourceRegistry m
, immBlockchainTime :: BlockchainTime m
}

-- | Default arguments when using the 'IO' monad
Expand All @@ -175,6 +177,7 @@ data ImmDbArgs m blk = forall h. ImmDbArgs {
-- * 'immCheckIntegrity'
-- * 'immAddHdrEnv'
-- * 'immRegistry'
-- * 'immBlockchainTime'
defaultArgs :: FilePath -> ImmDbArgs IO blk
defaultArgs fp = ImmDbArgs{
immErr = EH.exceptions
Expand All @@ -194,6 +197,7 @@ defaultArgs fp = ImmDbArgs{
, immCheckIntegrity = error "no default for immCheckIntegrity"
, immAddHdrEnv = error "no default for immAddHdrEnv"
, immRegistry = error "no default for immRegistry"
, immBlockchainTime = error "no default for immBlockchainTime"
}
where
-- Cache 250 past epochs by default. This will take roughly 250 MB of RAM.
Expand Down Expand Up @@ -228,6 +232,7 @@ openDB ImmDbArgs{..} = do
parser
immTracer
immCacheConfig
immBlockchainTime
return ImmDB
{ immDB = immDB
, decHeader = immDecodeHeader
Expand Down
15 changes: 12 additions & 3 deletions ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/Impl.hs
Expand Up @@ -110,6 +110,8 @@ import Control.Monad.Class.MonadThrow (bracket, bracketOnError,
finally)

import Ouroboros.Consensus.Block (IsEBB (..))
import Ouroboros.Consensus.BlockchainTime (BlockchainTime,
getCurrentSlot)
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
Expand Down Expand Up @@ -175,14 +177,15 @@ withDB
-> EpochFileParser e m (Secondary.Entry hash) hash
-> Tracer m (TraceEvent e hash)
-> Index.CacheConfig
-> BlockchainTime m
-> (ImmutableDB hash m -> m a)
-> m a
withDB registry hasFS err epochInfo hashInfo valPol parser tracer cacheConfig =
withDB registry hasFS err epochInfo hashInfo valPol parser tracer cacheConfig btime =
bracket open closeDB
where
open = fst <$>
openDBInternal registry hasFS err epochInfo hashInfo valPol parser tracer
cacheConfig
cacheConfig btime

{------------------------------------------------------------------------------
Exposed internals and/or extra functionality for testing purposes
Expand Down Expand Up @@ -239,9 +242,11 @@ openDBInternal
-> EpochFileParser e m (Secondary.Entry hash) hash
-> Tracer m (TraceEvent e hash)
-> Index.CacheConfig
-> BlockchainTime m
-> m (ImmutableDB hash m, Internal hash m)
openDBInternal registry hasFS@HasFS{..} err epochInfo hashInfo valPol parser
tracer cacheConfig = do
tracer cacheConfig btime = do
currentSlot <- atomically $ getCurrentSlot btime
let validateEnv = ValidateEnv
{ hasFS
, err
Expand All @@ -251,6 +256,7 @@ openDBInternal registry hasFS@HasFS{..} err epochInfo hashInfo valPol parser
, tracer
, registry
, cacheConfig
, currentSlot
}
!ost <- validateAndReopen validateEnv valPol

Expand All @@ -266,6 +272,7 @@ openDBInternal registry hasFS@HasFS{..} err epochInfo hashInfo valPol parser
, _dbTracer = tracer
, _dbRegistry = registry
, _dbCacheConfig = cacheConfig
, _dbBlockchainTime = btime
}
db = mkDBRecord dbEnv
internal = Internal
Expand Down Expand Up @@ -312,6 +319,7 @@ reopenImpl ImmutableDBEnv {..} valPol = bracketOnError

-- Closed, so we can try to reopen
DbClosed -> do
currentSlot <- atomically $ getCurrentSlot _dbBlockchainTime
let validateEnv = ValidateEnv
{ hasFS = _dbHasFS
, err = _dbErr
Expand All @@ -321,6 +329,7 @@ reopenImpl ImmutableDBEnv {..} valPol = bracketOnError
, tracer = _dbTracer
, registry = _dbRegistry
, cacheConfig = _dbCacheConfig
, currentSlot = currentSlot
}
ost <- validateAndReopen validateEnv valPol
putMVar _dbInternalState (DbOpen ost)
Expand Down
Expand Up @@ -31,6 +31,7 @@ import Cardano.Prelude (NoUnexpectedThunks (..))

import Control.Monad.Class.MonadThrow hiding (onException)

import Ouroboros.Consensus.BlockchainTime (BlockchainTime)
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
Expand Down Expand Up @@ -67,6 +68,7 @@ data ImmutableDBEnv m hash = forall h e. ImmutableDBEnv
, _dbTracer :: !(Tracer m (TraceEvent e hash))
, _dbRegistry :: !(ResourceRegistry m)
, _dbCacheConfig :: !Index.CacheConfig
, _dbBlockchainTime :: !(BlockchainTime m)
}

data InternalState m hash h =
Expand Down
Expand Up @@ -64,6 +64,7 @@ data ValidateEnv m hash h e = ValidateEnv
, tracer :: !(Tracer m (TraceEvent e hash))
, registry :: !(ResourceRegistry m)
, cacheConfig :: !Index.CacheConfig
, currentSlot :: !SlotNo
}

-- | Perform validation as per the 'ValidationPolicy' using 'validate' and
Expand Down Expand Up @@ -296,7 +297,7 @@ validateEpoch ValidateEnv{..} shouldBeFinalised epoch mbPrevHash = do
-- expensive integrity check of a block.
let expectedChecksums = map Secondary.checksum entriesFromSecondaryIndex
(entriesWithPrevHashes, mbErr) <- lift $
runEpochFileParser parser epochFile expectedChecksums $ \stream ->
runEpochFileParser parser epochFile currentSlot expectedChecksums $ \stream ->
(\(es :> mbErr) -> (es, mbErr)) <$> S.toList stream

-- Check whether the first block of this epoch fits onto the last block of
Expand Down
155 changes: 111 additions & 44 deletions ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/Parser.hs
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
-- | The ImmutableDB doesn't care about the serialisation format, but in
-- practice we use CBOR. If we were to change the serialisation format, we
-- would have to write a new 'EpochFileParser' implementation, but the rest of
Expand All @@ -25,7 +26,7 @@ import qualified Streaming as S
import qualified Streaming.Prelude as S

import Ouroboros.Network.Block (ChainHash (..), HasHeader (..),
HeaderHash)
HeaderHash, SlotNo)
import Ouroboros.Network.Point (WithOrigin (..))

import qualified Ouroboros.Consensus.Util.CBOR as Util.CBOR
Expand Down Expand Up @@ -55,6 +56,12 @@ data EpochFileError hash =
-- 'BlockOrEBB' number returned 'False', indicating that the block got
-- corrupted.
| EpochErrCorrupt hash BlockOrEBB

-- | The block has a slot number equal to or greater than the current slot
-- (wall clock). This block is in the future, so we must truncate it.
| EpochErrFutureBlock
SlotNo -- ^ Current slot (wall clock)
SlotNo -- ^ Slot number of the block
deriving (Eq, Show)

epochFileParser'
Expand All @@ -75,16 +82,36 @@ epochFileParser'
hash
epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
getBinaryInfo isNotCorrupt =
EpochFileParser $ \fsPath expectedChecksums k ->
EpochFileParser $ \fsPath currentSlotNo expectedChecksums k ->
Util.CBOR.withStreamIncrementalOffsets hasFS decoder fsPath
(k . checkIfHashesLineUp . checkEntries expectedChecksums)
( k
. checkIfHashesLineUp
. checkEntries expectedChecksums
. checkFutureSlot currentSlotNo
. fmap (fmap (first EpochErrRead))
)
where
decoder :: forall s. Decoder s (BL.ByteString -> (blk, CRC))
decoder = decodeBlock <&> \mkBlk bs ->
let !blk = mkBlk bs
!checksum = computeCRC bs
in (blk, checksum)

-- | Stop when a block has slot number >= the current slot, return
-- 'EpochErrFutureBlock'.
checkFutureSlot
:: SlotNo -- ^ Current slot (wall clock).
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
checkFutureSlot currentSlotNo = mapS $ \x@(offset, (_, (blk, _))) ->
if getSlotNo blk >= currentSlotNo
then Left $ Just (EpochErrFutureBlock currentSlotNo (getSlotNo blk), offset)
else Right x

-- | Go over the expected checksums and blocks in parallel. Stop with an
-- error when a block is corrupt. Yield correct entries along the way.
--
Expand All @@ -100,33 +127,36 @@ epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
-- ^ Expected checksums
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (Util.CBOR.ReadIncrementalErr, Word64))
(Maybe (EpochFileError hash, Word64))
-- ^ Input stream of blocks (with additional info)
-> Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
checkEntries = go
checkEntries = \expected -> mapAccumS expected handle
where
go expected blkAndInfos = S.lift (S.next blkAndInfos) >>= \case
-- No more blocks, but maybe some expected entries. We ignore them.
Left mbErr -> return $ first EpochErrRead <$> mbErr
-- A block
Right (blkAndInfo@(offset, (_, (blk, checksum))), blkAndInfos') ->
case expected of
expectedChecksum:expected'
| expectedChecksum == checksum
-> S.yield entryAndPrevHash *> go expected' blkAndInfos'
-- No expected entry or a mismatch
_ | isNotCorrupt blk
-- The (expensive) integrity check passed, so continue
-> S.yield entryAndPrevHash *> go (drop 1 expected) blkAndInfos'
| otherwise
-- The block is corrupt, stop
-> return $ Just (EpochErrCorrupt headerHash blockOrEBB, offset)
where
entryAndPrevHash@(actualEntry, _) =
entryForBlockAndInfo blkAndInfo
Secondary.Entry { headerHash, blockOrEBB } = actualEntry
handle
:: [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either (Maybe (EpochFileError hash, Word64))
( (Secondary.Entry hash, WithOrigin hash)
, [CRC]
)
handle expected blkAndInfo@(offset, (_, (blk, checksum))) =
case expected of
expectedChecksum:expected'
| expectedChecksum == checksum
-> Right (entryAndPrevHash, expected')
-- No expected entry or a mismatch
_ | isNotCorrupt blk
-- The (expensive) integrity check passed, so continue
-> Right (entryAndPrevHash, drop 1 expected)
| otherwise
-- The block is corrupt, stop
-> Left $ Just (EpochErrCorrupt headerHash blockOrEBB, offset)
where
entryAndPrevHash@(actualEntry, _) =
entryForBlockAndInfo blkAndInfo
Secondary.Entry { headerHash, blockOrEBB } = actualEntry

entryForBlockAndInfo
:: (Word64, (Word64, (blk, CRC)))
Expand Down Expand Up @@ -154,26 +184,19 @@ epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
-> Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
checkIfHashesLineUp = \input -> S.lift (S.next input) >>= \case
Left mbErr ->
return mbErr
Right ((entry, prevHash), input') ->
S.yield (entry, prevHash) *>
go (At (Secondary.headerHash entry)) input'
checkIfHashesLineUp = mapAccumS0 checkFirst checkNext
where
-- Loop invariant: the @hashOfPrevBlock@ is the hash of the most
-- recently checked block.
go hashOfPrevBlock input = S.lift (S.next input) >>= \case
Left mbErr
-> return mbErr
Right ((entry, prevHash), input')
| prevHash == hashOfPrevBlock
-> S.yield (entry, prevHash) *>
go (At (Secondary.headerHash entry)) input'
| otherwise
-> let err = EpochErrHashMismatch hashOfPrevBlock prevHash
offset = Secondary.unBlockOffset $ Secondary.blockOffset entry
in return $ Just (err, offset)
-- We pass the hash of the previous block around as the state (@s@).
checkFirst x@(entry, _) = Right (x, Secondary.headerHash entry)

checkNext hashOfPrevBlock x@(entry, prevHash)
| prevHash == At hashOfPrevBlock
= Right (x, Secondary.headerHash entry)
| otherwise
= Left (Just (err, offset))
where
err = EpochErrHashMismatch (At hashOfPrevBlock) prevHash
offset = Secondary.unBlockOffset $ Secondary.blockOffset entry

-- | A version of 'epochFileParser'' for blocks that implement 'HasHeader'.
epochFileParser
Expand All @@ -195,3 +218,47 @@ epochFileParser =
convertPrevHash :: ChainHash blk -> WithOrigin (HeaderHash blk)
convertPrevHash GenesisHash = Origin
convertPrevHash (BlockHash h) = At h

{-------------------------------------------------------------------------------
Streaming utilities
-------------------------------------------------------------------------------}

-- | Thread some state through a 'Stream'. An early return is possible by
-- returning 'Left'.
mapAccumS
:: Monad m
=> s -- ^ Initial state
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS st0 handle = go st0
where
go st input = S.lift (S.next input) >>= \case
Left r -> return r
Right (a, input') -> case handle st a of
Left r -> return r
Right (b, st') -> S.yield b *> go st' input'

-- | Variant of 'mapAccumS' that calls the first function argument on the
-- first element in the stream to construct the initial state. For all
-- elements in the stream after the first one, the second function argument is
-- used.
mapAccumS0
:: forall m a b r s. Monad m
=> (a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 handleFirst handleNext = mapAccumS Nothing handle
where
handle :: Maybe s -> a -> Either r (b, Maybe s)
handle mbSt = fmap (fmap Just) . maybe handleFirst handleNext mbSt

-- | Map over elements of a stream, allowing an early return by returning
-- 'Left'.
mapS
:: Monad m
=> (a -> Either r b)
-> Stream (Of a) m r
-> Stream (Of b) m r
mapS handle = mapAccumS () (\() a -> (, ()) <$> handle a)
14 changes: 8 additions & 6 deletions ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/Types.hs
Expand Up @@ -107,13 +107,15 @@ newtype EpochFileParser e m entry hash = EpochFileParser
{ runEpochFileParser
:: forall r.
FsPath
-> [CRC]
-- The expected checksums are given as input. This list can be empty
-- when the secondary index file is missing. If the expected
-- checksum matches the actual checksum, we can avoid the expensive
-- integrity check of the block.
-> SlotNo
-- Current slot (wall clock)
-> [CRC]
-- The expected checksums are given as input. This list can be empty
-- when the secondary index file is missing. If the expected checksum
-- matches the actual checksum, we can avoid the expensive integrity
-- check of the block.
-> (Stream (Of (entry, WithOrigin hash)) m (Maybe (e, Word64)) -> m r)
-- Continuation to ensure the file is closed
-- Continuation to ensure the file is closed
-> m r
}

Expand Down

0 comments on commit f7d9d8c

Please sign in to comment.