Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/Database/LSMTree/Internal/Chunk.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ module Database.LSMTree.Internal.Chunk
(
-- * Chunks
Chunk (Chunk),
fromChunk,
toByteVector,
toByteString,

-- * Balers
Baler,
Expand All @@ -19,23 +20,31 @@ import Prelude hiding (length)

import Control.Exception (assert)
import Control.Monad.ST.Strict (ST)
import Data.ByteString (ByteString)
import Data.List (scanl')
import Data.Primitive.PrimVar (PrimVar, newPrimVar, readPrimVar,
writePrimVar)
import Data.Vector.Primitive (Vector, length, unsafeCopy,
import Data.Vector.Primitive (Vector (Vector), length, unsafeCopy,
unsafeFreeze)
import Data.Vector.Primitive.Mutable (MVector)
import qualified Data.Vector.Primitive.Mutable as Mutable (drop, length, slice,
take, unsafeCopy, unsafeNew)
import Data.Word (Word8)
import Database.LSMTree.Internal.ByteString (byteArrayToByteString)

-- * Chunks

-- | A chunk of bytes, typically output during incremental index serialisation.
newtype Chunk = Chunk (Vector Word8)
newtype Chunk = Chunk (Vector Word8) deriving stock (Eq, Show)

fromChunk :: Chunk -> Vector Word8
fromChunk (Chunk content) = content
-- | Yields the contents of a chunk as a byte vector.
toByteVector :: Chunk -> Vector Word8
toByteVector (Chunk byteVector) = byteVector

-- | Yields the contents of a chunk as a (strict) byte string.
toByteString :: Chunk -> ByteString
toByteString (Chunk (Vector vecOffset vecLength byteArray))
= byteArrayToByteString vecOffset vecLength byteArray

-- * Balers

Expand Down
31 changes: 13 additions & 18 deletions src/Database/LSMTree/Internal/IndexCompact.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ module Database.LSMTree.Internal.IndexCompact (
, toLBS
-- * Incremental serialisation
-- $incremental-serialisation
, Chunk (..)
, headerLBS
, chunkToBS
, finalLBS
, word64VectorToChunk
-- * Deserialisation
, fromSBS
) where
Expand All @@ -27,7 +26,6 @@ import Control.Monad (when)
import Control.Monad.ST
import Data.Bit hiding (flipBit)
import Data.Bits (unsafeShiftR, (.&.))
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Builder.Extra as BB
import qualified Data.ByteString.Lazy as LBS
Expand All @@ -46,8 +44,9 @@ import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Base as VU
import Data.Word
import Database.LSMTree.Internal.BitMath
import Database.LSMTree.Internal.ByteString (byteArrayFromTo,
byteArrayToByteString)
import Database.LSMTree.Internal.ByteString (byteArrayFromTo)
import Database.LSMTree.Internal.Chunk (Chunk (Chunk))
import qualified Database.LSMTree.Internal.Chunk as Chunk (toByteString)
import Database.LSMTree.Internal.Entry (NumEntries (..))
import Database.LSMTree.Internal.Page
import Database.LSMTree.Internal.Serialise
Expand Down Expand Up @@ -456,7 +455,7 @@ sizeInPages = NumPages . toEnum . VU.length . icPrimary
toLBS :: NumEntries -> IndexCompact -> LBS.ByteString
toLBS numEntries index =
headerLBS
<> LBS.fromStrict (chunkToBS (Chunk (icPrimary index)))
<> LBS.fromStrict (Chunk.toByteString (word64VectorToChunk (icPrimary index)))
<> finalLBS numEntries index

{-------------------------------------------------------------------------------
Expand All @@ -467,8 +466,8 @@ toLBS numEntries index =

To incrementally serialise a compact index as it is being constructed, start
by using 'headerLBS'. Each yielded chunk can then be written using
'chunkToBS'. Once construction is completed, 'finalLBS' will serialise
the remaining parts of the compact index.
'Chunk.toByteString'. Once construction is completed, 'finalLBS' will
serialise the remaining parts of the compact index.
Also see module "Database.LSMTree.Internal.IndexCompactAcc".
-}

Expand All @@ -485,17 +484,8 @@ headerLBS =
BB.toLazyByteStringWith (BB.safeStrategy 4 BB.smallChunkSize) mempty $
BB.word32Host supportedTypeAndVersion <> BB.word32Host 0

-- | A chunk of the primary array, which can be constructed incrementally.
data Chunk = Chunk { cPrimary :: !(VU.Vector Word64) }
deriving stock (Show, Eq)

-- | 64 bit aligned.
chunkToBS :: Chunk -> BS.ByteString
chunkToBS (Chunk (VU.V_Word64 (VP.Vector off len ba))) =
byteArrayToByteString (mul8 off) (mul8 len) ba

-- | Writes everything after the primary array, which is assumed to have already
-- been written using 'chunkToBS'.
-- been written using 'Chunk.toByteString'.
finalLBS :: NumEntries -> IndexCompact -> LBS.ByteString
finalLBS (NumEntries numEntries) IndexCompact {..} =
-- use a builder, since it is all relatively small
Expand All @@ -508,6 +498,11 @@ finalLBS (NumEntries numEntries) IndexCompact {..} =
where
numPages = VU.length icPrimary

-- | Constructs a chunk containing the contents of a vector of 64-bit words.
word64VectorToChunk :: VU.Vector Word64 -> Chunk
word64VectorToChunk (VU.V_Word64 (VP.Vector off len ba)) =
Chunk (mkPrimVector (mul8 off) (mul8 len) ba)

-- | Padded to 64 bit.
--
-- Assumes that the bitvector has a byte-aligned offset.
Expand Down
6 changes: 3 additions & 3 deletions src/Database/LSMTree/Internal/IndexCompactAcc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ module Database.LSMTree.Internal.IndexCompactAcc (
IndexCompactAcc (..)
, new
, Append (..)
, Chunk (..)
, append
, appendSingle
, appendMulti
Expand Down Expand Up @@ -49,6 +48,7 @@ import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Mutable as VUM
import Data.Word
import Database.LSMTree.Internal.BitMath
import Database.LSMTree.Internal.Chunk (Chunk)
import Database.LSMTree.Internal.IndexCompact
import Database.LSMTree.Internal.Page
import Database.LSMTree.Internal.Serialise
Expand Down Expand Up @@ -243,7 +243,7 @@ yield IndexCompactAcc{..} = do
modifySTRef' icaPrimary . NE.cons =<< newPinnedMVec64 icaMaxChunkSize
modifySTRef' icaClashes . NE.cons =<< VUM.new icaMaxChunkSize
modifySTRef' icaLargerThanPage . NE.cons =<< VUM.new icaMaxChunkSize
pure $ Just (Chunk primaryChunk)
pure $ Just (word64VectorToChunk primaryChunk)
else -- the current chunk is not yet full
pure Nothing

Expand All @@ -268,7 +268,7 @@ unsafeEnd IndexCompactAcc{..} = do
-- Only slice out a chunk if there are entries in the chunk
let mchunk = if ix == 0
then Nothing
else Just (Chunk (head chunksPrimary))
else Just (word64VectorToChunk (head chunksPrimary))

let icPrimary = VU.concat . reverse $ chunksPrimary
let icClashes = VU.concat . reverse $ chunksClashes
Expand Down
30 changes: 15 additions & 15 deletions src/Database/LSMTree/Internal/RunAcc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ import Data.Primitive.PrimVar (PrimVar, modifyPrimVar, newPrimVar,
import Data.Word (Word64)
import Database.LSMTree.Internal.Assertions (fromIntegralChecked)
import Database.LSMTree.Internal.BlobRef (BlobSpan (..))
import Database.LSMTree.Internal.Chunk (Chunk)
import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..))
import Database.LSMTree.Internal.IndexCompact (IndexCompact)
import qualified Database.LSMTree.Internal.IndexCompact as Index
import Database.LSMTree.Internal.IndexCompactAcc (IndexCompactAcc)
import qualified Database.LSMTree.Internal.IndexCompactAcc as Index
import Database.LSMTree.Internal.PageAcc (PageAcc)
Expand Down Expand Up @@ -116,7 +116,7 @@ new (NumEntries nentries) alloc = do
unsafeFinalise ::
RunAcc s
-> ST s ( Maybe RawPage
, Maybe Index.Chunk
, Maybe Chunk
, Bloom SerialisedKey
, IndexCompact
, NumEntries
Expand All @@ -130,9 +130,9 @@ unsafeFinalise racc@RunAcc {..} = do
!mchunk = selectChunk mpagemchunk mchunk'
pure (mpage, mchunk, bloom, index, NumEntries numEntries)
where
selectChunk :: Maybe (RawPage, Maybe Index.Chunk)
-> Maybe Index.Chunk
-> Maybe Index.Chunk
selectChunk :: Maybe (RawPage, Maybe Chunk)
-> Maybe Chunk
-> Maybe Chunk
selectChunk (Just (_page, Just _chunk)) (Just _chunk') =
-- If flushing the page accumulator gives us an index chunk then
-- the index can't have any more chunks when we finalise the index.
Expand All @@ -154,13 +154,13 @@ addKeyOp
:: RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix
-> ST s ([RawPage], [RawOverflowPage], [Index.Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addKeyOp racc k e
| PageAcc.entryWouldFitInPage k e = smallToLarge <$> addSmallKeyOp racc k e
| otherwise = addLargeKeyOp racc k e
where
smallToLarge :: Maybe (RawPage, Maybe Index.Chunk)
-> ([RawPage], [RawOverflowPage], [Index.Chunk])
smallToLarge :: Maybe (RawPage, Maybe Chunk)
-> ([RawPage], [RawOverflowPage], [Chunk])
smallToLarge Nothing = ([], [], [])
smallToLarge (Just (page, Nothing)) = ([page], [], [])
smallToLarge (Just (page, Just chunk)) = ([page], [], [chunk])
Expand All @@ -179,7 +179,7 @@ addSmallKeyOp
:: RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe (RawPage, Maybe Index.Chunk))
-> ST s (Maybe (RawPage, Maybe Chunk))
addSmallKeyOp racc@RunAcc{..} k e =
assert (PageAcc.entryWouldFitInPage k e) $ do
modifyPrimVar entryCount (+1)
Expand Down Expand Up @@ -225,7 +225,7 @@ addLargeKeyOp
:: RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix
-> ST s ([RawPage], [RawOverflowPage], [Index.Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeKeyOp racc@RunAcc{..} k e =
assert (not (PageAcc.entryWouldFitInPage k e)) $ do
modifyPrimVar entryCount (+1)
Expand Down Expand Up @@ -276,7 +276,7 @@ addLargeSerialisedKeyOp
-- first page of a multi-page representation of a single
-- key\/op /without/ a 'BlobSpan'.
-> [RawOverflowPage] -- ^ The overflow pages for this key\/op
-> ST s ([RawPage], [RawOverflowPage], [Index.Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeSerialisedKeyOp racc@RunAcc{..} k page overflowPages =
assert (RawPage.rawPageNumKeys page == 1) $
assert (RawPage.rawPageHasBlobSpanAt page 0 == 0) $
Expand All @@ -299,7 +299,7 @@ addLargeSerialisedKeyOp racc@RunAcc{..} k page overflowPages =
--
-- Returns @Nothing@ if the page accumulator was empty.
--
flushPageIfNonEmpty :: RunAcc s -> ST s (Maybe (RawPage, Maybe Index.Chunk))
flushPageIfNonEmpty :: RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty RunAcc{mpageacc, mindex} = do
nkeys <- PageAcc.keysCountPageAcc mpageacc
if nkeys > 0
Expand All @@ -320,10 +320,10 @@ flushPageIfNonEmpty RunAcc{mpageacc, mindex} = do
-- Combine the result of 'flushPageIfNonEmpty' with extra pages and index
-- chunks.
--
selectPagesAndChunks :: Maybe (RawPage, Maybe Index.Chunk)
selectPagesAndChunks :: Maybe (RawPage, Maybe Chunk)
-> RawPage
-> [Index.Chunk]
-> ([RawPage], [Index.Chunk])
-> [Chunk]
-> ([RawPage], [Chunk])
selectPagesAndChunks mpagemchunkPre page chunks =
case mpagemchunkPre of
Nothing -> ( [page], chunks)
Expand Down
8 changes: 5 additions & 3 deletions src/Database/LSMTree/Internal/RunBuilder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import Data.Word (Word64)
import Database.LSMTree.Internal.BlobRef (BlobSpan (..), RawBlobRef)
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.BloomFilter (bloomFilterToLBS)
import Database.LSMTree.Internal.Chunk (Chunk)
import qualified Database.LSMTree.Internal.Chunk as Chunk (toByteString)
import Database.LSMTree.Internal.CRC32C (CRC32C)
import qualified Database.LSMTree.Internal.CRC32C as CRC
import Database.LSMTree.Internal.Entry
Expand Down Expand Up @@ -313,16 +315,16 @@ writeIndexHeader RunBuilder {..} =

{-# SPECIALISE writeIndexChunk ::
RunBuilder IO h
-> Index.Chunk
-> Chunk
-> IO () #-}
writeIndexChunk ::
(MonadSTM m, PrimMonad m)
=> RunBuilder m h
-> Index.Chunk
-> Chunk
-> m ()
writeIndexChunk RunBuilder {..} chunk =
writeToHandle runBuilderHasFS (forRunIndex runBuilderHandles) $
BSL.fromStrict $ Index.chunkToBS chunk
BSL.fromStrict $ Chunk.toByteString chunk

{-# SPECIALISE writeIndexFinal ::
RunBuilder IO h
Expand Down
10 changes: 6 additions & 4 deletions test/Test/Database/LSMTree/Internal/Chunk.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Data.Word (Word8)
import Database.LSMTree.Extras.Generators ()
-- for @Arbitrary@ instantiation of @Vector@
import Database.LSMTree.Internal.Chunk (Chunk, createBaler, feedBaler,
fromChunk, unsafeEndBaler)
toByteVector, unsafeEndBaler)
import Test.QuickCheck (Arbitrary (arbitrary, shrink),
NonEmptyList (NonEmpty), Positive (Positive, getPositive),
Property, Small (Small, getSmall), Testable, scale,
Expand Down Expand Up @@ -94,7 +94,8 @@ prop_contentIsPreserved (MinChunkSize minChunkSize) food
input = concat (List.concat food)

output :: Vector Word8
output = concat (fromChunk <$> catMaybes (commonChunks ++ [remnant]))
output = concat $
toByteVector <$> catMaybes (commonChunks ++ [remnant])

in input === output

Expand All @@ -108,12 +109,13 @@ prop_noRemnantAfterOutput (MinChunkSize minChunkSize) (NonEmpty food)
prop_commonChunksAreLarge :: MinChunkSize -> [[Vector Word8]] -> Property
prop_commonChunksAreLarge (MinChunkSize minChunkSize) food
= withBalingOutput minChunkSize food $ \ commonChunks _ ->
all (fromChunk >>> length >>> (>= minChunkSize)) (catMaybes commonChunks)
all (toByteVector >>> length >>> (>= minChunkSize)) $
catMaybes commonChunks

remnantChunkSizeIs :: (Int -> Bool) -> Int -> [[Vector Word8]] -> Property
remnantChunkSizeIs constraint minChunkSize food
= withBalingOutput minChunkSize food $ \ _ remnant ->
isJust remnant ==> constraint (length (fromChunk (fromJust remnant)))
isJust remnant ==> constraint (length (toByteVector (fromJust remnant)))

prop_remnantChunkIsNonEmpty :: MinChunkSize -> [[Vector Word8]] -> Property
prop_remnantChunkIsNonEmpty (MinChunkSize minChunkSize)
Expand Down
Loading