-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
micro-bench: add benchmark for run merging
- Loading branch information
Showing
4 changed files
with
422 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,391 @@ | ||
{-# LANGUAGE LambdaCase #-} | ||
{-# LANGUAGE RecordWildCards #-} | ||
|
||
module Bench.Database.LSMTree.Internal.Merge (benchmarks) where | ||
|
||
import Control.Monad (when, zipWithM) | ||
import Criterion.Main (Benchmark, bench, bgroup) | ||
import qualified Criterion.Main as Cr | ||
import Data.Bifunctor (first) | ||
import qualified Data.BloomFilter.Hash as Hash | ||
import qualified Data.ByteString as BS | ||
import Data.Foldable (traverse_) | ||
import qualified Data.List as List | ||
import Data.Maybe (fromMaybe) | ||
import Data.Word (Word64) | ||
import Database.LSMTree.Extras.Orphans () | ||
import Database.LSMTree.Extras.Random (frequency, withReplacement, | ||
withoutReplacement) | ||
import Database.LSMTree.Extras.UTxO | ||
import Database.LSMTree.Internal.Entry | ||
import qualified Database.LSMTree.Internal.Merge as Merge | ||
import Database.LSMTree.Internal.Run (Run) | ||
import qualified Database.LSMTree.Internal.Run as Run | ||
import Database.LSMTree.Internal.Serialise | ||
import qualified Database.LSMTree.Internal.WriteBuffer as WB | ||
import Prelude hiding (getContents) | ||
import System.Directory (removeDirectoryRecursive) | ||
import qualified System.FS.API as FS | ||
import qualified System.FS.IO as FS | ||
import System.IO.Temp | ||
import qualified System.Random as R | ||
import System.Random (StdGen, mkStdGen, uniform, uniformR) | ||
|
||
benchmarks :: Benchmark | ||
benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [ | ||
-- various numbers of runs | ||
benchMerge configWord64 | ||
{ name = "word64-insert-x2" | ||
, nentries = scaled $ replicate 2 1 | ||
, finserts = 1 | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-insert-x4" | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-insert-x7" | ||
, nentries = scaled $ replicate 7 1 | ||
, finserts = 1 | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-insert-x13" | ||
, nentries = scaled $ replicate 13 1 | ||
, finserts = 1 | ||
} | ||
-- different operations | ||
, benchMerge configWord64 | ||
{ name = "word64-delete-x4" | ||
, nentries = scaled $ replicate 4 1 | ||
, fdeletes = 1 | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-blob-x4" | ||
, nentries = scaled $ replicate 4 1 | ||
, fblobinserts = 1 | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-mupsert-collisions-x4" | ||
, nentries = scaled $ replicate 4 1 | ||
, fmupserts = 1 | ||
, randomKey = randomWord64WithCollisions | ||
, mergeMappend = Just (onDeserialisedValues ((+) @Word64)) | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-mix-x4" | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
, fmupserts = 1 | ||
, mergeMappend = Just (onDeserialisedValues ((+) @Word64)) | ||
} | ||
, benchMerge configWord64 | ||
{ name = "word64-mix-collisions-x4" | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
, fmupserts = 1 | ||
, randomKey = randomWord64WithCollisions | ||
, mergeMappend = Just (onDeserialisedValues ((+) @Word64)) | ||
} | ||
-- different key and value sizes | ||
, benchMerge configWord64 | ||
{ name = "insert-mix-keys-x4" -- potentially long keys | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, randomKey = first serialiseKey . randomByteStringR (6, 4000) | ||
} | ||
, benchMerge configWord64 | ||
{ name = "insert-mix-vals-x4" -- potentially long values | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, randomValue = first serialiseValue . randomByteStringR (0, 4000) | ||
} | ||
, benchMerge configWord64 | ||
{ name = "insert-page-x4" -- 1 page | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, randomValue = first serialiseValue . randomByteStringR (4056, 4056) | ||
} | ||
, benchMerge configWord64 | ||
{ name = "insert-page-plus-byte-x4" -- 1 page + 1 byte | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, randomValue = first serialiseValue . randomByteStringR (4057, 4057) | ||
} | ||
, benchMerge configWord64 | ||
{ name = "insert-huge-x4" -- 3-5 pages | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, randomValue = first serialiseValue . randomByteStringR (10_000, 20_000) | ||
} | ||
-- common UTxO scenarios | ||
, benchMerge configUTxO | ||
{ name = "utxo-x4" -- like tiering merge | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
} | ||
, benchMerge configUTxO | ||
{ name = "utxo-x4-uneven" | ||
, nentries = scaled [1, 3, 1.5, 2.5] | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
} | ||
, benchMerge configUTxO | ||
{ name = "utxo-x4-lastlevel" | ||
, nentries = scaled $ replicate 4 1 | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
, mergeLevel = Merge.LastLevel | ||
} | ||
, benchMerge configUTxO | ||
{ name = "utxo-x4+1-min-skewed-lastlevel" -- live levelling merge | ||
, nentries = scaled [1, 1, 1, 1, 4] | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
, mergeLevel = Merge.LastLevel | ||
} | ||
, benchMerge configUTxO | ||
{ name = "utxo-x4+1-max-skewed-lastlevel" -- live levelling merge | ||
, nentries = scaled [1, 1, 1, 1, 16] | ||
, finserts = 1 | ||
, fdeletes = 1 | ||
, mergeLevel = Merge.LastLevel | ||
} | ||
] | ||
where | ||
totalentries = 50_000 :: Word64 | ||
|
||
-- For each run, a significant part of all possible keys are in use, e.g. | ||
-- half of them for 4 runs of equal size. This leads to many collisions. | ||
-- Note that each input run still has the full number of elements due to the | ||
-- use of 'sampleWithoutReplacement'. | ||
-- Warning: Using this with a run of around half of all entries (or more) | ||
-- leads to issues with generating distinct keys for all its entries! | ||
randomWord64WithCollisions :: Rnd SerialisedKey | ||
randomWord64WithCollisions = | ||
first (serialiseKey . Hash.hash64) | ||
. uniformR (0, totalentries `div` 2) | ||
|
||
scaled weights = | ||
let total = sum weights | ||
in [ round (fromIntegral totalentries * w / total :: Double) | ||
| w <- weights | ||
] | ||
|
||
benchMerge :: Config -> Benchmark | ||
benchMerge conf@Config{name} = | ||
withEnv $ \ ~(_dir, hasFS, runs) -> | ||
bgroup name [ | ||
bench "merge" $ | ||
-- We'd like to do: `whnfAppIO (runs' -> ...) runs`. | ||
-- However, we also need per-run cleanup to avoid running out of | ||
-- disk space. We use `perRunEnvWithCleanup`, which has two issues: | ||
-- 1. Just as `whnfAppIO` etc., it takes an IO action and returns | ||
-- `Benchmarkable`, which does not compose. As a workaround, we | ||
-- thread `runs` through the environment, too. | ||
-- 2. It forces the result to normal form, which would traverse the | ||
-- whole run, so we force to WHNF ourselves and just return `()`. | ||
Cr.perRunEnvWithCleanup | ||
((,) runs <$> getPaths) | ||
(cleanupPaths hasFS . snd) $ \(runs', p) -> do | ||
!run <- merge hasFS conf p runs' | ||
-- Make sure to immediately close resulting runs so we don't run | ||
-- out of file handles. Ideally this would not be measured, but at | ||
-- least it's pretty cheap. | ||
Run.removeReference hasFS run | ||
] | ||
where | ||
withEnv = | ||
Cr.envWithCleanup | ||
(writeBufferEnv conf) | ||
writeBufferEnvCleanup | ||
|
||
-- We'll remove the files on every run, so we can re-use the same run number. | ||
getPaths :: IO Run.RunFsPaths | ||
getPaths = pure outputRunNumber | ||
|
||
-- We need to keep the other runs, but remove the freshly created one. | ||
cleanupPaths :: FS.HasFS IO FS.HandleIO -> Run.RunFsPaths -> IO () | ||
cleanupPaths hasFS paths = do | ||
traverse_ (FS.removeFile hasFS) (Run.runFsPaths paths) | ||
exists <- FS.doesFileExist hasFS (Run.runChecksumsPath paths) | ||
when exists $ | ||
FS.removeFile hasFS (Run.runChecksumsPath paths) | ||
|
||
merge :: FS.HasFS IO FS.HandleIO -> Config -> Run.RunFsPaths -> [Run (FS.Handle FS.HandleIO)] -> IO (Run (FS.Handle (FS.HandleIO))) | ||
merge fs Config {..} targetPaths runs = do | ||
let f = fromMaybe const mergeMappend | ||
m <- fromMaybe (error "empty inputs, no merge created") <$> | ||
Merge.new fs mergeLevel f targetPaths runs | ||
go m | ||
where | ||
go m = | ||
Merge.steps fs m stepSize >>= \case | ||
Merge.MergeComplete run -> return run | ||
Merge.MergeInProgress -> go m | ||
|
||
outputRunNumber :: Run.RunFsPaths | ||
outputRunNumber = Run.RunFsPaths 0 | ||
|
||
inputRunNumbers :: [Run.RunFsPaths] | ||
inputRunNumbers = Run.RunFsPaths <$> [1..] | ||
|
||
type InputRuns = [Run (FS.Handle FS.HandleIO)] | ||
|
||
type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue | ||
|
||
onDeserialisedValues :: SerialiseValue v => (v -> v -> v) -> Mappend | ||
onDeserialisedValues f x y = | ||
serialiseValue (f (deserialiseValue x) (deserialiseValue y)) | ||
|
||
type SerialisedKOp = (SerialisedKey, SerialisedEntry) | ||
type SerialisedEntry = Entry SerialisedValue SerialisedBlob | ||
|
||
{------------------------------------------------------------------------------- | ||
Environments | ||
-------------------------------------------------------------------------------} | ||
|
||
-- | Config options describing a benchmarking scenario | ||
data Config = Config { | ||
-- | Name for the benchmark scenario described by this config. | ||
name :: !String | ||
-- | Number of key\/operation pairs for each run. | ||
, nentries :: ![Int] | ||
, finserts :: !Int | ||
, fblobinserts :: !Int | ||
, fdeletes :: !Int | ||
, fmupserts :: !Int | ||
, randomKey :: Rnd SerialisedKey | ||
, randomValue :: Rnd SerialisedValue | ||
, randomBlob :: Rnd SerialisedBlob | ||
, mergeLevel :: !Merge.Level | ||
, mergeMappend :: !(Maybe Mappend) | ||
, stepSize :: !Int | ||
} | ||
|
||
type Rnd a = StdGen -> (a, StdGen) | ||
|
||
defaultConfig :: Config | ||
defaultConfig = Config { | ||
name = "default" | ||
, nentries = [] | ||
, finserts = 0 | ||
, fblobinserts = 0 | ||
, fdeletes = 0 | ||
, fmupserts = 0 | ||
, randomKey = error "randomKey not implemented" | ||
, randomValue = error "randomValue not implemented" | ||
, randomBlob = error "randomBlob not implemented" | ||
, mergeLevel = Merge.MidLevel | ||
, mergeMappend = Nothing | ||
, stepSize = maxBound -- by default, just do in one go | ||
} | ||
|
||
configWord64 :: Config | ||
configWord64 = defaultConfig { | ||
randomKey = first serialiseKey . uniform @_ @Word64 | ||
, randomValue = first serialiseValue . uniform @_ @Word64 | ||
, randomBlob = first serialiseBlob . randomByteStringR (0, 0x2000) -- up to 8 kB | ||
} | ||
|
||
configUTxO :: Config | ||
configUTxO = defaultConfig { | ||
randomKey = first serialiseKey . uniform @_ @UTxOKey | ||
, randomValue = first serialiseValue . uniform @_ @UTxOValue | ||
} | ||
|
||
writeBufferEnv :: | ||
Config | ||
-> IO ( FilePath -- ^ Temporary directory | ||
, FS.HasFS IO FS.HandleIO | ||
, InputRuns | ||
) | ||
writeBufferEnv config = do | ||
sysTmpDir <- getCanonicalTemporaryDirectory | ||
benchTmpDir <- createTempDirectory sysTmpDir "writeBufferEnv" | ||
let hasFS = FS.ioHasFS (FS.MountPoint benchTmpDir) | ||
runs <- randomRuns hasFS config (mkStdGen 17) | ||
pure (benchTmpDir, hasFS, runs) | ||
|
||
writeBufferEnvCleanup :: | ||
( FilePath -- ^ Temporary directory | ||
, FS.HasFS IO FS.HandleIO | ||
, InputRuns | ||
) | ||
-> IO () | ||
writeBufferEnvCleanup (tmpDir, hasFS, runs) = do | ||
traverse_ (Run.removeReference hasFS) runs | ||
removeDirectoryRecursive tmpDir | ||
|
||
-- | Generate keys and entries to insert into the write buffer. | ||
-- They are already serialised to exclude the cost from the benchmark. | ||
randomRuns :: | ||
FS.HasFS IO h | ||
-> Config | ||
-> StdGen -- ^ RNG | ||
-> IO [Run (FS.Handle h)] | ||
randomRuns hasFS config@Config {..} = | ||
zipWithM (createRun hasFS mergeMappend) inputRunNumbers | ||
. zipWith (randomKOps config) nentries | ||
. List.unfoldr (Just . R.split) | ||
|
||
createRun :: | ||
FS.HasFS IO h | ||
-> Maybe Mappend | ||
-> Run.RunFsPaths | ||
-> [SerialisedKOp] | ||
-> IO (Run (FS.Handle h)) | ||
createRun hasFS mMappend targetPath = | ||
Run.fromWriteBuffer hasFS targetPath | ||
. List.foldl insert WB.empty | ||
where | ||
insert wb (k, e) = case mMappend of | ||
Nothing -> WB.addEntryNormal k (expectNormal e) wb | ||
Just f -> WB.addEntryMonoidal f k (expectMonoidal e) wb | ||
|
||
expectNormal e = fromMaybe (error ("invalid normal update: " <> show e)) | ||
(entryToUpdateNormal e) | ||
expectMonoidal e = fromMaybe (error ("invalid monoidal update: " <> show e)) | ||
(entryToUpdateMonoidal e) | ||
|
||
-- | Generate keys and entries to insert into the write buffer. | ||
-- They are already serialised to exclude the cost from the benchmark. | ||
randomKOps :: | ||
Config | ||
-> Int -- ^ number of entries | ||
-> StdGen -- ^ RNG | ||
-> [SerialisedKOp] | ||
randomKOps Config {..} runentries g0 = | ||
zip | ||
(withoutReplacement g1 runentries randomKey) | ||
(withReplacement g2 runentries randomEntry) | ||
where | ||
(g1, g2) = R.split g0 | ||
|
||
randomEntry :: Rnd SerialisedEntry | ||
randomEntry = frequency | ||
[ ( finserts | ||
, \g -> let (!v, !g') = randomValue g | ||
in (Insert v, g') | ||
) | ||
, ( fblobinserts | ||
, \g -> let (!v, !g') = randomValue g | ||
(!b, !g'') = randomBlob g' | ||
in (InsertWithBlob v b, g'') | ||
) | ||
, ( fdeletes | ||
, \g -> (Delete, g) | ||
) | ||
, ( fmupserts | ||
, \g -> let (!v, !g') = randomValue g | ||
in (Mupdate v, g') | ||
) | ||
] | ||
|
||
randomByteStringR :: (Int, Int) -> Rnd BS.ByteString | ||
randomByteStringR range g = | ||
let (!l, !g') = uniformR range g | ||
in R.genByteString l g' |
Oops, something went wrong.