Skip to content

Commit

Permalink
micro-bench: add benchmark for run merging
Browse files Browse the repository at this point in the history
  • Loading branch information
mheinzel committed May 5, 2024
1 parent fad50a7 commit e4f4dd4
Show file tree
Hide file tree
Showing 5 changed files with 426 additions and 23 deletions.
393 changes: 393 additions & 0 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,393 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}

module Bench.Database.LSMTree.Internal.Merge (benchmarks) where

import Control.Monad (zipWithM)
import Criterion.Main (Benchmark, bench, bgroup)
import qualified Criterion.Main as Cr
import Data.Bifunctor (first)
import qualified Data.BloomFilter.Hash as Hash
import qualified Data.ByteString as BS
import Data.Foldable (traverse_)
import qualified Data.List as List
import Data.Maybe (fromMaybe)
import Data.Word (Word64)
import Database.LSMTree.Extras.Orphans ()
import Database.LSMTree.Extras.Random (frequency, withReplacement,
withoutReplacement)
import Database.LSMTree.Extras.UTxO
import Database.LSMTree.Internal.Entry
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import Prelude hiding (getContents)
import System.Directory (removeDirectoryRecursive)
import qualified System.FS.API as FS
import qualified System.FS.IO as FS
import System.IO.Temp
import qualified System.Random as R
import System.Random (StdGen, mkStdGen, uniform, uniformR)

benchmarks :: Benchmark
benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [
-- various numbers of runs
benchMerge configWord64
{ name = "word64-insert-x2"
, nentries = scaled $ replicate 2 1
, finserts = 1
}
, benchMerge configWord64
{ name = "word64-insert-x4"
, nentries = scaled $ replicate 4 1
, finserts = 1
}
, benchMerge configWord64
{ name = "word64-insert-x7"
, nentries = scaled $ replicate 7 1
, finserts = 1
}
, benchMerge configWord64
{ name = "word64-insert-x13"
, nentries = scaled $ replicate 13 1
, finserts = 1
}
-- different operations
, benchMerge configWord64
{ name = "word64-delete-x4"
, nentries = scaled $ replicate 4 1
, fdeletes = 1
}
, benchMerge configWord64
{ name = "word64-blob-x4"
, nentries = scaled $ replicate 4 1
, fblobinserts = 1
}
, benchMerge configWord64
{ name = "word64-mupsert-collisions-x4"
, nentries = scaled $ replicate 4 1
, fmupserts = 1
, randomKey = randomWord64WithCollisions
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
}
, benchMerge configWord64
{ name = "word64-mix-x4"
, nentries = scaled $ replicate 4 1
, finserts = 1
, fdeletes = 1
, fmupserts = 1
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
}
, benchMerge configWord64
{ name = "word64-mix-collisions-x4"
, nentries = scaled $ replicate 4 1
, finserts = 1
, fdeletes = 1
, fmupserts = 1
, randomKey = randomWord64WithCollisions
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
}
-- different key and value sizes
, benchMerge configWord64
{ name = "insert-mix-keys-x4" -- potentially long keys
, nentries = scaled $ replicate 4 1
, finserts = 1
, randomKey = first serialiseKey . randomByteStringR (0, 4000)
}
, benchMerge configWord64
{ name = "insert-mix-vals-x4" -- potentially long values
, nentries = scaled $ replicate 4 1
, finserts = 1
, randomValue = first serialiseValue . randomByteStringR (0, 4000)
}
, benchMerge configWord64
{ name = "insert-page-x4" -- 1 page
, nentries = scaled $ replicate 4 1
, finserts = 1
, randomValue = first serialiseValue . randomByteStringR (4056, 4056)
}
, benchMerge configWord64
{ name = "insert-page-plus-byte-x4" -- 1 page + 1 byte
, nentries = scaled $ replicate 4 1
, finserts = 1
, randomValue = first serialiseValue . randomByteStringR (4057, 4057)
}
, benchMerge configWord64
{ name = "insert-huge-x4" -- 3-5 pages
, nentries = scaled $ replicate 4 1
, finserts = 1
, randomValue = first serialiseValue . randomByteStringR (10_000, 20_000)
}
-- common UTxO scenarios
, benchMerge configUTxO
{ name = "utxo-x4" -- like tiering merge
, nentries = scaled $ replicate 4 1
, finserts = 1
, fdeletes = 1
}
, benchMerge configUTxO
{ name = "utxo-x4-uneven"
, nentries = scaled [1, 3, 1.5, 2.5]
, finserts = 1
, fdeletes = 1
}
, benchMerge configUTxO
{ name = "utxo-x4-lastlevel"
, nentries = scaled $ replicate 4 1
, finserts = 1
, fdeletes = 1
, mergeLevel = Merge.LastLevel
}
, benchMerge configUTxO
{ name = "utxo-x4+1-min-skewed-lastlevel" -- live levelling merge
, nentries = scaled [1, 1, 1, 1, 4]
, finserts = 1
, fdeletes = 1
, mergeLevel = Merge.LastLevel
}
, benchMerge configUTxO
{ name = "utxo-x4+1-max-skewed-lastlevel" -- live levelling merge
, nentries = scaled [1, 1, 1, 1, 16]
, finserts = 1
, fdeletes = 1
, mergeLevel = Merge.LastLevel
}
]
where
totalentries = 40_000 :: Word64

