Skip to content

Commit a5fe228

Browse files
committed
move Cursor read logic into separate module
1 parent 23939c1 commit a5fe228

File tree

3 files changed

+142
-116
lines changed

3 files changed

+142
-116
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ library
126126
Database.LSMTree.Internal.Chunk
127127
Database.LSMTree.Internal.Config
128128
Database.LSMTree.Internal.CRC32C
129+
Database.LSMTree.Internal.Cursor
129130
Database.LSMTree.Internal.Entry
130131
Database.LSMTree.Internal.IndexCompact
131132
Database.LSMTree.Internal.IndexCompactAcc

src/Database/LSMTree/Internal.hs

Lines changed: 9 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
{-# LANGUAGE DataKinds #-}
22

3+
-- | The internal API here is defined in terms of untyped serialised keys,
4+
-- values and blobs. It makes no distinction between normal and monoidal tables,
5+
-- accepting both blobs and mupserts.
6+
--
7+
-- This module mainly deals with concurrency- and exception-safe opening and
8+
-- closing of resources. Any other non-trivial logic should live somewhere else.
9+
--
310
module Database.LSMTree.Internal (
411
-- * Existentials
512
Session' (..)
@@ -84,8 +91,8 @@ import Data.Word (Word64)
8491
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
8592
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
8693
import Database.LSMTree.Internal.Config
94+
import qualified Database.LSMTree.Internal.Cursor as Cursor
8795
import Database.LSMTree.Internal.Entry (Entry)
88-
import qualified Database.LSMTree.Internal.Entry as Entry
8996
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
9097
ResolveSerialisedValue, lookupsIO)
9198
import Database.LSMTree.Internal.MergeSchedule
@@ -96,14 +103,12 @@ import Database.LSMTree.Internal.Range (Range (..))
96103
import qualified Database.LSMTree.Internal.RawBytes as RB
97104
import Database.LSMTree.Internal.Run (Run)
98105
import qualified Database.LSMTree.Internal.Run as Run
99-
import qualified Database.LSMTree.Internal.RunReader as Reader
100106
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
101107
import qualified Database.LSMTree.Internal.RunReaders as Readers
102108
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
103109
SerialisedKey, SerialisedValue)
104110
import Database.LSMTree.Internal.Snapshot
105111
import Database.LSMTree.Internal.UniqCounter
106-
import qualified Database.LSMTree.Internal.Vector as V
107112
import qualified Database.LSMTree.Internal.WriteBuffer as WB
108113
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
109114
import qualified System.FS.API as FS
@@ -855,8 +860,6 @@ retrieveBlobs sesh wrefs =
855860
Cursors
856861
-------------------------------------------------------------------------------}
857862

