Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 154 additions & 36 deletions bench/macro/lsm-tree-bench-wp8.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wno-orphans #-}
Expand Down Expand Up @@ -40,17 +41,18 @@ import Control.Concurrent (getNumCapabilities)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.DeepSeq (force)
import Control.Exception (evaluate)
import Control.Exception
import Control.Monad (forM_, unless, void, when)
import Control.Monad.Trans.State.Strict (runState, state)
import Control.Tracer
import qualified Data.ByteString.Short as BS
import qualified Data.Foldable as Fold
import qualified Data.IntSet as IS
import Data.IORef (modifyIORef', newIORef, readIORef, writeIORef)
import Data.IORef
import qualified Data.List.NonEmpty as NE
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Monoid
import qualified Data.Primitive as P
import qualified Data.Vector as V
import Data.Void (Void)
Expand Down Expand Up @@ -565,30 +567,31 @@ doRun gopts opts = do
name <- maybe (fail "invalid snapshot name") return $
LSM.mkSnapshotName "bench"

LSM.withSession (mkTracer gopts) hasFS hasBlockIO (FS.mkFsPath []) $ \session -> do
LSM.withSession (mkTracer gopts) hasFS hasBlockIO (FS.mkFsPath []) $ \session ->
withLatencyHandle $ \h -> do
-- open snapshot
-- In checking mode we start with an empty table, since our pure
-- reference version starts with empty (as it's not practical or
-- necessary for testing to load the whole snapshot).
tbl <- if check opts
then LSM.new @IO @K @V @B session (mkTableConfigRun gopts LSM.defaultTableConfig)
else LSM.open @IO @K @V @B session (mkTableConfigOverride gopts) name
then LSM.new @IO @K @V @B session (mkTableConfigRun gopts LSM.defaultTableConfig)
else LSM.open @IO @K @V @B session (mkTableConfigOverride gopts) name

-- In checking mode, compare each output against a pure reference.
checkvar <- newIORef $ pureReference
(initialSize gopts) (batchSize opts)
(batchCount opts) (seed opts)
(initialSize gopts) (batchSize opts)
(batchCount opts) (seed opts)
let fcheck | not (check opts) = \_ _ -> return ()
| otherwise = \b y -> do
| otherwise = \b y -> do
(x:xs) <- readIORef checkvar
unless (x == y) $
fail $ "lookup result mismatch in batch " ++ show b
writeIORef checkvar xs

let benchmarkIterations
| pipelined opts = pipelinedIterations
| pipelined opts = pipelinedIterations h
| lookuponly opts= sequentialIterationsLO
| otherwise = sequentialIterations
| otherwise = sequentialIterations h
!progressInterval = max 1 ((batchCount opts) `div` 100)
madeProgress b = b `mod` progressInterval == 0
(time, _, _) <- timed_ $ do
Expand All @@ -611,33 +614,39 @@ doRun gopts opts = do
type LookupResults = V.Vector (K, LSM.LookupResult V ())

{-# INLINE sequentialIteration #-}
sequentialIteration :: (Int -> LookupResults -> IO ())
sequentialIteration :: LatencyHandle
-> (Int -> LookupResults -> IO ())
-> Int
-> Int
-> LSM.Table IO K V B
-> Int
-> MCG.MCG
-> IO MCG.MCG
sequentialIteration output !initialSize !batchSize !tbl !b !g = do
sequentialIteration h output !initialSize !batchSize !tbl !b !g =
withTimedBatch h b $ \tref -> do
let (!g', ls, is) = generateBatch initialSize batchSize g b

-- lookups
results <- LSM.lookups ls tbl
results <- timeLatency tref $ LSM.lookups ls tbl
output b (V.zip ls (fmap (fmap (const ())) results))

-- deletes and inserts
LSM.updates is tbl
_ <- timeLatency tref $ LSM.updates is tbl

-- continue to the next batch
return g'

sequentialIterations :: (Int -> LookupResults -> IO ())

sequentialIterations :: LatencyHandle
-> (Int -> LookupResults -> IO ())
-> Int -> Int -> Int -> Word64
-> LSM.Table IO K V B
-> IO ()
sequentialIterations output !initialSize !batchSize !batchCount !seed !tbl =
sequentialIterations h output !initialSize !batchSize !batchCount !seed !tbl = do
createGnuplotExampleFileSequential
hPutHeaderSequential h
void $ forFoldM_ g0 [ 0 .. batchCount - 1 ] $ \b g ->
sequentialIteration output initialSize batchSize tbl b g
sequentialIteration h output initialSize batchSize tbl b g
where
g0 = initGen initialSize batchSize batchCount seed

Expand Down Expand Up @@ -718,7 +727,8 @@ And the initial setup looks like this:
Updates (db_3) tx_2
4. Sync ! (db_3, updates) 2. Sync ? (db_3, updates)
-}
pipelinedIteration :: (Int -> LookupResults -> IO ())
pipelinedIteration :: LatencyHandle
-> (Int -> LookupResults -> IO ())
-> Int
-> Int
-> MVar (LSM.Table IO K V B, Map K (LSM.Update V B))
Expand All @@ -728,33 +738,39 @@ pipelinedIteration :: (Int -> LookupResults -> IO ())
-> LSM.Table IO K V B
-> Int
-> IO (LSM.Table IO K V B)
pipelinedIteration output !initialSize !batchSize
pipelinedIteration h output !initialSize !batchSize
!syncTblIn !syncTblOut
!syncRngIn !syncRngOut
!tbl_n !b = do
!tbl_n !b =
withTimedBatch h b $ \tref -> do
g <- takeMVar syncRngIn
let (!g', !ls, !is) = generateBatch initialSize batchSize g b

-- 1: perform the lookups
lrs <- LSM.lookups ls tbl_n
lrs <- timeLatency tref $ LSM.lookups ls tbl_n

-- 2. sync: receive updates and new table
putMVar syncRngOut g'
(tbl_n1, delta) <- takeMVar syncTblIn
tbl_n1 <- timeLatency tref $ do
putMVar syncRngOut g'
(tbl_n1, delta) <- takeMVar syncTblIn

-- At this point, after syncing, our peer is guaranteed to no longer be
-- using tbl_n. They used it to generate tbl_n+1 (which they gave us).
LSM.close tbl_n
output b $! applyUpdates delta (V.zip ls lrs)
-- At this point, after syncing, our peer is guaranteed to no longer be
-- using tbl_n. They used it to generate tbl_n+1 (which they gave us).
LSM.close tbl_n
output b $! applyUpdates delta (V.zip ls lrs)
pure tbl_n1

-- 3. perform the inserts and report outputs (in any order)
tbl_n2 <- LSM.duplicate tbl_n1
LSM.updates is tbl_n2
tbl_n2 <- timeLatency tref $ do
tbl_n2 <- LSM.duplicate tbl_n1
LSM.updates is tbl_n2
pure tbl_n2

-- 4. sync: send the updates and new table
let delta' :: Map K (LSM.Update V B)
!delta' = Map.fromList (V.toList is)
putMVar syncTblOut (tbl_n2, delta')
timeLatency tref $ do
let delta' :: Map K (LSM.Update V B)
!delta' = Map.fromList (V.toList is)
putMVar syncTblOut (tbl_n2, delta')

return tbl_n2
where
Expand All @@ -767,11 +783,14 @@ pipelinedIteration output !initialSize !batchSize
Nothing -> (k, fmap (const ()) lr)
Just u -> (k, updateToLookupResult u)

pipelinedIterations :: (Int -> LookupResults -> IO ())
pipelinedIterations :: LatencyHandle
-> (Int -> LookupResults -> IO ())
-> Int -> Int -> Int -> Word64
-> LSM.Table IO K V B
-> IO ()
pipelinedIterations output !initialSize !batchSize !batchCount !seed tbl_0 = do
pipelinedIterations h output !initialSize !batchSize !batchCount !seed tbl_0 = do
createGnuplotExampleFilePipelined
hPutHeaderPipelined h
n <- getNumCapabilities
printf "INFO: the pipelined benchmark is running with %d capabilities.\n" n

Expand All @@ -794,14 +813,14 @@ pipelinedIterations output !initialSize !batchSize !batchCount !seed tbl_0 = do

threadA =
forFoldM_ tbl_1 [ 2, 4 .. batchCount - 1 ] $ \b tbl_n ->
pipelinedIteration output initialSize batchSize
pipelinedIteration h output initialSize batchSize
syncTblB2A syncTblA2B -- in, out
syncRngB2A syncRngA2B -- in, out
tbl_n b

threadB =
forFoldM_ tbl_0 [ 1, 3 .. batchCount - 1 ] $ \b tbl_n ->
pipelinedIteration output initialSize batchSize
pipelinedIteration h output initialSize batchSize
syncTblA2B syncTblB2A -- in, out
syncRngA2B syncRngB2A -- in, out
tbl_n b
Expand Down Expand Up @@ -862,6 +881,105 @@ batchOverlaps initialSize batchSize batchCount seed =

g0 = initGen initialSize batchSize batchCount seed

-------------------------------------------------------------------------------
-- measure batch latency
-------------------------------------------------------------------------------

_mEASURE_BATCH_LATENCY :: Bool
#ifdef MEASURE_BATCH_LATENCY
_mEASURE_BATCH_LATENCY = True
#else
_mEASURE_BATCH_LATENCY = False
#endif

type LatencyHandle = Handle

type TimeRef = IORef [Integer]

withLatencyHandle :: (LatencyHandle -> IO a) -> IO a
withLatencyHandle k
| _mEASURE_BATCH_LATENCY = withFile "latency.dat" WriteMode k
| otherwise = k (error "LatencyHandle: do not use")

{-# INLINE hPutHeaderSequential #-}
hPutHeaderSequential :: LatencyHandle -> IO ()
hPutHeaderSequential h
| _mEASURE_BATCH_LATENCY = do
hPutStrLn h "# batch number \t lookup time (ns) \t update time (ns)"
| otherwise = pure ()

{-# INLINE createGnuplotExampleFileSequential #-}
createGnuplotExampleFileSequential :: IO ()
createGnuplotExampleFileSequential
| _mEASURE_BATCH_LATENCY = do
withFile "latency.gp" WriteMode $ \h -> do
mapM_ (hPutStrLn h) [
"set title \"Latency (sequential)\""
, ""
, "set xlabel \"Batch number\""
, ""
, "set logscale y"
, "set ylabel \"Time (nanoseconds)\""
, ""
, "plot \"latency.dat\" using 1:2 title 'lookups' axis x1y1, \\"
, " \"latency.dat\" using 1:3 title 'updates' axis x1y1"
]
| otherwise = pure ()

{-# INLINE hPutHeaderPipelined #-}
hPutHeaderPipelined :: LatencyHandle -> IO ()
hPutHeaderPipelined h
| _mEASURE_BATCH_LATENCY = do
hPutStr h "# batch number"
hPutStr h "\t lookup time (ns) \t sync receive time (ns) "
hPutStrLn h "\t update time (ns) \t sync send time (ns)"
| otherwise = pure ()

{-# INLINE createGnuplotExampleFilePipelined #-}
createGnuplotExampleFilePipelined :: IO ()
createGnuplotExampleFilePipelined
| _mEASURE_BATCH_LATENCY =
withFile "latency.gp" WriteMode $ \h -> do
mapM_ (hPutStrLn h) [
"set title \"Latency (pipelined)\""
, ""
, "set xlabel \"Batch number\""
, ""
, "set logscale y"
, "set ylabel \"Time (nanoseconds)\""
, ""
, "plot \"latency.dat\" using 1:2 title 'lookups' axis x1y1, \\"
, " \"latency.dat\" using 1:3 title 'sync receive' axis x1y1, \\"
, " \"latency.dat\" using 1:4 title 'updates' axis x1y1, \\"
, " \"latency.dat\" using 1:5 title 'sync send' axis x1y1"
]
| otherwise = pure ()

{-# INLINE withTimedBatch #-}
withTimedBatch :: LatencyHandle -> Int -> (TimeRef -> IO a) -> IO a
withTimedBatch h b k
| _mEASURE_BATCH_LATENCY = do
tref <- newIORef []
x <- k tref
ts <- readIORef tref
let s = shows b
. getDual (foldMap (\t -> Dual (showString "\t" <> shows t)) ts)
hPutStrLn h (s "")
pure x
| otherwise = k (error "TimeRef: do not use")

{-# INLINE timeLatency #-}
timeLatency :: TimeRef -> IO a -> IO a
timeLatency tref k
| _mEASURE_BATCH_LATENCY = do
t1 <- Clock.getTime Clock.Monotonic
x <- k
t2 <- Clock.getTime Clock.Monotonic
let !t = Clock.toNanoSecs (Clock.diffTimeSpec t2 t1)
modifyIORef tref (t :)
pure x
| otherwise = k

-------------------------------------------------------------------------------
-- main
-------------------------------------------------------------------------------
Expand Down
13 changes: 12 additions & 1 deletion lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,19 @@ library mcg
, base
, primes

flag measure-batch-latency
description:
Measure the latency for individual batches of updates and lookups

default: False
manual: True

common measure-batch-latency
if flag(measure-batch-latency)
cpp-options: -DMEASURE_BATCH_LATENCY

benchmark lsm-tree-bench-wp8
import: language, warnings, wno-x-partial
import: language, warnings, wno-x-partial, measure-batch-latency
type: exitcode-stdio-1.0
hs-source-dirs: bench/macro
main-is: lsm-tree-bench-wp8.hs
Expand Down