Skip to content

Commit

Permalink
Monomorphise write buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisdral committed Apr 30, 2024
1 parent 72c4c20 commit bc217ef
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 46 deletions.
45 changes: 34 additions & 11 deletions src-extras/Database/LSMTree/Extras/Generators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

module Database.LSMTree.Extras.Generators (
-- * WriteBuffer
genWriteBuffer
TypedWriteBuffer (..)
, genWriteBuffer
, shrinkWriteBuffer
-- * WithSerialised
, WithSerialised (..)
Expand Down Expand Up @@ -174,11 +175,33 @@ instance Arbitrary2 Entry where
WriteBuffer
-------------------------------------------------------------------------------}

instance (Arbitrary k, Arbitrary v, Arbitrary blob,
SerialiseKey k, SerialiseValue v, SerialiseValue blob)
=> Arbitrary (WriteBuffer k v blob) where
arbitrary = genWriteBuffer arbitrary arbitrary arbitrary
shrink = shrinkWriteBuffer shrink shrink shrink
type role TypedWriteBuffer nominal nominal nominal
newtype TypedWriteBuffer k v blob = TypedWriteBuffer {
unTypedWriteBuffer :: WriteBuffer
}
deriving stock Show

instance Arbitrary WriteBuffer where
arbitrary = genWriteBuffer
(arbitrary @SerialisedKey)
(arbitrary @SerialisedValue)
(arbitrary @SerialisedBlob)
shrink = shrinkWriteBuffer
(shrink @SerialisedKey)
(shrink @SerialisedValue)
(shrink @SerialisedBlob)

instance ( Arbitrary k, Arbitrary v, Arbitrary blob
, SerialiseKey k, SerialiseValue v, SerialiseValue blob
)=> Arbitrary (TypedWriteBuffer k v blob) where
arbitrary = TypedWriteBuffer <$> genWriteBuffer
(arbitrary @k)
(arbitrary @v)
(arbitrary @blob)
shrink = coerce $ shrinkWriteBuffer
(shrink @k)
(shrink @v)
(shrink @blob)

-- | We cannot implement 'Arbitrary2' since we have constraints on the type
-- parameters.
Expand All @@ -187,7 +210,7 @@ genWriteBuffer ::
=> Gen k
-> Gen v
-> Gen blob
-> Gen (WriteBuffer k v blob)
-> Gen WriteBuffer
genWriteBuffer genKey genVal genBlob =
fromKOps <$> QC.listOf (liftArbitrary2 genKey (liftArbitrary2 genVal genBlob))

Expand All @@ -196,8 +219,8 @@ shrinkWriteBuffer ::
=> (k -> [k])
-> (v -> [v])
-> (blob -> [blob])
-> WriteBuffer k v blob
-> [WriteBuffer k v blob]
-> WriteBuffer
-> [WriteBuffer]
shrinkWriteBuffer shrinkKey shrinkVal shrinkBlob =
map fromKOps
. liftShrink (liftShrink2 shrinkKey (liftShrink2 shrinkVal shrinkBlob))
Expand All @@ -206,14 +229,14 @@ shrinkWriteBuffer shrinkKey shrinkVal shrinkBlob =
fromKOps ::
(SerialiseKey k, SerialiseValue v, SerialiseValue blob)
=> [(k, Entry v blob)]
-> WriteBuffer k v blob
-> WriteBuffer
fromKOps = WB . Map.fromList . map serialiseKOp
where
serialiseKOp = bimap serialiseKey (bimap serialiseValue serialiseBlob)

toKOps ::
(SerialiseKey k, SerialiseValue v, SerialiseValue blob)
=> WriteBuffer k v blob
=> WriteBuffer
-> [(k, Entry v blob)]
toKOps = map deserialiseKOp . Map.assocs . WB.unWB
where
Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fromMutable fs refCount builder = do
-- immediately when they are added to the write buffer, avoiding the need to do
-- it here.
fromWriteBuffer ::
HasFS IO h -> RunFsPaths -> WriteBuffer k v b
HasFS IO h -> RunFsPaths -> WriteBuffer
-> IO (Run (FS.Handle h))
fromWriteBuffer fs fsPaths buffer = do
-- We just estimate the number of pages to be one, as the write buffer is
Expand Down
45 changes: 23 additions & 22 deletions src/Database/LSMTree/Internal/WriteBuffer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ module Database.LSMTree.Internal.WriteBuffer (
rangeLookups,
) where

