Skip to content

Commit

Permalink
Add general-purpose chunk handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jeltsch committed Jul 15, 2024
1 parent cbf1c1e commit bcafa21
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ library
Database.LSMTree.Internal.BlobRef
Database.LSMTree.Internal.BloomFilter
Database.LSMTree.Internal.ByteString
Database.LSMTree.Internal.Chunk
Database.LSMTree.Internal.CRC32C
Database.LSMTree.Internal.Entry
Database.LSMTree.Internal.IndexCompact
Expand Down
98 changes: 98 additions & 0 deletions src/Database/LSMTree/Internal/Chunk.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
module Database.LSMTree.Internal.Chunk
(
-- * Chunks
Chunk (Chunk),

-- * Balers
Baler,
createBaler,
feedBaler,
readBalerRemnant
)
where

import Prelude hiding (concat, length, splitAt)

import Control.Monad.ST.Strict (ST)
import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef)
import Data.Vector.Primitive (Vector, concat, length, slice, splitAt)
import Data.Word (Word8)

-- * Chunks

-- | A chunk of bytes, typically output during incremental index serialisation.
newtype Chunk = Chunk (Vector Word8)

-- * Balers

-- Mutable state of a baler.
data BalerState = BalerState
![Vector Word8] -- Reverse list of queued blocks
!Int -- Total size of queued blocks

{-|
An object that receives blocks of bytes and repackages them into chunks of
preferably a given maximum size.
-}
data Baler s = Baler
!Int -- Maximum chunk size
!(STRef s BalerState) -- Reference to the mutable state

-- | Creates a new baler.
createBaler :: Int -- ^ Maximum chunk size
-> ST s (Baler s) -- ^ Creation of the baler
createBaler maxChunkSize = Baler maxChunkSize <$> newSTRef (BalerState [] 0)

{-|
Feeds a baler a block of bytes.
Bytes received by a bailer are generally queued for later output, but if
feeding new bytes makes the queued content exceed the maximum chunk size
then as many chunks of maximum size as possible are output.
-}
feedBaler :: Vector Word8 -> Baler s -> ST s [Chunk]
feedBaler block (Baler maxChunkSize stateRef) = do
BalerState remnantBlocksRev remnantSize <- readSTRef stateRef
let

totalSize :: Int
totalSize = remnantSize + length block

if totalSize < maxChunkSize
then do
writeSTRef stateRef
(BalerState (block : remnantBlocksRev) totalSize)
return []
else do
let

newRemnantSize :: Int
newRemnantSize = totalSize `mod` maxChunkSize

lastOutput, newRemnant :: Vector Word8
(lastOutput, newRemnant)
= splitAt (length block - newRemnantSize) block

output :: Vector Word8
output = concat (reverse (lastOutput : remnantBlocksRev))

chunks :: [Chunk]
chunks = Chunk <$>
(\ start -> slice start maxChunkSize output) <$>
init [0, maxChunkSize .. totalSize]

writeSTRef stateRef (BalerState [newRemnant] newRemnantSize)
return chunks

{-|
Reads the bytes queued in a bailer.
Note that this will not dispose of the bytes in the bailer. To actually get
rid of them, you have to get rid of the bailer itself.
-}
readBalerRemnant :: Baler s -> ST s (Maybe Chunk)
readBalerRemnant (Baler _ stateRef) = do
BalerState remnantBlocksRev remnantSize <- readSTRef stateRef
return $ if remnantSize == 0
then Nothing
else Just $ Chunk (concat (reverse remnantBlocksRev))

0 comments on commit bcafa21

Please sign in to comment.