diff --git a/lsm-tree.cabal b/lsm-tree.cabal index 21fe77910..6601f96d1 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -848,16 +848,13 @@ library control Control.ActionRegistry Control.Concurrent.Class.MonadSTM.RWVar Control.RefCount - Control.TempRegistry build-depends: - , base >=4.14 && <4.22 - , containers ^>=0.6 || ^>=0.7 - , deepseq ^>=1.4 || ^>=1.5 - , io-classes ^>=1.6 || ^>=1.7 - , io-classes:strict-mvar + , base >=4.14 && <4.22 + , deepseq ^>=1.4 || ^>=1.5 + , io-classes ^>=1.6 || ^>=1.7 , io-classes:strict-stm - , primitive ^>=0.9 + , primitive ^>=0.9 test-suite control-test import: language, warnings diff --git a/src-control/Control/ActionRegistry.hs b/src-control/Control/ActionRegistry.hs index 7819c2400..2d7c92f51 100644 --- a/src-control/Control/ActionRegistry.hs +++ b/src-control/Control/ActionRegistry.hs @@ -34,8 +34,6 @@ import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NE import Data.Primitive.MutVar --- TODO: replace TempRegistry by ActionRegistry - -- TODO: add tests using fs-sim/io-sim to make sure exception safety is -- guaranteed. @@ -144,9 +142,12 @@ modifyWithActionRegistry_ getSt putSt action = can register delayed (commit) and rollback actions. The delayed actions are all executed at the end if the transaction scope is exited successfully, but if an exception is thrown (sync or async) then the rollback actions are - executed instead, and the exception is propagated. Delay or rollback actions - are executed in the reverse order in which they were registered, which is the - natural nesting order when considered as bracketing. + executed instead, and the exception is propagated. + + * Rollback actions are executed in the reverse order in which they were + registered, which is the natural nesting order when considered as bracketing. + + * Delayed actions are executed in the same order in which they are registered. -} -- | Registry of monadic actions supporting rollback actions and delayed actions @@ -249,8 +250,8 @@ unsafeFinaliseActionRegistry reg ec = case ec of unsafeCommitActionRegistry :: (PrimMonad m, MonadCatch m) => ActionRegistry m -> m () unsafeCommitActionRegistry reg = do as <- readMutVar (registryDelay reg) - -- Run actions in LIFO order - r <- runActions as + -- Run actions in FIFO order + r <- runActions (reverse as) case NE.nonEmpty r of Nothing -> pure () Just exceptions -> throwIO (CommitActionRegistryError exceptions) diff --git a/src-control/Control/TempRegistry.hs b/src-control/Control/TempRegistry.hs deleted file mode 100644 index a70012eb0..000000000 --- a/src-control/Control/TempRegistry.hs +++ /dev/null @@ -1,226 +0,0 @@ --- TODO: we are starting to use the TempRegistry for more than just resource --- allocation/release, we are more generally using it for /actions that can be --- rolled back/ and /actions that are delayed/. Maybe we should reframe the use --- cases for the 'TempRegistry', and do some renaming: --- * Rename @'allocateTemp'*@ to @'withRollback'*@ --- * Rename @'freeTemp'@ to @'delayUntilEnd'@ -module Control.TempRegistry ( - TempRegistry - , withTempRegistry - , allocateTemp - , allocateMaybeTemp - , allocateEitherTemp - , freeTemp - , modifyWithTempRegistry - , modifyWithTempRegistry_ - ) where - -import Control.Concurrent.Class.MonadMVar.Strict -import Control.Monad -import Control.Monad.Class.MonadThrow -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import Data.Void - --- | A temporary registry for resources that are bound to end up in some final --- state, after which they /should/ be guaranteed to be released correctly. --- --- It is the responsibility of the user to guarantee that this final state is --- released correctly in the presence of async exceptions. --- --- NOTE: this is based on [the @ResourceRegistry@ module from @ouroboros-consensus@](https://github.com/IntersectMBO/ouroboros-consensus/blob/main/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/ResourceRegistry.hs). --- --- There are some differences between @WithTempRegistry@ from --- @ouroboros-consensus@ and our 'TempRegistry'. For one, 'TempRegistry' allows --- for the temporary /freeing/ of resources, which @WithTempRegistry@ does not. --- However, @WithTempRegistry@ can check whether newly allocated resources --- actually end up in the final state. --- --- TODO: make 'TempRegistry' more sophisticated. Ideas: --- --- * Use a similar approach (like in 'WithTempRegistry@) for checking that --- temporarily allocated resources end up in the final state, and that --- temporarily freed resources are removed from the final state. --- --- * Statically disallow using a resource after @freeTemp@, for example through --- data abstraction. --- --- TODO: could https://hackage.haskell.org/package/resourcet be a suitable --- abstraction instead of 'TempRegistry'? -newtype TempRegistry m = TempRegistry { - tempRegistryState :: StrictMVar m (TempRegistryState m) - } - -data TempRegistryState m = TempRegistryState { - tempAllocated :: !(Map ResourceId (Resource m)) - , tempFreed :: !(Map ResourceId (Resource m)) - , nextId :: !ResourceId - } - -newtype ResourceId = ResourceId Int - deriving stock (Eq, Ord) - deriving newtype (Num) - -newtype Resource m = Resource { - resourceRelease :: (m ()) - } - -{-# SPECIALISE withTempRegistry :: (TempRegistry IO -> IO a) -> IO a #-} -withTempRegistry :: - (MonadMVar m, MonadCatch m) - => (TempRegistry m -> m a) - -> m a -withTempRegistry k = fst <$> generalBracket acquire release k - where - acquire = unsafeNewTempRegistry - release reg ec = unsafeReleaseTempRegistry reg ec - -{-# SPECIALISE unsafeNewTempRegistry :: IO (TempRegistry IO) #-} --- | This is considered unsafe, because one should properly 'bracket' this --- function. Example: --- --- @ --- generalBracket unsafeNewTempRegistry unsafeReleaseTempRegistry --- @ -unsafeNewTempRegistry :: MonadMVar m => m (TempRegistry m) -unsafeNewTempRegistry = TempRegistry <$> newMVar (TempRegistryState Map.empty Map.empty (ResourceId 0)) - -{-# SPECIALISE unsafeReleaseTempRegistry :: TempRegistry IO -> ExitCase a -> IO () #-} --- | See 'unsafeNewTempRegistry'. -unsafeReleaseTempRegistry :: MonadMVar m => TempRegistry m -> ExitCase a -> m () -unsafeReleaseTempRegistry reg ec = case ec of - ExitCaseSuccess{} -> mapM_ resourceRelease . tempFreed =<< takeMVar (tempRegistryState reg) - _ -> mapM_ resourceRelease . tempAllocated =<< takeMVar (tempRegistryState reg) - - -{-# SPECIALISE allocateTemp :: TempRegistry IO -> IO a -> (a -> IO ()) -> IO a #-} --- | Temporarily allocate a resource. --- --- This runs the @acquire@ function with async exceptions masked to ensure that --- acquired resources are always put into the registry. However, note that in --- general the following two expressions are not equivalent: --- --- @ --- allocateTemp reg acquire free --- acquire >>= \x -> allocateTemp reg free (pure x) --- @ --- --- Assuming that @acquire@ is not already exception safe, it is /not/ --- exception-safe to pass the result of @acquire@ to @allocateTemp@: an async --- exception could be thrown in between @acquire@ and @allocateTemp@, which --- leaks resources. -allocateTemp :: (MonadMask m, MonadMVar m) => - TempRegistry m - -> m a - -> (a -> m ()) - -> m a -allocateTemp reg acquire free = - mustBeRight <$!> allocateEitherTemp reg (fmap Right acquire) free - where - mustBeRight :: Either Void a -> a - mustBeRight (Left v) = absurd v - mustBeRight (Right a) = a - -{-# SPECIALISE allocateMaybeTemp :: TempRegistry IO -> IO (Maybe a) -> (a -> IO ()) -> IO (Maybe a) #-} --- | Like 'allocateTemp', but for resources that might fail to be acquired. -allocateMaybeTemp :: - (MonadMask m, MonadMVar m) - => TempRegistry m - -> m (Maybe a) - -> (a -> m ()) - -> m (Maybe a) -allocateMaybeTemp reg acquire free = - fromEither <$!> allocateEitherTemp reg (toEither <$> acquire) free - where - toEither :: Maybe a -> Either () a - toEither Nothing = Left () - toEither (Just x) = Right x - - fromEither :: Either () a -> Maybe a - fromEither (Left ()) = Nothing - fromEither (Right x) = Just $! x - -{-# SPECIALISE allocateEitherTemp :: TempRegistry IO -> IO (Either e a) -> (a -> IO ()) -> IO (Either e a) #-} --- | Like 'allocateTemp', but for resources that might fail to be acquired. -allocateEitherTemp :: - (MonadMask m, MonadMVar m) - => TempRegistry m - -> m (Either e a) - -> (a -> m ()) - -> m (Either e a) -allocateEitherTemp reg acquire free = - mask_ $ do - eith <- acquire - case eith of - Left e -> pure $ Left e - Right x -> do - modifyMVar_ (tempRegistryState reg) $ \st -> do - let rid = nextId st - rid' = rid + 1 - pure TempRegistryState { - tempAllocated = Map.insert rid (Resource (free x)) (tempAllocated st) - , tempFreed = tempFreed st - , nextId = rid' - } - pure $ Right x - -{-# SPECIALISE freeTemp :: TempRegistry IO -> IO () -> IO () #-} --- | Temporarily free a resource. --- --- NOTE: the resource is not actually released until the 'TempRegistry' is --- released. This makes rolling back simple, but it means that /use after free/ --- within the scope of a 'TempRegistry' will work just as if there had been no --- free at all. As such, though it is not recommended to rely on this --- peculiarity, the following is safe: --- --- @ --- allocateTemp reg free acquire >>= \x -> --- freeTemp reg (free x) >>= \_ -> {- do something with x -} --- @ -freeTemp :: MonadMVar m => TempRegistry m -> m () -> m () -freeTemp reg free = modifyMVarMasked_ (tempRegistryState reg) $ \st -> do - let rid = nextId st - rid' = rid + 1 - pure TempRegistryState { - tempAllocated = tempAllocated st - , tempFreed = Map.insert rid (Resource free) (tempFreed st) - , nextId = rid' - } - -{-# SPECIALISE modifyWithTempRegistry :: IO st -> (st -> IO ()) -> (TempRegistry IO -> st -> IO (st, a)) -> IO a #-} --- | Exception-safe modification of state with a temporary registry. --- --- [Example:] When we modify a table's content (stored in a mutable variable), --- we might add new runs to the levels, or remove old runs from the level. If an --- exception is thrown before putting the updated table contents into the --- variable, then any resources that were acquired or released in the meantime --- should be rolled back. The 'TempRegistry' can be used to "temporarily" --- allocate or free resources, the effects of which are rolled back in case of --- an exception, or put into the final state when no exceptions were raised. -modifyWithTempRegistry :: - (MonadMVar m, MonadCatch m) - => m st -- ^ Get the state - -> (st -> m ()) -- ^ Store a state - -> (TempRegistry m -> st -> m (st, a)) -- ^ Modify the state - -> m a -modifyWithTempRegistry getSt putSt action = - snd . fst <$> generalBracket acquire release (uncurry action) - where - acquire = (,) <$> unsafeNewTempRegistry <*> getSt - release (reg, oldSt) ec = do - case ec of - ExitCaseSuccess (newSt, _) -> putSt newSt - ExitCaseException _ -> putSt oldSt - ExitCaseAbort -> putSt oldSt - unsafeReleaseTempRegistry reg ec - -{-# SPECIALISE modifyWithTempRegistry_ :: IO st -> (st -> IO ()) -> (TempRegistry IO -> st -> IO st) -> IO () #-} --- | Like 'modifyWithTempRegistry', but without a return value. -modifyWithTempRegistry_ :: - (MonadMVar m, MonadCatch m) - => m st -- ^ Get the state - -> (st -> m ()) -- ^ Store a state - -> (TempRegistry m -> st -> m st) - -> m () -modifyWithTempRegistry_ getSt putSt action = - modifyWithTempRegistry getSt putSt (\reg content -> (,()) <$> action reg content) diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 19ca2f344..1600af2ef 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -83,7 +83,6 @@ import Control.Monad.Class.MonadST (MonadST (..)) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Control.Tracer import Data.Arena (ArenaManager, newArenaManager) import Data.Either (fromRight) @@ -702,12 +701,12 @@ new :: new sesh conf = do traceWith (sessionTracer sesh) TraceNewTable withOpenSession sesh $ \seshEnv -> - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do am <- newArenaManager blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> incrUniqCounter (sessionUniqCounter seshEnv) tableWriteBufferBlobs - <- allocateTemp reg + <- withRollback reg (WBB.new (sessionHasFS seshEnv) blobpath) releaseRef let tableWriteBuffer = WB.empty @@ -722,7 +721,7 @@ new sesh conf = do newWith reg sesh seshEnv conf am tc {-# SPECIALISE newWith :: - TempRegistry IO + ActionRegistry IO -> Session IO h -> SessionEnv IO h -> TableConfig @@ -730,8 +729,8 @@ new sesh conf = do -> TableContent IO h -> IO (Table IO h) #-} newWith :: - (MonadSTM m, MonadMVar m) - => TempRegistry m + (MonadSTM m, MonadMVar m, PrimMonad m) + => ActionRegistry m -> Session m h -> SessionEnv m h -> TableConfig @@ -754,8 +753,9 @@ newWith reg sesh seshEnv conf !am !tc = do let !tid = uniqueToWord64 tableId !t = Table conf tableVar am tr tid sesh -- Track the current table - freeTemp reg $ modifyMVar_ (sessionOpenTables seshEnv) - $ pure . Map.insert (uniqueToWord64 tableId) t + delayedCommit reg $ + modifyMVar_ (sessionOpenTables seshEnv) $ + pure . Map.insert (uniqueToWord64 tableId) t pure $! t {-# SPECIALISE close :: Table IO h -> IO () #-} @@ -766,7 +766,7 @@ close :: -> m () close t = do traceWith (tableTracer t) TraceCloseTable - modifyWithTempRegistry_ + modifyWithActionRegistry_ (RW.unsafeAcquireWriteAccess (tableState t)) (atomically . RW.unsafeReleaseWriteAccess (tableState t)) $ \reg -> \case TableClosed -> pure TableClosed @@ -774,7 +774,7 @@ close t = do -- Since we have a write lock on the table state, we know that we are the -- only thread currently closing the table. We can safely make the session -- forget about this table. - freeTemp reg (tableSessionUntrackTable (tableId t) thEnv) + delayedCommit reg (tableSessionUntrackTable (tableId t) thEnv) RW.withWriteAccess_ (tableContent thEnv) $ \tc -> do releaseTableContent reg tc pure tc @@ -868,7 +868,7 @@ updates resolve es t = do let conf = tableConfig t withOpenTable t $ \thEnv -> do let hfs = tableHasFS thEnv - modifyWithTempRegistry_ + modifyWithActionRegistry_ (RW.unsafeAcquireWriteAccess (tableContent thEnv)) (atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv)) $ \reg -> do updatesWithInterleavedFlushes @@ -1005,10 +1005,10 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. withOpenSession cursorSession $ \_ -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do (wb, wbblobs, cursorRuns) <- dupTableContent reg (tableContent thEnv) cursorReaders <- - allocateMaybeTemp reg + withRollbackMaybe reg (Readers.new offsetKey (Just (wb, wbblobs)) cursorRuns) Readers.close let cursorWBB = wbblobs @@ -1017,9 +1017,9 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- Track cursor, but careful: If now an exception is raised, all -- resources get freed by the registry, so if the session still -- tracks 'cursor' (which is 'CursorOpen'), it later double frees. - -- Therefore, we only track the cursor if 'withTempRegistry' exits - -- successfully, i.e. using 'freeTemp'. - freeTemp reg $ + -- Therefore, we only track the cursor if 'withActionRegistry' exits + -- successfully, i.e. using 'delayedCommit'. + delayedCommit reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ pure . Map.insert cursorId cursor pure $! cursor @@ -1030,10 +1030,10 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do RW.withReadAccess contentVar $ \content -> do let !wb = tableWriteBuffer content !wbblobs = tableWriteBufferBlobs content - wbblobs' <- allocateTemp reg (dupRef wbblobs) releaseRef + wbblobs' <- withRollback reg (dupRef wbblobs) releaseRef let runs = cachedRuns (tableCache content) runs' <- V.forM runs $ \r -> - allocateTemp reg (dupRef r) releaseRef + withRollback reg (dupRef r) releaseRef pure (wb, wbblobs', runs') {-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-} @@ -1044,20 +1044,20 @@ closeCursor :: -> m () closeCursor Cursor {..} = do traceWith cursorTracer $ TraceCloseCursor - modifyWithTempRegistry_ (takeMVar cursorState) (putMVar cursorState) $ \reg -> \case + modifyWithActionRegistry_ (takeMVar cursorState) (putMVar cursorState) $ \reg -> \case CursorClosed -> return CursorClosed CursorOpen CursorEnv {..} -> do -- This should be safe-ish, but it's still not ideal, because it doesn't -- rule out sync exceptions in the cleanup operations. -- In that case, the cursor ends up closed, but resources might not have -- been freed. Probably better than the other way around, though. - freeTemp reg $ + delayedCommit reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ pure . Map.delete cursorId - forM_ cursorReaders $ freeTemp reg . Readers.close - V.forM_ cursorRuns $ freeTemp reg . releaseRef - freeTemp reg (releaseRef cursorWBB) + forM_ cursorReaders $ delayedCommit reg . Readers.close + V.forM_ cursorRuns $ delayedCommit reg . releaseRef + delayedCommit reg (releaseRef cursorWBB) return CursorClosed {-# SPECIALISE readCursor :: @@ -1142,7 +1142,7 @@ createSnapshot :: createSnapshot snap label tableType t = do traceWith (tableTracer t) $ TraceSnapshot snap withOpenTable t $ \thEnv -> - withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects + withActionRegistry $ \reg -> do -- TODO: use the action registry for all side effects let hfs = tableHasFS thEnv hbio = tableHasBlockIO thEnv uc = tableSessionUniqCounter thEnv @@ -1156,9 +1156,9 @@ createSnapshot snap label tableType t = do else -- we assume the snapshots directory already exists, so we just have -- to create the directory for this specific snapshot. - allocateTemp reg + withRollback_ reg (FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)) - (\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) + (FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) -- Duplicate references to the table content, so that resources do not disappear -- from under our feet while taking a snapshot. These references are released @@ -1206,7 +1206,7 @@ openSnapshot :: openSnapshot sesh label tableType override snap resolve = do traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override withOpenSession sesh $ \seshEnv -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do let hfs = sessionHasFS seshEnv hbio = sessionHasBlockIO seshEnv uc = sessionUniqCounter seshEnv @@ -1316,7 +1316,7 @@ duplicate t@Table{..} = do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. withOpenSession tableSession $ \_ -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do -- The table contents escape the read access, but we just added references -- to each run so it is safe. content <- RW.withReadAccess tableContent (duplicateTableContent reg) @@ -1369,7 +1369,7 @@ unions ts = do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. - modifyWithTempRegistry + modifyWithActionRegistry (atomically $ RW.unsafeAcquireReadAccess (sessionState sesh)) (\_ -> atomically $ RW.unsafeReleaseReadAccess (sessionState sesh)) $ \reg -> \case SessionClosed -> throwIO ErrSessionClosed diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 2718a13e0..f20d23dd2 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -29,13 +29,13 @@ module Database.LSMTree.Internal.MergeSchedule ( , addWriteBufferEntries ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..)) import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Control.Tracer import Data.BloomFilter (Bloom) import Data.Foldable (fold) @@ -60,7 +60,7 @@ import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.Serialise (SerialisedBlob, SerialisedKey, SerialisedValue) import Database.LSMTree.Internal.UniqCounter -import Database.LSMTree.Internal.Vector (mapStrict) +import Database.LSMTree.Internal.Vector (forMStrict, mapStrict) import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) import qualified Database.LSMTree.Internal.WriteBuffer as WB import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) @@ -119,26 +119,26 @@ data TableContent m h = TableContent { , tableCache :: !(LevelsCache m h) } -{-# SPECIALISE duplicateTableContent :: TempRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} +{-# SPECIALISE duplicateTableContent :: ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} duplicateTableContent :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> TableContent m h -> m (TableContent m h) duplicateTableContent reg (TableContent wb wbb levels cache) = do - wbb' <- allocateTemp reg (dupRef wbb) releaseRef + wbb' <- withRollback reg (dupRef wbb) releaseRef levels' <- duplicateLevels reg levels cache' <- duplicateLevelsCache reg cache return $! TableContent wb wbb' levels' cache' -{-# SPECIALISE releaseTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-} +{-# SPECIALISE releaseTableContent :: ActionRegistry IO -> TableContent IO h -> IO () #-} releaseTableContent :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> TableContent m h -> m () releaseTableContent reg (TableContent _wb wbb levels cache) = do - freeTemp reg (releaseRef wbb) + delayedCommit reg (releaseRef wbb) releaseLevels reg levels releaseLevelsCache reg cache @@ -168,7 +168,7 @@ data LevelsCache m h = LevelsCache_ { } {-# SPECIALISE mkLevelsCache :: - TempRegistry IO + ActionRegistry IO -> Levels IO h -> IO (LevelsCache IO h) #-} -- | Flatten the argument 'Level's into a single vector of runs, including all @@ -176,13 +176,13 @@ data LevelsCache m h = LevelsCache_ { -- 'LevelsCache'. The cache will take a reference for each of its runs. mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> Levels m h -> m (LevelsCache m h) mkLevelsCache reg lvls = do rs <- foldRunAndMergeM (fmap V.singleton . dupRun) - (\mr -> allocateTemp reg (MR.duplicateRuns mr) (V.mapM_ releaseRef)) + (\mr -> withRollback reg (MR.duplicateRuns mr) (V.mapM_ releaseRef)) lvls pure $! LevelsCache_ { cachedRuns = rs @@ -191,7 +191,7 @@ mkLevelsCache reg lvls = do , cachedKOpsFiles = mapStrict (\(DeRef r) -> Run.runKOpsFile r) rs } where - dupRun r = allocateTemp reg (dupRef r) releaseRef + dupRun r = withRollback reg (dupRef r) releaseRef -- TODO: this is not terribly performant, but it is also not sure if we are -- going to need this in the end. We might get rid of the LevelsCache. @@ -202,14 +202,14 @@ mkLevelsCache reg lvls = do -> Levels m h -> m a foldRunAndMergeM k1 k2 ls = - fmap fold $ V.forM ls $ \(Level ir rs) -> do + fmap fold $ forMStrict ls $ \(Level ir rs) -> do incoming <- case ir of Single r -> k1 r Merging _ mr -> k2 mr (incoming <>) . fold <$> V.forM rs k1 {-# SPECIALISE rebuildCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> Levels IO h -> IO (LevelsCache IO h) #-} @@ -235,7 +235,7 @@ mkLevelsCache reg lvls = do -- Lookups should no invalidate blob erferences. rebuildCache :: (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> LevelsCache m h -- ^ old cache -> Levels m h -- ^ new levels -> m (LevelsCache m h) -- ^ new cache @@ -244,31 +244,31 @@ rebuildCache reg oldCache newLevels = do mkLevelsCache reg newLevels {-# SPECIALISE duplicateLevelsCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> IO (LevelsCache IO h) #-} duplicateLevelsCache :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h) duplicateLevelsCache reg cache = do - rs' <- V.forM (cachedRuns cache) $ \r -> - allocateTemp reg (dupRef r) releaseRef + rs' <- forMStrict (cachedRuns cache) $ \r -> + withRollback reg (dupRef r) releaseRef return cache { cachedRuns = rs' } {-# SPECIALISE releaseLevelsCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> IO () #-} releaseLevelsCache :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> LevelsCache m h -> m () releaseLevelsCache reg cache = V.forM_ (cachedRuns cache) $ \r -> - freeTemp reg (releaseRef r) + delayedCommit reg (releaseRef r) {------------------------------------------------------------------------------- Levels, runs and ongoing merges @@ -295,52 +295,52 @@ mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels | V.null nextLevels = LevelLevelling -- levelling on last level | otherwise = LevelTiering -{-# SPECIALISE duplicateLevels :: TempRegistry IO -> Levels IO h -> IO (Levels IO h) #-} +{-# SPECIALISE duplicateLevels :: ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-} duplicateLevels :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> Levels m h -> m (Levels m h) duplicateLevels reg levels = - V.forM levels $ \Level {incomingRun, residentRuns} -> do + forMStrict levels $ \Level {incomingRun, residentRuns} -> do incomingRun' <- duplicateIncomingRun reg incomingRun - residentRuns' <- V.forM residentRuns $ \r -> - allocateTemp reg (dupRef r) releaseRef + residentRuns' <- forMStrict residentRuns $ \r -> + withRollback reg (dupRef r) releaseRef return $! Level { incomingRun = incomingRun', residentRuns = residentRuns' } -{-# SPECIALISE releaseLevels :: TempRegistry IO -> Levels IO h -> IO () #-} +{-# SPECIALISE releaseLevels :: ActionRegistry IO -> Levels IO h -> IO () #-} releaseLevels :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> Levels m h -> m () releaseLevels reg levels = V.forM_ levels $ \Level {incomingRun, residentRuns} -> do releaseIncomingRun reg incomingRun - V.mapM_ (freeTemp reg . releaseRef) residentRuns + V.mapM_ (delayedCommit reg . releaseRef) residentRuns -{-# SPECIALISE duplicateIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} +{-# SPECIALISE duplicateIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} duplicateIncomingRun :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> IncomingRun m h -> m (IncomingRun m h) duplicateIncomingRun reg (Single r) = - Single <$> allocateTemp reg (dupRef r) releaseRef + Single <$> withRollback reg (dupRef r) releaseRef duplicateIncomingRun reg (Merging mp mr) = - Merging mp <$> allocateTemp reg (dupRef mr) releaseRef + Merging mp <$> withRollback reg (dupRef mr) releaseRef -{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-} +{-# SPECIALISE releaseIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO () #-} releaseIncomingRun :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> IncomingRun m h -> m () -releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r) -releaseIncomingRun reg (Merging _ mr) = freeTemp reg (releaseRef mr) +releaseIncomingRun reg (Single r) = delayedCommit reg (releaseRef r) +releaseIncomingRun reg (Merging _ mr) = delayedCommit reg (releaseRef mr) {-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-} iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m () @@ -359,7 +359,7 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl -> SessionRoot -> UniqCounter IO -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> TempRegistry IO + -> ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} -- | A single batch of updates can fill up the write buffer multiple times. We @@ -397,7 +397,7 @@ updatesWithInterleavedFlushes :: -> SessionRoot -> UniqCounter m -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> TempRegistry m + -> ActionRegistry m -> TableContent m h -> m (TableContent m h) updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do @@ -476,7 +476,7 @@ addWriteBufferEntries hfs f wbblobs maxn = -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO - -> TempRegistry IO + -> ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} -- | Flush the write buffer to disk, regardless of whether it is full or not. @@ -492,7 +492,7 @@ flushWriteBuffer :: -> HasBlockIO m h -> SessionRoot -> UniqCounter m - -> TempRegistry m + -> ActionRegistry m -> TableContent m h -> m (TableContent m h) flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} @@ -506,7 +506,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} !alloc = bloomFilterAllocForLevel conf l !path = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc - r <- allocateTemp reg + r <- withRollback reg (Run.fromWriteBuffer hfs hbio cache alloc @@ -514,8 +514,8 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} (tableWriteBuffer tc) (tableWriteBufferBlobs tc)) releaseRef - freeTemp reg (releaseRef (tableWriteBufferBlobs tc)) - wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) + delayedCommit reg (releaseRef (tableWriteBufferBlobs tc)) + wbblobs' <- withRollback reg (WBB.new hfs (Paths.tableBlobPath root n)) releaseRef levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc) tableCache' <- rebuildCache reg (tableCache tc) levels' @@ -535,7 +535,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} -> SessionRoot -> UniqCounter IO -> Ref (Run IO h) - -> TempRegistry IO + -> ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-} -- | Add a run to the levels, and propagate merges. @@ -553,7 +553,7 @@ addRunToLevels :: -> SessionRoot -> UniqCounter m -> Ref (Run m h) - -> TempRegistry m + -> ActionRegistry m -> Levels m h -> m (Levels m h) addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = do @@ -622,8 +622,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = r <- case ir of Single r -> pure r Merging _ mr -> do - r <- allocateTemp reg (MR.expectCompleted mr) releaseRef - freeTemp reg (releaseRef mr) + r <- withRollback reg (MR.expectCompleted mr) releaseRef + delayedCommit reg (releaseRef mr) pure r traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (Run.runFsPathsNumber r) @@ -643,8 +643,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = (Run.runFsPathsNumber r) -- We create a fresh reference and release the original one. -- This will also make it easier to trace back where it was allocated. - ir <- Single <$> allocateTemp reg (dupRef r) releaseRef - freeTemp reg (releaseRef r) + ir <- Single <$> withRollback reg (dupRef r) releaseRef + delayedCommit reg (releaseRef r) pure ir | otherwise = do @@ -657,10 +657,10 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel -- The runs will end up inside the merging run, with fresh references. -- The original references can be released (but only on the happy path). - mr <- allocateTemp reg + mr <- withRollback reg (MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs) releaseRef - V.forM_ rs $ \r -> freeTemp reg (releaseRef r) + V.forM_ rs $ \r -> delayedCommit reg (releaseRef r) case confMergeSchedule of Incremental -> pure () OneShot -> do diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index cf95aff51..6e4dfac45 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -23,6 +23,7 @@ module Database.LSMTree.Internal.MergingRun ( , MergeKnownCompleted (..) ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.DeepSeq (NFData (..)) import Control.Monad (void, when) @@ -32,7 +33,6 @@ import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError), MonadMask) import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Data.Maybe (fromMaybe) import Data.Primitive.MutVar import Data.Primitive.PrimVar @@ -140,8 +140,8 @@ new :: -> m (Ref (MergingRun m h)) new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns = -- If creating the Merge fails, we must release the references again. - withTempRegistry $ \reg -> do - runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns + withActionRegistry $ \reg -> do + runs <- V.mapM (\r -> withRollback reg (dupRef r) releaseRef) inputRuns merge <- fromMaybe (error "newMerge: merges can not be empty") <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs let numInputRuns = NumRuns $ V.length runs @@ -219,8 +219,8 @@ duplicateRuns (DeRef mr) = -- does not get completed concurrently before we are done. withMVar (mergeState mr) $ \case CompletedMerge r -> V.singleton <$> dupRef r - OngoingMerge rs _ _ -> withTempRegistry $ \reg -> - V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) rs + OngoingMerge rs _ _ -> withActionRegistry $ \reg -> + V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs {------------------------------------------------------------------------------- Credits diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index 8e10b5967..1f68221db 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -26,6 +26,7 @@ module Database.LSMTree.Internal.Snapshot ( , hardLinkRunFiles ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.Concurrent.Class.MonadSTM (MonadSTM) import Control.DeepSeq (NFData (..)) @@ -34,7 +35,6 @@ import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadThrow (MonadMask) import Control.Monad.Primitive (PrimMonad) import Control.RefCount -import Control.TempRegistry import Data.Foldable (sequenceA_, traverse_) import Data.Primitive.PrimVar import Data.Text (Text) @@ -229,7 +229,7 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m {-# SPECIALISE snapshotWriteBuffer :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> UniqCounter IO @@ -241,7 +241,7 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m #-} snapshotWriteBuffer :: (MonadMVar m, MonadSTM m, MonadST m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> UniqCounter m @@ -254,11 +254,11 @@ snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb = do -- Write the write buffer and write buffer blobs to the active directory. activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc let activeWriteBufferPaths = WriteBufferFsPaths (getActiveDir activeDir) activeWriteBufferNumber - allocateTemp reg + withRollback_ reg (WBW.writeWriteBuffer hfs hbio activeWriteBufferPaths wb wbb) -- TODO: it should probably be the responsibility of writeWriteBuffer to do -- cleanup - $ \() -> do + $ do -- TODO: check files exist before removing them FS.removeFile hfs (writeBufferKOpsPath activeWriteBufferPaths) FS.removeFile hfs (writeBufferBlobPath activeWriteBufferPaths) @@ -277,18 +277,18 @@ snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb = do {-# SPECIALISE openWriteBuffer :: - TempRegistry IO - -> ResolveSerialisedValue - -> HasFS IO h - -> HasBlockIO IO h - -> UniqCounter IO - -> ActiveDir - -> WriteBufferFsPaths - -> IO (WriteBuffer, Ref (WriteBufferBlobs IO h)) + ActionRegistry IO + -> ResolveSerialisedValue + -> HasFS IO h + -> HasBlockIO IO h + -> UniqCounter IO + -> ActiveDir + -> WriteBufferFsPaths + -> IO (WriteBuffer, Ref (WriteBufferBlobs IO h)) #-} openWriteBuffer :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => TempRegistry m + => ActionRegistry m -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h @@ -303,7 +303,7 @@ openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do getActiveDir activeDir FS.mkFsPath [show activeWriteBufferNumber] <.> "wbblobs" copyFile reg hfs hbio (writeBufferBlobPath snapWriteBufferPaths) activeWriteBufferBlobPath writeBufferBlobs <- - allocateTemp reg + withRollback reg (WBB.open hfs activeWriteBufferBlobPath FS.AllowExisting) releaseRef -- Read write buffer key/ops @@ -318,7 +318,7 @@ openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do -------------------------------------------------------------------------------} {-# SPECIALISE snapshotRuns :: - TempRegistry IO + ActionRegistry IO -> HasBlockIO IO h -> NamedSnapshotDir -> SnapLevels (Ref (Run IO h)) @@ -328,8 +328,8 @@ openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do -- the @targetDir@ directory. The hard links and the @targetDir@ are made -- durable on disk. snapshotRuns :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasBlockIO m h -> NamedSnapshotDir -> SnapLevels (Ref (Run m h)) @@ -348,7 +348,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do pure levels' {-# SPECIALISE openRuns :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -366,7 +366,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do -- The result must ultimately be released using 'releaseRuns'. openRuns :: (MonadMask m, MonadSTM m, MonadST m, MonadMVar m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> TableConfig @@ -388,25 +388,25 @@ openRuns let targetPaths = RunFsPaths targetDir runNum' hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths - allocateTemp reg + withRollback reg (Run.openFromDisk hfs hbio caching targetPaths) releaseRef pure (SnapLevels levels') {-# SPECIALISE releaseRuns :: - TempRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () + ActionRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () #-} releaseRuns :: - (MonadMask m, MonadST m, MonadMVar m) - => TempRegistry m -> SnapLevels (Ref (Run m h)) -> m () -releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) + (MonadMask m, MonadST m) + => ActionRegistry m -> SnapLevels (Ref (Run m h)) -> m () +releaseRuns reg = traverse_ $ \r -> delayedCommit reg (releaseRef r) {------------------------------------------------------------------------------- Opening from levels snapshot format -------------------------------------------------------------------------------} {-# SPECIALISE fromSnapLevels :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -419,7 +419,7 @@ releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) -- | Duplicates runs and re-creates merging runs. fromSnapLevels :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> TableConfig @@ -450,11 +450,11 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do Merging mpfl <$> case smrs of SnapCompletedMerge run -> - allocateTemp reg (MR.newCompleted nr ne run) releaseRef + withRollback reg (MR.newCompleted nr ne run) releaseRef SnapOngoingMerge runs spentCredits lvl -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc - mr <- allocateTemp reg + mr <- withRollback reg (MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs) releaseRef -- When a snapshot is created, merge progress is lost, so we @@ -466,7 +466,7 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr return mr - dupRun r = allocateTemp reg (dupRef r) releaseRef + dupRun r = withRollback reg (dupRef r) releaseRef {------------------------------------------------------------------------------- Hard links @@ -476,7 +476,7 @@ data HardLinkDurable = HardLinkDurable | NoHardLinkDurable deriving stock Eq {-# SPECIALISE hardLinkRunFiles :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> HardLinkDurable @@ -488,8 +488,8 @@ data HardLinkDurable = HardLinkDurable | NoHardLinkDurable -- name for the new directory entry. If @dur == HardLinkDurabl@, the links will -- also be made durable on disk. hardLinkRunFiles :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> HardLinkDurable @@ -504,7 +504,7 @@ hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do {-# SPECIALISE hardLink :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> HardLinkDurable @@ -515,8 +515,8 @@ hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do -- | @'hardLink' reg hfs hbio dur sourcePath targetPath@ creates a hard link -- from @sourcePath@ to @targetPath@. hardLink :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> HardLinkDurable @@ -524,19 +524,20 @@ hardLink :: -> FS.FsPath -> m () hardLink reg hfs hbio dur sourcePath targetPath = do - allocateTemp reg + withRollback_ reg (FS.createHardLink hbio sourcePath targetPath) - (\_ -> FS.removeFile hfs targetPath) + (FS.removeFile hfs targetPath) when (dur == HardLinkDurable) $ FS.synchroniseFile hfs hbio targetPath + {------------------------------------------------------------------------------- Copy file -------------------------------------------------------------------------------} {-# SPECIALISE copyFile :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> FS.FsPath @@ -545,15 +546,15 @@ hardLink reg hfs hbio dur sourcePath targetPath = do #-} -- | @'copyFile' reg hfs hbio source target@ copies the @source@ path to the @target@ path. copyFile :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> FS.FsPath -> FS.FsPath -> m () copyFile reg hfs _hbio sourcePath targetPath = - flip (allocateTemp reg) (\_ -> FS.removeFile hfs targetPath) $ + flip (withRollback_ reg) (FS.removeFile hfs targetPath) $ FS.withFile hfs sourcePath FS.ReadMode $ \sourceHandle -> FS.withFile hfs targetPath (FS.WriteMode FS.MustBeNew) $ \targetHandle -> do bs <- FSL.hGetAll hfs sourceHandle diff --git a/src/Database/LSMTree/Internal/Vector.hs b/src/Database/LSMTree/Internal/Vector.hs index 618eb7f7f..e59639c6d 100644 --- a/src/Database/LSMTree/Internal/Vector.hs +++ b/src/Database/LSMTree/Internal/Vector.hs @@ -9,6 +9,7 @@ module Database.LSMTree.Internal.Vector ( mapStrict, mapMStrict, imapMStrict, + forMStrict, zipWithStrict, binarySearchL, unsafeInsertWithMStrict, @@ -79,6 +80,11 @@ imapMStrict f v = V.imapM (\i -> f i >=> (pure $!)) v zipWithStrict :: forall a b c. (a -> b -> c) -> V.Vector a -> V.Vector b -> V.Vector c zipWithStrict f xs ys = runST (V.zipWithM (\x y -> pure $! f x y) xs ys) +-- | /( O(n) /) Like 'V.forM', but strict in the produced elements of type @b@. +{-# INLINE forMStrict #-} +forMStrict :: Monad m => V.Vector a -> (a -> m b) -> m (V.Vector b) +forMStrict xs f = V.forM xs (f >=> (pure $!)) + {-| Finds the lowest index in a given sorted vector at which the given element could be inserted while maintaining the sortedness. diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index b5db368b9..e991e37e8 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -5,8 +5,8 @@ module Test.Database.LSMTree.Internal.Run ( tests, ) where +import Control.ActionRegistry (withActionRegistry) import Control.RefCount -import Control.TempRegistry (withTempRegistry) import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Short as SBS @@ -196,7 +196,7 @@ prop_WriteAndOpen :: -> IO Property prop_WriteAndOpen fs hbio wb = withRun fs hbio (simplePath 1337) (serialiseRunData wb) $ \written -> - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do let paths = Run.runFsPaths written paths' = paths { runNumber = RunNumber 17} hardLinkRunFiles reg fs hbio NoHardLinkDurable paths paths'