Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .hlint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- ignore: {name: "Use section"}
- ignore: {name: "Redundant $!"}
- ignore: {name: "Use shows"}
- ignore: {name: "Use fmap"}

# Specify additional command line arguments
#
Expand Down
2 changes: 1 addition & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Note: later rules override earlier rules.

# Default
* @dcoutts @jorisdral @mheinzel @recursion-ninja
* @dcoutts @jorisdral @mheinzel @recursion-ninja @wenkokke
66 changes: 37 additions & 29 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
import Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
import Control.DeepSeq
import Control.Monad (unless, void, when)
import Control.Monad (unless)
import Control.Monad.Class.MonadST (MonadST (..))
import Control.Monad.Class.MonadThrow
import Control.Monad.Fix (MonadFix)
Expand Down Expand Up @@ -100,7 +100,8 @@ import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
ResolveSerialisedValue, lookupsIO)
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.Paths (SessionRoot (..),
SnapshotName)
SnapshotMetaDataChecksumFile (..),
SnapshotMetaDataFile (..), SnapshotName)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import Database.LSMTree.Internal.Run (Run)
Expand All @@ -115,7 +116,6 @@ import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
import qualified System.FS.API.Lazy as FS
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)

Expand Down Expand Up @@ -1077,9 +1077,17 @@ createSnapshot resolve snap label tableType t = do
let conf = tableConfig t
withOpenTable t $ \thEnv -> do
let hfs = tableHasFS thEnv
let snapPath = Paths.snapshot (tableSessionRoot thEnv) snap
FS.doesFileExist (tableHasFS thEnv) snapPath >>= \b ->
when b $ throwIO (ErrSnapshotExists snap)

-- Guard that the snapshot does not exist already
let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap
doesSnapshotExist <-
FS.doesDirectoryExist (tableHasFS thEnv) (Paths.getNamedSnapshotDir snapDir)
if doesSnapshotExist then
throwIO (ErrSnapshotExists snap)
else
-- we assume the snapshots directory already exists, so we just have to
-- create the directory for this specific snapshot.
FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)

-- For the temporary implementation it is okay to just flush the buffer
-- before taking the snapshot.
Expand Down Expand Up @@ -1110,13 +1118,10 @@ createSnapshot resolve snap label tableType t = do
-- consistent.

snappedLevels <- snapLevels (tableLevels content)
let snapContents = encodeSnapshotMetaData (SnapshotMetaData label tableType (tableConfig t) snappedLevels)

FS.withFile
(tableHasFS thEnv)
snapPath
(FS.WriteMode FS.MustBeNew) $ \h ->
void $ FS.hPutAll (tableHasFS thEnv) h snapContents
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snappedLevels
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData

pure $! numSnapRuns snappedLevels

Expand All @@ -1142,20 +1147,20 @@ openSnapshot sesh label tableType override snap resolve = do
traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override
withOpenSession sesh $ \seshEnv -> do
withTempRegistry $ \reg -> do
let hfs = sessionHasFS seshEnv
hbio = sessionHasBlockIO seshEnv
snapPath = Paths.snapshot (sessionRoot seshEnv) snap
FS.doesFileExist hfs snapPath >>= \b ->
let hfs = sessionHasFS seshEnv
hbio = sessionHasBlockIO seshEnv

-- Guard that the snapshot exists
let snapDir = Paths.namedSnapshotDir (sessionRoot seshEnv) snap
FS.doesDirectoryExist hfs (Paths.getNamedSnapshotDir snapDir) >>= \b ->
unless b $ throwIO (ErrSnapshotNotExists snap)
bs <- FS.withFile
hfs
snapPath
FS.ReadMode $ \h ->
FS.hGetAll (sessionHasFS seshEnv) h

snapMetaData <- case decodeSnapshotMetaData bs of
let SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
snapMetaData <- readFileSnapshotMetaData hfs contentPath checksumPath >>= \case
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
Right x -> pure x

