Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ImmutableDB: truncate blocks from the future #1563

Merged
merged 4 commits into from Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -7,6 +7,8 @@
module Ouroboros.Consensus.BlockchainTime.Mock (
-- * Fixed time
fixedBlockchainTime
-- * Settable time
, settableBlockchainTime
-- * Testing time
, NumSlots(..)
, TestClock(..)
Expand Down Expand Up @@ -43,6 +45,19 @@ fixedBlockchainTime slot = BlockchainTime {
, onSlotChange_ = const (return (return ()))
}

{-------------------------------------------------------------------------------
Settable time
-------------------------------------------------------------------------------}

-- | The current slot can be changed by modifying the given 'StrictTVar'.
--
-- 'onSlotChange_' is not implemented and will return an 'error'.
settableBlockchainTime :: MonadSTM m => StrictTVar m SlotNo -> BlockchainTime m
settableBlockchainTime varCurSlot = BlockchainTime {
getCurrentSlot = readTVar varCurSlot
, onSlotChange_ = error "unimplemented onSlotChange_"
}

{-------------------------------------------------------------------------------
Testing time
-------------------------------------------------------------------------------}
Expand Down
Expand Up @@ -177,6 +177,7 @@ fromChainDbArgs ChainDbArgs{..} = (
, immAddHdrEnv = cdbAddHdrEnv
, immCacheConfig = cdbImmDbCacheConfig
, immRegistry = cdbRegistry
, immBlockchainTime = cdbBlockchainTime
}
, VolDB.VolDbArgs {
volHasFS = cdbHasFSVolDb
Expand Down
Expand Up @@ -78,6 +78,7 @@ import Ouroboros.Network.Block (pattern BlockPoint, ChainHash (..),
import Ouroboros.Network.Point (WithOrigin (..))

import Ouroboros.Consensus.Block (GetHeader (..), IsEBB (..))
import Ouroboros.Consensus.BlockchainTime (BlockchainTime)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
Expand Down Expand Up @@ -157,6 +158,7 @@ data ImmDbArgs m blk = forall h. ImmDbArgs {
, immTracer :: Tracer m (TraceEvent blk)
, immCacheConfig :: Index.CacheConfig
, immRegistry :: ResourceRegistry m
, immBlockchainTime :: BlockchainTime m
}

-- | Default arguments when using the 'IO' monad
Expand All @@ -175,6 +177,7 @@ data ImmDbArgs m blk = forall h. ImmDbArgs {
-- * 'immCheckIntegrity'
-- * 'immAddHdrEnv'
-- * 'immRegistry'
-- * 'immBlockchainTime'
defaultArgs :: FilePath -> ImmDbArgs IO blk
defaultArgs fp = ImmDbArgs{
immErr = EH.exceptions
Expand All @@ -194,6 +197,7 @@ defaultArgs fp = ImmDbArgs{
, immCheckIntegrity = error "no default for immCheckIntegrity"
, immAddHdrEnv = error "no default for immAddHdrEnv"
, immRegistry = error "no default for immRegistry"
, immBlockchainTime = error "no default for immBlockchainTime"
}
where
-- Cache 250 past epochs by default. This will take roughly 250 MB of RAM.
Expand Down Expand Up @@ -228,6 +232,7 @@ openDB ImmDbArgs{..} = do
parser
immTracer
immCacheConfig
immBlockchainTime
return ImmDB
{ immDB = immDB
, decHeader = immDecodeHeader
Expand Down
15 changes: 12 additions & 3 deletions ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/Impl.hs
Expand Up @@ -110,6 +110,8 @@ import Control.Monad.Class.MonadThrow (bracket, bracketOnError,
finally)

import Ouroboros.Consensus.Block (IsEBB (..))
import Ouroboros.Consensus.BlockchainTime (BlockchainTime,
getCurrentSlot)
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
Expand Down Expand Up @@ -175,14 +177,15 @@ withDB
-> EpochFileParser e m (Secondary.Entry hash) hash
-> Tracer m (TraceEvent e hash)
-> Index.CacheConfig
-> BlockchainTime m
-> (ImmutableDB hash m -> m a)
-> m a
withDB registry hasFS err epochInfo hashInfo valPol parser tracer cacheConfig =
withDB registry hasFS err epochInfo hashInfo valPol parser tracer cacheConfig btime =
bracket open closeDB
where
open = fst <$>
openDBInternal registry hasFS err epochInfo hashInfo valPol parser tracer
cacheConfig
cacheConfig btime

{------------------------------------------------------------------------------
Exposed internals and/or extra functionality for testing purposes
Expand Down Expand Up @@ -239,9 +242,11 @@ openDBInternal
-> EpochFileParser e m (Secondary.Entry hash) hash
-> Tracer m (TraceEvent e hash)
-> Index.CacheConfig
-> BlockchainTime m
-> m (ImmutableDB hash m, Internal hash m)
openDBInternal registry hasFS@HasFS{..} err epochInfo hashInfo valPol parser
tracer cacheConfig = do
tracer cacheConfig btime = do
currentSlot <- atomically $ getCurrentSlot btime
let validateEnv = ValidateEnv
{ hasFS
, err
Expand All @@ -251,6 +256,7 @@ openDBInternal registry hasFS@HasFS{..} err epochInfo hashInfo valPol parser
, tracer
, registry
, cacheConfig
, currentSlot
}
!ost <- validateAndReopen validateEnv valPol

Expand All @@ -266,6 +272,7 @@ openDBInternal registry hasFS@HasFS{..} err epochInfo hashInfo valPol parser
, _dbTracer = tracer
, _dbRegistry = registry
, _dbCacheConfig = cacheConfig
, _dbBlockchainTime = btime
}
db = mkDBRecord dbEnv
internal = Internal
Expand Down Expand Up @@ -312,6 +319,7 @@ reopenImpl ImmutableDBEnv {..} valPol = bracketOnError

-- Closed, so we can try to reopen
DbClosed -> do
currentSlot <- atomically $ getCurrentSlot _dbBlockchainTime
let validateEnv = ValidateEnv
{ hasFS = _dbHasFS
, err = _dbErr
Expand All @@ -321,6 +329,7 @@ reopenImpl ImmutableDBEnv {..} valPol = bracketOnError
, tracer = _dbTracer
, registry = _dbRegistry
, cacheConfig = _dbCacheConfig
, currentSlot = currentSlot
}
ost <- validateAndReopen validateEnv valPol
putMVar _dbInternalState (DbOpen ost)
Expand Down
Expand Up @@ -31,6 +31,7 @@ import Cardano.Prelude (NoUnexpectedThunks (..))

import Control.Monad.Class.MonadThrow hiding (onException)

import Ouroboros.Consensus.BlockchainTime (BlockchainTime)
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
Expand Down Expand Up @@ -67,6 +68,7 @@ data ImmutableDBEnv m hash = forall h e. ImmutableDBEnv
, _dbTracer :: !(Tracer m (TraceEvent e hash))
, _dbRegistry :: !(ResourceRegistry m)
, _dbCacheConfig :: !Index.CacheConfig
, _dbBlockchainTime :: !(BlockchainTime m)
}

data InternalState m hash h =
Expand Down
Expand Up @@ -64,6 +64,7 @@ data ValidateEnv m hash h e = ValidateEnv
, tracer :: !(Tracer m (TraceEvent e hash))
, registry :: !(ResourceRegistry m)
, cacheConfig :: !Index.CacheConfig
, currentSlot :: !SlotNo
}

-- | Perform validation as per the 'ValidationPolicy' using 'validate' and
Expand Down Expand Up @@ -296,7 +297,7 @@ validateEpoch ValidateEnv{..} shouldBeFinalised epoch mbPrevHash = do
-- expensive integrity check of a block.
let expectedChecksums = map Secondary.checksum entriesFromSecondaryIndex
(entriesWithPrevHashes, mbErr) <- lift $
runEpochFileParser parser epochFile expectedChecksums $ \stream ->
runEpochFileParser parser epochFile currentSlot expectedChecksums $ \stream ->
(\(es :> mbErr) -> (es, mbErr)) <$> S.toList stream

-- Check whether the first block of this epoch fits onto the last block of
Expand Down
155 changes: 111 additions & 44 deletions ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/Parser.hs
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
-- | The ImmutableDB doesn't care about the serialisation format, but in
-- practice we use CBOR. If we were to change the serialisation format, we
-- would have to write a new 'EpochFileParser' implementation, but the rest of
Expand All @@ -25,7 +26,7 @@ import qualified Streaming as S
import qualified Streaming.Prelude as S

import Ouroboros.Network.Block (ChainHash (..), HasHeader (..),
HeaderHash)
HeaderHash, SlotNo)
import Ouroboros.Network.Point (WithOrigin (..))

import qualified Ouroboros.Consensus.Util.CBOR as Util.CBOR
Expand Down Expand Up @@ -55,6 +56,12 @@ data EpochFileError hash =
-- 'BlockOrEBB' number returned 'False', indicating that the block got
-- corrupted.
| EpochErrCorrupt hash BlockOrEBB

-- | The block has a slot number greater than the current slot (wall
-- clock). This block is in the future, so we must truncate it.
| EpochErrFutureBlock
SlotNo -- ^ Current slot (wall clock)
SlotNo -- ^ Slot number of the block
deriving (Eq, Show)

epochFileParser'
Expand All @@ -75,16 +82,36 @@ epochFileParser'
hash
epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
getBinaryInfo isNotCorrupt =
EpochFileParser $ \fsPath expectedChecksums k ->
EpochFileParser $ \fsPath currentSlotNo expectedChecksums k ->
Util.CBOR.withStreamIncrementalOffsets hasFS decoder fsPath
(k . checkIfHashesLineUp . checkEntries expectedChecksums)
( k
. checkIfHashesLineUp
. checkEntries expectedChecksums
. checkFutureSlot currentSlotNo
. fmap (fmap (first EpochErrRead))
)
where
decoder :: forall s. Decoder s (BL.ByteString -> (blk, CRC))
decoder = decodeBlock <&> \mkBlk bs ->
let !blk = mkBlk bs
!checksum = computeCRC bs
in (blk, checksum)

-- | Stop when a block has slot number > the current slot, return
-- 'EpochErrFutureBlock'.
checkFutureSlot
:: SlotNo -- ^ Current slot (wall clock).
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
checkFutureSlot currentSlotNo = mapS $ \x@(offset, (_, (blk, _))) ->
if getSlotNo blk > currentSlotNo
then Left $ Just (EpochErrFutureBlock currentSlotNo (getSlotNo blk), offset)
else Right x

-- | Go over the expected checksums and blocks in parallel. Stop with an
-- error when a block is corrupt. Yield correct entries along the way.
--
Expand All @@ -100,33 +127,36 @@ epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
-- ^ Expected checksums
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (Util.CBOR.ReadIncrementalErr, Word64))
(Maybe (EpochFileError hash, Word64))
-- ^ Input stream of blocks (with additional info)
-> Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
checkEntries = go
checkEntries = \expected -> mapAccumS expected handle
where
go expected blkAndInfos = S.lift (S.next blkAndInfos) >>= \case
-- No more blocks, but maybe some expected entries. We ignore them.
Left mbErr -> return $ first EpochErrRead <$> mbErr
-- A block
Right (blkAndInfo@(offset, (_, (blk, checksum))), blkAndInfos') ->
case expected of
expectedChecksum:expected'
| expectedChecksum == checksum
-> S.yield entryAndPrevHash *> go expected' blkAndInfos'
-- No expected entry or a mismatch
_ | isNotCorrupt blk
-- The (expensive) integrity check passed, so continue
-> S.yield entryAndPrevHash *> go (drop 1 expected) blkAndInfos'
| otherwise
-- The block is corrupt, stop
-> return $ Just (EpochErrCorrupt headerHash blockOrEBB, offset)
where
entryAndPrevHash@(actualEntry, _) =
entryForBlockAndInfo blkAndInfo
Secondary.Entry { headerHash, blockOrEBB } = actualEntry
handle
:: [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either (Maybe (EpochFileError hash, Word64))
( (Secondary.Entry hash, WithOrigin hash)
, [CRC]
)
handle expected blkAndInfo@(offset, (_, (blk, checksum))) =
case expected of
expectedChecksum:expected'
| expectedChecksum == checksum
-> Right (entryAndPrevHash, expected')
-- No expected entry or a mismatch
_ | isNotCorrupt blk
-- The (expensive) integrity check passed, so continue
-> Right (entryAndPrevHash, drop 1 expected)
| otherwise
-- The block is corrupt, stop
-> Left $ Just (EpochErrCorrupt headerHash blockOrEBB, offset)
where
entryAndPrevHash@(actualEntry, _) =
entryForBlockAndInfo blkAndInfo
Secondary.Entry { headerHash, blockOrEBB } = actualEntry

entryForBlockAndInfo
:: (Word64, (Word64, (blk, CRC)))
Expand Down Expand Up @@ -154,26 +184,19 @@ epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
-> Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
checkIfHashesLineUp = \input -> S.lift (S.next input) >>= \case
Left mbErr ->
return mbErr
Right ((entry, prevHash), input') ->
S.yield (entry, prevHash) *>
go (At (Secondary.headerHash entry)) input'
checkIfHashesLineUp = mapAccumS0 checkFirst checkNext
where
-- Loop invariant: the @hashOfPrevBlock@ is the hash of the most
-- recently checked block.
go hashOfPrevBlock input = S.lift (S.next input) >>= \case
Left mbErr
-> return mbErr
Right ((entry, prevHash), input')
| prevHash == hashOfPrevBlock
-> S.yield (entry, prevHash) *>
go (At (Secondary.headerHash entry)) input'
| otherwise
-> let err = EpochErrHashMismatch hashOfPrevBlock prevHash
offset = Secondary.unBlockOffset $ Secondary.blockOffset entry
in return $ Just (err, offset)
-- We pass the hash of the previous block around as the state (@s@).
checkFirst x@(entry, _) = Right (x, Secondary.headerHash entry)

checkNext hashOfPrevBlock x@(entry, prevHash)
| prevHash == At hashOfPrevBlock
= Right (x, Secondary.headerHash entry)
| otherwise
= Left (Just (err, offset))
where
err = EpochErrHashMismatch (At hashOfPrevBlock) prevHash
offset = Secondary.unBlockOffset $ Secondary.blockOffset entry

-- | A version of 'epochFileParser'' for blocks that implement 'HasHeader'.
epochFileParser
Expand All @@ -195,3 +218,47 @@ epochFileParser =
convertPrevHash :: ChainHash blk -> WithOrigin (HeaderHash blk)
convertPrevHash GenesisHash = Origin
convertPrevHash (BlockHash h) = At h

{-------------------------------------------------------------------------------
Streaming utilities
-------------------------------------------------------------------------------}

-- | Thread some state through a 'Stream'. An early return is possible by
-- returning 'Left'.
mapAccumS
:: Monad m
=> s -- ^ Initial state
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS st0 handle = go st0
where
go st input = S.lift (S.next input) >>= \case
Left r -> return r
Right (a, input') -> case handle st a of
Left r -> return r
Right (b, st') -> S.yield b *> go st' input'

-- | Variant of 'mapAccumS' that calls the first function argument on the
-- first element in the stream to construct the initial state. For all
-- elements in the stream after the first one, the second function argument is
-- used.
mapAccumS0
:: forall m a b r s. Monad m
=> (a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 handleFirst handleNext = mapAccumS Nothing handle
where
handle :: Maybe s -> a -> Either r (b, Maybe s)
handle mbSt = fmap (fmap Just) . maybe handleFirst handleNext mbSt

-- | Map over elements of a stream, allowing an early return by returning
-- 'Left'.
mapS
:: Monad m
=> (a -> Either r b)
-> Stream (Of a) m r
-> Stream (Of b) m r
mapS handle = mapAccumS () (\() a -> (, ()) <$> handle a)