Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ library
Database.LSMTree.Internal.RunReaders
Database.LSMTree.Internal.Serialise
Database.LSMTree.Internal.Serialise.Class
Database.LSMTree.Internal.Snapshot
Database.LSMTree.Internal.UniqCounter
Database.LSMTree.Internal.Unsliced
Database.LSMTree.Internal.Vector
Expand Down
84 changes: 20 additions & 64 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import Control.Monad.Primitive
import Control.TempRegistry
import Control.Tracer
import Data.Arena (ArenaManager, newArenaManager)
import Data.Bifunctor (Bifunctor (..))
import qualified Data.ByteString.Char8 as BSC
import Data.Char (isNumber)
import Data.Foldable
Expand All @@ -90,19 +89,19 @@ import qualified Database.LSMTree.Internal.Entry as Entry
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
ResolveSerialisedValue, lookupsIO)
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.Paths (RunFsPaths (..),
SessionRoot (..), SnapshotName)
import Database.LSMTree.Internal.Paths (SessionRoot (..),
SnapshotName)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import qualified Database.LSMTree.Internal.RunReader as Reader
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
SerialisedKey, SerialisedValue)
import Database.LSMTree.Internal.Snapshot
import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
Expand Down Expand Up @@ -1201,14 +1200,18 @@ snapshot resolve snap label th = do
traceWith (tableTracer th) $ TraceSnapshot snap
let conf = tableConfig th
withOpenTable th $ \thEnv -> do
let hfs = tableHasFS thEnv
let snapPath = Paths.snapshot (tableSessionRoot thEnv) snap
FS.doesFileExist (tableHasFS thEnv) snapPath >>= \b ->
when b $ throwIO (ErrSnapshotExists snap)

-- For the temporary implementation it is okay to just flush the buffer
-- before taking the snapshot.
let hfs = tableHasFS thEnv
content <- modifyWithTempRegistry
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
$ \reg content -> do
r <- flushWriteBuffer
content' <- flushWriteBuffer
(TraceMerge `contramap` tableTracer th)
conf
resolve
Expand All @@ -1218,29 +1221,22 @@ snapshot resolve snap label th = do
(tableSessionUniqCounter thEnv)
reg
content
pure (r, r)
pure (content', content')
-- At this point, we've flushed the write buffer but we haven't created the
-- snapshot file yet. If an asynchronous exception happens beyond this
-- point, we'll take that loss, as the inner state of the table is still
-- consistent.
runNumbers <- V.forM (tableLevels content) $ \(Level mr rs) -> do
(,V.map (runNumber . Run.runRunFsPaths) rs) <$>
case mr of
SingleRun r -> pure (True, runNumber (Run.runRunFsPaths r))
MergingRun var -> do
withMVar var $ \case
CompletedMerge r -> pure (False, runNumber (Run.runRunFsPaths r))
OngoingMerge{} -> error "snapshot: OngoingMerge not yet supported" -- TODO: implement
let snapPath = Paths.snapshot (tableSessionRoot thEnv) snap
FS.doesFileExist (tableHasFS thEnv) snapPath >>= \b ->
when b $ throwIO (ErrSnapshotExists snap)

snappedLevels <- snapLevels (tableLevels content)
let snapContents = BSC.pack $ show (label, snappedLevels, tableConfig th)

FS.withFile
(tableHasFS thEnv)
snapPath
(FS.WriteMode FS.MustBeNew) $ \h ->
void $ FS.hPutAllStrict (tableHasFS thEnv) h
(BSC.pack $ show (label, runNumbers, tableConfig th))
pure $! V.sum (V.map (\((_ :: (Bool, RunNumber)), rs) -> 1 + V.length rs) runNumbers)
void $ FS.hPutAllStrict (tableHasFS thEnv) h snapContents

pure $! numSnapRuns snappedLevels