import Data.Bifunctor (Bifunctor (..))
import qualified Data.Map.Range as Map.R
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
Expand All @@ -52,36 +51,38 @@ import Database.LSMTree.Internal.Serialise
-- It would be consistent with other internal APIs (e.g. for @Run@ and
-- @IndexCompact@ to remove the type parameters here and move the responsibility
-- for these constraints and (de)serialisation to the layer above.
newtype WriteBuffer k v blob =
newtype WriteBuffer =
WB { unWB :: Map SerialisedKey (Entry SerialisedValue SerialisedBlob) }
deriving (Show)
type role WriteBuffer nominal nominal nominal
deriving Show

empty :: WriteBuffer k v blob
empty :: WriteBuffer
empty = WB Map.empty

numEntries :: WriteBuffer k v blob -> NumEntries
numEntries :: WriteBuffer -> NumEntries
numEntries (WB m) = NumEntries (Map.size m)

-- | \( O(n) \)
content :: WriteBuffer k v blob ->
content :: WriteBuffer ->
[(SerialisedKey, Entry SerialisedValue SerialisedBlob)]
content (WB m) = Map.assocs m

{-------------------------------------------------------------------------------
Updates
-------------------------------------------------------------------------------}

addEntryMonoidal :: (SerialiseKey k, SerialiseValue v)
=> (SerialisedValue -> SerialisedValue -> SerialisedValue) -- ^ merge function
-> k -> Monoidal.Update v -> WriteBuffer k v blob -> WriteBuffer k v blob
addEntryMonoidal ::
(SerialisedValue -> SerialisedValue -> SerialisedValue) -- ^ merge function
-> SerialisedKey -> Monoidal.Update SerialisedValue -> WriteBuffer -> WriteBuffer
addEntryMonoidal f k e (WB wb) =
WB (Map.insertWith (combine f) (serialiseKey k) (first serialiseValue (updateToEntryMonoidal e)) wb)
WB (Map.insertWith (combine f) k (updateToEntryMonoidal e) wb)

addEntryNormal :: (SerialiseKey k, SerialiseValue v, SerialiseValue blob)
=> k -> Normal.Update v blob -> WriteBuffer k v blob -> WriteBuffer k v blob
addEntryNormal ::
SerialisedKey
-> Normal.Update SerialisedValue SerialisedBlob
-> WriteBuffer
-> WriteBuffer
addEntryNormal k e (WB wb) =
WB (Map.insert (serialiseKey k) (bimap serialiseValue serialiseBlob (updateToEntryNormal e)) wb)
WB (Map.insert k (updateToEntryNormal e) wb)

{-------------------------------------------------------------------------------
Querying
Expand All @@ -92,11 +93,11 @@ addEntryNormal k e (WB wb) =
--
-- Note: the entry may be 'Delete'.
--
lookups :: SerialiseKey k
=> WriteBuffer k v blob
-> [k]
lookups ::
WriteBuffer
-> [SerialisedKey]
-> [(SerialisedKey, Maybe (Entry SerialisedValue SerialisedBlob))]
lookups (WB m) = fmap (f . serialiseKey)
lookups (WB m) = fmap f
where
f k = (k, Map.lookup k m)

Expand All @@ -109,13 +110,13 @@ lookups (WB m) = fmap (f . serialiseKey)
--
-- Note: 'Delete's are not filtered out.
--
rangeLookups :: SerialiseKey k
=> WriteBuffer k v blob
-> Range k
rangeLookups ::
WriteBuffer
-> Range SerialisedKey
-> [(SerialisedKey, Entry SerialisedValue SerialisedBlob)]
rangeLookups (WB m) r =
[ (k, e)
| let (lb, ub) = convertRange (fmap serialiseKey r)
| let (lb, ub) = convertRange r
, (k, e) <- Map.R.rangeLookup lb ub m
]

Expand Down
13 changes: 7 additions & 6 deletions test/Test/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import Data.Foldable (traverse_)
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust)
import Database.LSMTree.Extras
import Database.LSMTree.Extras.Generators (KeyForIndexCompact)
import Database.LSMTree.Extras.Generators (KeyForIndexCompact,
TypedWriteBuffer (..))
import qualified Database.LSMTree.Internal.Entry as Entry
import qualified Database.LSMTree.Internal.Merge as Merge
import qualified Database.LSMTree.Internal.Run as Run
Expand Down Expand Up @@ -52,9 +53,9 @@ prop_MergeDistributes ::
FS.HasFS IO h -> FS.HasBufFS IO h ->
Merge.Level ->
StepSize ->
[WriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob] ->
[TypedWriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob] ->
IO Property
prop_MergeDistributes fs bfs level stepSize wbs = do
prop_MergeDistributes fs bfs level stepSize (fmap unTypedWriteBuffer -> wbs) = do
runs <- sequenceA $ zipWith flush [10..] wbs
lhs <- mergeRuns fs bfs level 0 runs stepSize

Expand Down Expand Up @@ -94,9 +95,9 @@ prop_CloseMerge ::
FS.HasFS IO h -> FS.HasBufFS IO h ->
Merge.Level ->
StepSize ->
[WriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob] ->
[TypedWriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob] ->
IO Property
prop_CloseMerge fs bfs level (Positive stepSize) wbs = do
prop_CloseMerge fs bfs level (Positive stepSize) (fmap unTypedWriteBuffer -> wbs) = do
let path0 = Run.RunFsPaths 0
runs <- sequenceA $ zipWith flush [10..] wbs
mergeToClose <- makeInProgressMerge path0 runs
Expand Down Expand Up @@ -148,7 +149,7 @@ mergeRuns fs bfs level n runs (Positive stepSize) = do
Merge.MergeComplete run -> return run
Merge.MergeInProgress -> go m

mergeWriteBuffers :: Merge.Level -> [WriteBuffer k v b] -> WriteBuffer k v b
mergeWriteBuffers :: Merge.Level -> [WriteBuffer] -> WriteBuffer
mergeWriteBuffers level =
WB.WB
. (if level == Merge.LastLevel then Map.filter (not . isDelete) else id)
Expand Down
12 changes: 6 additions & 6 deletions test/Test/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import Test.Tasty.HUnit (assertEqual, testCase, (@=?), (@?))
import Test.Tasty.QuickCheck

import Database.LSMTree.Extras (showPowersOf10)
import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..))
import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..),
TypedWriteBuffer (..))
import Database.LSMTree.Internal.BitMath
import Database.LSMTree.Internal.BlobRef (BlobRef (..), BlobSpan (..))
import qualified Database.LSMTree.Internal.CRC32C as CRC
Expand All @@ -44,7 +45,6 @@ import Database.LSMTree.Internal.RawPage
import Database.LSMTree.Internal.Run
import qualified Database.LSMTree.Internal.RunReader as Reader
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import qualified Database.LSMTree.Internal.WriteBuffer as WB

import qualified FormatPage as Proto
Expand Down Expand Up @@ -191,9 +191,9 @@ readBlobFromBS bs (BlobSpan offset size) =
-- TODO: @id === readEntries . flush . toWriteBuffer@ ?
prop_WriteAndRead ::
FS.HasFS IO h -> FS.HasBufFS IO h
-> WriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob
-> TypedWriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob
-> IO Property
prop_WriteAndRead fs bfs wb = do
prop_WriteAndRead fs bfs (TypedWriteBuffer wb) = do
run <- flush 42 wb
rhs <- readKOps fs bfs run

Expand All @@ -218,9 +218,9 @@ prop_WriteAndRead fs bfs wb = do
-- @openFromDisk . flush === flush@
prop_WriteAndOpen ::
FS.HasFS IO h
-> WriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob
-> TypedWriteBuffer KeyForIndexCompact SerialisedValue SerialisedBlob
-> IO ()
prop_WriteAndOpen fs wb = do
prop_WriteAndOpen fs (TypedWriteBuffer wb) = do
-- flush write buffer
let fsPaths = RunFsPaths 1337
written <- fromWriteBuffer fs fsPaths wb
Expand Down

0 comments on commit bc217ef

Please sign in to comment.