Skip to content

Commit

Permalink
ImmutableDB.Iterator: allocate handles in the ResourceRegistry
Browse files Browse the repository at this point in the history
Fixes #1543.

The ChainDB.Iterators and ChainDB.Readers use ImmutableDB.Iterators
internally. Both take a ResourceRegistry to allocate the ImmutableDB.Iterator
in so that the ImmutableDB.Iterator and the open handle it contains is closed
when an exception occurs.

If an exception occurs at the wrong time while opening an
ImmutableDB.Iterator, after opening the handle, we might leak the handle. Fix
this by directly allocating the handle in the ResourceRegistry instead of the
ImmutableDB.Iterator that refers to the handle.
  • Loading branch information
mrBliss committed Jan 31, 2020
1 parent ac8a58e commit 4919c4a
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 121 deletions.
45 changes: 14 additions & 31 deletions ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/ImmDB.hs
Expand Up @@ -80,8 +80,7 @@ import Ouroboros.Network.Point (WithOrigin (..))
import Ouroboros.Consensus.Block (GetHeader (..), IsEBB (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
allocateEither, unsafeRelease)
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)

import Ouroboros.Storage.ChainDB.API (ChainDB)
import Ouroboros.Storage.ChainDB.API hiding (ChainDB (..),
Expand Down Expand Up @@ -417,14 +416,10 @@ appendBlock db@ImmDB{..} b = withDB db $ \imm -> case isEBB (getHeader b) of
Streaming
-------------------------------------------------------------------------------}

-- | Wrapper around 'ImmDB.stream' that 'allocate's the iterator in the
-- 'ResourceRegistry' so that 'ImmDB.iteratorClose' is registered as the
-- clean-up action. Translates the requested 'BlockComponent' into the
-- 'ImmDB.BlockComponent' the ImmutableDB understands.
--
-- When the returned iterator is closed, it will be 'release'd from the
-- 'ResourceRegistry'.
registeredStream
-- | Wrapper around 'ImmDB.stream' that translates the requested
-- 'BlockComponent' into the 'ImmDB.BlockComponent' the ImmutableDB
-- understands.
openIterator
:: forall m blk b. (IOLike m, HasHeader blk)
=> ImmDB m blk
-> ResourceRegistry m
Expand All @@ -433,20 +428,8 @@ registeredStream
-> Maybe (SlotNo, HeaderHash blk)
-> m (Either (ImmDB.WrongBoundError (HeaderHash blk))
(ImmDB.Iterator (HeaderHash blk) m b))
registeredStream db registry blockComponent start end = do
errOrKeyAndIt <- allocateEither registry
(\_key -> withDB db $ \imm ->
ImmDB.stream imm blockComponent' start end)
(iteratorClose db)
return $ case errOrKeyAndIt of
Left e -> Left e
-- The iterator will be used by a thread that is unknown to the registry
-- (which, after all, is entirely internal to the chain DB). This means
-- that the registry cannot guarantee that the iterator will be live for
-- the duration of that thread, and indeed, it may not be: the chain DB
-- might be closed before that thread terminates. We will deal with this
-- in the chain DB itself (throw ClosedDBError exception).
Right (key, it) -> Right it { ImmDB.iteratorClose = unsafeRelease key }
openIterator db registry blockComponent start end =
withDB db $ \imm -> ImmDB.stream imm registry blockComponent' start end
where
blockComponent' = translateToRawDB (parse db) (addHdrEnv db) blockComponent

Expand Down Expand Up @@ -489,29 +472,29 @@ stream db registry blockComponent from to = runExceptT $ do
case from of
StreamFromExclusive pt@BlockPoint { atSlot = slot, withHash = hash } -> do
when (pointSlot pt > slotNoAtTip) $ throwError $ MissingBlock pt
it <- openRegisteredStream (Just (slot, hash)) end
it <- openStream (Just (slot, hash)) end
-- Skip the first block, as the bound is exclusive
void $ lift $ iteratorNext db it
return it
StreamFromExclusive GenesisPoint ->
openRegisteredStream Nothing end
openStream Nothing end
StreamFromInclusive pt@BlockPoint { atSlot = slot, withHash = hash } -> do
when (pointSlot pt > slotNoAtTip) $ throwError $ MissingBlock pt
openRegisteredStream (Just (slot, hash)) end
openStream (Just (slot, hash)) end
StreamFromInclusive GenesisPoint ->
throwM NoGenesisBlock
where
openRegisteredStream
openStream
:: Maybe (SlotNo, HeaderHash blk)
-> Maybe (SlotNo, HeaderHash blk)
-> ExceptT (UnknownRange blk)
m
(ImmDB.Iterator (HeaderHash blk) m b)
openRegisteredStream start end = ExceptT $
openStream start end = ExceptT $
bimap toUnknownRange (fmap snd . stopAt to) <$>
-- 'stopAt' needs to know the hash of each streamed block, so we \"Get\"
-- it in addition to @b@, but we drop it afterwards.
registeredStream db registry ((,) <$> GetHash <*> blockComponent) start end
openIterator db registry ((,) <$> GetHash <*> blockComponent) start end
where
toUnknownRange :: ImmDB.WrongBoundError (HeaderHash blk) -> UnknownRange blk
toUnknownRange e
Expand Down Expand Up @@ -570,7 +553,7 @@ streamAfter
(ImmDB.WrongBoundError (HeaderHash blk))
(ImmDB.Iterator (HeaderHash blk) m b))
streamAfter db registry blockComponent low =
registeredStream db registry blockComponent low' Nothing >>= \case
openIterator db registry blockComponent low' Nothing >>= \case
Left err -> return $ Left err
Right itr -> do
case low of
Expand Down
5 changes: 4 additions & 1 deletion ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/API.hs
Expand Up @@ -22,6 +22,8 @@ import Data.ByteString.Builder (Builder)
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)

import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)

import Ouroboros.Storage.Common
import Ouroboros.Storage.ImmutableDB.Types
import Ouroboros.Storage.Util.ErrorHandling (ErrorHandling)
Expand Down Expand Up @@ -197,7 +199,8 @@ data ImmutableDB hash m = ImmutableDB
-- prematurely closed with 'iteratorClose'.
, stream
:: forall b. HasCallStack
=> BlockComponent (ImmutableDB hash m) b
=> ResourceRegistry m
-> BlockComponent (ImmutableDB hash m) b
-> Maybe (SlotNo, hash)
-> Maybe (SlotNo, hash)
-> m (Either (WrongBoundError hash)
Expand Down

0 comments on commit 4919c4a

Please sign in to comment.