-- For each run, a significant part of all possible keys are in use, e.g.
-- half of them for 4 runs of equal size. This leads to many collisions.
-- Note that each input run still has the full number of elements due to the
-- use of 'sampleWithoutReplacement'.
-- Warning: Using this with a run of around half of all entries (or more)
-- leads to issues with generating distinct keys for all its entries!
randomWord64WithCollisions :: Rnd SerialisedKey
randomWord64WithCollisions =
first (serialiseKey . Hash.hash64)
. uniformR (0, totalentries `div` 2)

scaled weights =
let total = sum weights
in [ round (fromIntegral totalentries * w / total :: Double)
| w <- weights
]

benchMerge :: Config -> Benchmark
benchMerge conf@Config{name} =
withEnv $ \ ~(_dir, hasFS, hasBufFS, runs) ->
bgroup name [
bench "merge" $
-- We'd like to do: `whnfAppIO (runs' -> ...) runs`.
-- However, we also need per-run cleanup to avoid running out of
-- disk space. We use `perRunEnvWithCleanup`, which has two issues:
-- 1. Just as `whnfAppIO` etc., it takes an IO action and returns
-- `Benchmarkable`, which does not compose. As a workaround, we
-- thread `runs` through the environment, too.
-- 2. It forces the result to normal form, which would traverse the
-- whole run, so we force to WHNF ourselves and just return `()`.
Cr.perRunEnvWithCleanup
((,) runs <$> getPaths)
(cleanupPaths hasFS . snd) $ \(runs', p) -> do
!run <- merge hasFS hasBufFS conf p runs'
-- Make sure to immediately close resulting runs so we don't run
-- out of file handles. Ideally this would not be measured, but at
-- least it's pretty cheap.
Run.removeReference hasFS run
]
where
withEnv =
Cr.envWithCleanup
(writeBufferEnv conf)
writeBufferEnvCleanup

-- We'll remove the files on every run, so we can re-use the same run number.
getPaths :: IO Run.RunFsPaths
getPaths = pure outputRunNumber

-- Simply remove the whole active directory.
-- We need to keep the other runs, but remove the freshly created one.
cleanupPaths :: FS.HasFS IO FS.HandleIO -> Run.RunFsPaths -> IO ()
cleanupPaths hasFS paths = do
traverse_ (FS.removeFile hasFS) (Run.runFsPaths paths)
FS.removeFile hasFS (Run.runChecksumsPath paths)

merge :: FS.HasFS IO FS.HandleIO -> FS.HasBufFS IO FS.HandleIO -> Config -> Run.RunFsPaths -> [Run (FS.Handle FS.HandleIO)] -> IO (Run (FS.Handle (FS.HandleIO)))
merge fs bfs Config {..} targetPaths runs = do
let f = fromMaybe const mergeMappend
m <- fromMaybe (error "empty inputs, no merge created") <$>
Merge.new fs bfs mergeLevel f targetPaths runs
go m
where
go m =
Merge.steps fs bfs m stepSize >>= \case
Merge.MergeComplete run -> return run
Merge.MergeInProgress -> go m

outputRunNumber :: Run.RunFsPaths
outputRunNumber = Run.RunFsPaths 0

inputRunNumbers :: [Run.RunFsPaths]
inputRunNumbers = Run.RunFsPaths <$> [1..]

type InputRuns = [Run (FS.Handle FS.HandleIO)]

type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue

onDeserialisedValues :: SerialiseValue v => (v -> v -> v) -> Mappend
onDeserialisedValues f x y =
serialiseValue (f (deserialiseValue x) (deserialiseValue y))

type SerialisedKOp = (SerialisedKey, SerialisedEntry)
type SerialisedEntry = Entry SerialisedValue SerialisedBlob

{-------------------------------------------------------------------------------
Environments
-------------------------------------------------------------------------------}

-- | Config options describing a benchmarking scenario
data Config = Config {
-- | Name for the benchmark scenario described by this config.
name :: !String
-- | Number of key\/operation pairs for each run.
, nentries :: ![Int]
, finserts :: !Int
, fblobinserts :: !Int
, fdeletes :: !Int
, fmupserts :: !Int
, randomKey :: Rnd SerialisedKey
, randomValue :: Rnd SerialisedValue
, randomBlob :: Rnd SerialisedBlob
, mergeLevel :: !Merge.Level
, mergeMappend :: !(Maybe Mappend)
, stepSize :: !Int
}

type Rnd a = StdGen -> (a, StdGen)

defaultConfig :: Config
defaultConfig = Config {
name = "default"
, nentries = []
, finserts = 0
, fblobinserts = 0
, fdeletes = 0
, fmupserts = 0
, randomKey = error "randomKey not implemented"
, randomValue = error "randomValue not implemented"
, randomBlob = error "randomBlob not implemented"
, mergeLevel = Merge.MidLevel
, mergeMappend = Nothing
, stepSize = maxBound -- by default, just do in one go
}

configWord64 :: Config
configWord64 = defaultConfig {
randomKey = first serialiseKey . uniform @_ @Word64
, randomValue = first serialiseValue . uniform @_ @Word64
, randomBlob = first serialiseBlob . randomByteStringR (0, 0x2000) -- up to 8 kB
}

configUTxO :: Config
configUTxO = defaultConfig {
randomKey = first serialiseKey . uniform @_ @UTxOKey
, randomValue = first serialiseValue . uniform @_ @UTxOValue
}

writeBufferEnv ::
Config
-> IO ( FilePath -- ^ Temporary directory
, FS.HasFS IO FS.HandleIO
, FS.HasBufFS IO FS.HandleIO
, InputRuns
)
writeBufferEnv config = do
sysTmpDir <- getCanonicalTemporaryDirectory
benchTmpDir <- createTempDirectory sysTmpDir "writeBufferEnv"
let hasFS = FS.ioHasFS (FS.MountPoint benchTmpDir)
let hasBufFS = FS.ioHasBufFS (FS.MountPoint benchTmpDir)
runs <- randomRuns hasFS config (mkStdGen 17)
pure (benchTmpDir, hasFS, hasBufFS, runs)

writeBufferEnvCleanup ::
( FilePath -- ^ Temporary directory
, FS.HasFS IO FS.HandleIO
, FS.HasBufFS IO FS.HandleIO
, InputRuns
)
-> IO ()
writeBufferEnvCleanup (tmpDir, hasFS, _, runs) = do
traverse_ (Run.removeReference hasFS) runs
removeDirectoryRecursive tmpDir

-- | Generate keys and entries to insert into the write buffer.
-- They are already serialised to exclude the cost from the benchmark.
randomRuns ::
FS.HasFS IO h
-> Config
-> StdGen -- ^ RNG
-> IO [Run (FS.Handle h)]
randomRuns hasFS config@Config {..} =
zipWithM (createRun hasFS mergeMappend) inputRunNumbers
. zipWith (randomKOps config) nentries
. List.unfoldr (Just . R.split)

createRun ::
FS.HasFS IO h
-> Maybe Mappend
-> Run.RunFsPaths
-> [SerialisedKOp]
-> IO (Run (FS.Handle h))
createRun hasFS mMappend targetPath =
Run.fromWriteBuffer hasFS targetPath
. List.foldl insert WB.empty
where
insert wb (k, e) = case mMappend of
Nothing -> WB.addEntryNormal k (expectNormal e) wb
Just f -> WB.addEntryMonoidal f k (expectMonoidal e) wb

expectNormal e = fromMaybe (error ("invalid normal update: " <> show e))
(entryToUpdateNormal e)
expectMonoidal e = fromMaybe (error ("invalid monoidal update: " <> show e))
(entryToUpdateMonoidal e)

-- | Generate keys and entries to insert into the write buffer.
-- They are already serialised to exclude the cost from the benchmark.
randomKOps ::
Config
-> Int -- ^ number of entries
-> StdGen -- ^ RNG
-> [SerialisedKOp]
randomKOps Config {..} runentries g0 =
zip
(withoutReplacement g1 runentries randomKey)
(withReplacement g2 runentries randomEntry)
where
(g1, g2) = R.split g0

randomEntry :: Rnd SerialisedEntry
randomEntry = frequency
[ ( finserts
, \g -> let (!v, !g') = randomValue g
in (Insert v, g')
)
, ( fblobinserts
, \g -> let (!v, !g') = randomValue g
(!b, !g'') = randomBlob g'
in (InsertWithBlob v b, g'')
)
, ( fdeletes
, \g -> (Delete, g)
)
, ( fmupserts
, \g -> let (!v, !g') = randomValue g
in (Mupdate v, g')
)
]

randomByteStringR :: (Int, Int) -> Rnd BS.ByteString
randomByteStringR range g =
let (!l, !g') = uniformR range g
in R.genByteString l g'

0 comments on commit e4f4dd4

Please sign in to comment.