{-# SPECIALISE open ::
Session IO h
Expand Down Expand Up @@ -1270,26 +1266,17 @@ open sesh label override snap = do
snapPath
FS.ReadMode $ \h ->
FS.hGetAll (sessionHasFS seshEnv) h
let (label', runNumbers, conf) =
-- why we are using read for this?
-- apparently this is a temporary solution, to be done properly in WP15
read @(SnapshotLabel, V.Vector ((Bool, RunNumber), V.Vector RunNumber), TableConfig) $
BSC.unpack $ BSC.toStrict $ bs

let conf' = applyOverride override conf
let (label', snappedLevels, conf) = read $ BSC.unpack $ BSC.toStrict $ bs
unless (label == label') $ throwIO (ErrSnapshotWrongType snap)
let runPaths = V.map (bimap (second $ RunFsPaths (Paths.activeDir $ sessionRoot seshEnv))
(V.map (RunFsPaths (Paths.activeDir $ sessionRoot seshEnv))))
runNumbers

let conf' = applyOverride override conf
am <- newArenaManager
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
incrUniqCounter (sessionUniqCounter seshEnv)
tableWriteBufferBlobs
<- allocateTemp reg
(WBB.new hfs blobpath)
WBB.removeReference
tableLevels <- openLevels reg hfs hbio (confDiskCachePolicy conf') runPaths
tableLevels <- openLevels reg hfs hbio conf (sessionRoot seshEnv) snappedLevels
tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
tableWriteBuffer = WB.empty
Expand All @@ -1298,37 +1285,6 @@ open sesh label override snap = do
, tableCache
}

{-# SPECIALISE openLevels ::
TempRegistry IO
-> HasFS IO h
-> HasBlockIO IO h
-> DiskCachePolicy
-> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths)
-> IO (Levels IO h) #-}
-- | Open multiple levels.
openLevels ::
(MonadFix m, MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m)
=> TempRegistry m
-> HasFS m h
-> HasBlockIO m h
-> DiskCachePolicy
-> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths)
-> m (Levels m h)
openLevels reg hfs hbio diskCachePolicy levels =
flip V.imapMStrict levels $ \i (mrPath, rsPaths) -> do
let ln = LevelNo (i+1) -- level 0 is the write buffer
caching = diskCachePolicyForLevel diskCachePolicy ln
!r <- allocateTemp reg
(Run.openFromDisk hfs hbio caching (snd mrPath))
Run.removeReference
!rs <- flip V.mapMStrict rsPaths $ \run ->
allocateTemp reg
(Run.openFromDisk hfs hbio caching run)
Run.removeReference
var <- newMVar (CompletedMerge r)
let !mr = if fst mrPath then SingleRun r else MergingRun var
pure $! Level mr rs

{-# SPECIALISE deleteSnapshot ::
Session IO h
-> SnapshotName
Expand Down
28 changes: 0 additions & 28 deletions src/Database/LSMTree/Internal/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ instance NFData TableConfig where
rnf (TableConfig a b c d e f g) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f `seq` rnf g

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read TableConfig

-- | A reasonable default 'TableConfig'.
--
-- This uses a write buffer with up to 20,000 elements and a generous amount of
Expand Down Expand Up @@ -169,10 +165,6 @@ data MergePolicy =
instance NFData MergePolicy where
rnf MergePolicyLazyLevelling = ()

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read MergePolicy

{-------------------------------------------------------------------------------
Size ratio
-------------------------------------------------------------------------------}
Expand All @@ -183,10 +175,6 @@ data SizeRatio = Four
instance NFData SizeRatio where
rnf Four = ()

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read SizeRatio

sizeRatioInt :: SizeRatio -> Int
sizeRatioInt = \case Four -> 4

Expand Down Expand Up @@ -214,14 +202,6 @@ data WriteBufferAlloc =
instance NFData WriteBufferAlloc where
rnf (AllocNumEntries n) = rnf n

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read WriteBufferAlloc

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read NumEntries

{-------------------------------------------------------------------------------
Bloom filter allocation
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -263,10 +243,6 @@ instance NFData BloomFilterAlloc where
rnf (AllocRequestFPR fpr) = rnf fpr
rnf (AllocMonkey a b) = rnf a `seq` rnf b

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read BloomFilterAlloc

defaultBloomFilterAlloc :: BloomFilterAlloc
defaultBloomFilterAlloc = AllocFixed 10

Expand Down Expand Up @@ -334,10 +310,6 @@ instance NFData FencePointerIndex where
rnf CompactIndex = ()
rnf OrdinaryIndex = ()

-- | TODO: this should be removed once we have proper snapshotting with proper
-- persistence of the config to disk.
deriving stock instance Read FencePointerIndex

{-------------------------------------------------------------------------------
Disk cache policy
-------------------------------------------------------------------------------}
Expand Down
4 changes: 0 additions & 4 deletions src/Database/LSMTree/Internal/RunNumber.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,3 @@ import Data.Word (Word64)

newtype RunNumber = RunNumber Word64
deriving newtype (Eq, Ord, Show, NFData)

-- read as Word64
-- the Read instance is used in Internal.open ?!?
deriving newtype instance Read RunNumber
Loading