let SnapshotMetaData label' tableType' conf snappedLevels = snapMetaData

unless (tableType == tableType') $
Expand Down Expand Up @@ -1195,10 +1200,12 @@ deleteSnapshot sesh snap = do
traceWith (sessionTracer sesh) $ TraceDeleteSnapshot snap
withOpenSession sesh $ \seshEnv -> do
let hfs = sessionHasFS seshEnv
snapPath = Paths.snapshot (sessionRoot seshEnv) snap
FS.doesFileExist hfs snapPath >>= \b ->
unless b $ throwIO (ErrSnapshotNotExists snap)
FS.removeFile hfs snapPath

let snapDir = Paths.namedSnapshotDir (sessionRoot seshEnv) snap
doesSnapshotExist <-
FS.doesDirectoryExist (sessionHasFS seshEnv) (Paths.getNamedSnapshotDir snapDir)
unless doesSnapshotExist $ throwIO (ErrSnapshotNotExists snap)
FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)

{-# SPECIALISE listSnapshots :: Session IO h -> IO [SnapshotName] #-}
-- | See 'Database.LSMTree.Common.listSnapshots'.
Expand All @@ -1219,8 +1226,9 @@ listSnapshots sesh = do
case Paths.mkSnapshotName s of
Nothing -> pure Nothing
Just snap -> do
-- check that it is a file
b <- FS.doesFileExist hfs (Paths.snapshot root snap)
-- check that it is a directory
b <- FS.doesDirectoryExist hfs
(Paths.getNamedSnapshotDir $ Paths.namedSnapshotDir root snap)
if b then pure $ Just snap
else pure $ Nothing

Expand Down
4 changes: 3 additions & 1 deletion src/Database/LSMTree/Internal/CRC32C.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ module Database.LSMTree.Internal.CRC32C (
readChecksumsFile,
writeChecksumsFile,
writeChecksumsFile',

hexdigitsToInt
) where

import Control.Monad
Expand Down Expand Up @@ -212,7 +214,7 @@ hGetAllCRC32C' ::
-> m CRC32C
hGetAllCRC32C' hfs h (ChunkSize !chunkSize) !crc0
| chunkSize <= 0
= error "hGetAllCRC32C_SBS: chunkSize must be >0"
= error "hGetAllCRC32C': chunkSize must be >0"
| otherwise
= do
buf@(MutableByteArray !mba#) <- newPinnedByteArray (fromIntegral chunkSize)
Expand Down
30 changes: 27 additions & 3 deletions src/Database/LSMTree/Internal/Paths.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ module Database.LSMTree.Internal.Paths (
, activeDir
, runPath
, snapshotsDir
, snapshot
, NamedSnapshotDir (..)
, namedSnapshotDir
, SnapshotMetaDataFile (..)
, snapshotMetaDataFile
, SnapshotMetaDataChecksumFile (..)
, snapshotMetaDataChecksumFile
-- * Table paths
, tableBlobPath
-- * Snapshot name
Expand Down Expand Up @@ -59,8 +64,27 @@ runPath root = RunFsPaths (activeDir root)
snapshotsDir :: SessionRoot -> FsPath
snapshotsDir (SessionRoot dir) = dir </> mkFsPath ["snapshots"]

snapshot :: SessionRoot -> SnapshotName -> FsPath
snapshot root (MkSnapshotName name) = snapshotsDir root </> mkFsPath [name]
-- | The directory for a specific, /named/ snapshot.
--
-- Not to be confused with the snapshot/s/ directory, which holds all named
-- snapshot directories.
newtype NamedSnapshotDir = NamedSnapshotDir { getNamedSnapshotDir :: FsPath }

namedSnapshotDir :: SessionRoot -> SnapshotName -> NamedSnapshotDir
namedSnapshotDir root (MkSnapshotName name) =
NamedSnapshotDir (snapshotsDir root </> mkFsPath [name])

newtype SnapshotMetaDataFile = SnapshotMetaDataFile FsPath

snapshotMetaDataFile :: NamedSnapshotDir -> SnapshotMetaDataFile
snapshotMetaDataFile (NamedSnapshotDir dir) =
SnapshotMetaDataFile (dir </> mkFsPath ["metadata"])

newtype SnapshotMetaDataChecksumFile = SnapshotMetaDataChecksumFile FsPath

snapshotMetaDataChecksumFile :: NamedSnapshotDir -> SnapshotMetaDataChecksumFile
snapshotMetaDataChecksumFile (NamedSnapshotDir dir) =
SnapshotMetaDataChecksumFile (dir </> mkFsPath ["metadata.checksum"])

{-------------------------------------------------------------------------------
Snapshot name
Expand Down
37 changes: 4 additions & 33 deletions src/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,7 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE RecursiveDo #-}

-- | Functionality related to LSM-Tree runs (sequences of LSM-Tree data).
--
-- === TODO
--
-- This is temporary module header documentation. The module will be
-- fleshed out more as we implement bits of it.
--
-- Related work packages: 5, 6
--
-- This module includes in-memory parts and I\/O parts for, amongst others,
--
-- * High-performance batch lookups
--
-- * Range lookups
--
-- * Incremental run construction
--
-- * Lookups in loaded disk pages, value resolution
--
-- * In-memory representation of a run
--
-- * Flushing a write buffer to a run
--
-- * Opening, deserialising, and verifying files for an LSM run.
--
-- * Closing runs (and removing files)
--
-- * high performance, incremental k-way merge
--
-- The above list is a sketch. Functionality may move around, and the list is
-- not exhaustive.
--
-- | Runs of sorted key\/value data.
module Database.LSMTree.Internal.Run (
-- * Run
Run (..)
Expand All @@ -50,8 +19,10 @@ module Database.LSMTree.Internal.Run (
-- ** Run creation
, fromMutable
, fromWriteBuffer
, openFromDisk
, RunDataCaching (..)
-- * Snapshot
, ChecksumError (..)
, openFromDisk
) where

import Control.DeepSeq (NFData (..), rwhnf)
Expand Down
85 changes: 74 additions & 11 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ module Database.LSMTree.Internal.Snapshot (
, SnapshotMetaData (..)
, SnapshotLabel (..)
, SnapshotTableType (..)
, writeFileSnapshotMetaData
, readFileSnapshotMetaData
, encodeSnapshotMetaData
, decodeSnapshotMetaData
-- * Snapshot format
, numSnapRuns
, SnapLevels
Expand All @@ -23,8 +27,6 @@ module Database.LSMTree.Internal.Snapshot (
, Encode (..)
, Decode (..)
, DecodeVersioned (..)
, encodeSnapshotMetaData
, decodeSnapshotMetaData
, Versioned (..)
) where

Expand All @@ -34,30 +36,38 @@ import Codec.CBOR.Read
import Codec.CBOR.Write
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM (MonadSTM)
import Control.Monad (void, when)
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow (MonadMask)
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..))
import Control.Monad.Fix (MonadFix)
import Control.Monad.Primitive (PrimMonad)
import Control.TempRegistry
import Data.Bifunctor
import qualified Data.ByteString.Builder as BSB
import qualified Data.ByteString.Char8 as BSC
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BSL
import Data.Primitive (newMutVar, readMutVar)
import Data.Primitive.PrimVar
import Data.Text (Text)
import qualified Data.Vector as V
import Database.LSMTree.Internal.Config
import Database.LSMTree.Internal.CRC32C
import qualified Database.LSMTree.Internal.CRC32C as FS
import Database.LSMTree.Internal.Entry
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.Paths (SessionRoot)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Run (Run)
import Database.LSMTree.Internal.Run (ChecksumError (..), Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.UniqCounter (UniqCounter,
incrUniqCounter, uniqueToRunNumber)
import System.FS.API (HasFS)
import qualified System.FS.API as FS
import System.FS.API (FsPath, HasFS)
import qualified System.FS.API.Lazy as FS
import System.FS.BlockIO.API (HasBlockIO)
import Text.Printf

Expand Down Expand Up @@ -133,6 +143,65 @@ data SnapshotMetaData = SnapshotMetaData {
}
deriving stock (Show, Eq)

-- | Encode 'SnapshotMetaData' and write it to 'SnapshotMetaDataFile'.
writeFileSnapshotMetaData ::
MonadThrow m
=> HasFS m h
-> FsPath -- ^ Target file for snapshot metadata
-> FsPath -- ^ Target file for checksum
-> SnapshotMetaData
-> m ()
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData = do
(_, checksum) <-
FS.withFile hfs contentPath (FS.WriteMode FS.MustBeNew) $ \h ->
hPutAllChunksCRC32C hfs h (encodeSnapshotMetaData snapMetaData) initialCRC32C
FS.withFile hfs checksumPath (FS.WriteMode FS.MustBeNew) $ \h ->
void $ FS.hPutAll hfs h $ encodeChecksum checksum

-- | Read from 'SnapshotMetaDataFile' and attempt to decode it to
-- 'SnapshotMetaData'.
readFileSnapshotMetaData ::
MonadThrow m
=> HasFS m h
-> FsPath -- ^ Source file for snapshot metadata
-> FsPath -- ^ Source file for checksum
-> m (Either DeserialiseFailure SnapshotMetaData)
readFileSnapshotMetaData hfs contentPath checksumPath = do
!bsc <-
FS.withFile hfs checksumPath FS.ReadMode $ \h ->
BSC.toStrict <$> FS.hGetAll hfs h

case decodeChecksum bsc of
Left failure -> pure (Left failure)
Right expectedChecksum -> do

(lbs, actualChecksum) <- FS.withFile hfs contentPath FS.ReadMode $ \h -> do
n <- FS.hGetSize hfs h
FS.hGetExactlyCRC32C hfs h n initialCRC32C

when (expectedChecksum /= actualChecksum) $
throwIO $ ChecksumError contentPath expectedChecksum actualChecksum

pure $ decodeSnapshotMetaData lbs

encodeChecksum :: CRC32C -> BSL.ByteString
encodeChecksum (CRC32C x) = BSB.toLazyByteString (BSB.word32HexFixed x)

decodeChecksum :: BSC.ByteString -> Either DeserialiseFailure CRC32C
decodeChecksum bsc = do
when (BSC.length bsc /= 8) $ do
let msg = "decodeChecksum: expected 8 bytes, but found "
<> (show (BSC.length bsc))
Left $ DeserialiseFailure 0 msg
let !x = fromIntegral (hexdigitsToInt bsc)
pure $! CRC32C x

encodeSnapshotMetaData :: SnapshotMetaData -> ByteString
encodeSnapshotMetaData = toLazyByteString . encode . Versioned

decodeSnapshotMetaData :: ByteString -> Either DeserialiseFailure SnapshotMetaData
decodeSnapshotMetaData bs = second (getVersioned . snd) (deserialiseFromBytes decode bs)

{-------------------------------------------------------------------------------
Levels snapshot format
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -343,12 +412,6 @@ class Decode a where
class DecodeVersioned a where
decodeVersioned :: SnapshotVersion -> Decoder s a

encodeSnapshotMetaData :: SnapshotMetaData -> ByteString
encodeSnapshotMetaData = toLazyByteString . encode . Versioned

decodeSnapshotMetaData :: ByteString -> Either DeserialiseFailure SnapshotMetaData
decodeSnapshotMetaData bs = second (getVersioned . snd) (deserialiseFromBytes decode bs)

newtype Versioned a = Versioned { getVersioned :: a }
deriving stock (Show, Eq)

Expand Down