Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ library
Database.LSMTree.Internal
Database.LSMTree.Internal.Assertions
Database.LSMTree.Internal.BitMath
Database.LSMTree.Internal.BlobFile
Database.LSMTree.Internal.BlobRef
Database.LSMTree.Internal.BloomFilter
Database.LSMTree.Internal.BloomFilterQuery1
Expand Down
21 changes: 15 additions & 6 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import qualified Data.Vector.Primitive as VP
import qualified Data.Vector.Unboxed.Mutable as VUM
import Data.Word
import Database.LSMTree.Internal as Internal
import Database.LSMTree.Internal.BlobFile
import Database.LSMTree.Internal.BlobRef
import Database.LSMTree.Internal.Config
import Database.LSMTree.Internal.CRC32C
Expand Down Expand Up @@ -394,9 +395,9 @@ deriving stock instance Generic (RunReader m h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (RunReader m h)

deriving stock instance Generic (Reader.Entry m (Handle h))
deriving stock instance Generic (Reader.Entry m h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (Reader.Entry m (Handle h))
=> NoThunks (Reader.Entry m h)

{-------------------------------------------------------------------------------
RawPage
Expand All @@ -416,13 +417,21 @@ deriving anyclass instance NoThunks RawOverflowPage
BlobRef
-------------------------------------------------------------------------------}

deriving stock instance Generic (BlobRef m h)
deriving anyclass instance (NoThunks h, Typeable (PrimState m))
=> NoThunks (BlobRef m h)

deriving stock instance Generic BlobSpan
deriving anyclass instance NoThunks BlobSpan

deriving stock instance Generic (BlobFile m h)
deriving anyclass instance (Typeable h, Typeable (PrimState m))
=> NoThunks (BlobFile m h)

deriving stock instance Generic (RawBlobRef m h)
deriving anyclass instance (Typeable h, Typeable (PrimState m))
=> NoThunks (RawBlobRef m h)

deriving stock instance Generic (WeakBlobRef m h)
deriving anyclass instance (Typeable h, Typeable (PrimState m))
=> NoThunks (WeakBlobRef m h)

{-------------------------------------------------------------------------------
Arena
-------------------------------------------------------------------------------}
Expand Down
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import qualified Database.LSMTree.Internal.MergeSchedule as Internal
import qualified Database.LSMTree.Internal.Paths as Internal
import qualified Database.LSMTree.Internal.Range as Internal
import Database.LSMTree.Internal.Serialise.Class
import System.FS.API (FsPath, Handle, HasFS)
import System.FS.API (FsPath, HasFS)
import System.FS.BlockIO.API (HasBlockIO)
import System.FS.IO (HandleIO)

Expand Down Expand Up @@ -268,7 +268,7 @@ type BlobRef :: (Type -> Type) -> Type -> Type
type role BlobRef nominal nominal
data BlobRef m blob where
BlobRef :: Typeable h
=> Internal.WeakBlobRef m (Handle h)
=> Internal.WeakBlobRef m h
-> BlobRef m blob

instance Show (BlobRef m blob) where
Expand Down
60 changes: 16 additions & 44 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ import Data.Kind
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes)
import qualified Data.Primitive.ByteArray as P
import qualified Data.Set as Set
import Data.Typeable
import qualified Data.Vector as V
Expand All @@ -104,7 +103,6 @@ import Database.LSMTree.Internal.Paths (SessionRoot (..),
SnapshotName)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
Expand All @@ -116,8 +114,7 @@ import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import System.FS.API (FsError, FsErrorPath (..), FsPath, Handle,
HasFS)
import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
import qualified System.FS.API.Lazy as FS
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)
Expand Down Expand Up @@ -724,14 +721,14 @@ close t = do
ResolveSerialisedValue
-> V.Vector SerialisedKey
-> Table IO h
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))))) #-}
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
-- | See 'Database.LSMTree.Normal.lookups'.
lookups ::
(MonadST m, MonadSTM m, MonadThrow m)
=> ResolveSerialisedValue
-> V.Vector SerialisedKey
-> Table m h
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h)))))
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups resolve ks t = do
traceWith (tableTracer t) $ TraceLookups (V.length ks)
withOpenTable t $ \thEnv ->
Expand All @@ -753,15 +750,15 @@ lookups resolve ks t = do
ResolveSerialisedValue
-> Range SerialisedKey
-> Table IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
-- | See 'Database.LSMTree.Normal.rangeLookup'.
rangeLookup ::
(MonadFix m, MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> Range SerialisedKey
-> Table m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-- ^ How to map to a query result, different for normal/monoidal
-> m (V.Vector res)
rangeLookup resolve range t fromEntry = do
Expand Down Expand Up @@ -828,44 +825,19 @@ updates resolve es t = do

{-# SPECIALISE retrieveBlobs ::
Session IO h
-> V.Vector (WeakBlobRef IO (FS.Handle h))
-> V.Vector (WeakBlobRef IO h)
-> IO (V.Vector SerialisedBlob) #-}
retrieveBlobs ::
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
(MonadMask m, MonadST m, MonadSTM m)
=> Session m h
-> V.Vector (WeakBlobRef m (FS.Handle h))
-> V.Vector (WeakBlobRef m h)
-> m (V.Vector SerialisedBlob)
retrieveBlobs sesh wrefs =
withOpenSession sesh $ \seshEnv ->
handle (\(BlobRef.WeakBlobRefInvalid i) -> throwIO (ErrBlobRefInvalid i)) $
BlobRef.withWeakBlobRefs wrefs $ \refs -> do

-- Prepare the IOOps:
-- We use a single large memory buffer, with appropriate offsets within
-- the buffer.
let bufSize :: Int
!bufSize = V.sum (V.map BlobRef.blobRefSpanSize refs)

{-# INLINE bufOffs #-}
bufOffs :: V.Vector Int
bufOffs = V.scanl (+) 0 (V.map BlobRef.blobRefSpanSize refs)
buf <- P.newPinnedByteArray bufSize
let ioops = V.zipWith (BlobRef.readBlobIOOp buf) bufOffs refs
hbio = sessionHasBlockIO seshEnv

-- Submit the IOOps all in one go:
_ <- FS.submitIO hbio ioops
-- We do not need to inspect the results because IO errors are
-- thrown as exceptions, and the result is just the read length
-- which is already known. Short reads can't happen here.

-- Construct the SerialisedBlobs results:
-- This is just the different offsets within the shared buffer.
ba <- P.unsafeFreezeByteArray buf
pure $! V.zipWith
(\off len -> SerialisedBlob (RB.fromByteArray off len ba))
bufOffs
(V.map BlobRef.blobRefSpanSize refs)
let hbio = sessionHasBlockIO seshEnv in
handle (\(BlobRef.WeakBlobRefInvalid i) ->
throwIO (ErrBlobRefInvalid i)) $
BlobRef.readWeakBlobRefs hbio wrefs

{-------------------------------------------------------------------------------
Cursors
Expand Down Expand Up @@ -1023,7 +995,7 @@ closeCursor Cursor {..} = do
ResolveSerialisedValue
-> Int
-> Cursor IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
-- | See 'Database.LSMTree.Normal.readCursor'.
readCursor ::
Expand All @@ -1032,7 +1004,7 @@ readCursor ::
=> ResolveSerialisedValue
-> Int -- ^ Maximum number of entries to read
-> Cursor m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-- ^ How to map to a query result, different for normal/monoidal
-> m (V.Vector res)
readCursor resolve n cursor fromEntry =
Expand All @@ -1043,7 +1015,7 @@ readCursor resolve n cursor fromEntry =
-> (SerialisedKey -> Bool)
-> Int
-> Cursor IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
-- | @readCursorWhile _ p n cursor _@ reads elements until either:
--
Expand All @@ -1060,7 +1032,7 @@ readCursorWhile ::
-> (SerialisedKey -> Bool) -- ^ Only read as long as this predicate holds
-> Int -- ^ Maximum number of entries to read
-> Cursor m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-- ^ How to map to a query result, different for normal/monoidal
-> m (V.Vector res)
readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
Expand Down
124 changes: 124 additions & 0 deletions src/Database/LSMTree/Internal/BlobFile.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
module Database.LSMTree.Internal.BlobFile (
BlobFile (..)
, BlobSpan (..)
, removeReference
, RemoveFileOnClose (..)
, openBlobFile
, readBlob
, writeBlob
) where

import Control.DeepSeq (NFData (..))
import Control.Monad (unless)
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow)
import Control.Monad.Primitive (PrimMonad)
import Control.RefCount (RefCounter)
import qualified Control.RefCount as RC
import qualified Data.Primitive.ByteArray as P
import qualified Data.Vector.Primitive as VP
import Data.Word (Word32, Word64)
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..))
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS

-- | A handle to a file containing blobs.
--
-- This is a reference counted object. Upon finalisation, the file is closed
-- and deleted.
--
data BlobFile m h = BlobFile {
blobFileHandle :: {-# UNPACK #-} !(FS.Handle h),
blobFileRefCounter :: {-# UNPACK #-} !(RefCounter m)
}
deriving stock (Show)

instance NFData h => NFData (BlobFile m h) where
rnf (BlobFile a b) = rnf a `seq` rnf b

-- | The location of a blob inside a blob file.
data BlobSpan = BlobSpan {
blobSpanOffset :: {-# UNPACK #-} !Word64
, blobSpanSize :: {-# UNPACK #-} !Word32
}
deriving stock (Show, Eq)

instance NFData BlobSpan where
rnf (BlobSpan a b) = rnf a `seq` rnf b

{-# INLINE removeReference #-}
removeReference ::
(MonadMask m, PrimMonad m)
=> BlobFile m h
-> m ()
removeReference BlobFile{blobFileRefCounter} =
RC.removeReference blobFileRefCounter

-- | TODO: this hack can be removed once snapshots are done properly and so
-- runs can delete their files on close.
data RemoveFileOnClose = RemoveFileOnClose | DoNotRemoveFileOnClose
deriving stock Eq

-- | Open the given file to make a 'BlobFile'. The finaliser will close and
-- delete the file.
--
-- TODO: Temporarily we have a 'RemoveFileOnClose' flag, which can be removed
-- once 'Run' no longer needs it, when snapshots are implemented.
--
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> RemoveFileOnClose -> IO (BlobFile IO h) #-}
openBlobFile ::
PrimMonad m
=> HasFS m h
-> FS.FsPath
-> FS.OpenMode
-> RemoveFileOnClose
-> m (BlobFile m h)
openBlobFile fs path mode remove = do
blobFileHandle <- FS.hOpen fs path mode
let finaliser = do
FS.hClose fs blobFileHandle
unless (remove == DoNotRemoveFileOnClose) $
FS.removeFile fs (FS.handlePath blobFileHandle)
blobFileRefCounter <- RC.mkRefCounter1 (Just finaliser)
return BlobFile {
blobFileHandle,
blobFileRefCounter
}

{-# SPECIALISE readBlob :: HasFS IO h -> BlobFile IO h -> BlobSpan -> IO SerialisedBlob #-}
readBlob ::
(MonadThrow m, PrimMonad m)
=> HasFS m h
-> BlobFile m h
-> BlobSpan
-> m SerialisedBlob
readBlob fs BlobFile {blobFileHandle}
BlobSpan {blobSpanOffset, blobSpanSize} = do
let off = FS.AbsOffset blobSpanOffset
len :: Int
len = fromIntegral blobSpanSize
mba <- P.newPinnedByteArray len
_ <- FS.hGetBufExactlyAt fs blobFileHandle mba 0
(fromIntegral len :: FS.ByteCount) off
ba <- P.unsafeFreezeByteArray mba
let !rb = RB.fromByteArray 0 len ba
return (SerialisedBlob rb)

{-# SPECIALISE writeBlob :: HasFS IO h -> BlobFile IO h -> SerialisedBlob -> Word64 -> IO () #-}
writeBlob ::
(MonadThrow m, PrimMonad m)
=> HasFS m h
-> BlobFile m h
-> SerialisedBlob
-> Word64
-> m ()
writeBlob fs BlobFile {blobFileHandle}
(SerialisedBlob' (VP.Vector boff blen ba)) off = do
mba <- P.unsafeThawByteArray ba
_ <- FS.hPutBufExactlyAt
fs blobFileHandle mba
(FS.BufferOffset boff)
(fromIntegral blen :: FS.ByteCount)
(FS.AbsOffset off)
return ()
Loading