Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ library
Database.LSMTree.Internal.Chunk
Database.LSMTree.Internal.Config
Database.LSMTree.Internal.CRC32C
Database.LSMTree.Internal.Cursor
Database.LSMTree.Internal.Entry
Database.LSMTree.Internal.IndexCompact
Database.LSMTree.Internal.IndexCompactAcc
Expand Down
129 changes: 13 additions & 116 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
{-# LANGUAGE DataKinds #-}

-- | This module brings together the internal parts to provide an API in terms
-- of untyped serialised keys, values and blobs. It makes no distinction between
-- normal and monoidal tables, accepting both blobs and mupserts.
-- The typed [normal]("Database.LSMTree.Normal") and
-- [monoidal]("Database.LSMTree.Monoidal") APIs then provide more type-safe
-- wrappers and handle serialisation.
--
-- Apart from defining the API, this module mainly deals with concurrency- and
-- exception-safe opening and closing of resources. Any other non-trivial logic
-- should live somewhere else.
--
module Database.LSMTree.Internal (
-- * Existentials
Session' (..)
Expand Down Expand Up @@ -84,8 +95,8 @@ import Data.Word (Word64)
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.Config
import qualified Database.LSMTree.Internal.Cursor as Cursor
import Database.LSMTree.Internal.Entry (Entry)
import qualified Database.LSMTree.Internal.Entry as Entry
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
ResolveSerialisedValue, lookupsIO)
import Database.LSMTree.Internal.MergeSchedule
Expand All @@ -96,14 +107,12 @@ import Database.LSMTree.Internal.Range (Range (..))
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import qualified Database.LSMTree.Internal.RunReader as Reader
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
SerialisedKey, SerialisedValue)
import Database.LSMTree.Internal.Snapshot
import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
Expand Down Expand Up @@ -855,8 +864,6 @@ retrieveBlobs sesh wrefs =
Cursors
-------------------------------------------------------------------------------}

-- TODO: Move to a separate Cursors module

