Skip to content

Commit

Permalink
Merge pull request #290 from IntersectMBO/jdral/cleanup-lookups
Browse files Browse the repository at this point in the history
A bit of cleanup and tidying up of the lookups code.
  • Loading branch information
jorisdral authored Jul 15, 2024
2 parents 636b68d + 1c26e75 commit afc3639
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 67 deletions.
5 changes: 4 additions & 1 deletion src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ import Database.LSMTree.Internal.BlobRef
import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..),
combineMaybe)
import Database.LSMTree.Internal.IndexCompact (IndexCompact)
import Database.LSMTree.Internal.Lookup (lookupsIO)
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
lookupsIO)
import Database.LSMTree.Internal.Managed
import qualified Database.LSMTree.Internal.Merge as Merge
import qualified Database.LSMTree.Internal.Normal as Normal
Expand Down Expand Up @@ -123,6 +124,8 @@ data LSMTreeError =
| ErrSnapshotExists SnapshotName
| ErrSnapshotNotExists SnapshotName
| ErrSnapshotWrongType SnapshotName
-- | Something went wrong during batch lookups.
| ErrLookup ByteCountDiscrepancy
deriving stock (Show)
deriving anyclass (Exception)

Expand Down
101 changes: 36 additions & 65 deletions src/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
{-# LANGUAGE TupleSections #-}

module Database.LSMTree.Internal.Lookup (
-- * Lookup preparation
RunIx
ResolveSerialisedValue
, ByteCountDiscrepancy (..)
, lookupsIO
-- * Internal: exposed for tests and benchmarks
, RunIx
, KeyIx
, prepLookups
, bloomQueries
, bloomQueriesDefault
, indexSearches
-- * Lookups in IO
, ResolveSerialisedValue
, lookupsIO
, intraPageLookups
) where

Expand Down Expand Up @@ -48,18 +48,17 @@ import Database.LSMTree.Internal.RawBytes (RawBytes (..))
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.RawPage
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.Vector (mkPrimVector)
import qualified Database.LSMTree.Internal.Vector as V
import System.FS.API (BufferOffset (..), Handle)
import System.FS.BlockIO.API

-- | Prepare disk lookups by doing bloom filter queries, index searches and
-- creating 'IOOp's. The result is a vector of 'IOOp's and a vector of indexes,
-- both of which are the same length. The indexes record the run and key
-- associated with each 'IOOp'.
prepLookups
:: Arena s
prepLookups ::
Arena s
-> V.Vector (Bloom SerialisedKey)
-> V.Vector IndexCompact
-> V.Vector (Handle h)
Expand All @@ -76,9 +75,8 @@ type ResIx = Int -- Result index

-- | 'bloomQueries' with a default result vector size of @V.length ks * 2@.
--
-- The result vector can be of variable length, so we use a generous
-- estimate here, and we grow the vector if needed. TODO: tune the
-- starting estimate based on the expected true- and false-positives.
-- TODO: tune the starting estimate based on the expected true- and
-- false-positives.
bloomQueriesDefault ::
V.Vector (Bloom SerialisedKey)
-> V.Vector SerialisedKey
Expand All @@ -88,7 +86,8 @@ bloomQueriesDefault blooms ks = bloomQueries blooms ks (fromIntegral $ V.length
-- | Perform a batch of bloom queries. The result is a tuple of indexes into the
-- vector of runs and vector of keys respectively.
--
-- Note: the result vector should be ephemeral, so do not retain it.
-- The result vector can be of variable length. An estimate should be provided,
-- and the vector is grown if needed.
--
-- TODO: we consider it likely that we could implement a optimised, batched
-- version of bloom filter queries, which would largely replace this function.
Expand Down Expand Up @@ -133,9 +132,10 @@ bloomQueries !blooms !ks !resN
| kix == ksN = pure (res2, resix2)
| let !h = hs `VP.unsafeIndex` kix
, Bloom.elemHashes h b = do
-- Grows the vector if we've reached the end. TODO: tune how
-- much much we grow the vector each time based on the
-- expected true- and false-positives.
-- Grows the vector if we've reached the end.
--
-- TODO: tune how much much we grow the vector each time based on
-- the expected true- and false-positives.
res2' <- if resix2 == VUM.length res2
then VUM.unsafeGrow res2 ksN
else pure res2
Expand Down Expand Up @@ -175,10 +175,6 @@ indexSearches !arena !indexes !kopsFiles !ks !rkixs = V.generateM n $ \i -> do
where
!n = VU.length rkixs

{-------------------------------------------------------------------------------
Lookups in IO
-------------------------------------------------------------------------------}

{-
Note [Batched lookups, buffer strategy and restrictions]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -231,28 +227,31 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy {
--
-- See Note [Batched lookups, buffer strategy and restrictions]
--
-- TODO: optimise by reducing allocations, possibly looking at core.
-- PRECONDITION: the vectors of bloom filters, indexes and file handles
-- should pointwise match with the vectors of runs.
lookupsIO ::
forall m h. (PrimMonad m, MonadThrow m, MonadST m)
=> HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> V.Vector (Run (Handle h))
-> V.Vector (Bloom SerialisedKey)
-> V.Vector IndexCompact
-> V.Vector (Handle h)
-> V.Vector (Run (Handle h)) -- ^ Runs @rs@
-> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@
-> V.Vector IndexCompact -- ^ The indexes inside @rs@
-> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
-> V.Vector SerialisedKey
-> m (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h))))))
lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert precondition $ withArena mgr $ \arena -> do
(rkixs, ioops) <- Class.stToIO $ prepLookups arena blooms indexes kopsFiles ks
ioress <- submitIO hbio ioops
intraPageLookups resolveV rs ks rkixs ioops ioress
where
precondition = and [
V.map Run.runFilter rs == blooms
, V.map Run.runIndex rs == indexes
, V.length rs == V.length kopsFiles
]
-- we check only that the lengths match, because checking the contents is
-- too expensive.
precondition =
assert (V.length rs == V.length blooms) $
assert (V.length rs == V.length indexes) $
assert (V.length rs == V.length kopsFiles) $
True

