From 1b3158bebfa4122de1491ef115d50b0d6ccce5ea Mon Sep 17 00:00:00 2001 From: Wolfgang Jeltsch Date: Wed, 17 Jul 2024 20:11:10 +0300 Subject: [PATCH] Add incremental functionality for the ordinary index --- lsm-tree.cabal | 1 + src/Database/LSMTree/Internal/Chunk.hs | 32 ++++- .../LSMTree/Internal/IndexOrdinaryAcc.hs | 124 ++++++++++++++++++ 3 files changed, 150 insertions(+), 7 deletions(-) create mode 100644 src/Database/LSMTree/Internal/IndexOrdinaryAcc.hs diff --git a/lsm-tree.cabal b/lsm-tree.cabal index 5397ea521..7a722d27b 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -119,6 +119,7 @@ library Database.LSMTree.Internal.IndexCompact Database.LSMTree.Internal.IndexCompactAcc Database.LSMTree.Internal.IndexOrdinary + Database.LSMTree.Internal.IndexOrdinaryAcc Database.LSMTree.Internal.Lookup Database.LSMTree.Internal.Managed Database.LSMTree.Internal.Merge diff --git a/src/Database/LSMTree/Internal/Chunk.hs b/src/Database/LSMTree/Internal/Chunk.hs index 1fe874969..cf8d131c9 100644 --- a/src/Database/LSMTree/Internal/Chunk.hs +++ b/src/Database/LSMTree/Internal/Chunk.hs @@ -18,6 +18,7 @@ import Prelude hiding (length) import Control.Exception (assert) import Control.Monad.ST.Strict (ST) +import Data.List (scanl') import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef) import Data.Vector.Primitive (Vector, length, unsafeCopy, unsafeFreeze) @@ -54,34 +55,51 @@ createBaler minChunkSize = assert (minChunkSize > 0) $ newSTRef 0 {-| - Feeds a baler a block of bytes. + Feeds a baler blocks of bytes. Bytes received by a baler are generally queued for later output, but if feeding new bytes makes the accumulated content exceed the minimum chunk size then a chunk containing all the accumulated content is output. -} -feedBaler :: Vector Word8 -> Baler s -> ST s (Maybe Chunk) -feedBaler block (Baler buffer remnantSizeRef) = do +feedBaler :: forall s . [Vector Word8] -> Baler s -> ST s (Maybe Chunk) +feedBaler blocks (Baler buffer remnantSizeRef) = do remnantSize <- readSTRef remnantSizeRef let + inputSize :: Int + !inputSize = sum (map length blocks) + totalSize :: Int - !totalSize = remnantSize + length block + !totalSize = remnantSize + inputSize if totalSize <= Mutable.length buffer then do - unsafeCopy (Mutable.slice remnantSize (length block) buffer) - block + unsafeCopyBlocks (Mutable.drop remnantSize buffer) writeSTRef remnantSizeRef totalSize return Nothing else do protoChunk <- Mutable.unsafeNew totalSize Mutable.unsafeCopy (Mutable.take remnantSize protoChunk) (Mutable.take remnantSize buffer) - unsafeCopy (Mutable.drop remnantSize protoChunk) block + unsafeCopyBlocks (Mutable.drop remnantSize protoChunk) writeSTRef remnantSizeRef 0 chunk <- Chunk <$> unsafeFreeze protoChunk return (Just chunk) + where + + unsafeCopyBlocks :: MVector s Word8 -> ST s () + unsafeCopyBlocks vec + = sequence_ $ + zipWith3 (\ start size block -> unsafeCopy + (Mutable.slice start size vec) + block) + (scanl' (+) 0 blockSizes) + blockSizes + blocks + where + + blockSizes :: [Int] + blockSizes = map length blocks {-| Returns the bytes still queued in a baler, if any, thereby invalidating the diff --git a/src/Database/LSMTree/Internal/IndexOrdinaryAcc.hs b/src/Database/LSMTree/Internal/IndexOrdinaryAcc.hs new file mode 100644 index 000000000..919c60dce --- /dev/null +++ b/src/Database/LSMTree/Internal/IndexOrdinaryAcc.hs @@ -0,0 +1,124 @@ +{- HLINT ignore "Avoid restricted alias" -} + +{-| + Incremental construction functionality for the general-purpose fence pointer + index. +-} +module Database.LSMTree.Internal.IndexOrdinaryAcc +( + IndexOrdinaryAcc, + new, + append, + unsafeEnd +) +where + +import Prelude hiding (take) + +import Control.Exception (assert) +import Control.Monad.ST.Strict (ST, runST) +import Data.Primitive.ByteArray (newByteArray, unsafeFreezeByteArray, + writeByteArray) +import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef) +import Data.Vector (force, take, unsafeFreeze) +import Data.Vector.Mutable (MVector) +import qualified Data.Vector.Mutable as Mutable (unsafeNew, write) +import qualified Data.Vector.Primitive as Primitive (Vector, length) +import Data.Word (Word16, Word8) +import Database.LSMTree.Internal.Chunk (Baler, Chunk, createBaler, + feedBaler, unsafeEndBaler) +import Database.LSMTree.Internal.IndexCompactAcc + (Append (AppendMultiPage, AppendSinglePage)) +import Database.LSMTree.Internal.IndexOrdinary + (IndexOrdinary (IndexOrdinary)) +import Database.LSMTree.Internal.Serialise + (SerialisedKey (SerialisedKey')) +import Database.LSMTree.Internal.Vector (mkPrimVector) + +{-| + A general-purpose fence pointer index under incremental construction. + + A value @IndexOrdinaryAcc buffer keyCountRef baler@ denotes a partially + constructed index with the following properties: + + * The keys that the index assigns to pages are stored as a prefix of the + mutable vector @buffer@. + * The reference @keyCountRef@ points to the number of those keys. + * The @baler@ object is used by the index for incremental output of the + serialised key list. +-} +data IndexOrdinaryAcc s = IndexOrdinaryAcc + !(MVector s SerialisedKey) + !(STRef s Int) + !(Baler s) + +-- | Creates a new, initially empty, index. +new :: Int -- ^ Maximum number of keys + -> Int -- ^ Minimum chunk size in bytes + -> ST s (IndexOrdinaryAcc s) -- ^ Construction of the index +new maxKeyCount minChunkSize = assert (maxKeyCount >= 0) $ + IndexOrdinaryAcc <$> + Mutable.unsafeNew maxKeyCount <*> + newSTRef 0 <*> + createBaler minChunkSize + +{-| + Appends keys to the key list of an index and outputs newly available chunks + of the serialised key list. + + __Warning:__ Appending keys whose length cannot be represented by a 16-bit + word may result in a corrupted serialised key list. +-} +append :: Append -> IndexOrdinaryAcc s -> ST s (Maybe Chunk) +append instruction (IndexOrdinaryAcc buffer keyCountRef baler) + = case instruction of + AppendSinglePage _ key -> do + keyCount <- readSTRef keyCountRef + Mutable.write buffer keyCount key + writeSTRef keyCountRef $! succ keyCount + feedBaler (keyListElem key) baler + AppendMultiPage key overflowPageCount -> do + keyCount <- readSTRef keyCountRef + let + + pageCount :: Int + !pageCount = succ (fromIntegral overflowPageCount) + + keyCount' :: Int + !keyCount' = keyCount + pageCount + + mapM_ (flip (Mutable.write buffer) key) + [keyCount .. pred keyCount'] + writeSTRef keyCountRef $! keyCount' + feedBaler (concat (replicate pageCount (keyListElem key))) baler + where + + keyListElem :: SerialisedKey -> [Primitive.Vector Word8] + keyListElem (SerialisedKey' keyBytes) = [keySizeBytes, keyBytes] where + + keySize :: Int + !keySize = Primitive.length keyBytes + + keySizeAsWord16 :: Word16 + !keySizeAsWord16 = assert (keySize <= fromIntegral (maxBound :: Word16)) $ + fromIntegral keySize + + keySizeBytes :: Primitive.Vector Word8 + !keySizeBytes = mkPrimVector 0 2 $ + runST $ do + rep <- newByteArray 2 + writeByteArray rep 0 keySizeAsWord16 + unsafeFreezeByteArray rep + +{-| + Returns the constructed index, along with a final chunk in case the + serialised key list has not been fully output yet, thereby invalidating the + index under construction. Executing @unsafeEnd index@ is only safe when + @index@ is not used afterwards. +-} +unsafeEnd :: IndexOrdinaryAcc s -> ST s (Maybe Chunk, IndexOrdinary) +unsafeEnd (IndexOrdinaryAcc buffer keyCountRef baler) = do + keyCount <- readSTRef keyCountRef + keys <- force <$> take keyCount <$> unsafeFreeze buffer + remnant <- unsafeEndBaler baler + return (remnant, IndexOrdinary keys)