858-
-- TODO: Move to a separate Cursors module
859-
860863
-- | A read-only view into the table state at the time of cursor creation.
861864
--
862865
-- For more information, see 'Database.LSMTree.Normal.Cursor'.
@@ -1059,123 +1062,13 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
10591062
-- a drained cursor will just return an empty vector
10601063
return (state, V.empty)
10611064
Just readers -> do
1062-
(vec, hasMore) <- readCursorEntriesWhile resolve keyIsWanted fromEntry readers n
1065+
(vec, hasMore) <- Cursor.readEntriesWhile resolve keyIsWanted fromEntry readers n
10631066
-- if we drained the readers, remove them from the state
10641067
let !state' = case hasMore of
10651068
Readers.HasMore -> state
10661069
Readers.Drained -> CursorOpen (cursorEnv {cursorReaders = Nothing})
10671070
return (state', vec)
10681071

1069-
{-# INLINE readCursorEntriesWhile #-}
1070-
{-# SPECIALISE readCursorEntriesWhile :: forall h res.
1071-
ResolveSerialisedValue
1072-
-> (SerialisedKey -> Bool)
1073-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
1074-
-> Readers.Readers IO h
1075-
-> Int
1076-
-> IO (V.Vector res, Readers.HasMore) #-}
1077-
-- | General notes on the code below:
1078-
-- * it is quite similar to the one in Internal.Merge, but different enough
1079-
-- that it's probably easier to keep them separate
1080-
-- * any function that doesn't take a 'hasMore' argument assumes that the
1081-
-- readers have not been drained yet, so we must check before calling them
1082-
-- * there is probably opportunity for optimisations
1083-
readCursorEntriesWhile :: forall h m res.
1084-
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
1085-
=> ResolveSerialisedValue
1086-
-> (SerialisedKey -> Bool)
1087-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
1088-
-> Readers.Readers m h
1089-
-> Int
1090-
-> m (V.Vector res, Readers.HasMore)
1091-
readCursorEntriesWhile resolve keyIsWanted fromEntry readers n =
1092-
flip (V.unfoldrNM' n) Readers.HasMore $ \case
1093-
Readers.Drained -> return (Nothing, Readers.Drained)
1094-
Readers.HasMore -> readEntryIfWanted
1095-
where
1096-
-- Produces a result unless the readers have been drained or 'keyIsWanted'
1097-
-- returned False.
1098-
readEntryIfWanted :: m (Maybe res, Readers.HasMore)
1099-
readEntryIfWanted = do
1100-
key <- Readers.peekKey readers
1101-
if keyIsWanted key then readEntry
1102-
else return (Nothing, Readers.HasMore)
1103-
1104-
readEntry :: m (Maybe res, Readers.HasMore)
1105-
readEntry = do
1106-
(key, readerEntry, hasMore) <- Readers.pop readers
1107-
let !entry = Reader.toFullEntry readerEntry
1108-
case hasMore of
1109-
Readers.Drained -> do
1110-
handleResolved key entry Readers.Drained
1111-
Readers.HasMore -> do
1112-
case entry of
1113-
Entry.Mupdate v ->
1114-
handleMupdate key v
1115-
_ -> do
1116-
-- Anything but Mupdate supersedes all previous entries of
1117-
-- the same key, so we can simply drop them and are done.
1118-
hasMore' <- dropRemaining key
1119-
handleResolved key entry hasMore'
1120-
1121-
dropRemaining :: SerialisedKey -> m Readers.HasMore
1122-
dropRemaining key = do
1123-
(_, hasMore) <- Readers.dropWhileKey readers key
1124-
return hasMore
1125-
1126-
-- Resolve a 'Mupsert' value with the other entries of the same key.
1127-
handleMupdate :: SerialisedKey
1128-
-> SerialisedValue
1129-
-> m (Maybe res, Readers.HasMore)
1130-
handleMupdate key v = do
1131-
nextKey <- Readers.peekKey readers
1132-
if nextKey /= key
1133-
then
1134-
-- No more entries for same key, done.
1135-
handleResolved key (Entry.Mupdate v) Readers.HasMore
1136-
else do
1137-
(_, nextEntry, hasMore) <- Readers.pop readers
1138-
let resolved = Entry.combine resolve (Entry.Mupdate v)
1139-
(Reader.toFullEntry nextEntry)
1140-
case hasMore of
1141-
Readers.HasMore -> case resolved of
1142-
Entry.Mupdate v' ->
1143-
-- Still a mupsert, keep resolving!
1144-
handleMupdate key v'
1145-
_ -> do
1146-
-- Done with this key, remaining entries are obsolete.
1147-
hasMore' <- dropRemaining key
1148-
handleResolved key resolved hasMore'
1149-
Readers.Drained -> do
1150-
handleResolved key resolved Readers.Drained
1151-
1152-
-- Once we have a resolved entry, we still have to make sure it's not
1153-
-- a 'Delete', since we only want to write values to the result vector.
1154-
handleResolved :: SerialisedKey
1155-
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
1156-
-> Readers.HasMore
1157-
-> m (Maybe res, Readers.HasMore)
1158-
handleResolved key entry hasMore =
1159-
case toResult key entry of
1160-
Just !res ->
1161-
-- Found one resolved value, done.
1162-
return (Just res, hasMore)
1163-
Nothing ->
1164-
-- Resolved value was a Delete, which we don't want to include.
1165-
-- So look for another one (unless there are no more entries!).
1166-
case hasMore of
1167-
Readers.HasMore -> readEntryIfWanted
1168-
Readers.Drained -> return (Nothing, Readers.Drained)
1169-
1170-
toResult :: SerialisedKey
1171-
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
1172-
-> Maybe res
1173-
toResult key = \case
1174-
Entry.Insert v -> Just $ fromEntry key v Nothing
1175-
Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b))
1176-
Entry.Mupdate v -> Just $ fromEntry key v Nothing
1177-
Entry.Delete -> Nothing
1178-
11791072
{-------------------------------------------------------------------------------
11801073
Snapshots
11811074
-------------------------------------------------------------------------------}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
{-# LANGUAGE DataKinds #-}
2+
3+
module Database.LSMTree.Internal.Cursor (
4+
readEntriesWhile
5+
) where
6+
7+
import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
8+
import Control.Monad.Class.MonadST (MonadST (..))
9+
import Control.Monad.Class.MonadThrow
10+
import Control.Monad.Fix (MonadFix)
11+
import qualified Data.Vector as V
12+
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
13+
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
14+
import Database.LSMTree.Internal.Entry (Entry)
15+
import qualified Database.LSMTree.Internal.Entry as Entry
16+
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
17+
import qualified Database.LSMTree.Internal.RunReader as Reader
18+
import qualified Database.LSMTree.Internal.RunReaders as Readers
19+
import Database.LSMTree.Internal.Serialise (SerialisedKey,
20+
SerialisedValue)
21+
import qualified Database.LSMTree.Internal.Vector as V
22+
import System.FS.API (Handle)
23+
24+
{-# INLINE readEntriesWhile #-}
25+
{-# SPECIALISE readEntriesWhile :: forall h res.
26+
ResolveSerialisedValue
27+
-> (SerialisedKey -> Bool)
28+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
29+
-> Readers.Readers IO h
30+
-> Int
31+
-> IO (V.Vector res, Readers.HasMore) #-}
32+
-- | General notes on the code below:
33+
-- * it is quite similar to the one in Internal.Merge, but different enough
34+
-- that it's probably easier to keep them separate
35+
-- * any function that doesn't take a 'hasMore' argument assumes that the
36+
-- readers have not been drained yet, so we must check before calling them
37+
-- * there is probably opportunity for optimisations
38+
readEntriesWhile :: forall h m res.
39+
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
40+
=> ResolveSerialisedValue
41+
-> (SerialisedKey -> Bool)
42+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
43+
-> Readers.Readers m h
44+
-> Int
45+
-> m (V.Vector res, Readers.HasMore)
46+
readEntriesWhile resolve keyIsWanted fromEntry readers n =
47+
flip (V.unfoldrNM' n) Readers.HasMore $ \case
48+
Readers.Drained -> return (Nothing, Readers.Drained)
49+
Readers.HasMore -> readEntryIfWanted
50+
where
51+
-- Produces a result unless the readers have been drained or 'keyIsWanted'
52+
-- returned False.
53+
readEntryIfWanted :: m (Maybe res, Readers.HasMore)
54+
readEntryIfWanted = do
55+
key <- Readers.peekKey readers
56+
if keyIsWanted key then readEntry
57+
else return (Nothing, Readers.HasMore)
58+
59+
readEntry :: m (Maybe res, Readers.HasMore)
60+
readEntry = do
61+
(key, readerEntry, hasMore) <- Readers.pop readers
62+
let !entry = Reader.toFullEntry readerEntry
63+
case hasMore of
64+
Readers.Drained -> do
65+
handleResolved key entry Readers.Drained
66+
Readers.HasMore -> do
67+
case entry of
68+
Entry.Mupdate v ->
69+
handleMupdate key v
70+
_ -> do
71+
-- Anything but Mupdate supersedes all previous entries of
72+
-- the same key, so we can simply drop them and are done.
73+
hasMore' <- dropRemaining key
74+
handleResolved key entry hasMore'
75+
76+
dropRemaining :: SerialisedKey -> m Readers.HasMore
77+
dropRemaining key = do
78+
(_, hasMore) <- Readers.dropWhileKey readers key
79+
return hasMore
80+
81+
-- Resolve a 'Mupsert' value with the other entries of the same key.
82+
handleMupdate :: SerialisedKey
83+
-> SerialisedValue
84+
-> m (Maybe res, Readers.HasMore)
85+
handleMupdate key v = do
86+
nextKey <- Readers.peekKey readers
87+
if nextKey /= key
88+
then
89+
-- No more entries for same key, done.
90+
handleResolved key (Entry.Mupdate v) Readers.HasMore
91+
else do
92+
(_, nextEntry, hasMore) <- Readers.pop readers
93+
let resolved = Entry.combine resolve (Entry.Mupdate v)
94+
(Reader.toFullEntry nextEntry)
95+
case hasMore of
96+
Readers.HasMore -> case resolved of
97+
Entry.Mupdate v' ->
98+
-- Still a mupsert, keep resolving!
99+
handleMupdate key v'
100+
_ -> do
101+
-- Done with this key, remaining entries are obsolete.
102+
hasMore' <- dropRemaining key
103+
handleResolved key resolved hasMore'
104+
Readers.Drained -> do
105+
handleResolved key resolved Readers.Drained
106+
107+
-- Once we have a resolved entry, we still have to make sure it's not
108+
-- a 'Delete', since we only want to write values to the result vector.
109+
handleResolved :: SerialisedKey
110+
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
111+
-> Readers.HasMore
112+
-> m (Maybe res, Readers.HasMore)
113+
handleResolved key entry hasMore =
114+
case toResult key entry of
115+
Just !res ->
116+
-- Found one resolved value, done.
117+
return (Just res, hasMore)
118+
Nothing ->
119+
-- Resolved value was a Delete, which we don't want to include.
120+
-- So look for another one (unless there are no more entries!).
121+
case hasMore of
122+
Readers.HasMore -> readEntryIfWanted
123+
Readers.Drained -> return (Nothing, Readers.Drained)
124+
125+
toResult :: SerialisedKey
126+
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
127+
-> Maybe res
128+
toResult key = \case
129+
Entry.Insert v -> Just $ fromEntry key v Nothing
130+
Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b))
131+
Entry.Mupdate v -> Just $ fromEntry key v Nothing
132+
Entry.Delete -> Nothing

0 commit comments

Comments
 (0)