diff --git a/lsm-tree.cabal b/lsm-tree.cabal index 24fb8abc6..1f28051b1 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -126,6 +126,7 @@ library Database.LSMTree.Internal.Chunk Database.LSMTree.Internal.Config Database.LSMTree.Internal.CRC32C + Database.LSMTree.Internal.Cursor Database.LSMTree.Internal.Entry Database.LSMTree.Internal.IndexCompact Database.LSMTree.Internal.IndexCompactAcc diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index b8a37ca89..e3c7334d5 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -1,5 +1,16 @@ {-# LANGUAGE DataKinds #-} +-- | This module brings together the internal parts to provide an API in terms +-- of untyped serialised keys, values and blobs. It makes no distinction between +-- normal and monoidal tables, accepting both blobs and mupserts. +-- The typed [normal]("Database.LSMTree.Normal") and +-- [monoidal]("Database.LSMTree.Monoidal") APIs then provide more type-safe +-- wrappers and handle serialisation. +-- +-- Apart from defining the API, this module mainly deals with concurrency- and +-- exception-safe opening and closing of resources. Any other non-trivial logic +-- should live somewhere else. +-- module Database.LSMTree.Internal ( -- * Existentials Session' (..) @@ -84,8 +95,8 @@ import Data.Word (Word64) import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..)) import qualified Database.LSMTree.Internal.BlobRef as BlobRef import Database.LSMTree.Internal.Config +import qualified Database.LSMTree.Internal.Cursor as Cursor import Database.LSMTree.Internal.Entry (Entry) -import qualified Database.LSMTree.Internal.Entry as Entry import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy, ResolveSerialisedValue, lookupsIO) import Database.LSMTree.Internal.MergeSchedule @@ -96,14 +107,12 @@ import Database.LSMTree.Internal.Range (Range (..)) import qualified Database.LSMTree.Internal.RawBytes as RB import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.Run as Run -import qualified Database.LSMTree.Internal.RunReader as Reader import Database.LSMTree.Internal.RunReaders (OffsetKey (..)) import qualified Database.LSMTree.Internal.RunReaders as Readers import Database.LSMTree.Internal.Serialise (SerialisedBlob (..), SerialisedKey, SerialisedValue) import Database.LSMTree.Internal.Snapshot import Database.LSMTree.Internal.UniqCounter -import qualified Database.LSMTree.Internal.Vector as V import qualified Database.LSMTree.Internal.WriteBuffer as WB import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB import qualified System.FS.API as FS @@ -855,8 +864,6 @@ retrieveBlobs sesh wrefs = Cursors -------------------------------------------------------------------------------} --- TODO: Move to a separate Cursors module - -- | A read-only view into the table state at the time of cursor creation. -- -- For more information, see 'Database.LSMTree.Normal.Cursor'. @@ -1059,123 +1066,13 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do -- a drained cursor will just return an empty vector return (state, V.empty) Just readers -> do - (vec, hasMore) <- readCursorEntriesWhile resolve keyIsWanted fromEntry readers n + (vec, hasMore) <- Cursor.readEntriesWhile resolve keyIsWanted fromEntry readers n -- if we drained the readers, remove them from the state let !state' = case hasMore of Readers.HasMore -> state Readers.Drained -> CursorOpen (cursorEnv {cursorReaders = Nothing}) return (state', vec) -{-# INLINE readCursorEntriesWhile #-} -{-# SPECIALISE readCursorEntriesWhile :: forall h res. - ResolveSerialisedValue - -> (SerialisedKey -> Bool) - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) - -> Readers.Readers IO h - -> Int - -> IO (V.Vector res, Readers.HasMore) #-} --- | General notes on the code below: --- * it is quite similar to the one in Internal.Merge, but different enough --- that it's probably easier to keep them separate --- * any function that doesn't take a 'hasMore' argument assumes that the --- readers have not been drained yet, so we must check before calling them --- * there is probably opportunity for optimisations -readCursorEntriesWhile :: forall h m res. - (MonadFix m, MonadMask m, MonadST m, MonadSTM m) - => ResolveSerialisedValue - -> (SerialisedKey -> Bool) - -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res) - -> Readers.Readers m h - -> Int - -> m (V.Vector res, Readers.HasMore) -readCursorEntriesWhile resolve keyIsWanted fromEntry readers n = - flip (V.unfoldrNM' n) Readers.HasMore $ \case - Readers.Drained -> return (Nothing, Readers.Drained) - Readers.HasMore -> readEntryIfWanted - where - -- Produces a result unless the readers have been drained or 'keyIsWanted' - -- returned False. - readEntryIfWanted :: m (Maybe res, Readers.HasMore) - readEntryIfWanted = do - key <- Readers.peekKey readers - if keyIsWanted key then readEntry - else return (Nothing, Readers.HasMore) - - readEntry :: m (Maybe res, Readers.HasMore) - readEntry = do - (key, readerEntry, hasMore) <- Readers.pop readers - let !entry = Reader.toFullEntry readerEntry - case hasMore of - Readers.Drained -> do - handleResolved key entry Readers.Drained - Readers.HasMore -> do - case entry of - Entry.Mupdate v -> - handleMupdate key v - _ -> do - -- Anything but Mupdate supersedes all previous entries of - -- the same key, so we can simply drop them and are done. - hasMore' <- dropRemaining key - handleResolved key entry hasMore' - - dropRemaining :: SerialisedKey -> m Readers.HasMore - dropRemaining key = do - (_, hasMore) <- Readers.dropWhileKey readers key - return hasMore - - -- Resolve a 'Mupsert' value with the other entries of the same key. - handleMupdate :: SerialisedKey - -> SerialisedValue - -> m (Maybe res, Readers.HasMore) - handleMupdate key v = do - nextKey <- Readers.peekKey readers - if nextKey /= key - then - -- No more entries for same key, done. - handleResolved key (Entry.Mupdate v) Readers.HasMore - else do - (_, nextEntry, hasMore) <- Readers.pop readers - let resolved = Entry.combine resolve (Entry.Mupdate v) - (Reader.toFullEntry nextEntry) - case hasMore of - Readers.HasMore -> case resolved of - Entry.Mupdate v' -> - -- Still a mupsert, keep resolving! - handleMupdate key v' - _ -> do - -- Done with this key, remaining entries are obsolete. - hasMore' <- dropRemaining key - handleResolved key resolved hasMore' - Readers.Drained -> do - handleResolved key resolved Readers.Drained - - -- Once we have a resolved entry, we still have to make sure it's not - -- a 'Delete', since we only want to write values to the result vector. - handleResolved :: SerialisedKey - -> Entry SerialisedValue (BlobRef.BlobRef m (Handle h)) - -> Readers.HasMore - -> m (Maybe res, Readers.HasMore) - handleResolved key entry hasMore = - case toResult key entry of - Just !res -> - -- Found one resolved value, done. - return (Just res, hasMore) - Nothing -> - -- Resolved value was a Delete, which we don't want to include. - -- So look for another one (unless there are no more entries!). - case hasMore of - Readers.HasMore -> readEntryIfWanted - Readers.Drained -> return (Nothing, Readers.Drained) - - toResult :: SerialisedKey - -> Entry SerialisedValue (BlobRef.BlobRef m (Handle h)) - -> Maybe res - toResult key = \case - Entry.Insert v -> Just $ fromEntry key v Nothing - Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b)) - Entry.Mupdate v -> Just $ fromEntry key v Nothing - Entry.Delete -> Nothing - {------------------------------------------------------------------------------- Snapshots -------------------------------------------------------------------------------} diff --git a/src/Database/LSMTree/Internal/Cursor.hs b/src/Database/LSMTree/Internal/Cursor.hs new file mode 100644 index 000000000..b1bc8b815 --- /dev/null +++ b/src/Database/LSMTree/Internal/Cursor.hs @@ -0,0 +1,132 @@ +{-# LANGUAGE DataKinds #-} + +module Database.LSMTree.Internal.Cursor ( + readEntriesWhile + ) where + +import Control.Concurrent.Class.MonadSTM (MonadSTM (..)) +import Control.Monad.Class.MonadST (MonadST (..)) +import Control.Monad.Class.MonadThrow +import Control.Monad.Fix (MonadFix) +import qualified Data.Vector as V +import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..)) +import qualified Database.LSMTree.Internal.BlobRef as BlobRef +import Database.LSMTree.Internal.Entry (Entry) +import qualified Database.LSMTree.Internal.Entry as Entry +import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) +import qualified Database.LSMTree.Internal.RunReader as Reader +import qualified Database.LSMTree.Internal.RunReaders as Readers +import Database.LSMTree.Internal.Serialise (SerialisedKey, + SerialisedValue) +import qualified Database.LSMTree.Internal.Vector as V +import System.FS.API (Handle) + +{-# INLINE readEntriesWhile #-} +{-# SPECIALISE readEntriesWhile :: forall h res. + ResolveSerialisedValue + -> (SerialisedKey -> Bool) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) + -> Readers.Readers IO h + -> Int + -> IO (V.Vector res, Readers.HasMore) #-} +-- | General notes on the code below: +-- * it is quite similar to the one in Internal.Merge, but different enough +-- that it's probably easier to keep them separate +-- * any function that doesn't take a 'hasMore' argument assumes that the +-- readers have not been drained yet, so we must check before calling them +-- * there is probably opportunity for optimisations +readEntriesWhile :: forall h m res. + (MonadFix m, MonadMask m, MonadST m, MonadSTM m) + => ResolveSerialisedValue + -> (SerialisedKey -> Bool) + -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res) + -> Readers.Readers m h + -> Int + -> m (V.Vector res, Readers.HasMore) +readEntriesWhile resolve keyIsWanted fromEntry readers n = + flip (V.unfoldrNM' n) Readers.HasMore $ \case + Readers.Drained -> return (Nothing, Readers.Drained) + Readers.HasMore -> readEntryIfWanted + where + -- Produces a result unless the readers have been drained or 'keyIsWanted' + -- returned False. + readEntryIfWanted :: m (Maybe res, Readers.HasMore) + readEntryIfWanted = do + key <- Readers.peekKey readers + if keyIsWanted key then readEntry + else return (Nothing, Readers.HasMore) + + readEntry :: m (Maybe res, Readers.HasMore) + readEntry = do + (key, readerEntry, hasMore) <- Readers.pop readers + let !entry = Reader.toFullEntry readerEntry + case hasMore of + Readers.Drained -> do + handleResolved key entry Readers.Drained + Readers.HasMore -> do + case entry of + Entry.Mupdate v -> + handleMupdate key v + _ -> do + -- Anything but Mupdate supersedes all previous entries of + -- the same key, so we can simply drop them and are done. + hasMore' <- dropRemaining key + handleResolved key entry hasMore' + + dropRemaining :: SerialisedKey -> m Readers.HasMore + dropRemaining key = do + (_, hasMore) <- Readers.dropWhileKey readers key + return hasMore + + -- Resolve a 'Mupsert' value with the other entries of the same key. + handleMupdate :: SerialisedKey + -> SerialisedValue + -> m (Maybe res, Readers.HasMore) + handleMupdate key v = do + nextKey <- Readers.peekKey readers + if nextKey /= key + then + -- No more entries for same key, done. + handleResolved key (Entry.Mupdate v) Readers.HasMore + else do + (_, nextEntry, hasMore) <- Readers.pop readers + let resolved = Entry.combine resolve (Entry.Mupdate v) + (Reader.toFullEntry nextEntry) + case hasMore of + Readers.HasMore -> case resolved of + Entry.Mupdate v' -> + -- Still a mupsert, keep resolving! + handleMupdate key v' + _ -> do + -- Done with this key, remaining entries are obsolete. + hasMore' <- dropRemaining key + handleResolved key resolved hasMore' + Readers.Drained -> do + handleResolved key resolved Readers.Drained + + -- Once we have a resolved entry, we still have to make sure it's not + -- a 'Delete', since we only want to write values to the result vector. + handleResolved :: SerialisedKey + -> Entry SerialisedValue (BlobRef.BlobRef m (Handle h)) + -> Readers.HasMore + -> m (Maybe res, Readers.HasMore) + handleResolved key entry hasMore = + case toResult key entry of + Just !res -> + -- Found one resolved value, done. + return (Just res, hasMore) + Nothing -> + -- Resolved value was a Delete, which we don't want to include. + -- So look for another one (unless there are no more entries!). + case hasMore of + Readers.HasMore -> readEntryIfWanted + Readers.Drained -> return (Nothing, Readers.Drained) + + toResult :: SerialisedKey + -> Entry SerialisedValue (BlobRef.BlobRef m (Handle h)) + -> Maybe res + toResult key = \case + Entry.Insert v -> Just $ fromEntry key v Nothing + Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b)) + Entry.Mupdate v -> Just $ fromEntry key v Nothing + Entry.Delete -> Nothing