Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ library
Database.LSMTree.Internal.IndexCompact
Database.LSMTree.Internal.IndexCompactAcc
Database.LSMTree.Internal.IndexOrdinary
Database.LSMTree.Internal.IndexOrdinaryAcc
Database.LSMTree.Internal.Lookup
Database.LSMTree.Internal.Managed
Database.LSMTree.Internal.Merge
Expand Down
32 changes: 25 additions & 7 deletions src/Database/LSMTree/Internal/Chunk.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Prelude hiding (length)

import Control.Exception (assert)
import Control.Monad.ST.Strict (ST)
import Data.List (scanl')
import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef)
import Data.Vector.Primitive (Vector, length, unsafeCopy,
unsafeFreeze)
Expand Down Expand Up @@ -54,34 +55,51 @@ createBaler minChunkSize = assert (minChunkSize > 0) $
newSTRef 0

{-|
Feeds a baler a block of bytes.
Feeds a baler blocks of bytes.

Bytes received by a baler are generally queued for later output, but if
feeding new bytes makes the accumulated content exceed the minimum chunk
size then a chunk containing all the accumulated content is output.
-}
feedBaler :: Vector Word8 -> Baler s -> ST s (Maybe Chunk)
feedBaler block (Baler buffer remnantSizeRef) = do
feedBaler :: forall s . [Vector Word8] -> Baler s -> ST s (Maybe Chunk)
feedBaler blocks (Baler buffer remnantSizeRef) = do
remnantSize <- readSTRef remnantSizeRef
let

inputSize :: Int
!inputSize = sum (map length blocks)

totalSize :: Int
!totalSize = remnantSize + length block
!totalSize = remnantSize + inputSize

if totalSize <= Mutable.length buffer
then do
unsafeCopy (Mutable.slice remnantSize (length block) buffer)
block
unsafeCopyBlocks (Mutable.drop remnantSize buffer)
writeSTRef remnantSizeRef totalSize
return Nothing
else do
protoChunk <- Mutable.unsafeNew totalSize
Mutable.unsafeCopy (Mutable.take remnantSize protoChunk)
(Mutable.take remnantSize buffer)
unsafeCopy (Mutable.drop remnantSize protoChunk) block
unsafeCopyBlocks (Mutable.drop remnantSize protoChunk)
writeSTRef remnantSizeRef 0
chunk <- Chunk <$> unsafeFreeze protoChunk
return (Just chunk)
where

unsafeCopyBlocks :: MVector s Word8 -> ST s ()
unsafeCopyBlocks vec
= sequence_ $
zipWith3 (\ start size block -> unsafeCopy
(Mutable.slice start size vec)
block)
(scanl' (+) 0 blockSizes)
blockSizes
blocks
where

blockSizes :: [Int]
blockSizes = map length blocks

{-|
Returns the bytes still queued in a baler, if any, thereby invalidating the
Expand Down
124 changes: 124 additions & 0 deletions src/Database/LSMTree/Internal/IndexOrdinaryAcc.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{- HLINT ignore "Avoid restricted alias" -}

{-|
Incremental construction functionality for the general-purpose fence pointer
index.
-}
module Database.LSMTree.Internal.IndexOrdinaryAcc
(
IndexOrdinaryAcc,
new,
append,
unsafeEnd
)
where

import Prelude hiding (take)

import Control.Exception (assert)
import Control.Monad.ST.Strict (ST, runST)
import Data.Primitive.ByteArray (newByteArray, unsafeFreezeByteArray,
writeByteArray)
import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef)
import Data.Vector (force, take, unsafeFreeze)
import Data.Vector.Mutable (MVector)
import qualified Data.Vector.Mutable as Mutable (unsafeNew, write)
import qualified Data.Vector.Primitive as Primitive (Vector, length)
import Data.Word (Word16, Word8)
import Database.LSMTree.Internal.Chunk (Baler, Chunk, createBaler,
feedBaler, unsafeEndBaler)
import Database.LSMTree.Internal.IndexCompactAcc
(Append (AppendMultiPage, AppendSinglePage))
import Database.LSMTree.Internal.IndexOrdinary
(IndexOrdinary (IndexOrdinary))
import Database.LSMTree.Internal.Serialise
(SerialisedKey (SerialisedKey'))
import Database.LSMTree.Internal.Vector (mkPrimVector)

{-|
A general-purpose fence pointer index under incremental construction.

A value @IndexOrdinaryAcc buffer keyCountRef baler@ denotes a partially
constructed index with the following properties:

* The keys that the index assigns to pages are stored as a prefix of the
mutable vector @buffer@.
* The reference @keyCountRef@ points to the number of those keys.
* The @baler@ object is used by the index for incremental output of the
serialised key list.
-}
data IndexOrdinaryAcc s = IndexOrdinaryAcc
!(MVector s SerialisedKey)
!(STRef s Int)
!(Baler s)

-- | Creates a new, initially empty, index.
new :: Int -- ^ Maximum number of keys
-> Int -- ^ Minimum chunk size in bytes
-> ST s (IndexOrdinaryAcc s) -- ^ Construction of the index
new maxKeyCount minChunkSize = assert (maxKeyCount >= 0) $
IndexOrdinaryAcc <$>
Mutable.unsafeNew maxKeyCount <*>
newSTRef 0 <*>
createBaler minChunkSize

{-|
Appends keys to the key list of an index and outputs newly available chunks
of the serialised key list.

__Warning:__ Appending keys whose length cannot be represented by a 16-bit
word may result in a corrupted serialised key list.
-}
append :: Append -> IndexOrdinaryAcc s -> ST s (Maybe Chunk)
append instruction (IndexOrdinaryAcc buffer keyCountRef baler)
= case instruction of
AppendSinglePage _ key -> do
keyCount <- readSTRef keyCountRef
Mutable.write buffer keyCount key
writeSTRef keyCountRef $! succ keyCount
feedBaler (keyListElem key) baler
AppendMultiPage key overflowPageCount -> do
keyCount <- readSTRef keyCountRef
let

pageCount :: Int
!pageCount = succ (fromIntegral overflowPageCount)

keyCount' :: Int
!keyCount' = keyCount + pageCount

mapM_ (flip (Mutable.write buffer) key)
[keyCount .. pred keyCount']
writeSTRef keyCountRef $! keyCount'
feedBaler (concat (replicate pageCount (keyListElem key))) baler
where

keyListElem :: SerialisedKey -> [Primitive.Vector Word8]
keyListElem (SerialisedKey' keyBytes) = [keySizeBytes, keyBytes] where

keySize :: Int
!keySize = Primitive.length keyBytes

keySizeAsWord16 :: Word16
!keySizeAsWord16 = assert (keySize <= fromIntegral (maxBound :: Word16)) $
fromIntegral keySize

keySizeBytes :: Primitive.Vector Word8
!keySizeBytes = mkPrimVector 0 2 $
runST $ do
rep <- newByteArray 2
writeByteArray rep 0 keySizeAsWord16
unsafeFreezeByteArray rep

{-|
Returns the constructed index, along with a final chunk in case the
serialised key list has not been fully output yet, thereby invalidating the
index under construction. Executing @unsafeEnd index@ is only safe when
@index@ is not used afterwards.
-}
unsafeEnd :: IndexOrdinaryAcc s -> ST s (Maybe Chunk, IndexOrdinary)
unsafeEnd (IndexOrdinaryAcc buffer keyCountRef baler) = do
keyCount <- readSTRef keyCountRef
keys <- force <$> take keyCount <$> unsafeFreeze buffer
remnant <- unsafeEndBaler baler
return (remnant, IndexOrdinary keys)