Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ module Database.LSMTree.Internal (
, openSnapshot
, deleteSnapshot
, listSnapshots
-- * Mutiple writable tables
-- * Multiple writable tables
, duplicate
-- * Table union
, unions
Expand Down Expand Up @@ -318,7 +318,7 @@ data SessionState m h =
| SessionClosed

data SessionEnv m h = SessionEnv {
-- | The path to the directory in which this sesion is live. This is a path
-- | The path to the directory in which this session is live. This is a path
-- relative to root of the 'HasFS' instance.
--
-- INVARIANT: the session root is never changed during the lifetime of a
Expand Down Expand Up @@ -1194,10 +1194,22 @@ createSnapshot snap label tableType t = do
-- Hard link runs into the named snapshot directory
snapLevels' <- traverse (snapshotRun hfs hbio snapUc reg snapDir) snapLevels

-- Release the table content
-- If a merging tree exists, do the same hard-linking for the runs within
mTreeOpt <- case tableUnionLevel content of
NoUnion -> pure Nothing
Union mTreeRef -> do
mTree <- toSnapMergingTree mTreeRef
Just <$> traverse (snapshotRun hfs hbio snapUc reg snapDir) mTree

releaseTableContent reg content

let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
let snapMetaData = SnapshotMetaData
label
tableType
(tableConfig t)
snapWriteBufferNumber
snapLevels'
mTreeOpt
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
Expand Down Expand Up @@ -1242,7 +1254,7 @@ openSnapshot sesh label tableType override snap resolve = do
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
Right x -> pure x

let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels = snapMetaData
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels mTreeOpt = snapMetaData

unless (tableType == tableType') $
throwIO (ErrSnapshotWrongTableType snap tableType tableType')
Expand All @@ -1261,18 +1273,24 @@ openSnapshot sesh label tableType override snap resolve = do

-- Hard link runs into the active directory,
snapLevels' <- traverse (openRun hfs hbio uc reg snapDir activeDir) snapLevels
unionLevel <- case mTreeOpt of
Nothing -> pure NoUnion
Just mTree -> do
snapTree <- traverse (openRun hfs hbio uc reg snapDir activeDir) mTree
Union <$> fromSnapMergingTree reg hfs hbio conf uc resolve activeDir snapTree

-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels hfs hbio uc conf resolve reg activeDir snapLevels'
traverse_ (delayedCommit reg . releaseRef) snapLevels'
--TODO: also delayedCommit unionLevel

tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
tableWriteBuffer
, tableWriteBufferBlobs
, tableLevels
, tableCache
, tableUnionLevel = NoUnion -- TODO: at some point also load union level from snapshot
, tableUnionLevel = unionLevel
}

{-# SPECIALISE deleteSnapshot ::
Expand Down Expand Up @@ -1322,7 +1340,7 @@ listSnapshots sesh = do
else pure $ Nothing

{-------------------------------------------------------------------------------
Mutiple writable tables
Multiple writable tables
-------------------------------------------------------------------------------}

{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}
Expand Down
12 changes: 11 additions & 1 deletion src/Database/LSMTree/Internal/MergingTree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
module Database.LSMTree.Internal.MergingTree (
-- $mergingtrees
MergingTree (..)
, newOngoingMerge
, PreExistingRun (..)
, newPendingLevelMerge
, newPendingUnionMerge
Expand Down Expand Up @@ -102,6 +103,15 @@ data PreExistingRun m h =
PreExistingRun !(Ref (Run m h))
| PreExistingMergingRun !(Ref (MergingRun MR.LevelMergeType m h))

-- | Create a new 'MergingTree' representing the merge of an ongoing run.
-- The usage of this function is primarily to facilitate the reloading of an
-- ongoing merge from a persistent snapshot.
newOngoingMerge ::
(MonadMVar m, PrimMonad m, MonadMask m)
=> Ref (MergingRun MR.TreeMergeType m h)
-> m (Ref (MergingTree m h))
newOngoingMerge = mkMergingTree . OngoingTreeMerge

-- | Create a new 'MergingTree' representing the merge of a sequence of
-- pre-existing runs (completed or ongoing, plus a optional final tree).
-- This is for merging the entire contents of a table down to a single run
Expand Down Expand Up @@ -183,7 +193,7 @@ newPendingUnionMerge ::
-> m (Ref (MergingTree m h))
newPendingUnionMerge mts = do
mts' <- V.filterM (fmap not . isStructurallyEmpty) (V.fromList mts)
-- isStructurallyEmpty is interruptable even with async exceptions masked,
-- isStructurallyEmpty is interruptible even with async exceptions masked,
-- but we use it before allocating new references.
mts'' <- V.mapM dupRef mts'
case V.uncons mts'' of
Expand Down
168 changes: 164 additions & 4 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,26 @@ module Database.LSMTree.Internal.Snapshot (
, SnapLevel (..)
, SnapIncomingRun (..)
, SnapMergingRunState (..)
-- * MergeTree snapshot format
, SnapMergingTree(..)
, SnapMergingTreeState(..)
, SnapPendingMerge(..)
, SnapPreExistingRun(..)
-- * Conversion to levels snapshot format
, toSnapLevels
-- * Conversion to merging tree snapshot format
, toSnapMergingTree
-- * Write buffer
, snapshotWriteBuffer
, openWriteBuffer
-- * Run
, snapshotRun
, openRun
-- * Opening from levels snapshot format
-- * Opening snapshot formats
-- ** Levels format
, fromSnapLevels
-- ** Mergeing Tree format
, fromSnapMergingTree
-- * Hard links
, hardLinkRunFiles
) where
Expand All @@ -43,6 +53,7 @@ import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.MergingRun (NumRuns (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import qualified Database.LSMTree.Internal.MergingTree as MT
import Database.LSMTree.Internal.Paths (ActiveDir (..), ForBlob (..),
ForKOps (..), NamedSnapshotDir (..), RunFsPaths (..),
WriteBufferFsPaths (..),
Expand Down Expand Up @@ -93,7 +104,7 @@ data SnapshotMetaData = SnapshotMetaData {
--
-- One could argue that the 'SnapshotName' could be used to to hold this
-- type information, but the file name of snapshot metadata is not guarded
-- by a checksum, wherease the contents of the file are. Therefore using the
-- by a checksum, whereas the contents of the file are. Therefore using the
-- 'SnapshotLabel' is safer.
snapMetaLabel :: !SnapshotLabel
-- | Whether a table is normal or monoidal.
Expand All @@ -110,11 +121,15 @@ data SnapshotMetaData = SnapshotMetaData {
, snapWriteBuffer :: !RunNumber
-- | The shape of the levels of the LSM tree.
, snapMetaLevels :: !(SnapLevels SnapshotRun)
-- | The state of tree merging of the LSM tree.
, snapMergingTree :: !(Maybe (SnapMergingTree SnapshotRun))
}
deriving stock Eq

instance NFData SnapshotMetaData where
rnf (SnapshotMetaData a b c d e) = rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e
rnf (SnapshotMetaData a b c d e f) =
rnf a `seq` rnf b `seq` rnf c `seq`
rnf d `seq` rnf e `seq` rnf f

-- | Information needed to open a 'Run' from disk, see 'Run.openFromDisk'.
--
Expand Down Expand Up @@ -197,6 +212,151 @@ instance (NFData t, NFData r) => NFData (SnapMergingRunState t r) where
rnf (SnapCompletedMerge a b c) = rnf a `seq` rnf b `seq` rnf c
rnf (SnapOngoingMerge a b c d) = rnf a `seq` rnf b `seq` rnf c `seq` rnf d

{-------------------------------------------------------------------------------
Snapshot MergingTree
-------------------------------------------------------------------------------}

newtype SnapMergingTree r = SnapMergingTree (SnapMergingTreeState r)
deriving stock (Eq, Functor, Foldable, Traversable)
deriving newtype NFData

data SnapMergingTreeState r =
SnapCompletedTreeMerge !r
| SnapPendingTreeMerge !(SnapPendingMerge r)
| SnapOngoingTreeMerge
!(SnapMergingRunState MR.TreeMergeType r)
deriving stock (Eq, Functor, Foldable, Traversable)

instance NFData r => NFData (SnapMergingTreeState r) where
rnf (SnapCompletedTreeMerge a) = rnf a
rnf (SnapPendingTreeMerge a) = rnf a
rnf (SnapOngoingTreeMerge a) = rnf a

data SnapPendingMerge r =
SnapPendingLevelMerge
![SnapPreExistingRun r]
!(Maybe (SnapMergingTree r))
| SnapPendingUnionMerge
![SnapMergingTree r]
deriving stock (Eq, Functor, Foldable, Traversable)

instance NFData r => NFData (SnapPendingMerge r) where
rnf (SnapPendingLevelMerge a b) = rnf a `seq` rnf b
rnf (SnapPendingUnionMerge a) = rnf a

data SnapPreExistingRun r =
SnapPreExistingRun !r
| SnapPreExistingMergingRun
!(SnapMergingRunState MR.LevelMergeType r)
deriving stock (Eq, Functor, Foldable, Traversable)

instance NFData r => NFData (SnapPreExistingRun r) where
rnf (SnapPreExistingRun a) = rnf a
rnf (SnapPreExistingMergingRun a) = rnf a

{-------------------------------------------------------------------------------
Opening from merging tree snapshot format
-------------------------------------------------------------------------------}

{-# SPECIALISE fromSnapMergingTree ::
ActionRegistry IO
-> HasFS IO h
-> HasBlockIO IO h
-> TableConfig
-> UniqCounter IO
-> ResolveSerialisedValue
-> ActiveDir
-> SnapMergingTree (Ref (Run IO h))
-> IO (Ref (MT.MergingTree IO h))
#-}
-- | Duplicates runs and re-creates merging runs.
fromSnapMergingTree ::
forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
=> ActionRegistry m
-> HasFS m h
-> HasBlockIO m h
-> TableConfig
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MT.MergingTree m h))
fromSnapMergingTree reg hfs hbio conf uc resolve dir (SnapMergingTree snapTreeState) =
fromSnapTreeState snapTreeState
where
-- Partially applied functions for convenience
recurrence :: SnapMergingTree (Ref (Run m h)) -> m (Ref (MT.MergingTree m h))
recurrence = fromSnapMergingTree reg hfs hbio conf uc resolve dir

getSnapMergingRunState
:: forall t.
MR.IsMergeType t
=> SnapMergingRunState t (Ref (Run m h))
-> m (Ref (MR.MergingRun t m h))
getSnapMergingRunState = fromSnapMergingRunState hfs hbio uc resolve dir

-- Conversion definitions
fromSnapTreeState :: SnapMergingTreeState (Ref (Run m h)) -> m (Ref (MT.MergingTree m h))
fromSnapTreeState (SnapCompletedTreeMerge run) =
MT.newPendingLevelMerge [MT.PreExistingRun run] Nothing
fromSnapTreeState (SnapPendingTreeMerge pMerge) = case pMerge of
SnapPendingLevelMerge peRuns maybeMergeTree -> do
peRuns' <- traverse fromSnapPreExistingRun peRuns
maybeMergeTree' <- traverse recurrence maybeMergeTree
MT.newPendingLevelMerge peRuns' maybeMergeTree'
SnapPendingUnionMerge mergeTrees ->
MT.newPendingUnionMerge =<< traverse recurrence mergeTrees
fromSnapTreeState (SnapOngoingTreeMerge smrs) =
MT.newOngoingMerge =<< getSnapMergingRunState smrs

fromSnapPreExistingRun :: SnapPreExistingRun (Ref (Run m h)) -> m (MT.PreExistingRun m h)
fromSnapPreExistingRun (SnapPreExistingRun run) = pure $ MT.PreExistingRun run
fromSnapPreExistingRun (SnapPreExistingMergingRun smrs) =
MT.PreExistingMergingRun <$> getSnapMergingRunState smrs

{-------------------------------------------------------------------------------
Conversion to merge tree snapshot format
-------------------------------------------------------------------------------}

{-# SPECIALISE toSnapMergingTree :: Ref (MT.MergingTree IO h) -> IO (SnapMergingTree (Ref (Run IO h))) #-}
toSnapMergingTree ::
(PrimMonad m, MonadMVar m)
=> Ref (MT.MergingTree m h)
-> m (SnapMergingTree (Ref (Run m h)))
toSnapMergingTree (DeRef (MT.MergingTree mStateVar _mCounter)) =
withMVar mStateVar $ \mState -> SnapMergingTree <$> toSnapMergingTreeState mState

{-# SPECIALISE toSnapMergingTreeState :: MT.MergingTreeState IO h -> IO (SnapMergingTreeState (Ref (Run IO h))) #-}
toSnapMergingTreeState ::
(PrimMonad m, MonadMVar m)
=> MT.MergingTreeState m h
-> m (SnapMergingTreeState (Ref (Run m h)))
toSnapMergingTreeState (MT.CompletedTreeMerge r) = pure $ SnapCompletedTreeMerge r
toSnapMergingTreeState (MT.PendingTreeMerge p) = SnapPendingTreeMerge <$> toSnapPendingMerge p
toSnapMergingTreeState (MT.OngoingTreeMerge mergingRun) =
SnapOngoingTreeMerge <$> toSnapMergingRunState mergingRun

{-# SPECIALISE toSnapPendingMerge :: MT.PendingMerge IO h -> IO (SnapPendingMerge (Ref (Run IO h))) #-}
toSnapPendingMerge ::
(PrimMonad m, MonadMVar m)
=> MT.PendingMerge m h
-> m (SnapPendingMerge (Ref (Run m h)))
toSnapPendingMerge (MT.PendingUnionMerge mts) =
SnapPendingUnionMerge <$> traverse toSnapMergingTree (V.toList mts)
toSnapPendingMerge (MT.PendingLevelMerge pes mmt) = do
pes' <- traverse toSnapPreExistingRun pes
mmt' <- traverse toSnapMergingTree mmt
pure $ SnapPendingLevelMerge (V.toList pes') mmt'

{-# SPECIALISE toSnapPreExistingRun :: MT.PreExistingRun IO h -> IO (SnapPreExistingRun (Ref (Run IO h))) #-}
toSnapPreExistingRun ::
(PrimMonad m, MonadMVar m)
=> MT.PreExistingRun m h
-> m (SnapPreExistingRun (Ref (Run m h)))
toSnapPreExistingRun (MT.PreExistingRun run) = pure $ SnapPreExistingRun run
toSnapPreExistingRun (MT.PreExistingMergingRun peMergingRun) =
SnapPreExistingMergingRun <$> toSnapMergingRunState peMergingRun

{-------------------------------------------------------------------------------
Conversion to levels snapshot format
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -247,7 +407,7 @@ toSnapMergingRunState ::
-> m (SnapMergingRunState t (Ref (Run m h)))
toSnapMergingRunState !mr = do
-- TODO: MR.snapshot needs to return duplicated run references, and we
-- need to arrange to release them when the snapshoting is done.
-- need to arrange to release them when the snapshotting is done.
(numRuns, mergeDebt, mergeCredits, state) <- MR.snapshot mr
case state of
MR.CompletedMerge r ->
Expand Down
Loading
Loading