diff --git a/lsm-tree.cabal b/lsm-tree.cabal index 558f22c1e..9365d708d 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -119,6 +119,7 @@ library Database.LSMTree.Internal Database.LSMTree.Internal.Assertions Database.LSMTree.Internal.BitMath + Database.LSMTree.Internal.BlobFile Database.LSMTree.Internal.BlobRef Database.LSMTree.Internal.BloomFilter Database.LSMTree.Internal.BloomFilterQuery1 diff --git a/src-extras/Database/LSMTree/Extras/NoThunks.hs b/src-extras/Database/LSMTree/Extras/NoThunks.hs index 7c23952b6..796b9d925 100644 --- a/src-extras/Database/LSMTree/Extras/NoThunks.hs +++ b/src-extras/Database/LSMTree/Extras/NoThunks.hs @@ -34,6 +34,7 @@ import qualified Data.Vector.Primitive as VP import qualified Data.Vector.Unboxed.Mutable as VUM import Data.Word import Database.LSMTree.Internal as Internal +import Database.LSMTree.Internal.BlobFile import Database.LSMTree.Internal.BlobRef import Database.LSMTree.Internal.Config import Database.LSMTree.Internal.CRC32C @@ -394,9 +395,9 @@ deriving stock instance Generic (RunReader m h) deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h) => NoThunks (RunReader m h) -deriving stock instance Generic (Reader.Entry m (Handle h)) +deriving stock instance Generic (Reader.Entry m h) deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h) - => NoThunks (Reader.Entry m (Handle h)) + => NoThunks (Reader.Entry m h) {------------------------------------------------------------------------------- RawPage @@ -416,13 +417,21 @@ deriving anyclass instance NoThunks RawOverflowPage BlobRef -------------------------------------------------------------------------------} -deriving stock instance Generic (BlobRef m h) -deriving anyclass instance (NoThunks h, Typeable (PrimState m)) - => NoThunks (BlobRef m h) - deriving stock instance Generic BlobSpan deriving anyclass instance NoThunks BlobSpan +deriving stock instance Generic (BlobFile m h) +deriving anyclass instance (Typeable h, Typeable (PrimState m)) + => NoThunks (BlobFile m h) + +deriving stock instance Generic (RawBlobRef m h) +deriving anyclass instance (Typeable h, Typeable (PrimState m)) + => NoThunks (RawBlobRef m h) + +deriving stock instance Generic (WeakBlobRef m h) +deriving anyclass instance (Typeable h, Typeable (PrimState m)) + => NoThunks (WeakBlobRef m h) + {------------------------------------------------------------------------------- Arena -------------------------------------------------------------------------------} diff --git a/src/Database/LSMTree/Common.hs b/src/Database/LSMTree/Common.hs index 4606bad56..63edd2f11 100644 --- a/src/Database/LSMTree/Common.hs +++ b/src/Database/LSMTree/Common.hs @@ -63,7 +63,7 @@ import qualified Database.LSMTree.Internal.MergeSchedule as Internal import qualified Database.LSMTree.Internal.Paths as Internal import qualified Database.LSMTree.Internal.Range as Internal import Database.LSMTree.Internal.Serialise.Class -import System.FS.API (FsPath, Handle, HasFS) +import System.FS.API (FsPath, HasFS) import System.FS.BlockIO.API (HasBlockIO) import System.FS.IO (HandleIO) @@ -268,7 +268,7 @@ type BlobRef :: (Type -> Type) -> Type -> Type type role BlobRef nominal nominal data BlobRef m blob where BlobRef :: Typeable h - => Internal.WeakBlobRef m (Handle h) + => Internal.WeakBlobRef m h -> BlobRef m blob instance Show (BlobRef m blob) where diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 8aaf59bd4..dd87cefa4 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -87,7 +87,6 @@ import Data.Kind import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes) -import qualified Data.Primitive.ByteArray as P import qualified Data.Set as Set import Data.Typeable import qualified Data.Vector as V @@ -104,7 +103,6 @@ import Database.LSMTree.Internal.Paths (SessionRoot (..), SnapshotName) import qualified Database.LSMTree.Internal.Paths as Paths import Database.LSMTree.Internal.Range (Range (..)) -import qualified Database.LSMTree.Internal.RawBytes as RB import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.RunReaders (OffsetKey (..)) @@ -116,8 +114,7 @@ import Database.LSMTree.Internal.UniqCounter import qualified Database.LSMTree.Internal.WriteBuffer as WB import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB import qualified System.FS.API as FS -import System.FS.API (FsError, FsErrorPath (..), FsPath, Handle, - HasFS) +import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS) import qualified System.FS.API.Lazy as FS import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) @@ -724,14 +721,14 @@ close t = do ResolveSerialisedValue -> V.Vector SerialisedKey -> Table IO h - -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))))) #-} + -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-} -- | See 'Database.LSMTree.Normal.lookups'. lookups :: (MonadST m, MonadSTM m, MonadThrow m) => ResolveSerialisedValue -> V.Vector SerialisedKey -> Table m h - -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h))))) + -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))) lookups resolve ks t = do traceWith (tableTracer t) $ TraceLookups (V.length ks) withOpenTable t $ \thEnv -> @@ -753,7 +750,7 @@ lookups resolve ks t = do ResolveSerialisedValue -> Range SerialisedKey -> Table IO h - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> IO (V.Vector res) #-} -- | See 'Database.LSMTree.Normal.rangeLookup'. rangeLookup :: @@ -761,7 +758,7 @@ rangeLookup :: => ResolveSerialisedValue -> Range SerialisedKey -> Table m h - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -- ^ How to map to a query result, different for normal/monoidal -> m (V.Vector res) rangeLookup resolve range t fromEntry = do @@ -828,44 +825,19 @@ updates resolve es t = do {-# SPECIALISE retrieveBlobs :: Session IO h - -> V.Vector (WeakBlobRef IO (FS.Handle h)) + -> V.Vector (WeakBlobRef IO h) -> IO (V.Vector SerialisedBlob) #-} retrieveBlobs :: - (MonadFix m, MonadMask m, MonadST m, MonadSTM m) + (MonadMask m, MonadST m, MonadSTM m) => Session m h - -> V.Vector (WeakBlobRef m (FS.Handle h)) + -> V.Vector (WeakBlobRef m h) -> m (V.Vector SerialisedBlob) retrieveBlobs sesh wrefs = withOpenSession sesh $ \seshEnv -> - handle (\(BlobRef.WeakBlobRefInvalid i) -> throwIO (ErrBlobRefInvalid i)) $ - BlobRef.withWeakBlobRefs wrefs $ \refs -> do - - -- Prepare the IOOps: - -- We use a single large memory buffer, with appropriate offsets within - -- the buffer. - let bufSize :: Int - !bufSize = V.sum (V.map BlobRef.blobRefSpanSize refs) - - {-# INLINE bufOffs #-} - bufOffs :: V.Vector Int - bufOffs = V.scanl (+) 0 (V.map BlobRef.blobRefSpanSize refs) - buf <- P.newPinnedByteArray bufSize - let ioops = V.zipWith (BlobRef.readBlobIOOp buf) bufOffs refs - hbio = sessionHasBlockIO seshEnv - - -- Submit the IOOps all in one go: - _ <- FS.submitIO hbio ioops - -- We do not need to inspect the results because IO errors are - -- thrown as exceptions, and the result is just the read length - -- which is already known. Short reads can't happen here. - - -- Construct the SerialisedBlobs results: - -- This is just the different offsets within the shared buffer. - ba <- P.unsafeFreezeByteArray buf - pure $! V.zipWith - (\off len -> SerialisedBlob (RB.fromByteArray off len ba)) - bufOffs - (V.map BlobRef.blobRefSpanSize refs) + let hbio = sessionHasBlockIO seshEnv in + handle (\(BlobRef.WeakBlobRefInvalid i) -> + throwIO (ErrBlobRefInvalid i)) $ + BlobRef.readWeakBlobRefs hbio wrefs {------------------------------------------------------------------------------- Cursors @@ -1023,7 +995,7 @@ closeCursor Cursor {..} = do ResolveSerialisedValue -> Int -> Cursor IO h - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> IO (V.Vector res) #-} -- | See 'Database.LSMTree.Normal.readCursor'. readCursor :: @@ -1032,7 +1004,7 @@ readCursor :: => ResolveSerialisedValue -> Int -- ^ Maximum number of entries to read -> Cursor m h - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -- ^ How to map to a query result, different for normal/monoidal -> m (V.Vector res) readCursor resolve n cursor fromEntry = @@ -1043,7 +1015,7 @@ readCursor resolve n cursor fromEntry = -> (SerialisedKey -> Bool) -> Int -> Cursor IO h - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> IO (V.Vector res) #-} -- | @readCursorWhile _ p n cursor _@ reads elements until either: -- @@ -1060,7 +1032,7 @@ readCursorWhile :: -> (SerialisedKey -> Bool) -- ^ Only read as long as this predicate holds -> Int -- ^ Maximum number of entries to read -> Cursor m h - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -- ^ How to map to a query result, different for normal/monoidal -> m (V.Vector res) readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do diff --git a/src/Database/LSMTree/Internal/BlobFile.hs b/src/Database/LSMTree/Internal/BlobFile.hs new file mode 100644 index 000000000..304096e7e --- /dev/null +++ b/src/Database/LSMTree/Internal/BlobFile.hs @@ -0,0 +1,124 @@ +module Database.LSMTree.Internal.BlobFile ( + BlobFile (..) + , BlobSpan (..) + , removeReference + , RemoveFileOnClose (..) + , openBlobFile + , readBlob + , writeBlob + ) where + +import Control.DeepSeq (NFData (..)) +import Control.Monad (unless) +import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow) +import Control.Monad.Primitive (PrimMonad) +import Control.RefCount (RefCounter) +import qualified Control.RefCount as RC +import qualified Data.Primitive.ByteArray as P +import qualified Data.Vector.Primitive as VP +import Data.Word (Word32, Word64) +import qualified Database.LSMTree.Internal.RawBytes as RB +import Database.LSMTree.Internal.Serialise (SerialisedBlob (..)) +import qualified System.FS.API as FS +import System.FS.API (HasFS) +import qualified System.FS.BlockIO.API as FS + +-- | A handle to a file containing blobs. +-- +-- This is a reference counted object. Upon finalisation, the file is closed +-- and deleted. +-- +data BlobFile m h = BlobFile { + blobFileHandle :: {-# UNPACK #-} !(FS.Handle h), + blobFileRefCounter :: {-# UNPACK #-} !(RefCounter m) + } + deriving stock (Show) + +instance NFData h => NFData (BlobFile m h) where + rnf (BlobFile a b) = rnf a `seq` rnf b + +-- | The location of a blob inside a blob file. +data BlobSpan = BlobSpan { + blobSpanOffset :: {-# UNPACK #-} !Word64 + , blobSpanSize :: {-# UNPACK #-} !Word32 + } + deriving stock (Show, Eq) + +instance NFData BlobSpan where + rnf (BlobSpan a b) = rnf a `seq` rnf b + +{-# INLINE removeReference #-} +removeReference :: + (MonadMask m, PrimMonad m) + => BlobFile m h + -> m () +removeReference BlobFile{blobFileRefCounter} = + RC.removeReference blobFileRefCounter + +-- | TODO: this hack can be removed once snapshots are done properly and so +-- runs can delete their files on close. +data RemoveFileOnClose = RemoveFileOnClose | DoNotRemoveFileOnClose + deriving stock Eq + +-- | Open the given file to make a 'BlobFile'. The finaliser will close and +-- delete the file. +-- +-- TODO: Temporarily we have a 'RemoveFileOnClose' flag, which can be removed +-- once 'Run' no longer needs it, when snapshots are implemented. +-- +{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> RemoveFileOnClose -> IO (BlobFile IO h) #-} +openBlobFile :: + PrimMonad m + => HasFS m h + -> FS.FsPath + -> FS.OpenMode + -> RemoveFileOnClose + -> m (BlobFile m h) +openBlobFile fs path mode remove = do + blobFileHandle <- FS.hOpen fs path mode + let finaliser = do + FS.hClose fs blobFileHandle + unless (remove == DoNotRemoveFileOnClose) $ + FS.removeFile fs (FS.handlePath blobFileHandle) + blobFileRefCounter <- RC.mkRefCounter1 (Just finaliser) + return BlobFile { + blobFileHandle, + blobFileRefCounter + } + +{-# SPECIALISE readBlob :: HasFS IO h -> BlobFile IO h -> BlobSpan -> IO SerialisedBlob #-} +readBlob :: + (MonadThrow m, PrimMonad m) + => HasFS m h + -> BlobFile m h + -> BlobSpan + -> m SerialisedBlob +readBlob fs BlobFile {blobFileHandle} + BlobSpan {blobSpanOffset, blobSpanSize} = do + let off = FS.AbsOffset blobSpanOffset + len :: Int + len = fromIntegral blobSpanSize + mba <- P.newPinnedByteArray len + _ <- FS.hGetBufExactlyAt fs blobFileHandle mba 0 + (fromIntegral len :: FS.ByteCount) off + ba <- P.unsafeFreezeByteArray mba + let !rb = RB.fromByteArray 0 len ba + return (SerialisedBlob rb) + +{-# SPECIALISE writeBlob :: HasFS IO h -> BlobFile IO h -> SerialisedBlob -> Word64 -> IO () #-} +writeBlob :: + (MonadThrow m, PrimMonad m) + => HasFS m h + -> BlobFile m h + -> SerialisedBlob + -> Word64 + -> m () +writeBlob fs BlobFile {blobFileHandle} + (SerialisedBlob' (VP.Vector boff blen ba)) off = do + mba <- P.unsafeThawByteArray ba + _ <- FS.hPutBufExactlyAt + fs blobFileHandle mba + (FS.BufferOffset boff) + (fromIntegral blen :: FS.ByteCount) + (FS.AbsOffset off) + return () diff --git a/src/Database/LSMTree/Internal/BlobRef.hs b/src/Database/LSMTree/Internal/BlobRef.hs index 2bc2e2d5f..0cf3850c8 100644 --- a/src/Database/LSMTree/Internal/BlobRef.hs +++ b/src/Database/LSMTree/Internal/BlobRef.hs @@ -4,185 +4,199 @@ {- HLINT ignore "Use unless" -} module Database.LSMTree.Internal.BlobRef ( - BlobRef (..) - , BlobSpan (..) - , blobRefSpanSize + BlobSpan (..) + , RawBlobRef (..) , WeakBlobRef (..) - , withWeakBlobRef - , withWeakBlobRefs - , deRefWeakBlobRef - , deRefWeakBlobRefs , WeakBlobRefInvalid (..) - , removeReference - , removeReferences - , readBlob - , readBlobIOOp + , rawToWeakBlobRef + , readRawBlobRef + , readWeakBlobRef + , readWeakBlobRefs ) where -import Control.DeepSeq (NFData (..)) import Control.Monad (when) import Control.Monad.Class.MonadThrow (Exception, MonadMask, MonadThrow (..), bracket, throwIO) import Control.Monad.Primitive -import Control.RefCount (RefCounter) import qualified Control.RefCount as RC -import Data.Coerce (coerce) -import qualified Data.Primitive.ByteArray as P (MutableByteArray, - newPinnedByteArray, unsafeFreezeByteArray) +import qualified Data.Primitive.ByteArray as P (newPinnedByteArray, + unsafeFreezeByteArray) import qualified Data.Vector as V -import Data.Word (Word32, Word64) +import qualified Data.Vector.Mutable as VM +import Database.LSMTree.Internal.BlobFile (BlobFile (..), + BlobSpan (..)) +import qualified Database.LSMTree.Internal.BlobFile as BlobFile import qualified Database.LSMTree.Internal.RawBytes as RB import Database.LSMTree.Internal.Serialise (SerialisedBlob (..)) import qualified System.FS.API as FS import System.FS.API (HasFS) import qualified System.FS.BlockIO.API as FS +import System.FS.BlockIO.API (HasBlockIO) --- | A handle-like reference to an on-disk blob. The blob can be retrieved based --- on the reference. + +-- | A raw blob reference is a reference to a blob within a blob file. -- --- See 'Database.LSMTree.Common.BlobRef' for more info. -data BlobRef m h = BlobRef { - blobRefFile :: !h - , blobRefCount :: {-# UNPACK #-} !(RefCounter m) - , blobRefSpan :: {-# UNPACK #-} !BlobSpan +-- The \"raw\" means that it does no reference counting, so does not maintain +-- ownership of the 'BlobFile'. Thus these are only safe to use in the context +-- of code that already (directly or indirectly) owns the blob file that the +-- blob ref uses (such as within run merging). +-- +-- Thus these cannot be handed out via the API. Use 'WeakBlobRef' for that. +-- +data RawBlobRef m h = RawBlobRef { + rawBlobRefFile :: {-# NOUNPACK #-} !(BlobFile m h) + , rawBlobRefSpan :: {-# UNPACK #-} !BlobSpan } deriving stock (Show) -instance NFData h => NFData (BlobRef m h) where - rnf (BlobRef a b c) = rnf a `seq` rnf b `seq` rnf c - --- | Location of a blob inside a blob file. -data BlobSpan = BlobSpan { - blobSpanOffset :: {-# UNPACK #-} !Word64 - , blobSpanSize :: {-# UNPACK #-} !Word32 - } - deriving stock (Show, Eq) - -instance NFData BlobSpan where - rnf (BlobSpan a b) = rnf a `seq` rnf b - -blobRefSpanSize :: BlobRef m h -> Int -blobRefSpanSize = fromIntegral . blobSpanSize . blobRefSpan +-- | A \"weak\" reference to a blob within a blob file. These are the ones we +-- can return in the public API and can outlive their parent table. +-- +-- They are weak references in that they do not keep the file open using a +-- reference count. So when we want to use our weak reference we have to +-- dereference them to obtain a normal strong reference while we do the I\/O +-- to read the blob. This ensures the file is not closed under our feet. +-- +-- See 'Database.LSMTree.Common.BlobRef' for more info. +-- +data WeakBlobRef m h = WeakBlobRef { + weakBlobRefFile :: {-# NOUNPACK #-} !(BlobFile m h) + , weakBlobRefSpan :: {-# UNPACK #-} !BlobSpan + } + deriving stock (Show) --- | A 'WeakBlobRef' is a weak reference to a blob file. These are the ones we --- can return in the public API and can outlive their parent table. They do not --- keep the file open using a reference count. So when we want to use our weak --- reference we have to dereference them to obtain a normal strong reference --- while we do the I\/O to read the blob. This ensures the file is not closed --- under our feet. +-- | A \"strong\" reference to a blob within a blob file. The blob file remains +-- open while the strong reference is live. Thus it is safe to do I\/O to +-- retrieve the blob based on the reference. Strong references must be released +-- using 'releaseBlobRef' when no longer in use (e.g. after completing I\/O). -- -newtype WeakBlobRef m h = WeakBlobRef (BlobRef m h) - deriving newtype (Show, NFData) +data StrongBlobRef m h = StrongBlobRef { + strongBlobRefFile :: {-# NOUNPACK #-} !(BlobFile m h) + , strongBlobRefSpan :: {-# UNPACK #-} !BlobSpan + } + deriving stock (Show) + +-- | Convert a 'RawBlobRef' to a 'WeakBlobRef'. +rawToWeakBlobRef :: RawBlobRef m h -> WeakBlobRef m h +rawToWeakBlobRef RawBlobRef {rawBlobRefFile, rawBlobRefSpan} = + -- This doesn't need to really do anything, becuase the raw version + -- does not maintain an independent ref count, and the weak one does + -- not either. + WeakBlobRef { + weakBlobRefFile = rawBlobRefFile, + weakBlobRefSpan = rawBlobRefSpan + } -- | The 'WeakBlobRef' now points to a blob that is no longer available. newtype WeakBlobRefInvalid = WeakBlobRefInvalid Int deriving stock (Show) deriving anyclass (Exception) -{-# SPECIALISE withWeakBlobRef :: - WeakBlobRef IO h - -> (BlobRef IO h -> IO a) - -> IO a #-} --- | 'WeakBlobRef's are weak references. They do not keep the blob file open. --- Dereference a 'WeakBlobRef' to a strong 'BlobRef' to allow I\/O using --- 'readBlob' or 'readBlobIOOp'. Use 'removeReference' when the 'BlobRef' is --- no longer needed. --- --- Throws 'WeakBlobRefInvalid' if the weak reference has become invalid. --- -withWeakBlobRef :: - (MonadMask m, PrimMonad m) - => WeakBlobRef m h - -> (BlobRef m h -> m a) - -> m a -withWeakBlobRef wref = bracket (deRefWeakBlobRef wref) removeReference - -{-# SPECIALISE withWeakBlobRefs :: - V.Vector (WeakBlobRef IO h) - -> (V.Vector (BlobRef IO h) -> IO a) - -> IO a #-} --- | The same as 'withWeakBlobRef' but for many references in one go. --- -withWeakBlobRefs :: - (MonadMask m, PrimMonad m) - => V.Vector (WeakBlobRef m h) - -> (V.Vector (BlobRef m h) -> m a) - -> m a -withWeakBlobRefs wrefs = bracket (deRefWeakBlobRefs wrefs) removeReferences - {-# SPECIALISE deRefWeakBlobRef :: WeakBlobRef IO h - -> IO (BlobRef IO h) #-} + -> IO (StrongBlobRef IO h) #-} deRefWeakBlobRef :: (MonadThrow m, PrimMonad m) => WeakBlobRef m h - -> m (BlobRef m h) -deRefWeakBlobRef (WeakBlobRef ref) = do - ok <- RC.upgradeWeakReference (blobRefCount ref) + -> m (StrongBlobRef m h) +deRefWeakBlobRef WeakBlobRef{weakBlobRefFile, weakBlobRefSpan} = do + ok <- RC.upgradeWeakReference (blobFileRefCounter weakBlobRefFile) when (not ok) $ throwIO (WeakBlobRefInvalid 0) - pure ref + return StrongBlobRef{ + strongBlobRefFile = weakBlobRefFile, + strongBlobRefSpan = weakBlobRefSpan + } {-# SPECIALISE deRefWeakBlobRefs :: V.Vector (WeakBlobRef IO h) - -> IO (V.Vector (BlobRef IO h)) #-} + -> IO (V.Vector (StrongBlobRef IO h)) #-} deRefWeakBlobRefs :: forall m h. (MonadMask m, PrimMonad m) => V.Vector (WeakBlobRef m h) - -> m (V.Vector (BlobRef m h)) + -> m (V.Vector (StrongBlobRef m h)) deRefWeakBlobRefs wrefs = do - let refs :: V.Vector (BlobRef m h) - refs = coerce wrefs -- safely coerce away the newtype wrappers - V.iforM_ wrefs $ \i (WeakBlobRef ref) -> do - ok <- RC.upgradeWeakReference (blobRefCount ref) - when (not ok) $ do - -- drop refs on the previous ones taken successfully so far - V.mapM_ removeReference (V.take i refs) - throwIO (WeakBlobRefInvalid i) - pure refs - -{-# SPECIALISE removeReference :: BlobRef IO h -> IO () #-} -removeReference :: (MonadMask m, PrimMonad m) => BlobRef m h -> m () -removeReference = RC.removeReference . blobRefCount - -{-# SPECIALISE removeReferences :: V.Vector (BlobRef IO h) -> IO () #-} -removeReferences :: (MonadMask m, PrimMonad m) => V.Vector (BlobRef m h) -> m () -removeReferences = V.mapM_ removeReference - -{-# SPECIALISE readBlob :: - HasFS IO h - -> BlobRef IO (FS.Handle h) - -> IO SerialisedBlob #-} -readBlob :: + refs <- VM.new (V.length wrefs) + V.iforM_ wrefs $ \i WeakBlobRef {weakBlobRefFile, weakBlobRefSpan} -> do + ok <- RC.upgradeWeakReference (blobFileRefCounter weakBlobRefFile) + if ok + then VM.write refs i StrongBlobRef { + strongBlobRefFile = weakBlobRefFile, + strongBlobRefSpan = weakBlobRefSpan + } + else do + -- drop refs on the previous ones taken successfully so far + VM.mapM_ removeReference (VM.take i refs) + throwIO (WeakBlobRefInvalid i) + V.unsafeFreeze refs + +{-# SPECIALISE removeReference :: StrongBlobRef IO h -> IO () #-} +removeReference :: (MonadMask m, PrimMonad m) => StrongBlobRef m h -> m () +removeReference = BlobFile.removeReference . strongBlobRefFile + +{-# INLINE readRawBlobRef #-} +readRawBlobRef :: (MonadThrow m, PrimMonad m) => HasFS m h - -> BlobRef m (FS.Handle h) + -> RawBlobRef m h + -> m SerialisedBlob +readRawBlobRef fs RawBlobRef {rawBlobRefFile, rawBlobRefSpan} = + BlobFile.readBlob fs rawBlobRefFile rawBlobRefSpan + +{-# SPECIALISE readWeakBlobRef :: HasFS IO h -> WeakBlobRef IO h -> IO SerialisedBlob #-} +readWeakBlobRef :: + (MonadMask m, PrimMonad m) + => HasFS m h + -> WeakBlobRef m h -> m SerialisedBlob -readBlob fs BlobRef { - blobRefFile, - blobRefSpan = BlobSpan {blobSpanOffset, blobSpanSize} - } = do - let off = FS.AbsOffset blobSpanOffset - len :: Int - len = fromIntegral blobSpanSize - mba <- P.newPinnedByteArray len - _ <- FS.hGetBufExactlyAt fs blobRefFile mba 0 - (fromIntegral len :: FS.ByteCount) off - ba <- P.unsafeFreezeByteArray mba - let !rb = RB.fromByteArray 0 len ba - return (SerialisedBlob rb) - -readBlobIOOp :: - P.MutableByteArray s -> Int - -> BlobRef m (FS.Handle h) - -> FS.IOOp s h -readBlobIOOp buf bufoff - BlobRef { - blobRefFile, - blobRefSpan = BlobSpan {blobSpanOffset, blobSpanSize} - } = - FS.IOOpRead - blobRefFile - (fromIntegral blobSpanOffset :: FS.FileOffset) - buf (FS.BufferOffset bufoff) - (fromIntegral blobSpanSize :: FS.ByteCount) +readWeakBlobRef fs wref = + bracket (deRefWeakBlobRef wref) removeReference $ + \StrongBlobRef {strongBlobRefFile, strongBlobRefSpan} -> + BlobFile.readBlob fs strongBlobRefFile strongBlobRefSpan + +{-# SPECIALISE readWeakBlobRefs :: HasBlockIO IO h -> V.Vector (WeakBlobRef IO h) -> IO (V.Vector SerialisedBlob) #-} +readWeakBlobRefs :: + (MonadMask m, PrimMonad m) + => HasBlockIO m h + -> V.Vector (WeakBlobRef m h) + -> m (V.Vector SerialisedBlob) +readWeakBlobRefs hbio wrefs = + bracket (deRefWeakBlobRefs wrefs) (V.mapM_ removeReference) $ \refs -> do + -- Prepare the IOOps: + -- We use a single large memory buffer, with appropriate offsets within + -- the buffer. + let bufSize :: Int + !bufSize = V.sum (V.map blobRefSpanSize refs) + + {-# INLINE bufOffs #-} + bufOffs :: V.Vector Int + bufOffs = V.scanl (+) 0 (V.map blobRefSpanSize refs) + buf <- P.newPinnedByteArray bufSize + + -- Submit the IOOps all in one go: + _ <- FS.submitIO hbio $ + V.zipWith + (\bufoff + StrongBlobRef { + strongBlobRefFile = BlobFile {blobFileHandle}, + strongBlobRefSpan = BlobSpan {blobSpanOffset, blobSpanSize} + } -> + FS.IOOpRead + blobFileHandle + (fromIntegral blobSpanOffset :: FS.FileOffset) + buf (FS.BufferOffset bufoff) + (fromIntegral blobSpanSize :: FS.ByteCount)) + bufOffs refs + -- We do not need to inspect the results because IO errors are + -- thrown as exceptions, and the result is just the read length + -- which is already known. Short reads can't happen here. + + -- Construct the SerialisedBlobs results: + -- This is just the different offsets within the shared buffer. + ba <- P.unsafeFreezeByteArray buf + pure $! V.zipWith + (\off len -> SerialisedBlob (RB.fromByteArray off len ba)) + bufOffs + (V.map blobRefSpanSize refs) + where + blobRefSpanSize = fromIntegral . blobSpanSize . strongBlobRefSpan diff --git a/src/Database/LSMTree/Internal/CRC32C.hs b/src/Database/LSMTree/Internal/CRC32C.hs index c4d44de91..f113d36c2 100644 --- a/src/Database/LSMTree/Internal/CRC32C.hs +++ b/src/Database/LSMTree/Internal/CRC32C.hs @@ -79,7 +79,7 @@ initialCRC32C = CRC32C 0 -- same as crc32c BS.empty updateCRC32C :: BS.ByteString -> CRC32C -> CRC32C updateCRC32C bs (CRC32C crc) = CRC32C (CRC.crc32c_update crc bs) - +{-# SPECIALISE hGetSomeCRC32C :: HasFS IO h -> Handle h -> Word64 -> CRC32C -> IO (BS.ByteString, CRC32C) #-} hGetSomeCRC32C :: Monad m => HasFS m h -> Handle h @@ -100,6 +100,7 @@ hGetSomeCRC32C fs h n crc = do -- TODO: To reliably return a strict bytestring without additional copying, -- @fs-api@ needs to support directly reading into a buffer, which is currently -- work in progress: +{-# SPECIALISE hGetExactlyCRC32C :: HasFS IO h -> Handle h -> Word64 -> CRC32C -> IO (BSL.ByteString, CRC32C) #-} hGetExactlyCRC32C :: MonadThrow m => HasFS m h -> Handle h @@ -111,6 +112,7 @@ hGetExactlyCRC32C fs h n crc = do return (lbs, crc') +{-# SPECIALISE hPutSomeCRC32C :: HasFS IO h -> Handle h -> BS.ByteString -> CRC32C -> IO (Word64, CRC32C) #-} hPutSomeCRC32C :: Monad m => HasFS m h -> Handle h @@ -123,6 +125,7 @@ hPutSomeCRC32C fs h bs crc = do -- | This function makes sure that the whole 'BS.ByteString' is written. +{-# SPECIALISE hPutAllCRC32C :: HasFS IO h -> Handle h -> BS.ByteString -> CRC32C -> IO (Word64, CRC32C) #-} hPutAllCRC32C :: forall m h . Monad m => HasFS m h @@ -141,6 +144,7 @@ hPutAllCRC32C fs h = go 0 else go written' bs' crc' -- | This function makes sure that the whole /lazy/ 'BSL.ByteString' is written. +{-# SPECIALISE hPutAllChunksCRC32C :: HasFS IO h -> Handle h -> BSL.ByteString -> CRC32C -> IO (Word64, CRC32C) #-} hPutAllChunksCRC32C :: forall m h . Monad m => HasFS m h @@ -155,6 +159,7 @@ hPutAllChunksCRC32C fs h = \lbs crc -> (n, crc') <- hPutAllCRC32C fs h bs crc return (written + n, crc') +{-# SPECIALISE readFileCRC32C :: HasFS IO h -> FsPath -> IO CRC32C #-} readFileCRC32C :: forall m h. MonadThrow m => HasFS m h -> FsPath -> m CRC32C readFileCRC32C fs file = withFile fs file ReadMode (\h -> go h initialCRC32C) @@ -266,6 +271,7 @@ data ChecksumsFileFormatError = ChecksumsFileFormatError FsPath BSC.ByteString instance Exception ChecksumsFileFormatError +{-# SPECIALISE readChecksumsFile :: HasFS IO h -> FsPath -> IO ChecksumsFile #-} readChecksumsFile :: MonadThrow m => HasFS m h -> FsPath -> m ChecksumsFile readChecksumsFile fs path = do @@ -274,6 +280,7 @@ readChecksumsFile fs path = do Left badline -> throwIO (ChecksumsFileFormatError path badline) Right checksums -> return checksums +{-# SPECIALISE writeChecksumsFile :: HasFS IO h -> FsPath -> ChecksumsFile -> IO () #-} writeChecksumsFile :: MonadThrow m => HasFS m h -> FsPath -> ChecksumsFile -> m () writeChecksumsFile fs path checksums = @@ -281,6 +288,7 @@ writeChecksumsFile fs path checksums = _ <- hPutAll fs h (formatChecksumsFile checksums) return () +{-# SPECIALISE writeChecksumsFile' :: HasFS IO h -> Handle h -> ChecksumsFile -> IO () #-} writeChecksumsFile' :: MonadThrow m => HasFS m h -> Handle h -> ChecksumsFile -> m () writeChecksumsFile' fs h checksums = void $ hPutAll fs h (formatChecksumsFile checksums) diff --git a/src/Database/LSMTree/Internal/Cursor.hs b/src/Database/LSMTree/Internal/Cursor.hs index b1bc8b815..e8451c603 100644 --- a/src/Database/LSMTree/Internal/Cursor.hs +++ b/src/Database/LSMTree/Internal/Cursor.hs @@ -9,7 +9,8 @@ import Control.Monad.Class.MonadST (MonadST (..)) import Control.Monad.Class.MonadThrow import Control.Monad.Fix (MonadFix) import qualified Data.Vector as V -import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..)) +import Database.LSMTree.Internal.BlobRef (RawBlobRef, + WeakBlobRef (..)) import qualified Database.LSMTree.Internal.BlobRef as BlobRef import Database.LSMTree.Internal.Entry (Entry) import qualified Database.LSMTree.Internal.Entry as Entry @@ -19,13 +20,12 @@ import qualified Database.LSMTree.Internal.RunReaders as Readers import Database.LSMTree.Internal.Serialise (SerialisedKey, SerialisedValue) import qualified Database.LSMTree.Internal.Vector as V -import System.FS.API (Handle) {-# INLINE readEntriesWhile #-} {-# SPECIALISE readEntriesWhile :: forall h res. ResolveSerialisedValue -> (SerialisedKey -> Bool) - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> Readers.Readers IO h -> Int -> IO (V.Vector res, Readers.HasMore) #-} @@ -39,7 +39,7 @@ readEntriesWhile :: forall h m res. (MonadFix m, MonadMask m, MonadST m, MonadSTM m) => ResolveSerialisedValue -> (SerialisedKey -> Bool) - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -> Readers.Readers m h -> Int -> m (V.Vector res, Readers.HasMore) @@ -107,7 +107,7 @@ readEntriesWhile resolve keyIsWanted fromEntry readers n = -- Once we have a resolved entry, we still have to make sure it's not -- a 'Delete', since we only want to write values to the result vector. handleResolved :: SerialisedKey - -> Entry SerialisedValue (BlobRef.BlobRef m (Handle h)) + -> Entry SerialisedValue (RawBlobRef m h) -> Readers.HasMore -> m (Maybe res, Readers.HasMore) handleResolved key entry hasMore = @@ -123,10 +123,10 @@ readEntriesWhile resolve keyIsWanted fromEntry readers n = Readers.Drained -> return (Nothing, Readers.Drained) toResult :: SerialisedKey - -> Entry SerialisedValue (BlobRef.BlobRef m (Handle h)) + -> Entry SerialisedValue (RawBlobRef m h) -> Maybe res toResult key = \case Entry.Insert v -> Just $ fromEntry key v Nothing - Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b)) + Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (BlobRef.rawToWeakBlobRef b)) Entry.Mupdate v -> Just $ fromEntry key v Nothing Entry.Delete -> Nothing diff --git a/src/Database/LSMTree/Internal/Lookup.hs b/src/Database/LSMTree/Internal/Lookup.hs index ca17bbf6c..e0cadade8 100644 --- a/src/Database/LSMTree/Internal/Lookup.hs +++ b/src/Database/LSMTree/Internal/Lookup.hs @@ -47,7 +47,8 @@ import Database.LSMTree.Internal.Page (PageSpan (..), getNumPages, import Database.LSMTree.Internal.RawBytes (RawBytes (..)) import qualified Database.LSMTree.Internal.RawBytes as RB import Database.LSMTree.Internal.RawPage -import Database.LSMTree.Internal.Run (Run, mkBlobRefForRun) +import Database.LSMTree.Internal.Run (Run) +import qualified Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.Serialise import qualified Database.LSMTree.Internal.Vector as V import qualified Database.LSMTree.Internal.WriteBuffer as WB @@ -163,7 +164,7 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy { -> V.Vector IndexCompact -> V.Vector (Handle h) -> V.Vector SerialisedKey - -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))))) + -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-} -- | Batched lookups in I\/O. -- @@ -183,7 +184,7 @@ lookupsIO :: -> V.Vector IndexCompact -- ^ The indexes inside @rs@ -> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@ -> V.Vector SerialisedKey - -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h))))) + -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))) lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks = assert precondition $ withArena mgr $ \arena -> do @@ -208,7 +209,7 @@ lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks -> VP.Vector RunIxKeyIx -> V.Vector (IOOp RealWorld h) -> VU.Vector IOResult - -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))))) + -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-} -- | Intra-page lookups, and combining lookup results from multiple runs and -- the write buffer. @@ -227,7 +228,7 @@ intraPageLookups :: -> VP.Vector RunIxKeyIx -> V.Vector (IOOp (PrimState m) h) -> VU.Vector IOResult - -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h))))) + -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))) intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do -- We accumulate results into the 'res' vector. When there are several -- lookup hits for the same key then we combine the results. The combining @@ -245,8 +246,7 @@ intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do res <- VM.generateM (V.length ks) $ \ki -> case WB.lookup wb (V.unsafeIndex ks ki) of Nothing -> pure Nothing - Just e -> pure $! Just $! - fmap (WeakBlobRef . WBB.mkBlobRef wbblobs) e + Just e -> pure $! Just $! fmap (WBB.mkWeakBlobRef wbblobs) e -- TODO: ^^ we should be able to avoid this allocation by -- combining the conversion with other later conversions. loop res 0 @@ -256,7 +256,7 @@ intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do loop :: VM.MVector (PrimState m) - (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h)))) + (Maybe (Entry SerialisedValue (WeakBlobRef m h))) -> Int -> m () loop !res !ioopix @@ -275,8 +275,7 @@ intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do -- Laziness ensures that we only compute the forcing of the value in -- the entry when the result is needed. LookupEntry e -> do - let e' = bimap copySerialisedValue - (WeakBlobRef . mkBlobRefForRun r) e + let e' = bimap copySerialisedValue (Run.mkWeakBlobRef r) e -- TODO: ^^ we should be able to avoid this allocation by -- combining the conversion with other later conversions. V.unsafeInsertWithMStrict res (combine resolveV) kix e' @@ -289,7 +288,7 @@ intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do (unBufferOffset (ioopBufferOffset ioop) + 4096) (fromIntegral m) buf) - e' = bimap v' (WeakBlobRef . mkBlobRefForRun r) e + e' = bimap v' (Run.mkWeakBlobRef r) e V.unsafeInsertWithMStrict res (combine resolveV) kix e' loop res (ioopix + 1) diff --git a/src/Database/LSMTree/Internal/Merge.hs b/src/Database/LSMTree/Internal/Merge.hs index 2f4900f95..f68f95387 100644 --- a/src/Database/LSMTree/Internal/Merge.hs +++ b/src/Database/LSMTree/Internal/Merge.hs @@ -33,7 +33,7 @@ import Data.Primitive.MutVar import Data.Traversable (for) import qualified Data.Vector as V import Data.Word -import Database.LSMTree.Internal.BlobRef (BlobRef) +import Database.LSMTree.Internal.BlobRef (RawBlobRef) import Database.LSMTree.Internal.Entry import Database.LSMTree.Internal.Run (Run, RunDataCaching) import qualified Database.LSMTree.Internal.Run as Run @@ -45,7 +45,6 @@ import Database.LSMTree.Internal.RunReaders (Readers) import qualified Database.LSMTree.Internal.RunReaders as Readers import Database.LSMTree.Internal.Serialise import GHC.Stack (HasCallStack) -import qualified System.FS.API as FS import System.FS.API (HasFS) import System.FS.BlockIO.API (HasBlockIO) @@ -190,7 +189,7 @@ finaliser var b rs = do -- this function with async exceptions masked. Otherwise, these resources can -- leak. complete :: - (MonadFix m, MonadSTM m, MonadST m, MonadThrow m) + (MonadFix m, MonadSTM m, MonadST m, MonadMask m) => Merge m h -> m (Run m h) complete Merge{..} = do @@ -218,7 +217,7 @@ complete Merge{..} = do -- -- Note: run with async exceptions masked. See 'complete'. stepsToCompletion :: - (MonadCatch m, MonadFix m, MonadSTM m, MonadST m) + (MonadMask m, MonadFix m, MonadSTM m, MonadST m) => Merge m h -> Int -> m (Run m h) @@ -237,7 +236,7 @@ stepsToCompletion m stepBatchSize = go -- -- Note: run with async exceptions masked. See 'complete'. stepsToCompletionCounted :: - (MonadCatch m, MonadFix m, MonadSTM m, MonadST m) + (MonadMask m, MonadFix m, MonadSTM m, MonadST m) => Merge m h -> Int -> m (Int, Run m h) @@ -357,14 +356,14 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do Level -> RunBuilder IO h -> SerialisedKey - -> Reader.Entry IO (FS.Handle h) + -> Reader.Entry IO h -> IO () #-} writeReaderEntry :: (MonadSTM m, MonadST m, MonadThrow m) => Level -> RunBuilder m h -> SerialisedKey - -> Reader.Entry m (FS.Handle h) + -> Reader.Entry m h -> m () writeReaderEntry level builder key (Reader.Entry entryFull) = -- Small entry. @@ -397,14 +396,14 @@ writeReaderEntry level builder key entry@(Reader.EntryOverflow prefix page _ ove Level -> RunBuilder IO h -> SerialisedKey - -> Entry SerialisedValue (BlobRef IO (FS.Handle h)) + -> Entry SerialisedValue (RawBlobRef IO h) -> IO () #-} writeSerialisedEntry :: (MonadSTM m, MonadST m, MonadThrow m) => Level -> RunBuilder m h -> SerialisedKey - -> Entry SerialisedValue (BlobRef m (FS.Handle h)) + -> Entry SerialisedValue (RawBlobRef m h) -> m () writeSerialisedEntry level builder key entry = when (shouldWriteEntry level entry) $ diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 5e604ad7e..01763a86e 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -869,7 +869,7 @@ levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr) {-# SPECIALISE mergeRuns :: ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc -> RunFsPaths -> Merge.Level -> V.Vector (Run IO h) -> IO (Run IO h) #-} mergeRuns :: - (MonadCatch m, MonadFix m, MonadST m, MonadSTM m) + (MonadMask m, MonadFix m, MonadST m, MonadSTM m) => ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h diff --git a/src/Database/LSMTree/Internal/Run.hs b/src/Database/LSMTree/Internal/Run.hs index 09dc67970..fd82348bc 100644 --- a/src/Database/LSMTree/Internal/Run.hs +++ b/src/Database/LSMTree/Internal/Run.hs @@ -45,7 +45,8 @@ module Database.LSMTree.Internal.Run ( , addReference , removeReference , removeReferenceN - , mkBlobRefForRun + , mkRawBlobRef + , mkWeakBlobRef -- ** Run creation , fromMutable , fromWriteBuffer @@ -66,7 +67,10 @@ import Data.BloomFilter (Bloom) import qualified Data.ByteString.Short as SBS import Data.Foldable (for_) import Data.Word (Word64) -import Database.LSMTree.Internal.BlobRef (BlobRef (..), BlobSpan (..)) +import Database.LSMTree.Internal.BlobFile hiding (removeReference) +import qualified Database.LSMTree.Internal.BlobFile as BlobFile +import Database.LSMTree.Internal.BlobRef (RawBlobRef (..), + WeakBlobRef (..)) import Database.LSMTree.Internal.BloomFilter (bloomFilterFromSBS) import qualified Database.LSMTree.Internal.CRC32C as CRC import Database.LSMTree.Internal.Entry (NumEntries (..)) @@ -111,7 +115,7 @@ data Run m h = Run { -- | The file handle for the BLOBs file. This file is opened -- read-only and is accessed in a normal style using buffered -- I\/O, reading arbitrary file offset and length spans. - , runBlobFile :: !(FS.Handle h) + , runBlobFile :: !(BlobFile m h) , runRunDataCaching :: !RunDataCaching , runHasFS :: !(HasFS m h) , runHasBlockIO :: !(HasBlockIO m h) @@ -141,13 +145,20 @@ removeReference r = RC.removeReference (runRefCounter r) removeReferenceN :: (PrimMonad m, MonadMask m) => Run m h -> Word64 -> m () removeReferenceN r = RC.removeReferenceN (runRefCounter r) --- | Helper function to make a 'BlobRef' that points into a 'Run'. -mkBlobRefForRun :: Run m h -> BlobSpan -> BlobRef m (FS.Handle h) -mkBlobRefForRun Run{runBlobFile, runRefCounter} blobRefSpan = - BlobRef { - blobRefFile = runBlobFile, - blobRefCount = runRefCounter, - blobRefSpan +-- | Helper function to make a 'WeakBlobRef' that points into a 'Run'. +mkRawBlobRef :: Run m h -> BlobSpan -> RawBlobRef m h +mkRawBlobRef Run{runBlobFile} blobspan = + RawBlobRef { + rawBlobRefFile = runBlobFile, + rawBlobRefSpan = blobspan + } + +-- | Helper function to make a 'WeakBlobRef' that points into a 'Run'. +mkWeakBlobRef :: Run m h -> BlobSpan -> WeakBlobRef m h +mkWeakBlobRef Run{runBlobFile} blobspan = + WeakBlobRef { + weakBlobRefFile = runBlobFile, + weakBlobRefSpan = blobspan } {-# SPECIALISE close :: @@ -158,18 +169,20 @@ mkBlobRefForRun Run{runBlobFile, runRefCounter} blobRefSpan = -- -- TODO: Once snapshots are implemented, files should get removed, but for now -- we want to be able to re-open closed runs from disk. -close :: (MonadSTM m, MonadThrow m) => Run m h -> m () +-- TODO: see openBlobFile DoNotRemoveFileOnClose. This can be dropped at the +-- same once when snapshots are implemented. +close :: (MonadSTM m, MonadMask m, PrimMonad m) => Run m h -> m () close Run {..} = do -- TODO: removing files should drop them from the page cache, but until we -- have proper snapshotting we are keeping the files around. Because of -- this, we instruct the OS to drop all run-related files from the page -- cache FS.hDropCacheAll runHasBlockIO runKOpsFile - FS.hDropCacheAll runHasBlockIO runBlobFile + FS.hDropCacheAll runHasBlockIO (blobFileHandle runBlobFile) FS.hClose runHasFS runKOpsFile `finally` - FS.hClose runHasFS runBlobFile + BlobFile.removeReference runBlobFile -- | Should this run cache key\/ops data in memory? data RunDataCaching = CacheRunData | NoCacheRunData @@ -205,7 +218,7 @@ setRunDataCaching hbio runKOpsFile NoCacheRunData = do -> RunBuilder IO h -> IO (Run IO h) #-} fromMutable :: - (MonadFix m, MonadST m, MonadSTM m, MonadThrow m) + (MonadFix m, MonadST m, MonadSTM m, MonadMask m) => RunDataCaching -> RefCount -> RunBuilder m h @@ -214,7 +227,8 @@ fromMutable runRunDataCaching refCount builder = do (runHasFS, runHasBlockIO, runRunFsPaths, runFilter, runIndex, runNumEntries) <- Builder.unsafeFinalise (runRunDataCaching == NoCacheRunData) builder runKOpsFile <- FS.hOpen runHasFS (runKOpsPath runRunFsPaths) FS.ReadMode - runBlobFile <- FS.hOpen runHasFS (runBlobPath runRunFsPaths) FS.ReadMode + runBlobFile <- openBlobFile runHasFS (runBlobPath runRunFsPaths) FS.ReadMode + DoNotRemoveFileOnClose setRunDataCaching runHasBlockIO runKOpsFile runRunDataCaching rec runRefCounter <- RC.unsafeMkRefCounterN refCount (Just $ close r) let !r = Run { .. } @@ -236,7 +250,7 @@ fromMutable runRunDataCaching refCount builder = do -- immediately when they are added to the write buffer, avoiding the need to do -- it here. fromWriteBuffer :: - (MonadFix m, MonadST m, MonadSTM m, MonadThrow m) + (MonadFix m, MonadST m, MonadSTM m, MonadMask m) => HasFS m h -> HasBlockIO m h -> RunDataCaching @@ -248,7 +262,7 @@ fromWriteBuffer :: fromWriteBuffer fs hbio caching alloc fsPaths buffer blobs = do builder <- Builder.new fs hbio fsPaths (WB.numEntries buffer) alloc for_ (WB.toList buffer) $ \(k, e) -> - Builder.addKeyOp builder k (fmap (WBB.mkBlobRef blobs) e) + Builder.addKeyOp builder k (fmap (WBB.mkRawBlobRef blobs) e) --TODO: the fmap entry here reallocates even when there are no blobs fromMutable caching (RefCount 1) builder @@ -274,7 +288,7 @@ data FileFormatError = FileFormatError FS.FsPath String -- checksum ('ChecksumError') or can't be parsed ('FileFormatError'). openFromDisk :: forall m h. - (MonadFix m, MonadSTM m, MonadThrow m, PrimMonad m) + (MonadFix m, MonadSTM m, MonadMask m, PrimMonad m) => HasFS m h -> HasBlockIO m h -> RunDataCaching @@ -299,7 +313,8 @@ openFromDisk fs hbio runRunDataCaching runRunFsPaths = do =<< readCRC (forRunIndex expectedChecksums) (forRunIndex paths) runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode - runBlobFile <- FS.hOpen fs (runBlobPath runRunFsPaths) FS.ReadMode + runBlobFile <- openBlobFile fs (runBlobPath runRunFsPaths) FS.ReadMode + DoNotRemoveFileOnClose setRunDataCaching hbio runKOpsFile runRunDataCaching rec runRefCounter <- RC.unsafeMkRefCounterN (RefCount 1) (Just $ close r) let !r = Run diff --git a/src/Database/LSMTree/Internal/RunBuilder.hs b/src/Database/LSMTree/Internal/RunBuilder.hs index 06971868e..659360b52 100644 --- a/src/Database/LSMTree/Internal/RunBuilder.hs +++ b/src/Database/LSMTree/Internal/RunBuilder.hs @@ -22,7 +22,7 @@ import qualified Data.ByteString.Lazy as BSL import Data.Foldable (for_, traverse_) import Data.Primitive.PrimVar import Data.Word (Word64) -import Database.LSMTree.Internal.BlobRef (BlobRef, BlobSpan (..)) +import Database.LSMTree.Internal.BlobRef (BlobSpan (..), RawBlobRef) import qualified Database.LSMTree.Internal.BlobRef as BlobRef import Database.LSMTree.Internal.BloomFilter (bloomFilterToLBS) import Database.LSMTree.Internal.CRC32C (CRC32C) @@ -106,11 +106,11 @@ new fs hbio runBuilderFsPaths numEntries alloc = do {-# SPECIALISE addKeyOp :: RunBuilder IO h -> SerialisedKey - -> Entry SerialisedValue (BlobRef IO (FS.Handle h)) + -> Entry SerialisedValue (RawBlobRef IO h) -> IO () #-} -- | Add a key\/op pair. -- --- In the 'InsertWithBlob' case, the 'BlobRef' identifies where the blob can be +-- In the 'InsertWithBlob' case, the 'RawBlobRef' identifies where the blob can be -- found (which is either from a write buffer or another run). The blobs will -- be copied from their existing blob file into the new run's blob file. -- @@ -125,7 +125,7 @@ addKeyOp :: (MonadST m, MonadSTM m, MonadThrow m) => RunBuilder m h -> SerialisedKey - -> Entry SerialisedValue (BlobRef m (FS.Handle h)) + -> Entry SerialisedValue (RawBlobRef m h) -> m () addKeyOp builder@RunBuilder{runBuilderAcc} key op = do -- TODO: the fmap entry here reallocates even when there are no blobs. @@ -275,16 +275,18 @@ writeBlob RunBuilder{..} blob = do {-# SPECIALISE copyBlob :: RunBuilder IO h - -> BlobRef IO (FS.Handle h) - -> IO BlobRef.BlobSpan #-} + -> RawBlobRef IO h + -> IO BlobSpan #-} copyBlob :: (MonadSTM m, MonadThrow m, PrimMonad m) => RunBuilder m h - -> BlobRef m (FS.Handle h) - -> m BlobRef.BlobSpan + -> RawBlobRef m h + -> m BlobSpan copyBlob builder@RunBuilder {..} blobref = do - blob <- BlobRef.readBlob runBuilderHasFS blobref + blob <- BlobRef.readRawBlobRef runBuilderHasFS blobref writeBlob builder blob + --TODO: can't easily switch this to use BlobFile.writeBlob because + -- RunBuilder currently does everything uniformly with ChecksumHandle. {-# SPECIALISE writeFilter :: RunBuilder IO h diff --git a/src/Database/LSMTree/Internal/RunReader.hs b/src/Database/LSMTree/Internal/RunReader.hs index fba341ac5..74decdef8 100644 --- a/src/Database/LSMTree/Internal/RunReader.hs +++ b/src/Database/LSMTree/Internal/RunReader.hs @@ -29,7 +29,7 @@ import Data.Primitive.PrimVar import Data.Word (Word16, Word32) import Database.LSMTree.Internal.BitMath (ceilDivPageSize, mulPageSize, roundUpToPageSize) -import Database.LSMTree.Internal.BlobRef (BlobRef (..)) +import Database.LSMTree.Internal.BlobRef (RawBlobRef (..)) import qualified Database.LSMTree.Internal.Entry as E import qualified Database.LSMTree.Internal.IndexCompact as Index import Database.LSMTree.Internal.Page (PageNo (..), PageSpan (..), @@ -167,12 +167,12 @@ data Result m h data Entry m h = Entry - !(E.Entry SerialisedValue (BlobRef m h)) + !(E.Entry SerialisedValue (RawBlobRef m h)) | -- | A large entry. The caller might be interested in various different -- (redundant) representation, so we return all of them. EntryOverflow -- | The value is just a prefix, with the remainder in the overflow pages. - !(E.Entry SerialisedValue (BlobRef m h)) + !(E.Entry SerialisedValue (RawBlobRef m h)) -- | A page containing the single entry (or rather its prefix). !RawPage -- | Non-zero length of the overflow in bytes. @@ -186,7 +186,7 @@ data Entry m h = ![RawOverflowPage] mkEntryOverflow :: - E.Entry SerialisedValue (BlobRef m h) + E.Entry SerialisedValue (RawBlobRef m h) -> RawPage -> Word32 -> [RawOverflowPage] @@ -198,7 +198,7 @@ mkEntryOverflow entryPrefix page len overflowPages = EntryOverflow entryPrefix page len overflowPages {-# INLINE toFullEntry #-} -toFullEntry :: Entry m h -> E.Entry SerialisedValue (BlobRef m h) +toFullEntry :: Entry m h -> E.Entry SerialisedValue (RawBlobRef m h) toFullEntry = \case Entry e -> e @@ -214,13 +214,13 @@ appendOverflow len overflowPages (SerialisedValue prefix) = {-# SPECIALISE next :: RunReader IO h - -> IO (Result IO (FS.Handle h)) #-} + -> IO (Result IO h) #-} -- | Stop using the 'RunReader' after getting 'Empty', because the 'Reader' is -- automatically closed! next :: forall m h. (MonadCatch m, MonadSTM m, MonadST m) => RunReader m h - -> m (Result m (FS.Handle h)) + -> m (Result m h) next reader@RunReader {..} = do readMutVar readerCurrentPage >>= \case Nothing -> @@ -229,7 +229,7 @@ next reader@RunReader {..} = do entryNo <- readPrimVar readerCurrentEntryNo go entryNo page where - go :: Word16 -> RawPage -> m (Result m (FS.Handle h)) + go :: Word16 -> RawPage -> m (Result m h) go !entryNo !page = -- take entry from current page (resolve blob if necessary) case rawPageIndex page entryNo of @@ -246,14 +246,14 @@ next reader@RunReader {..} = do go 0 p -- try again on the new page IndexEntry key entry -> do modifyPrimVar readerCurrentEntryNo (+1) - let entry' = fmap (Run.mkBlobRefForRun readerRun) entry + let entry' = fmap (Run.mkRawBlobRef readerRun) entry let rawEntry = Entry entry' return (ReadEntry key rawEntry) IndexEntryOverflow key entry lenSuffix -> do -- TODO: we know that we need the next page, could already load? modifyPrimVar readerCurrentEntryNo (+1) - let entry' :: E.Entry SerialisedValue (BlobRef m (FS.Handle h)) - entry' = fmap (Run.mkBlobRefForRun readerRun) entry + let entry' :: E.Entry SerialisedValue (RawBlobRef m h) + entry' = fmap (Run.mkRawBlobRef readerRun) entry overflowPages <- readOverflowPages readerHasFS readerKOpsHandle lenSuffix let rawEntry = mkEntryOverflow entry' page lenSuffix overflowPages return (ReadEntry key rawEntry) diff --git a/src/Database/LSMTree/Internal/RunReaders.hs b/src/Database/LSMTree/Internal/RunReaders.hs index e9d33d0da..978533a0a 100644 --- a/src/Database/LSMTree/Internal/RunReaders.hs +++ b/src/Database/LSMTree/Internal/RunReaders.hs @@ -26,7 +26,7 @@ import Data.Maybe (catMaybes) import Data.Primitive.MutVar import Data.Traversable (for) import qualified Data.Vector as V -import Database.LSMTree.Internal.BlobRef (BlobRef) +import Database.LSMTree.Internal.BlobRef (RawBlobRef) import Database.LSMTree.Internal.Entry (Entry (..)) import Database.LSMTree.Internal.Run (Run) import Database.LSMTree.Internal.RunReader (OffsetKey (..), @@ -73,7 +73,7 @@ data ReadCtx m h = ReadCtx { -- Using an 'STRef' could avoid reallocating the record for every entry, -- but that might not be straightforward to integrate with the heap. readCtxHeadKey :: !SerialisedKey - , readCtxHeadEntry :: !(Reader.Entry m (FS.Handle h)) + , readCtxHeadEntry :: !(Reader.Entry m h) -- We could get rid of this by making 'LoserTree' stable (for which there -- is a prototype already). -- Alternatively, if we decide to have an invariant that the number in @@ -103,9 +103,9 @@ data Reader m h = -- having to find the next entry in the Map again (requiring key -- comparisons) or having to copy out all entries. -- TODO: more efficient representation? benchmark! - | ReadBuffer !(MutVar (PrimState m) [KOp m (FS.Handle h)]) + | ReadBuffer !(MutVar (PrimState m) [KOp m h]) -type KOp m h = (SerialisedKey, Entry SerialisedValue (BlobRef m h)) +type KOp m h = (SerialisedKey, Entry SerialisedValue (RawBlobRef m h)) {-# SPECIALISE new :: OffsetKey @@ -132,7 +132,7 @@ new !offsetKey wbs runs = do -> m (Maybe (ReadCtx m h)) fromWB wb wbblobs = do --TODO: this BlobSpan to BlobRef conversion involves quite a lot of allocation - kops <- newMutVar $ map (fmap (fmap (WB.mkBlobRef wbblobs))) $ + kops <- newMutVar $ map (fmap (fmap (WB.mkRawBlobRef wbblobs))) $ Map.toList $ filterWB $ WB.toMap wb nextReadCtx (ReaderNumber 0) (ReadBuffer kops) where @@ -184,11 +184,11 @@ data HasMore = HasMore | Drained {-# SPECIALISE pop :: Readers IO h - -> IO (SerialisedKey, Reader.Entry IO (FS.Handle h), HasMore) #-} + -> IO (SerialisedKey, Reader.Entry IO h, HasMore) #-} pop :: (MonadCatch m, MonadSTM m, MonadST m) => Readers m h - -> m (SerialisedKey, Reader.Entry m (FS.Handle h), HasMore) + -> m (SerialisedKey, Reader.Entry m h, HasMore) pop r@Readers {..} = do ReadCtx {..} <- readMutVar readersNext hasMore <- dropOne r readCtxNumber readCtxReader diff --git a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs index 33c3b24f2..8137235e8 100644 --- a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs +++ b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs @@ -27,8 +27,8 @@ module Database.LSMTree.Internal.WriteBufferBlobs ( addReference, removeReference, addBlob, - readBlob, - mkBlobRef, + mkRawBlobRef, + mkWeakBlobRef, -- * For tests FilePointer (..) ) where @@ -37,16 +37,15 @@ import Control.DeepSeq (NFData (..)) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive (PrimMonad, PrimState) import qualified Control.RefCount as RC -import Data.Primitive.ByteArray as P import Data.Primitive.PrimVar as P -import qualified Data.Vector.Primitive as VP import Data.Word (Word64) -import Database.LSMTree.Internal.BlobRef (BlobRef (..), BlobSpan (..)) -import Database.LSMTree.Internal.RawBytes as RB +import Database.LSMTree.Internal.BlobFile hiding (removeReference) +import qualified Database.LSMTree.Internal.BlobFile as BlobFile +import Database.LSMTree.Internal.BlobRef (RawBlobRef (..), + WeakBlobRef (..)) import Database.LSMTree.Internal.Serialise import qualified System.FS.API as FS import System.FS.API (HasFS) -import qualified System.Posix.Types as FS (ByteCount) -- | A single 'WriteBufferBlobs' may be shared between multiple tables. -- As a consequence of being shared, the management of the shared state has to @@ -102,17 +101,14 @@ import qualified System.Posix.Types as FS (ByteCount) -- data WriteBufferBlobs m h = WriteBufferBlobs { - blobFileHandle :: {-# UNPACK #-} !(FS.Handle h) + blobFile :: !(BlobFile m h) -- | The manually tracked file pointer. - , blobFilePointer :: !(FilePointer m) - - -- | The reference counter for the blob file. - , blobFileRefCounter :: {-# UNPACK #-} !(RC.RefCounter m) + , blobFilePointer :: !(FilePointer m) } instance NFData h => NFData (WriteBufferBlobs m h) where - rnf (WriteBufferBlobs a b c) = rnf a `seq` rnf b `seq` rnf c + rnf (WriteBufferBlobs a b) = rnf a `seq` rnf b {-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (WriteBufferBlobs IO h) #-} new :: PrimMonad m @@ -122,33 +118,23 @@ new :: PrimMonad m new fs blobFileName = do -- Must use read/write mode because we write blobs when adding, but -- we can also be asked to retrieve blobs at any time. - blobFileHandle <- FS.hOpen fs blobFileName (FS.ReadWriteMode FS.MustBeNew) + blobFile <- openBlobFile fs blobFileName (FS.ReadWriteMode FS.MustBeNew) + RemoveFileOnClose blobFilePointer <- newFilePointer - blobFileRefCounter <- RC.mkRefCounter1 (Just (finaliser fs blobFileHandle)) return WriteBufferBlobs { - blobFileHandle, - blobFilePointer, - blobFileRefCounter + blobFile, + blobFilePointer } -{-# SPECIALISE finaliser :: HasFS IO h -> FS.Handle h -> IO () #-} -finaliser :: PrimMonad m - => HasFS m h - -> FS.Handle h - -> m () -finaliser fs h = do - FS.hClose fs h - FS.removeFile fs (FS.handlePath h) - {-# SPECIALISE addReference :: WriteBufferBlobs IO h -> IO () #-} addReference :: PrimMonad m => WriteBufferBlobs m h -> m () -addReference WriteBufferBlobs {blobFileRefCounter} = - RC.addReference blobFileRefCounter +addReference WriteBufferBlobs {blobFile} = + RC.addReference (blobFileRefCounter blobFile) {-# SPECIALISE removeReference :: WriteBufferBlobs IO h -> IO () #-} removeReference :: (PrimMonad m, MonadMask m) => WriteBufferBlobs m h -> m () -removeReference WriteBufferBlobs {blobFileRefCounter} = - RC.removeReference blobFileRefCounter +removeReference WriteBufferBlobs {blobFile} = + BlobFile.removeReference blobFile {-# SPECIALISE addBlob :: HasFS IO h -> WriteBufferBlobs IO h -> SerialisedBlob -> IO BlobSpan #-} addBlob :: (PrimMonad m, MonadThrow m) @@ -156,59 +142,35 @@ addBlob :: (PrimMonad m, MonadThrow m) -> WriteBufferBlobs m h -> SerialisedBlob -> m BlobSpan -addBlob fs WriteBufferBlobs {blobFileHandle, blobFilePointer} blob = do +addBlob fs WriteBufferBlobs {blobFile, blobFilePointer} blob = do let blobsize = sizeofBlob blob bloboffset <- updateFilePointer blobFilePointer blobsize - writeBlobAtOffset fs blobFileHandle blob bloboffset + BlobFile.writeBlob fs blobFile blob bloboffset return BlobSpan { blobSpanOffset = bloboffset, blobSpanSize = fromIntegral blobsize } -{-# SPECIALISE writeBlobAtOffset :: HasFS IO h -> FS.Handle h -> SerialisedBlob -> Word64 -> IO () #-} -writeBlobAtOffset :: (PrimMonad m, MonadThrow m) - => HasFS m h - -> FS.Handle h - -> SerialisedBlob - -> Word64 - -> m () -writeBlobAtOffset fs h (SerialisedBlob' (VP.Vector boff blen ba)) off = do - mba <- P.unsafeThawByteArray ba - _ <- FS.hPutBufExactlyAt - fs h mba - (FS.BufferOffset boff) - (fromIntegral blen :: FS.ByteCount) - (FS.AbsOffset off) - return () - -{-# SPECIALISE readBlob :: HasFS IO h -> WriteBufferBlobs IO h -> BlobSpan -> IO SerialisedBlob #-} -readBlob :: (PrimMonad m, MonadThrow m) - => HasFS m h - -> WriteBufferBlobs m h - -> BlobSpan - -> m SerialisedBlob -readBlob fs WriteBufferBlobs {blobFileHandle} - BlobSpan {blobSpanOffset, blobSpanSize} = do - let off = FS.AbsOffset blobSpanOffset - len :: Int - len = fromIntegral blobSpanSize - mba <- P.newPinnedByteArray len - _ <- FS.hGetBufExactlyAt fs blobFileHandle mba 0 - (fromIntegral len :: FS.ByteCount) off - ba <- P.unsafeFreezeByteArray mba - let !rb = RB.fromByteArray 0 len ba - return (SerialisedBlob rb) - - --- | Helper function to make a 'BlobRef' that points into a 'WriteBufferBlobs'. -mkBlobRef :: WriteBufferBlobs m h - -> BlobSpan - -> BlobRef m (FS.Handle h) -mkBlobRef WriteBufferBlobs {blobFileHandle, blobFileRefCounter} blobRefSpan = - BlobRef { - blobRefFile = blobFileHandle, - blobRefCount = blobFileRefCounter, - blobRefSpan +-- | Helper function to make a 'RawBlobRef' that points into a +-- 'WriteBufferBlobs'. +mkRawBlobRef :: WriteBufferBlobs m h + -> BlobSpan + -> RawBlobRef m h +mkRawBlobRef WriteBufferBlobs {blobFile} blobspan = + RawBlobRef { + rawBlobRefFile = blobFile, + rawBlobRefSpan = blobspan + } + +-- | Helper function to make a 'WeakBlobRef' that points into a +-- 'WriteBufferBlobs'. +mkWeakBlobRef :: WriteBufferBlobs m h + -> BlobSpan + -> WeakBlobRef m h +mkWeakBlobRef WriteBufferBlobs {blobFile} blobspan = + WeakBlobRef { + weakBlobRefFile = blobFile, + weakBlobRefSpan = blobspan } diff --git a/src/Database/LSMTree/Normal.hs b/src/Database/LSMTree/Normal.hs index d24bd858d..0c285061f 100644 --- a/src/Database/LSMTree/Normal.hs +++ b/src/Database/LSMTree/Normal.hs @@ -130,7 +130,6 @@ import qualified Database.LSMTree.Internal.Entry as Entry import qualified Database.LSMTree.Internal.Serialise as Internal import qualified Database.LSMTree.Internal.Snapshot as Internal import qualified Database.LSMTree.Internal.Vector as V -import qualified System.FS.API as FS -- $resource-management -- Sessions, tables and cursors use resources and as such need to be @@ -629,8 +628,8 @@ retrieveBlobs (Internal.Session' (sesh :: Internal.Session m h)) refs = V.map Internal.deserialiseBlob <$> Internal.retrieveBlobs sesh (V.imap checkBlobRefType refs) where - checkBlobRefType _ (BlobRef (ref :: Internal.WeakBlobRef m (FS.Handle h'))) - | Just Refl <- eqT @(FS.Handle h) @(FS.Handle h') = ref + checkBlobRefType _ (BlobRef (ref :: Internal.WeakBlobRef m h')) + | Just Refl <- eqT @h @h' = ref checkBlobRefType i _ = throw (Internal.ErrBlobRefInvalid i) {------------------------------------------------------------------------------- diff --git a/test/Test/Database/LSMTree/Internal.hs b/test/Test/Database/LSMTree/Internal.hs index e192827a5..5115ebc58 100644 --- a/test/Test/Database/LSMTree/Internal.hs +++ b/test/Test/Database/LSMTree/Internal.hs @@ -195,9 +195,9 @@ prop_interimOpenTable dat = ioProperty $ conf = testTableConfig fetchBlobs :: FS.HasFS IO h - -> (V.Vector (Maybe (Entry v (WeakBlobRef IO (FS.Handle h))))) + -> (V.Vector (Maybe (Entry v (WeakBlobRef IO h)))) -> IO (V.Vector (Maybe (Entry v SerialisedBlob))) - fetchBlobs hfs = traverse (traverse (traverse (fetchBlob hfs))) + fetchBlobs hfs = traverse (traverse (traverse (readWeakBlobRef hfs))) Test.InMemLookupData { runData, lookups = keysToLookup } = dat ks = V.map serialiseKey (V.fromList keysToLookup) @@ -240,9 +240,9 @@ prop_roundtripCursor lb ub kops = ioProperty $ conf = testTableConfig fetchBlobs :: FS.HasFS IO h - -> V.Vector (k, (v, Maybe (WeakBlobRef IO (FS.Handle h)))) + -> V.Vector (k, (v, Maybe (WeakBlobRef IO h))) -> IO (V.Vector (k, (v, Maybe SerialisedBlob))) - fetchBlobs hfs = traverse (traverse (traverse (traverse (fetchBlob hfs)))) + fetchBlobs hfs = traverse (traverse (traverse (traverse (readWeakBlobRef hfs)))) toOffsetKey = maybe NoOffsetKey (OffsetKey . coerce) @@ -272,7 +272,7 @@ readCursorUntil :: -> Cursor IO h -> IO (V.Vector (KeyForIndexCompact, (SerialisedValue, - Maybe (WeakBlobRef IO (FS.Handle h))))) + Maybe (WeakBlobRef IO h)))) readCursorUntil resolve ub cursor = go V.empty where chunkSize = 50 @@ -288,6 +288,3 @@ readCursorUntil resolve ub cursor = go V.empty appendSerialisedValue :: ResolveSerialisedValue appendSerialisedValue (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) - -fetchBlob :: FS.HasFS IO h -> WeakBlobRef IO (FS.Handle h) -> IO SerialisedBlob -fetchBlob hfs bref = withWeakBlobRef bref (readBlob hfs) diff --git a/test/Test/Database/LSMTree/Internal/Lookup.hs b/test/Test/Database/LSMTree/Internal/Lookup.hs index 031197c7f..186f7307a 100644 --- a/test/Test/Database/LSMTree/Internal/Lookup.hs +++ b/test/Test/Database/LSMTree/Internal/Lookup.hs @@ -44,8 +44,7 @@ import Database.LSMTree.Extras.Generators import Database.LSMTree.Extras.RunData (RunData (..), liftArbitrary2Map, liftShrink2Map, unsafeFlushAsWriteBuffer) -import Database.LSMTree.Internal.BlobRef (BlobSpan, WeakBlobRef, - readBlob, withWeakBlobRef) +import Database.LSMTree.Internal.BlobRef import Database.LSMTree.Internal.Entry as Entry import Database.LSMTree.Internal.IndexCompact as Index import Database.LSMTree.Internal.Lookup @@ -323,11 +322,9 @@ prop_roundtripFromWriteBufferLookupIO (SmallList dats) = resolveV = \(SerialisedValue v1) (SerialisedValue v2) -> SerialisedValue (v1 <> v2) fetchBlobs :: FS.HasFS IO h - -> (V.Vector (Maybe (Entry v (WeakBlobRef IO (FS.Handle h))))) + -> (V.Vector (Maybe (Entry v (WeakBlobRef IO h)))) -> IO (V.Vector (Maybe (Entry v SerialisedBlob))) - fetchBlobs hfs = traverse (traverse (traverse fetchBlob)) - where - fetchBlob bref = withWeakBlobRef bref (readBlob hfs) + fetchBlobs hfs = traverse (traverse (traverse (readWeakBlobRef hfs))) -- | Given a bunch of 'InMemLookupData', prepare the data into the form needed -- for 'lookupsIO': a write buffer (and blobs) and a vector of on-disk runs. diff --git a/test/Test/Database/LSMTree/Internal/Merge.hs b/test/Test/Database/LSMTree/Internal/Merge.hs index bf4f4d94a..8b2ff5d6f 100644 --- a/test/Test/Database/LSMTree/Internal/Merge.hs +++ b/test/Test/Database/LSMTree/Internal/Merge.hs @@ -12,6 +12,7 @@ import qualified Data.Vector as V import Database.LSMTree.Extras import Database.LSMTree.Extras.Generators (KeyForIndexCompact) import Database.LSMTree.Extras.RunData +import qualified Database.LSMTree.Internal.BlobFile as BlobFile import qualified Database.LSMTree.Internal.Entry as Entry import qualified Database.LSMTree.Internal.Merge as Merge import Database.LSMTree.Internal.PageAcc (entryWouldFitInPage) @@ -73,9 +74,9 @@ prop_MergeDistributes fs hbio level stepSize (SmallList rds) = withRun fs hbio (simplePath 1) (RunData $ mergeWriteBuffers level $ fmap unRunData rds') $ \rhs -> do lhsKOpsFile <- FS.hGetAll fs (Run.runKOpsFile lhs) - lhsBlobFile <- FS.hGetAll fs (Run.runBlobFile lhs) + lhsBlobFile <- FS.hGetAll fs (BlobFile.blobFileHandle (Run.runBlobFile lhs)) rhsKOpsFile <- FS.hGetAll fs (Run.runKOpsFile rhs) - rhsBlobFile <- FS.hGetAll fs (Run.runBlobFile rhs) + rhsBlobFile <- FS.hGetAll fs (BlobFile.blobFileHandle (Run.runBlobFile rhs)) lhsKOps <- readKOps Nothing lhs rhsKOps <- readKOps Nothing rhs diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index d989d89f2..fe7df6b32 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -25,6 +25,7 @@ import Test.Tasty.QuickCheck import Control.RefCount (RefCount (..), readRefCount) import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..)) import Database.LSMTree.Extras.RunData +import Database.LSMTree.Internal.BlobFile (BlobFile (..)) import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) import qualified Database.LSMTree.Internal.CRC32C as CRC import Database.LSMTree.Internal.Entry @@ -195,8 +196,8 @@ prop_WriteAndOpen fs hbio wb = (FS.handlePath (runKOpsFile written)) (FS.handlePath (runKOpsFile loaded)) assertEqual "blob file" - (FS.handlePath (runBlobFile written)) - (FS.handlePath (runBlobFile loaded)) + (FS.handlePath (blobFileHandle (runBlobFile written))) + (FS.handlePath (blobFileHandle (runBlobFile loaded))) -- make sure runs get closed again removeReference loaded diff --git a/test/Test/Database/LSMTree/Internal/RunReader.hs b/test/Test/Database/LSMTree/Internal/RunReader.hs index 4fb7db498..368f39ae1 100644 --- a/test/Test/Database/LSMTree/Internal/RunReader.hs +++ b/test/Test/Database/LSMTree/Internal/RunReader.hs @@ -9,7 +9,7 @@ import Data.Coerce (coerce) import qualified Data.Map as Map import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..)) import Database.LSMTree.Extras.RunData -import Database.LSMTree.Internal.BlobRef (readBlob) +import Database.LSMTree.Internal.BlobRef import Database.LSMTree.Internal.Entry (Entry) import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.RunReader as Reader @@ -179,6 +179,6 @@ readKOps offset run = do Reader.Empty -> return [] Reader.ReadEntry key e -> do let fs = Reader.readerHasFS reader - e' <- traverse (readBlob fs) $ Reader.toFullEntry e + e' <- traverse (readRawBlobRef fs) $ Reader.toFullEntry e ((key, e') :) <$> go reader diff --git a/test/Test/Database/LSMTree/Internal/RunReaders.hs b/test/Test/Database/LSMTree/Internal/RunReaders.hs index 169714e1f..67bd0e731 100644 --- a/test/Test/Database/LSMTree/Internal/RunReaders.hs +++ b/test/Test/Database/LSMTree/Internal/RunReaders.hs @@ -18,7 +18,7 @@ import Data.Word (Word64) import Database.LSMTree.Extras (showPowersOf) import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..)) import Database.LSMTree.Extras.RunData -import Database.LSMTree.Internal.BlobRef (readBlob) +import Database.LSMTree.Internal.BlobRef import Database.LSMTree.Internal.Entry import qualified Database.LSMTree.Internal.Paths as Paths import qualified Database.LSMTree.Internal.Run as Run @@ -390,5 +390,5 @@ runIO act lu = case act of put (RealState n Nothing) return (Right x) - toMockEntry :: FS.HasFS IO MockFS.HandleMock -> Reader.Entry IO (FS.Handle MockFS.HandleMock) -> IO SerialisedEntry - toMockEntry hfs = traverse (readBlob hfs) . Reader.toFullEntry + toMockEntry :: FS.HasFS IO MockFS.HandleMock -> Reader.Entry IO MockFS.HandleMock -> IO SerialisedEntry + toMockEntry hfs = traverse (readRawBlobRef hfs) . Reader.toFullEntry