Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/Database/LSMTree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Database.LSMTree (
, Common.TableClosedError (..)
, Common.TableCorruptedError (..)
, Common.TableTooLargeError (..)
, Common.TableNotCompatibleError (..)
, Common.TableUnionNotCompatibleError (..)
, Common.SnapshotExistsError (..)
, Common.SnapshotDoesNotExistError (..)
, Common.SnapshotCorruptedError (..)
Expand Down Expand Up @@ -129,7 +129,8 @@ import Data.Bifunctor (Bifunctor (..))
import Data.Coerce (coerce)
import Data.Kind (Type)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Typeable (Proxy (..), Typeable, eqT, type (:~:) (Refl))
import Data.Typeable (Proxy (..), Typeable, eqT, type (:~:) (Refl),
typeRep)
import qualified Data.Vector as V
import Database.LSMTree.Common (BlobRef (BlobRef), IOLike, Range (..),
SerialiseKey, SerialiseValue, Session, UnionCredits (..),
Expand Down Expand Up @@ -570,7 +571,7 @@ unions (t :| ts) =
-> m (Internal.Table m h)
checkTableType _ i (Internal.Table' (t' :: Internal.Table m h'))
| Just Refl <- eqT @h @h' = pure t'
| otherwise = throwIO (Common.ErrTableTypeMismatch 0 i)
| otherwise = throwIO $ Common.ErrTableUnionHandleTypeMismatch 0 (typeRep $ Proxy @h) i (typeRep $ Proxy @h')

{-# SPECIALISE remainingUnionDebt :: Table IO k v b -> IO UnionDebt #-}
remainingUnionDebt :: IOLike m => Table m k v b -> m UnionDebt
Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Database.LSMTree.Common (
, Internal.TableClosedError (..)
, Internal.TableCorruptedError (..)
, Internal.TableTooLargeError (..)
, Internal.TableNotCompatibleError (..)
, Internal.TableUnionNotCompatibleError (..)
, Internal.SnapshotExistsError (..)
, Internal.SnapshotDoesNotExistError (..)
, Internal.SnapshotCorruptedError (..)
Expand Down
89 changes: 43 additions & 46 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module Database.LSMTree.Internal (
, TableClosedError (..)
, TableCorruptedError (..)
, TableTooLargeError (..)
, TableNotCompatibleError (..)
, TableUnionNotCompatibleError (..)
, SnapshotExistsError (..)
, SnapshotDoesNotExistError (..)
, SnapshotCorruptedError (..)
Expand Down Expand Up @@ -1534,27 +1534,30 @@ duplicate t@Table{..} = do
tableArenaManager
content


{-------------------------------------------------------------------------------
Table union
-------------------------------------------------------------------------------}

-- | An operation was called with two tables that are not compatible.
data TableNotCompatibleError
= -- | An operation was called with two tables that are not of the same type.
--
-- TODO: This error is no longer used by 'unions'.
ErrTableTypeMismatch
-- | Vector index of table @t1@ involved in the mismatch
Int
-- | Vector index of table @t2@ involved in the mismatch
Int
| -- | An operation was called with two tables that are not in the same session.
ErrTableSessionMismatch
-- | Vector index of table @t1@ involved in the mismatch
Int
-- | Vector index of table @t2@ involved in the mismatch
Int
-- | A table union was constructed with two tables that are not compatible.
data TableUnionNotCompatibleError
= ErrTableUnionHandleTypeMismatch
-- | The index of the first table.
!Int
-- | The type of the filesystem handle of the first table.
!TypeRep
-- | The index of the second table.
!Int
-- | The type of the filesystem handle of the second table.
!TypeRep
| ErrTableUnionSessionMismatch
-- | The index of the first table.
!Int
-- | The session directory of the first table.
!FsErrorPath
-- | The index of the second table.
!Int
-- | The session directory of the second table.
!FsErrorPath
deriving stock (Show, Eq)
deriving anyclass (Exception)

Expand All @@ -1565,10 +1568,7 @@ unions ::
=> NonEmpty (Table m h)
-> m (Table m h)
unions ts = do
sesh <-
matchSessions ts >>= \case
Left (i, j) -> throwIO $ ErrTableSessionMismatch i j
Right sesh -> pure sesh
sesh <- ensureSessionsMatch ts

traceWith (sessionTracer sesh) $ TraceUnions (NE.map tableId ts)

Expand Down Expand Up @@ -1706,37 +1706,34 @@ writeBufferToNewRun SessionEnv {
tableWriteBuffer
tableWriteBufferBlobs

-- | Check that all tables in the session match. If so, return the matched
-- session. If there is a mismatch, return the list indices of the mismatching
-- tables.
--
-- TODO: compare LockFileHandle instead of SessionRoot (?). We can write an Eq
-- instance for LockFileHandle based on pointer equality, just like base does
-- for Handle.
matchSessions ::
{-# SPECIALISE ensureSessionsMatch ::
NonEmpty (Table IO h)
-> IO (Session IO h) #-}
-- | Check if all tables have the same session.
-- If so, return the session.
-- Otherwise, throw a 'TableUnionNotCompatibleError'.
ensureSessionsMatch ::
(MonadSTM m, MonadThrow m)
=> NonEmpty (Table m h)
-> m (Either (Int, Int) (Session m h))
matchSessions = \(t :| ts) ->
withSessionRoot t $ \root -> do
eith <- go root 1 ts
pure $ case eith of
Left i -> Left (0, i)
Right () -> Right (tableSession t)
where
-> m (Session m h)
ensureSessionsMatch (t :| ts) = do
let sesh = tableSession t
withOpenSession sesh $ \seshEnv -> do
let root = FS.mkFsErrorPath (sessionHasFS seshEnv) (getSessionRoot (sessionRoot seshEnv))
-- Check that the session roots for all tables are the same. There can only
-- be one *open/active* session per directory because of cooperative file
-- locks, so each unique *open* session has a unique session root. We check
-- that all the table's sessions are open at the same time while comparing
-- the session roots.
go _ _ [] = pure (Right ())
go root !i (t':ts') =
withSessionRoot t' $ \root' ->
if root == root'
then go root (i+1) ts'
else pure (Left i)

withSessionRoot t k = withOpenSession (tableSession t) $ k . sessionRoot
for_ (zip [1..] ts) $ \(i, t') -> do
let sesh' = tableSession t'
withOpenSession sesh' $ \seshEnv' -> do
let root' = FS.mkFsErrorPath (sessionHasFS seshEnv') (getSessionRoot (sessionRoot seshEnv'))
-- TODO: compare LockFileHandle instead of SessionRoot (?).
-- We can write an Eq instance for LockFileHandle based on pointer equality,
-- just like base does for Handle.
unless (root == root') $ throwIO $ ErrTableUnionSessionMismatch 0 root i root'
pure sesh

{-------------------------------------------------------------------------------
Table union: debt and credit
Expand Down
8 changes: 4 additions & 4 deletions src/Database/LSMTree/Monoidal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Database.LSMTree.Monoidal (
, Common.TableClosedError (..)
, Common.TableCorruptedError (..)
, Common.TableTooLargeError (..)
, Common.TableNotCompatibleError (..)
, Common.TableUnionNotCompatibleError (..)
, Common.SnapshotExistsError (..)
, Common.SnapshotDoesNotExistError (..)
, Common.SnapshotCorruptedError (..)
Expand Down Expand Up @@ -149,8 +149,8 @@ import Data.Coerce (coerce)
import Data.Kind (Type)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Monoid (Sum (..))
import Data.Proxy (Proxy (Proxy))
import Data.Typeable (Typeable, eqT, type (:~:) (Refl))
import Data.Typeable (Proxy (..), Typeable, eqT, type (:~:) (Refl),
typeRep)
import qualified Data.Vector as V
import Database.LSMTree.Common (IOLike, Range (..), SerialiseKey,
SerialiseValue (..), Session, UnionCredits (..),
Expand Down Expand Up @@ -719,7 +719,7 @@ unions (t :| ts) =
-> m (Internal.Table m h)
checkTableType _ i (Internal.MonoidalTable (t' :: Internal.Table m h'))
| Just Refl <- eqT @h @h' = pure t'
| otherwise = throwIO (Common.ErrTableTypeMismatch 0 i)
| otherwise = throwIO $ Common.ErrTableUnionHandleTypeMismatch 0 (typeRep $ Proxy @h) i (typeRep $ Proxy @h')

{-# SPECIALISE remainingUnionDebt :: Table IO k v -> IO UnionDebt #-}
-- | Return the current union debt. This debt can be reduced until it is paid
Expand Down
7 changes: 4 additions & 3 deletions src/Database/LSMTree/Normal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Database.LSMTree.Normal (
, Common.TableClosedError (..)
, Common.TableCorruptedError (..)
, Common.TableTooLargeError (..)
, Common.TableNotCompatibleError (..)
, Common.TableUnionNotCompatibleError (..)
, Common.SnapshotExistsError (..)
, Common.SnapshotDoesNotExistError (..)
, Common.SnapshotCorruptedError (..)
Expand Down Expand Up @@ -140,7 +140,8 @@ import Control.Monad.Class.MonadThrow
import Data.Bifunctor (Bifunctor (..))
import Data.Kind (Type)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Typeable (Typeable, eqT, type (:~:) (Refl))
import Data.Typeable (Proxy (..), Typeable, eqT, type (:~:) (Refl),
typeRep)
import qualified Data.Vector as V
import Database.LSMTree.Common (BlobRef (BlobRef), IOLike, Range (..),
SerialiseKey, SerialiseValue, Session, UnionCredits (..),
Expand Down Expand Up @@ -839,7 +840,7 @@ unions (t :| ts) =
-> m (Internal.Table m h)
checkTableType _ i (Internal.NormalTable (t' :: Internal.Table m h'))
| Just Refl <- eqT @h @h' = pure t'
| otherwise = throwIO (Common.ErrTableTypeMismatch 0 i)
| otherwise = throwIO $ Common.ErrTableUnionHandleTypeMismatch 0 (typeRep $ Proxy @h) i (typeRep $ Proxy @h')

{-# SPECIALISE remainingUnionDebt :: Table IO k v b -> IO UnionDebt #-}
-- | Return the current union debt. This debt can be reduced until it is paid
Expand Down
4 changes: 2 additions & 2 deletions test/Database/LSMTree/Model/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ data Err
| ErrSessionClosed
| ErrTableClosed
| ErrTableCorrupted
| ErrTableTypeMismatch
| ErrTableSessionMismatch
| ErrTableUnionHandleTypeMismatch
| ErrTableUnionSessionMismatch
| ErrSnapshotExists !SnapshotName
| ErrSnapshotDoesNotExist !SnapshotName
| ErrSnapshotCorrupted !SnapshotName
Expand Down
15 changes: 8 additions & 7 deletions test/Test/Database/LSMTree/StateMachine.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ import Database.LSMTree.Common (BlobRefInvalidError (..),
SessionDirLockedError (..), SnapshotCorruptedError (..),
SnapshotDoesNotExistError (..), SnapshotExistsError (..),
SnapshotNotCompatibleError (..), TableClosedError (..),
TableCorruptedError (..), TableNotCompatibleError (..))
TableCorruptedError (..),
TableUnionNotCompatibleError (..))
import Database.LSMTree.Extras (showPowersOf)
import Database.LSMTree.Extras.Generators (KeyForIndexCompact)
import Database.LSMTree.Extras.NoThunks (propNoThunks)
Expand Down Expand Up @@ -483,7 +484,7 @@ handleSomeException e =
, handleSessionClosedError <$> fromException e
, handleTableClosedError <$> fromException e
, handleTableCorruptedError <$> fromException e
, handleTableNotCompatibleError <$> fromException e
, handleTableUnionNotCompatibleError <$> fromException e
, handleSnapshotExistsError <$> fromException e
, handleSnapshotDoesNotExistError <$> fromException e
, handleSnapshotCorruptedError <$> fromException e
Expand Down Expand Up @@ -528,12 +529,12 @@ handleTableClosedError = \case

handleTableCorruptedError :: TableCorruptedError -> Model.Err
handleTableCorruptedError = \case
ErrLookupByteCountDiscrepancy _ _ -> Model.ErrTableCorrupted
ErrLookupByteCountDiscrepancy{} -> Model.ErrTableCorrupted

handleTableNotCompatibleError :: TableNotCompatibleError -> Model.Err
handleTableNotCompatibleError = \case
ErrTableTypeMismatch _ _ -> Model.ErrTableTypeMismatch
ErrTableSessionMismatch _ _ -> Model.ErrTableSessionMismatch
handleTableUnionNotCompatibleError :: TableUnionNotCompatibleError -> Model.Err
handleTableUnionNotCompatibleError = \case
ErrTableUnionHandleTypeMismatch{} -> Model.ErrTableUnionHandleTypeMismatch
ErrTableUnionSessionMismatch{} -> Model.ErrTableUnionSessionMismatch

handleSnapshotExistsError :: SnapshotExistsError -> Model.Err
handleSnapshotExistsError = \case
Expand Down