{-# SPECIALIZE intraPageLookups ::
ResolveSerialisedValue
Expand All @@ -268,11 +267,6 @@ lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert prec
-- This function assumes that @rkixs@ is ordered such that newer runs are
-- handled first. The order matters for resolving cases where we find the same
-- key in multiple runs.
--
-- TODO: optimise by reducing allocations, possibly looking at core, using
-- unsafe vector operations.
--
-- PRECONDITION: @length rkixs == length ioops == length ioress@
intraPageLookups ::
forall m h. (PrimMonad m, MonadThrow m)
=> ResolveSerialisedValue
Expand All @@ -282,17 +276,11 @@ intraPageLookups ::
-> V.Vector (IOOp (PrimState m) h)
-> VU.Vector IOResult
-> m (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h))))))
intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress =
assert precondition $ do
res <- VM.replicate (V.length ks) Nothing
loop res 0
V.unsafeFreeze res
intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress = do
res <- VM.replicate (V.length ks) Nothing
loop res 0
V.unsafeFreeze res
where
precondition = and [
VU.length rkixs == V.length ioops
, V.length ioops == VU.length ioress
]

!n = V.length ioops

loop ::
Expand All @@ -316,20 +304,18 @@ intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress =
-- the entry when the result is needed.
LookupEntry e -> do
let e' = bimap copySerialisedValue (BlobRef r) e
unsafeInsertWithMStrict res (combine resolveV) kix e'
V.unsafeInsertWithMStrict res (combine resolveV) kix e'
-- Laziness ensures that we only compute the appending of the prefix
-- and suffix when the result is needed. We do not use 'force' here,
-- since appending already creates a new primary vector.
--
-- TODO: verify if appending always computes a new primary vector
LookupEntryOverflow e m -> do
let v' (SerialisedValue v) = SerialisedValue $ v <>
RawBytes (mkPrimVector
RawBytes (V.mkPrimVector
(unBufferOffset (ioopBufferOffset ioop) + 4096)
(fromIntegral m)
buf)
e' = bimap v' (BlobRef r) e
unsafeInsertWithMStrict res (combine resolveV) kix e'
V.unsafeInsertWithMStrict res (combine resolveV) kix e'
loop res (ioopix + 1)

-- Check that the IOOp was performed succesfully, and that it wrote/read
Expand All @@ -346,18 +332,3 @@ intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress =
copySerialisedValue :: SerialisedValue -> SerialisedValue
copySerialisedValue (SerialisedValue rb) =
SerialisedValue (RB.copy rb)

{-# INLINE unsafeInsertWithMStrict #-}
-- | Insert (in a broad sense) an entry in a mutable vector at a given index,
-- but if a @Just@ entry already exists at that index, combine the two entries
-- using @f@.
unsafeInsertWithMStrict ::
PrimMonad m
=> VM.MVector (PrimState m) (Maybe a)
-> (a -> a -> a) -- ^ function @f@, called as @f new old@
-> Int
-> a
-> m ()
unsafeInsertWithMStrict mvec f i y = VM.unsafeModifyM mvec g i
where
g x = pure $! Just $! maybe y (`f` y) x
20 changes: 19 additions & 1 deletion src/Database/LSMTree/Internal/Vector.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ module Database.LSMTree.Internal.Vector (
mapMStrict,
imapMStrict,
zipWithStrict,
binarySearchL
binarySearchL,
unsafeInsertWithMStrict,
) where

import Control.Monad
import Control.Monad.Primitive (PrimMonad, PrimState)
import Data.Primitive.ByteArray (ByteArray, sizeofByteArray)
import Data.Primitive.Types (Prim (sizeOfType#))
import Data.Proxy (Proxy (..))
import qualified Data.Vector as V
import qualified Data.Vector.Algorithms.Search as VA
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Primitive as VP
import Database.LSMTree.Internal.Assertions
import GHC.Exts (Int (..))
Expand Down Expand Up @@ -66,3 +69,18 @@ zipWithStrict f xs ys = runST (V.zipWithM (\x y -> pure $! f x y) xs ys)
-}
binarySearchL :: Ord a => V.Vector a -> a -> Int
binarySearchL vec val = runST $ V.unsafeThaw vec >>= flip VA.binarySearchL val

{-# INLINE unsafeInsertWithMStrict #-}
-- | Insert (in a broad sense) an entry in a mutable vector at a given index,
-- but if a @Just@ entry already exists at that index, combine the two entries
-- using @f@.
unsafeInsertWithMStrict ::
PrimMonad m
=> VM.MVector (PrimState m) (Maybe a)
-> (a -> a -> a) -- ^ function @f@, called as @f new old@
-> Int
-> a
-> m ()
unsafeInsertWithMStrict mvec f i y = VM.unsafeModifyM mvec g i
where
g x = pure $! Just $! maybe y (`f` y) x

0 comments on commit afc3639

Please sign in to comment.