Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bench/macro/lsm-tree-bench-lookups.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import Database.LSMTree.Internal.Paths (RunFsPaths (RunFsPaths))
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunBuilder (RunParams (..))
import qualified Database.LSMTree.Internal.RunBuilder as RunBuilder
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise (SerialisedKey,
Expand Down Expand Up @@ -349,10 +350,13 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do
-- create the runs
rbs <- sequence
[ RunBuilder.new hfs hbio
RunParams {
runParamCaching = caching,
runParamAlloc = RunAllocFixed benchmarkNumBitsPerEntry,
runParamIndex = Index.Compact
}
(RunFsPaths (FS.mkFsPath []) (RunNumber i))
(NumEntries numEntries)
(RunAllocFixed benchmarkNumBitsPerEntry)
Index.Compact
| ((numEntries, _), i) <- zip runSizes [0..] ]

-- fill the runs
Expand All @@ -373,7 +377,7 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do
putStr "DONE"

-- return runs
runs <- V.fromList <$> mapM (Run.fromMutable caching) rbs
runs <- V.fromList <$> mapM Run.fromMutable rbs
let blooms = V.map (\(DeRef r) -> Run.runFilter r) runs
indexes = V.map (\(DeRef r) -> Run.runIndex r) runs
handles = V.map (\(DeRef r) -> Run.runKOpsFile r) runs
Expand Down
5 changes: 2 additions & 3 deletions bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ import qualified Data.Vector as V
import Database.LSMTree.Extras.Orphans ()
import Database.LSMTree.Extras.Random (frequency, randomByteStringR,
sampleUniformWithReplacement, uniformWithoutReplacement)
import Database.LSMTree.Extras.RunData (defaultRunParams)
import Database.LSMTree.Extras.UTxO
import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..))
import qualified Database.LSMTree.Internal.Index as Index (IndexType (Compact))
import Database.LSMTree.Internal.Lookup (bloomQueries, indexSearches,
intraPageLookups, lookupsIO, prepLookups)
import Database.LSMTree.Internal.Page (getNumPages)
import Database.LSMTree.Internal.Paths (RunFsPaths (..))
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
Expand Down Expand Up @@ -192,7 +191,7 @@ lookupsInBatchesEnv Config {..} = do
wbblobs <- WBB.new hasFS (FS.mkFsPath ["0.wbblobs"])
wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob hasFS wbblobs)) storedKeys
let fsps = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
r <- Run.fromWriteBuffer hasFS hasBlockIO caching (RunAllocFixed 10) Index.Compact fsps wb wbblobs
r <- Run.fromWriteBuffer hasFS hasBlockIO defaultRunParams fsps wb wbblobs
let NumEntries nentriesReal = Run.size r
assertEqual nentriesReal nentries $ pure ()
-- 42 to 43 entries per page
Expand Down
4 changes: 1 addition & 3 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.Paths (RunFsPaths (..))
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.UniqCounter
Expand Down Expand Up @@ -264,8 +263,7 @@ merge ::
merge fs hbio Config {..} targetPaths runs = do
let f = fromMaybe const mergeMappend
m <- fromMaybe (error "empty inputs, no merge created") <$>
Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) Index.Compact
mergeType f targetPaths runs
Merge.new fs hbio defaultRunParams mergeType f targetPaths runs
Merge.stepsToCompletion m stepSize

fsPath :: FS.FsPath
Expand Down
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ library
Database.LSMTree.Internal.CRC32C
Database.LSMTree.Internal.Cursor
Database.LSMTree.Internal.Entry
Database.LSMTree.Internal.IncomingRun
Database.LSMTree.Internal.Index
Database.LSMTree.Internal.Index.Compact
Database.LSMTree.Internal.Index.CompactAcc
Expand Down
6 changes: 2 additions & 4 deletions src-extras/Database/LSMTree/Extras/MergingRunData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergingRun (MergingRun)
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.Paths
import Database.LSMTree.Internal.Run (RunDataCaching (..))
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.UniqCounter
Expand Down Expand Up @@ -88,8 +86,8 @@ unsafeCreateMergingRun hfs hbio resolve indexType path counter = \case
$ \runs -> do
n <- incrUniqCounter counter
let fsPaths = RunFsPaths path (RunNumber (uniqueToInt n))
MR.new hfs hbio resolve CacheRunData (RunAllocFixed 10) indexType
mergeType fsPaths (V.fromList runs)
MR.new hfs hbio resolve defaultRunParams mergeType
fsPaths (V.fromList runs)

