diff --git a/bench/macro/lsm-tree-bench-wp8.hs b/bench/macro/lsm-tree-bench-wp8.hs index 2edff9e0f..5adea97bb 100644 --- a/bench/macro/lsm-tree-bench-wp8.hs +++ b/bench/macro/lsm-tree-bench-wp8.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE OverloadedStrings #-} {-# OPTIONS_GHC -Wno-orphans #-} @@ -40,17 +41,18 @@ import Control.Concurrent (getNumCapabilities) import Control.Concurrent.Async import Control.Concurrent.MVar import Control.DeepSeq (force) -import Control.Exception (evaluate) +import Control.Exception import Control.Monad (forM_, unless, void, when) import Control.Monad.Trans.State.Strict (runState, state) import Control.Tracer import qualified Data.ByteString.Short as BS import qualified Data.Foldable as Fold import qualified Data.IntSet as IS -import Data.IORef (modifyIORef', newIORef, readIORef, writeIORef) +import Data.IORef import qualified Data.List.NonEmpty as NE import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import Data.Monoid import qualified Data.Primitive as P import qualified Data.Vector as V import Data.Void (Void) @@ -565,30 +567,31 @@ doRun gopts opts = do name <- maybe (fail "invalid snapshot name") return $ LSM.mkSnapshotName "bench" - LSM.withSession (mkTracer gopts) hasFS hasBlockIO (FS.mkFsPath []) $ \session -> do + LSM.withSession (mkTracer gopts) hasFS hasBlockIO (FS.mkFsPath []) $ \session -> + withLatencyHandle $ \h -> do -- open snapshot -- In checking mode we start with an empty table, since our pure -- reference version starts with empty (as it's not practical or -- necessary for testing to load the whole snapshot). tbl <- if check opts - then LSM.new @IO @K @V @B session (mkTableConfigRun gopts LSM.defaultTableConfig) - else LSM.open @IO @K @V @B session (mkTableConfigOverride gopts) name + then LSM.new @IO @K @V @B session (mkTableConfigRun gopts LSM.defaultTableConfig) + else LSM.open @IO @K @V @B session (mkTableConfigOverride gopts) name -- In checking mode, compare each output against a pure reference. checkvar <- newIORef $ pureReference - (initialSize gopts) (batchSize opts) - (batchCount opts) (seed opts) + (initialSize gopts) (batchSize opts) + (batchCount opts) (seed opts) let fcheck | not (check opts) = \_ _ -> return () - | otherwise = \b y -> do + | otherwise = \b y -> do (x:xs) <- readIORef checkvar unless (x == y) $ fail $ "lookup result mismatch in batch " ++ show b writeIORef checkvar xs let benchmarkIterations - | pipelined opts = pipelinedIterations + | pipelined opts = pipelinedIterations h | lookuponly opts= sequentialIterationsLO - | otherwise = sequentialIterations + | otherwise = sequentialIterations h !progressInterval = max 1 ((batchCount opts) `div` 100) madeProgress b = b `mod` progressInterval == 0 (time, _, _) <- timed_ $ do @@ -611,33 +614,39 @@ doRun gopts opts = do type LookupResults = V.Vector (K, LSM.LookupResult V ()) {-# INLINE sequentialIteration #-} -sequentialIteration :: (Int -> LookupResults -> IO ()) +sequentialIteration :: LatencyHandle + -> (Int -> LookupResults -> IO ()) -> Int -> Int -> LSM.Table IO K V B -> Int -> MCG.MCG -> IO MCG.MCG -sequentialIteration output !initialSize !batchSize !tbl !b !g = do +sequentialIteration h output !initialSize !batchSize !tbl !b !g = + withTimedBatch h b $ \tref -> do let (!g', ls, is) = generateBatch initialSize batchSize g b -- lookups - results <- LSM.lookups ls tbl + results <- timeLatency tref $ LSM.lookups ls tbl output b (V.zip ls (fmap (fmap (const ())) results)) -- deletes and inserts - LSM.updates is tbl + _ <- timeLatency tref $ LSM.updates is tbl -- continue to the next batch return g' -sequentialIterations :: (Int -> LookupResults -> IO ()) + +sequentialIterations :: LatencyHandle + -> (Int -> LookupResults -> IO ()) -> Int -> Int -> Int -> Word64 -> LSM.Table IO K V B -> IO () -sequentialIterations output !initialSize !batchSize !batchCount !seed !tbl = +sequentialIterations h output !initialSize !batchSize !batchCount !seed !tbl = do + createGnuplotExampleFileSequential + hPutHeaderSequential h void $ forFoldM_ g0 [ 0 .. batchCount - 1 ] $ \b g -> - sequentialIteration output initialSize batchSize tbl b g + sequentialIteration h output initialSize batchSize tbl b g where g0 = initGen initialSize batchSize batchCount seed @@ -718,7 +727,8 @@ And the initial setup looks like this: Updates (db_3) tx_2 4. Sync ! (db_3, updates) 2. Sync ? (db_3, updates) -} -pipelinedIteration :: (Int -> LookupResults -> IO ()) +pipelinedIteration :: LatencyHandle + -> (Int -> LookupResults -> IO ()) -> Int -> Int -> MVar (LSM.Table IO K V B, Map K (LSM.Update V B)) @@ -728,33 +738,39 @@ pipelinedIteration :: (Int -> LookupResults -> IO ()) -> LSM.Table IO K V B -> Int -> IO (LSM.Table IO K V B) -pipelinedIteration output !initialSize !batchSize +pipelinedIteration h output !initialSize !batchSize !syncTblIn !syncTblOut !syncRngIn !syncRngOut - !tbl_n !b = do + !tbl_n !b = + withTimedBatch h b $ \tref -> do g <- takeMVar syncRngIn let (!g', !ls, !is) = generateBatch initialSize batchSize g b -- 1: perform the lookups - lrs <- LSM.lookups ls tbl_n + lrs <- timeLatency tref $ LSM.lookups ls tbl_n -- 2. sync: receive updates and new table - putMVar syncRngOut g' - (tbl_n1, delta) <- takeMVar syncTblIn + tbl_n1 <- timeLatency tref $ do + putMVar syncRngOut g' + (tbl_n1, delta) <- takeMVar syncTblIn - -- At this point, after syncing, our peer is guaranteed to no longer be - -- using tbl_n. They used it to generate tbl_n+1 (which they gave us). - LSM.close tbl_n - output b $! applyUpdates delta (V.zip ls lrs) + -- At this point, after syncing, our peer is guaranteed to no longer be + -- using tbl_n. They used it to generate tbl_n+1 (which they gave us). + LSM.close tbl_n + output b $! applyUpdates delta (V.zip ls lrs) + pure tbl_n1 -- 3. perform the inserts and report outputs (in any order) - tbl_n2 <- LSM.duplicate tbl_n1 - LSM.updates is tbl_n2 + tbl_n2 <- timeLatency tref $ do + tbl_n2 <- LSM.duplicate tbl_n1 + LSM.updates is tbl_n2 + pure tbl_n2 -- 4. sync: send the updates and new table - let delta' :: Map K (LSM.Update V B) - !delta' = Map.fromList (V.toList is) - putMVar syncTblOut (tbl_n2, delta') + timeLatency tref $ do + let delta' :: Map K (LSM.Update V B) + !delta' = Map.fromList (V.toList is) + putMVar syncTblOut (tbl_n2, delta') return tbl_n2 where @@ -767,11 +783,14 @@ pipelinedIteration output !initialSize !batchSize Nothing -> (k, fmap (const ()) lr) Just u -> (k, updateToLookupResult u) -pipelinedIterations :: (Int -> LookupResults -> IO ()) +pipelinedIterations :: LatencyHandle + -> (Int -> LookupResults -> IO ()) -> Int -> Int -> Int -> Word64 -> LSM.Table IO K V B -> IO () -pipelinedIterations output !initialSize !batchSize !batchCount !seed tbl_0 = do +pipelinedIterations h output !initialSize !batchSize !batchCount !seed tbl_0 = do + createGnuplotExampleFilePipelined + hPutHeaderPipelined h n <- getNumCapabilities printf "INFO: the pipelined benchmark is running with %d capabilities.\n" n @@ -794,14 +813,14 @@ pipelinedIterations output !initialSize !batchSize !batchCount !seed tbl_0 = do threadA = forFoldM_ tbl_1 [ 2, 4 .. batchCount - 1 ] $ \b tbl_n -> - pipelinedIteration output initialSize batchSize + pipelinedIteration h output initialSize batchSize syncTblB2A syncTblA2B -- in, out syncRngB2A syncRngA2B -- in, out tbl_n b threadB = forFoldM_ tbl_0 [ 1, 3 .. batchCount - 1 ] $ \b tbl_n -> - pipelinedIteration output initialSize batchSize + pipelinedIteration h output initialSize batchSize syncTblA2B syncTblB2A -- in, out syncRngA2B syncRngB2A -- in, out tbl_n b @@ -862,6 +881,105 @@ batchOverlaps initialSize batchSize batchCount seed = g0 = initGen initialSize batchSize batchCount seed +------------------------------------------------------------------------------- +-- measure batch latency +------------------------------------------------------------------------------- + +_mEASURE_BATCH_LATENCY :: Bool +#ifdef MEASURE_BATCH_LATENCY +_mEASURE_BATCH_LATENCY = True +#else +_mEASURE_BATCH_LATENCY = False +#endif + +type LatencyHandle = Handle + +type TimeRef = IORef [Integer] + +withLatencyHandle :: (LatencyHandle -> IO a) -> IO a +withLatencyHandle k + | _mEASURE_BATCH_LATENCY = withFile "latency.dat" WriteMode k + | otherwise = k (error "LatencyHandle: do not use") + +{-# INLINE hPutHeaderSequential #-} +hPutHeaderSequential :: LatencyHandle -> IO () +hPutHeaderSequential h + | _mEASURE_BATCH_LATENCY = do + hPutStrLn h "# batch number \t lookup time (ns) \t update time (ns)" + | otherwise = pure () + +{-# INLINE createGnuplotExampleFileSequential #-} +createGnuplotExampleFileSequential :: IO () +createGnuplotExampleFileSequential + | _mEASURE_BATCH_LATENCY = do + withFile "latency.gp" WriteMode $ \h -> do + mapM_ (hPutStrLn h) [ + "set title \"Latency (sequential)\"" + , "" + , "set xlabel \"Batch number\"" + , "" + , "set logscale y" + , "set ylabel \"Time (nanoseconds)\"" + , "" + , "plot \"latency.dat\" using 1:2 title 'lookups' axis x1y1, \\" + , " \"latency.dat\" using 1:3 title 'updates' axis x1y1" + ] + | otherwise = pure () + +{-# INLINE hPutHeaderPipelined #-} +hPutHeaderPipelined :: LatencyHandle -> IO () +hPutHeaderPipelined h + | _mEASURE_BATCH_LATENCY = do + hPutStr h "# batch number" + hPutStr h "\t lookup time (ns) \t sync receive time (ns) " + hPutStrLn h "\t update time (ns) \t sync send time (ns)" + | otherwise = pure () + +{-# INLINE createGnuplotExampleFilePipelined #-} +createGnuplotExampleFilePipelined :: IO () +createGnuplotExampleFilePipelined + | _mEASURE_BATCH_LATENCY = + withFile "latency.gp" WriteMode $ \h -> do + mapM_ (hPutStrLn h) [ + "set title \"Latency (pipelined)\"" + , "" + , "set xlabel \"Batch number\"" + , "" + , "set logscale y" + , "set ylabel \"Time (nanoseconds)\"" + , "" + , "plot \"latency.dat\" using 1:2 title 'lookups' axis x1y1, \\" + , " \"latency.dat\" using 1:3 title 'sync receive' axis x1y1, \\" + , " \"latency.dat\" using 1:4 title 'updates' axis x1y1, \\" + , " \"latency.dat\" using 1:5 title 'sync send' axis x1y1" + ] + | otherwise = pure () + +{-# INLINE withTimedBatch #-} +withTimedBatch :: LatencyHandle -> Int -> (TimeRef -> IO a) -> IO a +withTimedBatch h b k + | _mEASURE_BATCH_LATENCY = do + tref <- newIORef [] + x <- k tref + ts <- readIORef tref + let s = shows b + . getDual (foldMap (\t -> Dual (showString "\t" <> shows t)) ts) + hPutStrLn h (s "") + pure x + | otherwise = k (error "TimeRef: do not use") + +{-# INLINE timeLatency #-} +timeLatency :: TimeRef -> IO a -> IO a +timeLatency tref k + | _mEASURE_BATCH_LATENCY = do + t1 <- Clock.getTime Clock.Monotonic + x <- k + t2 <- Clock.getTime Clock.Monotonic + let !t = Clock.toNanoSecs (Clock.diffTimeSpec t2 t1) + modifyIORef tref (t :) + pure x + | otherwise = k + ------------------------------------------------------------------------------- -- main ------------------------------------------------------------------------------- diff --git a/lsm-tree.cabal b/lsm-tree.cabal index ca13efd7a..ed3d96ae5 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -520,8 +520,19 @@ library mcg , base , primes +flag measure-batch-latency + description: + Measure the latency for individual batches of updates and lookups + + default: False + manual: True + +common measure-batch-latency + if flag(measure-batch-latency) + cpp-options: -DMEASURE_BATCH_LATENCY + benchmark lsm-tree-bench-wp8 - import: language, warnings, wno-x-partial + import: language, warnings, wno-x-partial, measure-batch-latency type: exitcode-stdio-1.0 hs-source-dirs: bench/macro main-is: lsm-tree-bench-wp8.hs