-- | A read-only view into the table state at the time of cursor creation.
--
-- For more information, see 'Database.LSMTree.Normal.Cursor'.
Expand Down Expand Up @@ -1059,123 +1066,13 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
-- a drained cursor will just return an empty vector
return (state, V.empty)
Just readers -> do
(vec, hasMore) <- readCursorEntriesWhile resolve keyIsWanted fromEntry readers n
(vec, hasMore) <- Cursor.readEntriesWhile resolve keyIsWanted fromEntry readers n
-- if we drained the readers, remove them from the state
let !state' = case hasMore of
Readers.HasMore -> state
Readers.Drained -> CursorOpen (cursorEnv {cursorReaders = Nothing})
return (state', vec)

{-# INLINE readCursorEntriesWhile #-}
{-# SPECIALISE readCursorEntriesWhile :: forall h res.
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
-> Readers.Readers IO h
-> Int
-> IO (V.Vector res, Readers.HasMore) #-}
-- | General notes on the code below:
-- * it is quite similar to the one in Internal.Merge, but different enough
-- that it's probably easier to keep them separate
-- * any function that doesn't take a 'hasMore' argument assumes that the
-- readers have not been drained yet, so we must check before calling them
-- * there is probably opportunity for optimisations
readCursorEntriesWhile :: forall h m res.
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
-> Readers.Readers m h
-> Int
-> m (V.Vector res, Readers.HasMore)
readCursorEntriesWhile resolve keyIsWanted fromEntry readers n =
flip (V.unfoldrNM' n) Readers.HasMore $ \case
Readers.Drained -> return (Nothing, Readers.Drained)
Readers.HasMore -> readEntryIfWanted
where
-- Produces a result unless the readers have been drained or 'keyIsWanted'
-- returned False.
readEntryIfWanted :: m (Maybe res, Readers.HasMore)
readEntryIfWanted = do
key <- Readers.peekKey readers
if keyIsWanted key then readEntry
else return (Nothing, Readers.HasMore)

readEntry :: m (Maybe res, Readers.HasMore)
readEntry = do
(key, readerEntry, hasMore) <- Readers.pop readers
let !entry = Reader.toFullEntry readerEntry
case hasMore of
Readers.Drained -> do
handleResolved key entry Readers.Drained
Readers.HasMore -> do
case entry of
Entry.Mupdate v ->
handleMupdate key v
_ -> do
-- Anything but Mupdate supersedes all previous entries of
-- the same key, so we can simply drop them and are done.
hasMore' <- dropRemaining key
handleResolved key entry hasMore'

dropRemaining :: SerialisedKey -> m Readers.HasMore
dropRemaining key = do
(_, hasMore) <- Readers.dropWhileKey readers key
return hasMore

-- Resolve a 'Mupsert' value with the other entries of the same key.
handleMupdate :: SerialisedKey
-> SerialisedValue
-> m (Maybe res, Readers.HasMore)
handleMupdate key v = do
nextKey <- Readers.peekKey readers
if nextKey /= key
then
-- No more entries for same key, done.
handleResolved key (Entry.Mupdate v) Readers.HasMore
else do
(_, nextEntry, hasMore) <- Readers.pop readers
let resolved = Entry.combine resolve (Entry.Mupdate v)
(Reader.toFullEntry nextEntry)
case hasMore of
Readers.HasMore -> case resolved of
Entry.Mupdate v' ->
-- Still a mupsert, keep resolving!
handleMupdate key v'
_ -> do
-- Done with this key, remaining entries are obsolete.
hasMore' <- dropRemaining key
handleResolved key resolved hasMore'
Readers.Drained -> do
handleResolved key resolved Readers.Drained

-- Once we have a resolved entry, we still have to make sure it's not
-- a 'Delete', since we only want to write values to the result vector.
handleResolved :: SerialisedKey
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
-> Readers.HasMore
-> m (Maybe res, Readers.HasMore)
handleResolved key entry hasMore =
case toResult key entry of
Just !res ->
-- Found one resolved value, done.
return (Just res, hasMore)
Nothing ->
-- Resolved value was a Delete, which we don't want to include.
-- So look for another one (unless there are no more entries!).
case hasMore of
Readers.HasMore -> readEntryIfWanted
Readers.Drained -> return (Nothing, Readers.Drained)

toResult :: SerialisedKey
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
-> Maybe res
toResult key = \case
Entry.Insert v -> Just $ fromEntry key v Nothing
Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b))
Entry.Mupdate v -> Just $ fromEntry key v Nothing
Entry.Delete -> Nothing

{-------------------------------------------------------------------------------
Snapshots
-------------------------------------------------------------------------------}
Expand Down
132 changes: 132 additions & 0 deletions src/Database/LSMTree/Internal/Cursor.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
{-# LANGUAGE DataKinds #-}

module Database.LSMTree.Internal.Cursor (
readEntriesWhile
) where

import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadST (MonadST (..))
import Control.Monad.Class.MonadThrow
import Control.Monad.Fix (MonadFix)
import qualified Data.Vector as V
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.Entry (Entry)
import qualified Database.LSMTree.Internal.Entry as Entry
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import qualified Database.LSMTree.Internal.RunReader as Reader
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedKey,
SerialisedValue)
import qualified Database.LSMTree.Internal.Vector as V
import System.FS.API (Handle)

{-# INLINE readEntriesWhile #-}
{-# SPECIALISE readEntriesWhile :: forall h res.
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
-> Readers.Readers IO h
-> Int
-> IO (V.Vector res, Readers.HasMore) #-}
-- | General notes on the code below:
-- * it is quite similar to the one in Internal.Merge, but different enough
-- that it's probably easier to keep them separate
-- * any function that doesn't take a 'hasMore' argument assumes that the
-- readers have not been drained yet, so we must check before calling them
-- * there is probably opportunity for optimisations
readEntriesWhile :: forall h m res.
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
-> Readers.Readers m h
-> Int
-> m (V.Vector res, Readers.HasMore)
readEntriesWhile resolve keyIsWanted fromEntry readers n =
flip (V.unfoldrNM' n) Readers.HasMore $ \case
Readers.Drained -> return (Nothing, Readers.Drained)
Readers.HasMore -> readEntryIfWanted
where
-- Produces a result unless the readers have been drained or 'keyIsWanted'
-- returned False.
readEntryIfWanted :: m (Maybe res, Readers.HasMore)
readEntryIfWanted = do
key <- Readers.peekKey readers
if keyIsWanted key then readEntry
else return (Nothing, Readers.HasMore)

readEntry :: m (Maybe res, Readers.HasMore)
readEntry = do
(key, readerEntry, hasMore) <- Readers.pop readers
let !entry = Reader.toFullEntry readerEntry
case hasMore of
Readers.Drained -> do
handleResolved key entry Readers.Drained
Readers.HasMore -> do
case entry of
Entry.Mupdate v ->
handleMupdate key v
_ -> do
-- Anything but Mupdate supersedes all previous entries of
-- the same key, so we can simply drop them and are done.
hasMore' <- dropRemaining key
handleResolved key entry hasMore'

dropRemaining :: SerialisedKey -> m Readers.HasMore
dropRemaining key = do
(_, hasMore) <- Readers.dropWhileKey readers key
return hasMore

-- Resolve a 'Mupsert' value with the other entries of the same key.
handleMupdate :: SerialisedKey
-> SerialisedValue
-> m (Maybe res, Readers.HasMore)
handleMupdate key v = do
nextKey <- Readers.peekKey readers
if nextKey /= key
then
-- No more entries for same key, done.
handleResolved key (Entry.Mupdate v) Readers.HasMore
else do
(_, nextEntry, hasMore) <- Readers.pop readers
let resolved = Entry.combine resolve (Entry.Mupdate v)
(Reader.toFullEntry nextEntry)
case hasMore of
Readers.HasMore -> case resolved of
Entry.Mupdate v' ->
-- Still a mupsert, keep resolving!
handleMupdate key v'
_ -> do
-- Done with this key, remaining entries are obsolete.
hasMore' <- dropRemaining key
handleResolved key resolved hasMore'
Readers.Drained -> do
handleResolved key resolved Readers.Drained

-- Once we have a resolved entry, we still have to make sure it's not
-- a 'Delete', since we only want to write values to the result vector.
handleResolved :: SerialisedKey
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
-> Readers.HasMore
-> m (Maybe res, Readers.HasMore)
handleResolved key entry hasMore =
case toResult key entry of
Just !res ->
-- Found one resolved value, done.
return (Just res, hasMore)
Nothing ->
-- Resolved value was a Delete, which we don't want to include.
-- So look for another one (unless there are no more entries!).
case hasMore of
Readers.HasMore -> readEntryIfWanted
Readers.Drained -> return (Nothing, Readers.Drained)

toResult :: SerialisedKey
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
-> Maybe res
toResult key = \case
Entry.Insert v -> Just $ fromEntry key v Nothing
Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b))
Entry.Mupdate v -> Just $ fromEntry key v Nothing
Entry.Delete -> Nothing