{-------------------------------------------------------------------------------
MergingRunData
Expand Down
12 changes: 6 additions & 6 deletions src-extras/Database/LSMTree/Extras/MergingTreeData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ unsafeCreateMergingTree ::
unsafeCreateMergingTree hfs hbio resolve indexType path counter = go
where
go = \case
CompletedTreeMergeData rd -> do
CompletedTreeMergeData rd ->
withRun hfs hbio indexType path counter rd $ \run ->
MT.mkMergingTree . MT.CompletedTreeMerge =<< dupRef run
OngoingTreeMergeData mrd -> do
MT.newCompletedMerge run
OngoingTreeMergeData mrd ->
withMergingRun hfs hbio resolve indexType path counter mrd $ \mr ->
MT.mkMergingTree . MT.OngoingTreeMerge =<< dupRef mr
PendingLevelMergeData prds mtd -> do
MT.newOngoingMerge mr
PendingLevelMergeData prds mtd ->
withPreExistingRuns prds $ \prs ->
withMaybeTree mtd $ \mt ->
MT.newPendingLevelMerge prs mt
PendingUnionMergeData mtds -> do
PendingUnionMergeData mtds ->
withTrees mtds $ \mts ->
MT.newPendingUnionMerge mts

Expand Down
9 changes: 9 additions & 0 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,18 @@ deriving stock instance Generic (Run m h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (Run m h)

deriving stock instance Generic RunParams
deriving anyclass instance NoThunks RunParams

deriving stock instance Generic RunBloomFilterAlloc
deriving anyclass instance NoThunks RunBloomFilterAlloc

deriving stock instance Generic RunDataCaching
deriving anyclass instance NoThunks RunDataCaching

deriving stock instance Generic IndexType
deriving anyclass instance NoThunks IndexType

{-------------------------------------------------------------------------------
Paths
-------------------------------------------------------------------------------}
Expand Down
21 changes: 17 additions & 4 deletions src-extras/Database/LSMTree/Extras/RunData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
-- from them. Tests and benchmarks should preferably use these utilities instead
-- of (re-)defining their own.
module Database.LSMTree.Extras.RunData (
-- * RunParams
defaultRunParams
-- * Create runs
withRun
, withRun
, withRunAt
, withRuns
, unsafeCreateRun
Expand Down Expand Up @@ -48,12 +50,13 @@ import qualified Data.Vector as V
import Database.LSMTree.Extras (showPowersOf10)
import Database.LSMTree.Extras.Generators ()
import Database.LSMTree.Internal.Entry
import Database.LSMTree.Internal.Index (IndexType)
import Database.LSMTree.Internal.Index (IndexType (..))
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries)
import Database.LSMTree.Internal.Paths
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..))
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..),
RunParams (..))
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..),
entryWouldFitInPage)
Expand All @@ -69,6 +72,15 @@ import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)
import Test.QuickCheck


defaultRunParams :: RunParams
defaultRunParams =
RunParams {
runParamCaching = CacheRunData,
runParamAlloc = RunAllocFixed 10,
runParamIndex = Compact
}

{-------------------------------------------------------------------------------
Create runs
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -153,7 +165,8 @@ unsafeCreateRunAt fs hbio indexType fsPaths (RunData m) = do
let blobpath = FS.addExtension (runBlobPath fsPaths) ".wb"
bracket (WBB.new fs blobpath) releaseRef $ \wbblobs -> do
wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob fs wbblobs)) m
Run.fromWriteBuffer fs hbio CacheRunData (RunAllocFixed 10) indexType
Run.fromWriteBuffer fs hbio
defaultRunParams { runParamIndex = indexType }
fsPaths wb wbblobs

-- | Create a 'RunFsPaths' using an empty 'FsPath'. The empty path corresponds
Expand Down
66 changes: 38 additions & 28 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ module Database.LSMTree.Internal (
, openSnapshot
, deleteSnapshot
, listSnapshots
-- * Mutiple writable tables
-- * Multiple writable tables
, duplicate
-- * Table union
, unions
Expand Down Expand Up @@ -323,7 +323,7 @@ data SessionState m h =
| SessionClosed

data SessionEnv m h = SessionEnv {
-- | The path to the directory in which this sesion is live. This is a path
-- | The path to the directory in which this session is live. This is a path
-- relative to root of the 'HasFS' instance.
--
-- INVARIANT: the session root is never changed during the lifetime of a
Expand Down Expand Up @@ -1234,18 +1234,30 @@ createSnapshot snap label tableType t = do
let wb = tableWriteBuffer content
let wbb = tableWriteBufferBlobs content
snapWriteBufferNumber <- Paths.writeBufferNumber <$>
snapshotWriteBuffer reg hfs hbio activeUc snapUc activeDir snapDir wb wbb
snapshotWriteBuffer hfs hbio activeUc snapUc reg activeDir snapDir wb wbb

-- Convert to snapshot format
snapLevels <- toSnapLevels (tableLevels content)

-- Hard link runs into the named snapshot directory
snapLevels' <- snapshotRuns reg snapUc snapDir snapLevels
snapLevels' <- traverse (snapshotRun hfs hbio snapUc reg snapDir) snapLevels

-- If a merging tree exists, do the same hard-linking for the runs within
mTreeOpt <- case tableUnionLevel content of
NoUnion -> pure Nothing
Union mTreeRef -> do
mTree <- toSnapMergingTree mTreeRef
Just <$> traverse (snapshotRun hfs hbio snapUc reg snapDir) mTree

-- Release the table content
releaseTableContent reg content

let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
let snapMetaData = SnapshotMetaData
label
tableType
(tableConfig t)
snapWriteBufferNumber
snapLevels'
mTreeOpt
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
Expand Down Expand Up @@ -1290,7 +1302,7 @@ openSnapshot sesh label tableType override snap resolve = do
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
Right x -> pure x

let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels = snapMetaData
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels mTreeOpt = snapMetaData

unless (tableType == tableType') $
throwIO (ErrSnapshotWrongTableType snap tableType tableType')
Expand All @@ -1308,19 +1320,26 @@ openSnapshot sesh label tableType override snap resolve = do
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths

-- Hard link runs into the active directory,
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels
snapLevels' <- traverse (openRun hfs hbio uc reg snapDir activeDir) snapLevels
unionLevel <- case mTreeOpt of
Nothing -> pure NoUnion
Just mTree -> do
snapTree <- traverse (openRun hfs hbio uc reg snapDir activeDir) mTree
mt <- fromSnapMergingTree hfs hbio uc resolve activeDir reg snapTree
traverse_ (delayedCommit reg . releaseRef) snapTree
pure (Union mt)

-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve activeDir snapLevels'
releaseRuns reg snapLevels'
tableLevels <- fromSnapLevels hfs hbio uc conf resolve reg activeDir snapLevels'
traverse_ (delayedCommit reg . releaseRef) snapLevels'

tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
tableWriteBuffer
, tableWriteBufferBlobs
, tableLevels
, tableCache
, tableUnionLevel = NoUnion -- TODO: at some point also load union level from snapshot
, tableUnionLevel = unionLevel
}

{-# SPECIALISE deleteSnapshot ::
Expand Down Expand Up @@ -1370,7 +1389,7 @@ listSnapshots sesh = do
else pure $ Nothing

{-------------------------------------------------------------------------------
Mutiple writable tables
Multiple writable tables
-------------------------------------------------------------------------------}

{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}
Expand Down Expand Up @@ -1534,28 +1553,19 @@ writeBufferToNewRun SessionEnv {
sessionHasBlockIO = hbio,
sessionUniqCounter = uc
}
conf@TableConfig {
confDiskCachePolicy,
confFencePointerIndex
}
conf
TableContent{
tableWriteBuffer,
tableWriteBufferBlobs
}
| WB.null tableWriteBuffer = pure Nothing
| otherwise = Just <$> do
!n <- incrUniqCounter uc
let !ln = LevelNo 1
!cache = diskCachePolicyForLevel confDiskCachePolicy ln
!alloc = bloomFilterAllocForLevel conf ln
!indexType = indexTypeForRun confFencePointerIndex
!path = Paths.runPath (Paths.activeDir root)
(uniqueToRunNumber n)
Run.fromWriteBuffer hfs hbio
cache
alloc
indexType
path
!uniq <- incrUniqCounter uc
let (!runParams, !runPaths) = mergingRunParamsForLevel
(Paths.activeDir root) conf uniq (LevelNo 1)
Run.fromWriteBuffer
hfs hbio
runParams runPaths
tableWriteBuffer
tableWriteBufferBlobs

Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Internal/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import qualified Monkey

newtype LevelNo = LevelNo Int
deriving stock (Show, Eq)
deriving newtype Enum
deriving newtype (Enum, NFData)

{-------------------------------------------------------------------------------
Table configuration
Expand Down
Loading