From 2701b341d2c081966ee81e60f51dd9fcd441b462 Mon Sep 17 00:00:00 2001 From: Dominic Mayhew Date: Tue, 28 May 2024 16:18:34 -0700 Subject: [PATCH] Provide DB-specific locations through conditional compilation Use a single data structure, `databaseLocation`, to describe either a work directory or in-memory when compiled for SQLite or a remote URL when compiled for PostgreSQL. This provides strong type safety but comes at the cost of having to include a argument parser in the DB modules, which is not really where it belongs. Alternatively, conditional compilation could be added to the argument parsing module, but that also seems out of place. Ultimately, including all 3 alternatives in `databaseLocation` and validating at runtime may be a cleaner solution, despite the loss of type safety. --- src/Kupo.hs | 8 +- src/Kupo/App/Database.hs | 9 +- src/Kupo/App/Database/Postgres.hs | 533 +++++++--------------------- src/Kupo/App/Database/SQLite.hs | 58 ++- src/Kupo/App/Database/Types.hs | 8 +- src/Kupo/Data/Configuration.hs | 25 +- src/Kupo/Options.hs | 20 +- test/Test/Kupo/AppSpec.hs | 4 +- test/Test/Kupo/Data/DatabaseSpec.hs | 65 ++-- test/Test/Kupo/OptionsSpec.hs | 10 +- test/Test/KupoSpec.hs | 16 +- 11 files changed, 250 insertions(+), 506 deletions(-) diff --git a/src/Kupo.hs b/src/Kupo.hs index 98d5f19..981c796 100644 --- a/src/Kupo.hs +++ b/src/Kupo.hs @@ -64,6 +64,7 @@ import Kupo.App.Database ( ConnectionType (..) , DBPool (..) , copyDatabase + , mkDBPool ) import Kupo.App.Health ( connectionStatusToggle @@ -132,9 +133,6 @@ import System.Exit ( ExitCode (..) ) -import Kupo.App.Database.SQLite - ( mkDBPool - ) import qualified Kupo.Data.Health as Health -- @@ -190,7 +188,7 @@ kupoWith tr withProducer withFetchBlock = , configuration = config@Configuration { serverHost , serverPort - , workDir + , databaseLocation , inputManagement , longestRollback , deferIndexes @@ -205,7 +203,7 @@ kupoWith tr withProducer withFetchBlock = -- , maxConcurrentWriters = if isReadOnlyReplica config then 0 else maxConcurrentWriters -- } - dbPool <- liftIO $ mkDBPool (isReadOnlyReplica config) (tracerDatabase tr) workDir longestRollback + dbPool <- liftIO $ mkDBPool (isReadOnlyReplica config) (tracerDatabase tr) databaseLocation longestRollback let run action | isReadOnlyReplica config = diff --git a/src/Kupo/App/Database.hs b/src/Kupo/App/Database.hs index e3e738b..8952839 100644 --- a/src/Kupo/App/Database.hs +++ b/src/Kupo/App/Database.hs @@ -34,9 +34,13 @@ module Kupo.App.Database , rollbackQryDeleteCheckpoints -- * Setup - , copyDatabase , ConnectionType (..) , DBPool (..) + , mkDBPool + , copyDatabase + + -- ** Option Parser + , databaseLocationOptionParser -- * Internal , installIndexes @@ -50,10 +54,11 @@ module Kupo.App.Database import Kupo.App.Database.Postgres #else import Kupo.App.Database.SQLite +#endif + import Kupo.App.Database.Types ( ConnectionType (..) , DBPool (..) , DBTransaction , Database (..) ) -#endif diff --git a/src/Kupo/App/Database/Postgres.hs b/src/Kupo/App/Database/Postgres.hs index d16e2e9..1fabb42 100644 --- a/src/Kupo/App/Database/Postgres.hs +++ b/src/Kupo/App/Database/Postgres.hs @@ -8,14 +8,12 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TemplateHaskell #-} -module Kupo.App.Database.SQLite - ( -- * Database DSL - Database (..) - , DBTransaction +module Kupo.App.Database.Postgres + ( -- ** Queries -- *** Inputs - , deleteInputsQry + deleteInputsQry , markInputsQry , pruneInputsQry , foldInputsQry @@ -36,34 +34,22 @@ module Kupo.App.Database.SQLite , rollbackQryDeleteCheckpoints -- * Setup - , newDatabaseFile - , createShortLivedConnection - , withLongLivedConnection - , withShortLivedConnection + , mkDBPool , copyDatabase - , Connection - , ConnectionType (..) - , DatabaseFile (..) -- * Internal , installIndexes , installIndex - -- ** Lock - , DBLock - , newLock - -- * Tracer , TraceDatabase (..) - , mkDBPool + , databaseLocationOptionParser ) where import Kupo.Prelude import Control.Exception - ( IOException - , handle - , mask + ( mask , onException , throwIO ) @@ -74,17 +60,10 @@ import Control.Tracer import Data.FileEmbed ( embedFile ) -import Data.Scientific - ( scientific - ) import Data.Severity ( HasSeverityAnnotation (..) , Severity (..) ) -import Data.Text.Lazy.Builder.Scientific - ( FPFormat (Fixed) - , formatScientificBuilder - ) import Database.SQLite.Simple ( Connection , Error (..) @@ -92,7 +71,6 @@ import Database.SQLite.Simple , Query (..) , SQLData (..) , SQLError (..) - , SQLOpenFlag (..) , ToRow (..) , changes , execute @@ -101,16 +79,12 @@ import Database.SQLite.Simple , nextRow , query_ , totalChanges - , withConnection' , withStatement ) import GHC.TypeLits ( KnownSymbol , symbolVal ) -import Kupo.Control.MonadAsync - ( concurrently_ - ) import Kupo.Control.MonadCatch ( catch ) @@ -119,14 +93,6 @@ import Kupo.Control.MonadDelay ) import Kupo.Control.MonadLog ( TraceProgress (..) - , nullTracer - ) -import Kupo.Control.MonadSTM - ( MonadSTM (..) - ) -import Kupo.Control.MonadThrow - ( bracket - , bracket_ ) import Kupo.Control.MonadTime ( DiffTime @@ -138,7 +104,6 @@ import Kupo.Data.Cardano import Kupo.Data.Configuration ( DeferIndexesInstallation (..) , LongestRollback (..) - , WorkDir , pruneInputsMaxIncrement ) import Kupo.Data.Database @@ -159,281 +124,40 @@ import Kupo.Data.Pattern import Numeric ( Floating (..) ) -import System.Directory - ( Permissions (..) - , copyFile - , createDirectoryIfMissing - , doesFileExist - , getCurrentDirectory - , getPermissions - , removePathForcibly - ) -import System.FilePath - ( () - ) -import System.IO.Error - ( isAlreadyExistsError - ) -import Control.Concurrent - ( getNumCapabilities - ) import qualified Data.Char as Char -import Data.Pool - ( Pool - , defaultPoolConfig - , destroyAllResources - , newPool - , tryWithResource - , withResource - ) import qualified Data.Text as T -import qualified Data.Text.Lazy.Builder as T import qualified Data.Text.Lazy.Builder as TL import qualified Database.SQLite.Simple as Sqlite import Kupo.App.Database.Types + ( ConnectionType (..) + , Database (..) + , MakeDatabasePool + ) import qualified Kupo.Data.Configuration as Configuration + ( DatabaseLocation (..) + ) import qualified Kupo.Data.Database as DB +import Options.Applicative + ( Parser + , help + , long + , metavar + , option + , str + ) -data DatabaseFile = OnDisk !FilePath | InMemory !(Maybe FilePath) - deriving (Generic, Eq, Show) - -data NewDatabaseFileException - = FailedToAccessOrCreateDatabaseFile { reason :: FailedToCreateDatabaseFileReason } - deriving (Show) - -instance Exception NewDatabaseFileException - -data FailedToCreateDatabaseFileReason - = SpecifiedPathIsAFile { path :: !FilePath } - | SpecifiedPathIsReadOnly { path :: !FilePath } - | SomeUnexpectedErrorOccured { error :: !IOException } - deriving (Show) - --- | Create a new 'DatabaseFile' in the expected workding directory. Create the target --- directory (recursively) if it doesn't exist. -newDatabaseFile - :: (MonadIO m) - => Tracer IO TraceDatabase - -> Configuration.WorkDir - -> m DatabaseFile -newDatabaseFile tr = \case - Configuration.InMemory -> do - return $ InMemory Nothing - Configuration.Dir dir -> - OnDisk <$> newDatabaseOnDiskFile tr (traceWith tr . DatabaseCreateNew) dir - -newDatabaseOnDiskFile - :: (MonadIO m) - => Tracer IO TraceDatabase - -> (FilePath -> IO ()) - -> FilePath - -> m FilePath -newDatabaseOnDiskFile tr onFileMissing dir = liftIO $ do - absoluteDir <- ( dir) <$> getCurrentDirectory - handle (onAlreadyExistsError absoluteDir) $ createDirectoryIfMissing True dir - permissions <- getPermissions absoluteDir - unless (writable permissions) $ bail (SpecifiedPathIsReadOnly absoluteDir) - let dbFile = absoluteDir "kupo.sqlite3" - unlessM (doesFileExist dbFile) $ onFileMissing dbFile - return dbFile - where - bail absoluteDir = do - traceWith tr $ DatabasePathMustBeDirectory - { hint = "The path you've specified as working directory is a file; you probably meant to \ - \point to the parent directory instead. Don't worry about the database file, \ - \I'll manage it myself." - } - throwIO (FailedToAccessOrCreateDatabaseFile absoluteDir) - onAlreadyExistsError absoluteDir e - | isAlreadyExistsError e = do - bail (SpecifiedPathIsAFile absoluteDir) - | otherwise = - bail (SomeUnexpectedErrorOccured e) - --- | Construct a connection string for the SQLite database. This utilizes (and assumes) the URI --- recognition from SQLite to choose between read-only or read-write database. By default also, when --- no filepath is provided, the database is created in-memory with a shared cache. --- --- For testing purpose however, it is also possible to create a in-memory database in isolation by --- simply passing `:memory:` as a filepath. -mkConnectionString - :: DatabaseFile - -> ConnectionType - -> (String, [SQLOpenFlag]) -mkConnectionString filePath mode = - case filePath of - OnDisk fp -> - ("file:" <> fp, SQLOpenNoMutex : openFlags) - InMemory Nothing -> - ("file::kupo:?mode=memory&cache=shared", openFlags) - InMemory (Just fp) -> - (fp, SQLOpenMemory : openFlags) - where - openFlags = case mode of - ReadOnly -> [SQLOpenReadOnly] - ReadWrite -> [SQLOpenReadWrite, SQLOpenCreate] - WriteOnly -> [SQLOpenReadWrite, SQLOpenCreate] - --- | A short-lived connection meant to be used in a resource-pool. These connections can be opened --- either as read-only connection or read-write; depending on the client needs. Read-only connections --- are non-blocking and can access data even when the database is being written concurrently. -createShortLivedConnection - :: Tracer IO TraceDatabase - -> ConnectionType - -> DBLock IO - -> LongestRollback - -> DatabaseFile - -> IO (Database IO) -createShortLivedConnection tr mode (DBLock shortLived longLived) k file = do - traceWith tr $ DatabaseConnection ConnectionCreateShortLived{mode} - - let (str, flags) = mkConnectionString file mode - - !conn <- Sqlite.open' str flags - - forM_ ["PRAGMA cache_size = 1024"] $ \pragma -> - handle - (\(_ :: SomeException) -> traceWith trConn ConnectionFailedPragma{pragma}) - (execute_ conn (Query pragma)) - - return $ mkDatabase trConn mode k (bracketConnection conn) - where - trConn :: Tracer IO TraceConnection - trConn = contramap DatabaseConnection tr - - bracketConnection :: Connection -> (forall a. ((Connection -> IO a) -> IO a)) - bracketConnection conn between = - case mode of - WriteOnly -> - between conn - ReadOnly -> - between conn - ReadWrite -> - bracket_ - -- read-write connections only run when the longLived isn't busy working. Multiple - -- short-lived read-write connections may still conflict with one another, but - -- since they mostly are one-off requests, we simply retry them when busy/locked. - (atomically $ do - readTVar longLived >>= check . not - modifyTVar' shortLived next - ) - (atomically (modifyTVar' shortLived prev)) - (between conn) - -withShortLivedConnection - :: Tracer IO TraceDatabase - -> ConnectionType - -> DBLock IO - -> LongestRollback - -> DatabaseFile - -> (Database IO -> IO a) - -> IO a -withShortLivedConnection tr mode lock k file action = do - bracket - (createShortLivedConnection tr mode lock k file) - (\Database{close} -> close) - action - --- | A resource acquisition bracket for a single long-lived connection. The system is assumed to use --- with only once, at the application start-up and provide this connection to a privileged one which --- takes priority over any other connections. --- --- It is therefore also the connection from which we check for and run database migrations when --- needed. Note that this bracket will also create the database if it doesn't exist. -withLongLivedConnection - :: Tracer IO TraceDatabase - -> DBLock IO - -> LongestRollback - -> DatabaseFile - -> DeferIndexesInstallation - -> (Database IO -> IO a) - -> IO a -withLongLivedConnection tr (DBLock shortLived longLived) k file deferIndexes action = do - let (str, flags) = mkConnectionString file ReadWrite - withConnection' str flags $ \conn -> do - execute_ conn "PRAGMA page_size = 32768" - execute_ conn "PRAGMA cache_size = 1024" - execute_ conn "PRAGMA synchronous = NORMAL" - execute_ conn "PRAGMA journal_mode = WAL" - execute_ conn "PRAGMA optimize" - databaseVersion conn >>= runMigrations tr conn - installIndexes tr conn deferIndexes - execute_ conn "PRAGMA foreign_keys = ON" - action (mkDatabase (contramap DatabaseConnection tr) ReadWrite k (bracketConnection conn)) - where - bracketConnection :: Connection -> (forall a. ((Connection -> IO a) -> IO a)) - bracketConnection conn between = - bracket_ - (do - atomically (writeTVar longLived True) -- acquire - atomically (readTVar shortLived >>= check . (== 0)) -- wait for read-write short-lived - ) - (atomically $ writeTVar longLived False) - (between conn) +-- | --remote=URL +databaseLocationOptionParser :: Parser Configuration.DatabaseLocation +databaseLocationOptionParser = fmap Configuration.Remote $ option str $ mempty + <> long "remote" + <> metavar "URL" + <> help "URL of a PostgreSQL server, in the form [user[:password]@][netloc][:port][/dbname][?param1=value1&...]" -- | Create a Database pool that uses separate pools for `ReadOnly` and `ReadWrite` connections. -mkDBPool :: Bool -> (Tracer IO TraceDatabase) -> WorkDir -> LongestRollback -> IO (DBPool IO) -mkDBPool isReadOnly tr workDir longestRollback = do - dbFile <- newDatabaseFile tr workDir - - lock <- liftIO newLock - - (maxConcurrentWriters, maxConcurrentReaders) <- liftIO getNumCapabilities <&> \n -> (n, 5 * n) - - readOnlyPool <- liftIO $ newPool $ defaultPoolConfig - (createShortLivedConnection tr ReadOnly lock longestRollback dbFile) - (\Database{close} -> close) - 600 - maxConcurrentReaders - - readWritePool <- liftIO $ newPool $ defaultPoolConfig - (createShortLivedConnection tr ReadWrite lock longestRollback dbFile) - (\Database{close} -> close) - 30 - maxConcurrentWriters - - let - withDB :: forall a b. (Pool (Database IO) -> (Database IO -> IO a) -> IO b) -> ConnectionType -> (Database IO -> IO a) -> IO b - withDB withRes connType dbAction = - case connType of - ReadOnly -> withRes readOnlyPool dbAction - ReadWrite | isReadOnly -> fail "Cannot acquire a read/write connection on read-only replica" - ReadWrite -> withRes readWritePool dbAction - WriteOnly -> fail "Impossible: tried to acquire a WriteOnly database?" - - withDatabaseExclusiveWriter :: DeferIndexesInstallation -> (Database IO -> IO a) -> (IO a) - withDatabaseExclusiveWriter = - withLongLivedConnection tr lock longestRollback dbFile - - destroyResources = do - destroyAllResources readOnlyPool - destroyAllResources readWritePool - - return DBPool { tryWithDatabase = withDB tryWithResource, withDatabase = withDB withResource, withDatabaseExclusiveWriter, destroyResources } - - --- It is therefore also the connection from which we check for and run database migrations when --- needed. Note that this bracket will also create the database if it doesn't exist. -withWriteOnlyConnection - :: DatabaseFile - -> (Sqlite.Connection -> Database IO -> IO a) - -> IO a -withWriteOnlyConnection file action = do - let (str, flags) = mkConnectionString file WriteOnly - withConnection' str flags $ \conn -> do - databaseVersion conn >>= runMigrations nullTracer conn - installIndexes nullTracer conn SkipNonEssentialIndexes - execute_ conn "PRAGMA synchronous = OFF" - execute_ conn "PRAGMA journal_mode = OFF" - execute_ conn "PRAGMA locking_mode = EXCLUSIVE" - action conn (mkDatabase nullTracer ReadWrite k (bracketConnection conn)) - where - k = LongestRollback maxBound - - bracketConnection :: Connection -> (forall a. ((Connection -> IO a) -> IO a)) - bracketConnection conn between = - between conn +-- This function creates a database file if it does not already exist. +mkDBPool :: MakeDatabasePool (Tracer IO TraceDatabase) +mkDBPool isReadOnly tr workDir longestRollback = undefined data CopyException = ErrCopyEmptyPatterns { hint :: Text } @@ -456,108 +180,103 @@ copyDatabase -> FilePath -> Set Pattern -> IO () -copyDatabase (tr, progress) fromDir intoDir patterns = do - when (null patterns) $ do - throwIO ErrCopyEmptyPatterns - { hint = "No patterns provided for copy. At least one is required." } - - fromFile <- newDatabaseOnDiskFile tr (throwIO . ErrMissingSourceDatabase) fromDir - intoFile <- newDatabaseOnDiskFile tr (traceWith tr . DatabaseCreateNew) intoDir - - cleanupFile <- newCleanupAction intoFile - - handle cleanupFile $ do - traceWith tr DatabaseCloneSourceDatabase - copyFile fromFile intoFile - lock <- newLock - withShortLivedConnection tr ReadOnly lock longestRollback (OnDisk fromFile) $ \from -> do - withWriteOnlyConnection (OnDisk intoFile) $ \conn into -> do - execute_ conn "PRAGMA foreign_keys = OFF" - mapM_ (cleanup conn) ["inputs", "policies", "patterns"] - runTransaction into (insertPatterns into patterns) - forM_ patterns $ \pattern_ -> do - traceWith tr $ DatabaseImportTable { table = "inputs", pattern = patternToText pattern_ } - copyTable - (runTransaction from $ countInputs from pattern_) - (runTransaction from . foldInputs from pattern_ Whole NoStatusFlag Asc) - (runTransaction into . insertInputs into) - DB.resultToRow - traceWith tr $ DatabaseImportTable { table = "policies", pattern = patternToText pattern_ } - copyTable - (runTransaction from $ countPolicies from pattern_) - (runTransaction from . foldPolicies from pattern_) - (runTransaction into . insertPolicies into . fromList) - identity - traceWith tr DatabaseCopyFinalize - execute_ conn "VACUUM" - execute_ conn "PRAGMA optimize" - where - longestRollback :: LongestRollback - longestRollback = - LongestRollback maxBound - - cleanup :: Connection -> Text -> IO () - cleanup conn table = do - traceWith tr $ DatabaseCleanupOldData { table } - execute_ conn $ Query $ "DELETE FROM " <> table - - newCleanupAction :: FilePath -> IO (SomeException -> IO a) - newCleanupAction filePath = do - whenM (doesFileExist filePath) (throwIO $ ErrTargetAlreadyExists { target = filePath }) - return $ \(e :: SomeException) -> do - traceWith tr DatabaseRemoveIncompleteCopy { filePath } - removePathForcibly filePath - throwIO e - - copyTable - :: IO Integer - -> ((result -> IO ()) -> IO ()) - -> ([row] -> IO ()) - -> (result -> row) - -> IO () - copyTable countTable foldTable insertTable mkRow = do - queue <- newTBQueueIO 10_000 - done <- newTVarIO False - total <- countTable - concurrently_ - (do - foldTable $ \result -> - atomically $ writeTBQueue queue (mkRow result) - atomically $ writeTVar done True - ) - ( let loop n = do - results <- atomically $ do - isDone <- readTVar done - isEmpty <- isEmptyTBQueue queue - check (not isEmpty || isDone) - flushTBQueue queue - insertTable results - let len = toInteger (length results) - unless (len == 0) $ do - traceWith progress $ ProgressStep (mkProgress total (n + len)) - loop (n + len) - in loop 0 >> do - traceWith progress ProgressDone - traceWith tr $ DatabaseImported { rows = total } - ) - - mkProgress :: Integer -> Integer -> Text - mkProgress total n = - scientific (round (double (n * 10000) / double total)) (-2) - & formatScientificBuilder Fixed (Just 2) - & T.toLazyText - & toStrict - & (<> "%") - where - double :: Integer -> Double - double = fromIntegral - --- ** Lock - -data DBLock (m :: Type -> Type) = DBLock !(TVar m Word) !(TVar m Bool) - -newLock :: MonadSTM m => m (DBLock m) -newLock = DBLock <$> newTVarIO 0 <*> newTVarIO True +copyDatabase (tr, progress) fromDir intoDir patterns = + undefined + -- do + -- when (null patterns) $ do + -- throwIO ErrCopyEmptyPatterns + -- { hint = "No patterns provided for copy. At least one is required." } + + -- fromFile <- newDatabaseOnDiskFile tr (throwIO . ErrMissingSourceDatabase) fromDir + -- intoFile <- newDatabaseOnDiskFile tr (traceWith tr . DatabaseCreateNew) intoDir + + -- cleanupFile <- newCleanupAction intoFile + + -- handle cleanupFile $ do + -- traceWith tr DatabaseCloneSourceDatabase + -- copyFile fromFile intoFile + -- lock <- newLock + -- withShortLivedConnection tr ReadOnly lock longestRollback (OnDisk fromFile) $ \from -> do + -- withWriteOnlyConnection (OnDisk intoFile) $ \conn into -> do + -- execute_ conn "PRAGMA foreign_keys = OFF" + -- mapM_ (cleanup conn) ["inputs", "policies", "patterns"] + -- runTransaction into (insertPatterns into patterns) + -- forM_ patterns $ \pattern_ -> do + -- traceWith tr $ DatabaseImportTable { table = "inputs", pattern = patternToText pattern_ } + -- copyTable + -- (runTransaction from $ countInputs from pattern_) + -- (runTransaction from . foldInputs from pattern_ Whole NoStatusFlag Asc) + -- (runTransaction into . insertInputs into) + -- DB.resultToRow + -- traceWith tr $ DatabaseImportTable { table = "policies", pattern = patternToText pattern_ } + -- copyTable + -- (runTransaction from $ countPolicies from pattern_) + -- (runTransaction from . foldPolicies from pattern_) + -- (runTransaction into . insertPolicies into . fromList) + -- identity + -- traceWith tr DatabaseCopyFinalize + -- execute_ conn "VACUUM" + -- execute_ conn "PRAGMA optimize" + -- where + -- longestRollback :: LongestRollback + -- longestRollback = + -- LongestRollback maxBound + + -- cleanup :: Connection -> Text -> IO () + -- cleanup conn table = do + -- traceWith tr $ DatabaseCleanupOldData { table } + -- execute_ conn $ Query $ "DELETE FROM " <> table + + -- newCleanupAction :: FilePath -> IO (SomeException -> IO a) + -- newCleanupAction filePath = do + -- whenM (doesFileExist filePath) (throwIO $ ErrTargetAlreadyExists { target = filePath }) + -- return $ \(e :: SomeException) -> do + -- traceWith tr DatabaseRemoveIncompleteCopy { filePath } + -- removePathForcibly filePath + -- throwIO e + + -- copyTable + -- :: IO Integer + -- -> ((result -> IO ()) -> IO ()) + -- -> ([row] -> IO ()) + -- -> (result -> row) + -- -> IO () + -- copyTable countTable foldTable insertTable mkRow = do + -- queue <- newTBQueueIO 10_000 + -- done <- newTVarIO False + -- total <- countTable + -- concurrently_ + -- (do + -- foldTable $ \result -> + -- atomically $ writeTBQueue queue (mkRow result) + -- atomically $ writeTVar done True + -- ) + -- ( let loop n = do + -- results <- atomically $ do + -- isDone <- readTVar done + -- isEmpty <- isEmptyTBQueue queue + -- check (not isEmpty || isDone) + -- flushTBQueue queue + -- insertTable results + -- let len = toInteger (length results) + -- unless (len == 0) $ do + -- traceWith progress $ ProgressStep (mkProgress total (n + len)) + -- loop (n + len) + -- in loop 0 >> do + -- traceWith progress ProgressDone + -- traceWith tr $ DatabaseImported { rows = total } + -- ) + + -- mkProgress :: Integer -> Integer -> Text + -- mkProgress total n = + -- scientific (round (double (n * 10000) / double total)) (-2) + -- & formatScientificBuilder Fixed (Just 2) + -- & T.toLazyText + -- & toStrict + -- & (<> "%") + -- where + -- double :: Integer -> Double + -- double = fromIntegral -- -- IO diff --git a/src/Kupo/App/Database/SQLite.hs b/src/Kupo/App/Database/SQLite.hs index 7a54c4d..2814746 100644 --- a/src/Kupo/App/Database/SQLite.hs +++ b/src/Kupo/App/Database/SQLite.hs @@ -43,6 +43,7 @@ module Kupo.App.Database.SQLite -- * Tracer , TraceDatabase (..) + , databaseLocationOptionParser ) where import Kupo.Prelude @@ -125,7 +126,6 @@ import Kupo.Data.Cardano import Kupo.Data.Configuration ( DeferIndexesInstallation (..) , LongestRollback (..) - , WorkDir , pruneInputsMaxIncrement ) import Kupo.Data.Database @@ -182,10 +182,39 @@ import Kupo.App.Database.Types ( ConnectionType (..) , DBPool (..) , Database (..) + , MakeDatabasePool ) import qualified Kupo.Data.Configuration as Configuration + ( DatabaseLocation (..) + ) import qualified Kupo.Data.Database as DB +import Options.Applicative + ( Parser + , bashCompleter + , completer + , flag' + , help + , long + , metavar + , option + , str + ) +-- | --workdir=DIR | --in-memory +databaseLocationOptionParser :: Parser Configuration.DatabaseLocation +databaseLocationOptionParser = + dirOption <|> inMemoryFlag + where + dirOption = fmap Configuration.OnDisk $ option str $ mempty + <> long "workdir" + <> metavar "DIRECTORY" + <> help "Path to a working directory, where the database is stored." + <> completer (bashCompleter "directory") + + inMemoryFlag = flag' Configuration.InMemory $ mempty + <> long "in-memory" + <> help "Run fully in-memory, data is short-lived and lost when the process exits." + data DatabaseFile = OnDisk !FilePath | InMemory !(Maybe FilePath) deriving (Generic, Eq, Show) @@ -198,20 +227,19 @@ instance Exception NewDatabaseFileException data FailedToCreateDatabaseFileReason = SpecifiedPathIsAFile { path :: !FilePath } | SpecifiedPathIsReadOnly { path :: !FilePath } + | RemoteURLSpecified { url :: !String } -- // TODO: Change to URL | SomeUnexpectedErrorOccured { error :: !IOException } deriving (Show) --- | Create a new 'DatabaseFile' in the expected workding directory. Create the target +-- | Create a new database file in the expected workding directory. Create the target -- directory (recursively) if it doesn't exist. -newDatabaseFile - :: (MonadIO m) - => Tracer IO TraceDatabase - -> Configuration.WorkDir - -> m DatabaseFile +newDatabaseFile :: + Tracer IO TraceDatabase + -> Configuration.DatabaseLocation + -> IO DatabaseFile newDatabaseFile tr = \case - Configuration.InMemory -> do - return $ InMemory Nothing - Configuration.Dir dir -> + Configuration.InMemory -> return $ InMemory Nothing + Configuration.OnDisk dir -> OnDisk <$> newDatabaseOnDiskFile tr (traceWith tr . DatabaseCreateNew) dir newDatabaseOnDiskFile @@ -365,9 +393,9 @@ withLongLivedConnection tr (DBLock shortLived longLived) k file deferIndexes act -- | Create a Database pool that uses separate pools for `ReadOnly` and `ReadWrite` connections. -- This function creates a database file if it does not already exist. -mkDBPool :: Bool -> (Tracer IO TraceDatabase) -> WorkDir -> LongestRollback -> IO (DBPool IO) -mkDBPool isReadOnly tr workDir longestRollback = do - dbFile <- newDatabaseFile tr workDir +mkDBPool :: MakeDatabasePool (Tracer IO TraceDatabase) +mkDBPool isReadOnly tr dbLocation longestRollback = do + dbFile <- newDatabaseFile tr dbLocation lock <- liftIO newLock @@ -1316,6 +1344,9 @@ data TraceDatabase where DatabaseDebug :: Text -> TraceDatabase + DatabaseMustBeLocal + :: Text + -> TraceDatabase deriving stock (Generic, Show) instance ToJSON TraceDatabase where @@ -1341,6 +1372,7 @@ instance HasSeverityAnnotation TraceDatabase where DatabaseRemoveIncompleteCopy{} -> Notice DatabaseCopyFinalize{} -> Notice DatabaseDebug{} -> Warning + DatabaseMustBeLocal{} -> Error data TraceConnection where ConnectionCreateShortLived diff --git a/src/Kupo/App/Database/Types.hs b/src/Kupo/App/Database/Types.hs index c750289..06ed0b4 100644 --- a/src/Kupo/App/Database/Types.hs +++ b/src/Kupo/App/Database/Types.hs @@ -9,6 +9,7 @@ module Kupo.App.Database.Types -- * Setup , ConnectionType (..) , DBPool (..) + , MakeDatabasePool ) where @@ -26,7 +27,8 @@ import Kupo.Data.Cardano , SlotNo (..) ) import Kupo.Data.Configuration - ( DeferIndexesInstallation + ( DatabaseLocation + , DeferIndexesInstallation , LongestRollback (..) ) import Kupo.Data.Database @@ -169,3 +171,7 @@ data DBPool m = DBPool { , withDatabaseExclusiveWriter :: forall a. DeferIndexesInstallation -> (Database m -> m a) -> m a , destroyResources :: m () } + +type MakeDatabasePool tr = IsReadOnlyReplica -> tr -> DatabaseLocation -> LongestRollback -> IO (DBPool IO) + +type IsReadOnlyReplica = Bool diff --git a/src/Kupo/Data/Configuration.hs b/src/Kupo/Data/Configuration.hs index 8ea590b..a7458f9 100644 --- a/src/Kupo/Data/Configuration.hs +++ b/src/Kupo/Data/Configuration.hs @@ -2,6 +2,7 @@ -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. +{-# LANGUAGE CPP #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} @@ -12,7 +13,7 @@ module Kupo.Data.Configuration ( -- * Configuration Configuration (..) - , WorkDir (..) + , DatabaseLocation (..) , InputManagement (..) , ChainProducer (..) , LongestRollback (..) @@ -80,7 +81,7 @@ data Configuration = Configuration -- This is slightly hacky, but the alternative would be to either split -- this type in two or to introduce some higher-kinded type parameter to -- the record. Both seems overly complicated given the benefits. - , workDir :: !WorkDir + , databaseLocation :: !DatabaseLocation -- ^ Where to store the data: in-memory vs specific location on-disk , serverHost :: !String -- ^ Hostname for the API HTTP server @@ -138,12 +139,16 @@ data ChainProducer -- bounded by the CPU capabilities and the I/O read access. deriving (Generic, Eq, Show) --- | Database working directory. 'in-memory' runs the database in hot memory, +-- | Database location. For SQLite, 'in-memory' runs the database in hot memory, -- only suitable for non-permissive patterns or testing. -data WorkDir - = Dir !FilePath - | InMemory +-- For PostgreSQL, a URL and DB name must be specified +#if postgres +data DatabaseLocation = Remote String deriving (Generic, Eq, Show) +#else +data DatabaseLocation = OnDisk !FilePath | InMemory + deriving (Generic, Eq, Show) +#endif -- | What to do with inputs that are spent. There are two options: -- @@ -197,10 +202,10 @@ data UnableToFetchBlockFromReadOnlyReplicaException = UnableToFetchBlockFromRead deriving (Generic, Eq, Show) instance Exception UnableToFetchBlockFromReadOnlyReplicaException where displayException _ = - "An attempt to fetch a block from a read-only replica has occured likely \ - \caused by a request to access transaction metadata. This is, unfortunately, \ - \not a possible operation from a read-only replica. Only the master server \ - \can do that." + "An attempt to fetch a block from a read-only replica has occured likely \n" ++ + "caused by a request to access transaction metadata. This is, unfortunately, \n" ++ + "not a possible operation from a read-only replica. Only the master server \n" ++ + "can do that." -- Mailbox's capacity, or how many messages can be enqueued in the queue between -- the consumer and the producer workers. More means faster synchronization (up diff --git a/src/Kupo/Options.hs b/src/Kupo/Options.hs index 93f8f92..6a4c56b 100644 --- a/src/Kupo/Options.hs +++ b/src/Kupo/Options.hs @@ -42,6 +42,7 @@ import Kupo.App.Configuration ) import Kupo.App.Database ( TraceDatabase + , databaseLocationOptionParser ) import Kupo.App.Http ( TraceHttpServer @@ -65,9 +66,9 @@ import Kupo.Data.Cardano import Kupo.Data.Configuration ( ChainProducer (..) , Configuration (..) + , DatabaseLocation (..) , DeferIndexesInstallation (..) , InputManagement (..) - , WorkDir (..) ) import Kupo.Data.Pattern ( Pattern @@ -125,7 +126,7 @@ parserInfo = info (helper <*> parser) $ mempty ( Run <$> ( Configuration <$> chainProducerOption - <*> workDirOption + <*> databaseLocationOptionParser <*> serverHostOption <*> serverPortOption <*> optional sinceOption @@ -186,21 +187,6 @@ nodeConfigOption = option str $ mempty <> help "Path to the node configuration file." <> completer (bashCompleter "file") --- | --workdir=DIR | --in-memory -workDirOption :: Parser WorkDir -workDirOption = - dirOption <|> inMemoryFlag - where - dirOption = fmap Dir $ option str $ mempty - <> long "workdir" - <> metavar "DIRECTORY" - <> help "Path to a working directory, where the database is stored." - <> completer (bashCompleter "directory") - - inMemoryFlag = flag' InMemory $ mempty - <> long "in-memory" - <> help "Run fully in-memory, data is short-lived and lost when the process exits." - -- | [--host=IPv4], default: 127.0.0.1 serverHostOption :: Parser String serverHostOption = option str $ mempty diff --git a/test/Test/Kupo/AppSpec.hs b/test/Test/Kupo/AppSpec.hs index 1ee6d6e..8610ba4 100644 --- a/test/Test/Kupo/AppSpec.hs +++ b/test/Test/Kupo/AppSpec.hs @@ -111,10 +111,10 @@ import Kupo.Data.ChainSync import Kupo.Data.Configuration ( ChainProducer (..) , Configuration (..) + , DatabaseLocation (..) , DeferIndexesInstallation (..) , InputManagement (..) , LongestRollback (..) - , WorkDir (..) , mailboxCapacity ) import Kupo.Data.FetchBlock @@ -253,7 +253,7 @@ spec = do { nodeSocket = "/dev/null" , nodeConfig = "/dev/null" } - , workDir = InMemory + , databaseLocation = InMemory , serverHost , serverPort , since = Just GenesisPoint diff --git a/test/Test/Kupo/Data/DatabaseSpec.hs b/test/Test/Kupo/Data/DatabaseSpec.hs index 8c5d394..96f34fd 100644 --- a/test/Test/Kupo/Data/DatabaseSpec.hs +++ b/test/Test/Kupo/Data/DatabaseSpec.hs @@ -25,10 +25,8 @@ import Database.SQLite.Simple ) import Kupo.App.Database ( ConnectionType (..) - , DBLock + , DBPool (..) , Database (..) - , DatabaseFile (..) - , createShortLivedConnection , deleteInputsQry , foldInputsQry , foldPoliciesQry @@ -38,14 +36,13 @@ import Kupo.App.Database , listAncestorQry , listCheckpointsQry , markInputsQry - , newLock + , mkDBPool , pruneBinaryDataQry , pruneInputsQry , rollbackQryDeleteCheckpoints , rollbackQryDeleteInputs , rollbackQryUpdateInputs , selectMaxCheckpointQry - , withLongLivedConnection ) import Kupo.Control.MonadAsync ( mapConcurrently_ @@ -83,8 +80,10 @@ import Kupo.Data.Cardano , slotNoToText ) import Kupo.Data.Configuration - ( DeferIndexesInstallation (..) + ( DatabaseLocation (..) + , DeferIndexesInstallation (..) , LongestRollback (..) + , getLongestRollback ) import Kupo.Data.Database ( SortDirection (..) @@ -328,28 +327,30 @@ spec = parallel $ do monitor (.&&. pAsc .&&. pDesc) context "concurrent read / write" $ do + let k = LongestRollback { getLongestRollback = 42 } mapM_ - (\(title, withDatabaseFile) -> do + (\(title, withDatabasePool) -> do specify ("1 long-lived worker vs 2 short-lived workers (" <> title <> ")") $ do - withDatabaseFile $ \file -> do - lock <- newLock + withDatabasePool $ \pool -> do waitGroup <- newTVarIO False let allow = atomically (writeTVar waitGroup True) let await = atomically (readTVar waitGroup >>= check) mapConcurrently_ identity - [ longLivedWorker file lock allow - , await >> shortLivedWorker file ReadOnly lock - , await >> shortLivedWorker file ReadWrite lock + [ longLivedWorker pool allow + , await >> shortLivedWorker pool ReadOnly + , await >> shortLivedWorker pool ReadWrite ] ) [ ( "in-memory" - , \test -> - test (InMemory (Just "file::concurrent-read-write:?cache=shared&mode=memory")) + , \test -> do + test =<< mkDBPool False nullTracer InMemory k + -- // TODO: Previously this used a specific filepath for the in-memory DB. Will using the standard path ruin things? + -- I think if there is a running instance of Kupo and someone runs this test ) , ( "on-disk" - , \test -> - withSystemTempDirectory "kupo-database-concurrent" $ \dir -> - test (OnDisk (dir "db.sqlite3")) + , \test -> + withSystemTempDirectory "kupo-database-concurrent" $ \dir -> do + test =<< mkDBPool False nullTracer (OnDisk (dir "db.sqlite3")) k ) ] @@ -1173,9 +1174,9 @@ loudly e = do print e throwIO e -longLivedWorker :: DatabaseFile -> DBLock IO -> IO () -> IO () -longLivedWorker fp lock allow = - handle loudly $ withLongLivedConnection nullTracer lock 42 fp InstallIndexesIfNotExist $ \db -> do +longLivedWorker :: DBPool IO -> IO () -> IO () +longLivedWorker dbPool allow = + handle loudly $ (withDatabaseExclusiveWriter dbPool) InstallIndexesIfNotExist $ \db -> do allow loop db 0 where @@ -1189,12 +1190,10 @@ longLivedWorker fp lock allow = threadDelay ms loop db (next n) -shortLivedWorker :: DatabaseFile -> ConnectionType -> DBLock IO -> IO () -shortLivedWorker fp mode lock = do - handle loudly $ bracket - (createShortLivedConnection nullTracer mode lock 42 fp) - (\Database{close} -> close) - (`loop` 0) +shortLivedWorker :: DBPool IO -> ConnectionType -> IO () +shortLivedWorker dbPool mode = do + handle loudly $ + (withDatabase dbPool) mode (`loop` 0) where loop :: Database IO -> Int -> IO () loop db@Database{..} = \case @@ -1225,8 +1224,7 @@ shortLivedWorker fp mode lock = do pure $ runTransaction $ insertPatterns (fromList [pattern_]) ) , (1, do - pattern_ <- genPattern - pure $ void $ runTransaction $ deletePattern pattern_ + void . runTransaction . deletePattern <$> genPattern ) ] ms <- millisecondsToDiffTime <$> generate (choose (15, 50)) @@ -1363,6 +1361,7 @@ withInMemoryDatabase withInMemoryDatabase = withInMemoryDatabase' run InstallIndexesIfNotExist +-- // TODO: Check this withInMemoryDatabase' :: forall (m :: Type -> Type) b. (Monad m) => (forall a. IO a -> m a) @@ -1371,14 +1370,8 @@ withInMemoryDatabase' -> (Database IO -> IO b) -> m b withInMemoryDatabase' runInIO deferIndexes k action = do - lock <- runInIO newLock - runInIO $ withLongLivedConnection - nullTracer - lock - (LongestRollback k) - (InMemory (Just ":memory:")) - deferIndexes - action + pool <- runInIO $ mkDBPool False nullTracer InMemory (LongestRollback { getLongestRollback = k }) + runInIO $ (withDatabase pool) ReadWrite action forAllCheckpoints :: Testable prop diff --git a/test/Test/Kupo/OptionsSpec.hs b/test/Test/Kupo/OptionsSpec.hs index 39c5bca..f2c51d9 100644 --- a/test/Test/Kupo/OptionsSpec.hs +++ b/test/Test/Kupo/OptionsSpec.hs @@ -29,8 +29,8 @@ import Kupo.Data.Cardano import Kupo.Data.Configuration ( ChainProducer (..) , Configuration (..) + , DatabaseLocation (..) , InputManagement (..) - , WorkDir (..) ) import Kupo.Data.Pattern ( MatchBootstrap (..) @@ -100,7 +100,7 @@ spec = parallel $ do { nodeSocket = "./node.socket" , nodeConfig = "./node.config" } - , workDir = InMemory + , databaseLocation = InMemory } ) , ( defaultArgs' @@ -109,7 +109,7 @@ spec = parallel $ do { ogmiosHost = "localhost" , ogmiosPort = 1337 } - , workDir = InMemory + , databaseLocation = InMemory } ) , ( defaultArgs'' @@ -118,14 +118,14 @@ spec = parallel $ do { hydraHost = "localhost" , hydraPort = 4001 } - , workDir = InMemory + , databaseLocation = InMemory } ) , ( filter (/= "--in-memory") defaultArgs ++ [ "--workdir", "./workdir" ] , shouldParseAppConfiguration $ defaultConfiguration - { workDir = Dir "./workdir" + { databaseLocation = OnDisk "./workdir" } ) , ( defaultArgs ++ [ "--host", "0.0.0.0" ] diff --git a/test/Test/KupoSpec.hs b/test/Test/KupoSpec.hs index 9ef37e3..b60a12c 100644 --- a/test/Test/KupoSpec.hs +++ b/test/Test/KupoSpec.hs @@ -82,9 +82,9 @@ import Kupo.Data.ChainSync import Kupo.Data.Configuration ( ChainProducer (..) , Configuration (..) + , DatabaseLocation (..) , DeferIndexesInstallation (..) , InputManagement (..) - , WorkDir (..) ) import Kupo.Data.Http.GetCheckpointMode ( GetCheckpointMode (..) @@ -217,7 +217,7 @@ spec :: Spec spec = skippableContext "End-to-end" $ do endToEnd "can connect" $ \(configure, runSpec, HttpClient{..}) -> do (_cfg, env) <- configure $ \defaultCfg -> defaultCfg - { workDir = InMemory + { databaseLocation = InMemory , since = Just GenesisPoint , patterns = fromList [MatchAny OnlyShelley] } @@ -228,7 +228,7 @@ spec = skippableContext "End-to-end" $ do endToEnd "in-memory" $ \(configure, runSpec, HttpClient{..}) -> do (cfg, env) <- configure $ \defaultCfg -> defaultCfg - { workDir = InMemory + { databaseLocation = InMemory , since = Just lastByronPoint , patterns = fromList [MatchAny IncludingBootstrap] , deferIndexes = SkipNonEssentialIndexes @@ -419,7 +419,7 @@ spec = skippableContext "End-to-end" $ do (xs, ys) <- readIORef ref withSystemTempDirectory "kupo-end-to-end" $ \tmp' -> do (_, env') <- configure $ \defaultCfg -> defaultCfg - { workDir = Dir tmp' + { databaseLocation = OnDisk tmp' , since = Just lastByronPoint , patterns = fromList [MatchDelegation someOtherStakeKey] } @@ -572,7 +572,7 @@ skippableContext prefix skippableSpec = do manager <- runIO $ newManager defaultManagerSettings let defaultCfg = Configuration { chainProducer = CardanoNode { nodeSocket, nodeConfig } - , workDir = InMemory + , databaseLocation = InMemory , serverHost = "127.0.0.1" , serverPort = 0 , since = Nothing @@ -593,7 +593,7 @@ skippableContext prefix skippableSpec = do defaultManagerSettings { managerResponseTimeout = responseTimeoutNone } let defaultCfg = Configuration { chainProducer = Ogmios { ogmiosHost, ogmiosPort } - , workDir = InMemory + , databaseLocation = InMemory , serverHost = "127.0.0.1" , serverPort = 0 , since = Nothing @@ -614,7 +614,7 @@ skippableContext prefix skippableSpec = do defaultManagerSettings { managerResponseTimeout = responseTimeoutNone } let defaultCfg = Configuration { chainProducer = Hydra {hydraHost, hydraPort} - , workDir = InMemory + , databaseLocation = InMemory , serverHost = "127.0.0.1" , serverPort = 0 , since = Nothing @@ -642,7 +642,7 @@ skippableContext prefix skippableSpec = do withSystemTempDirectory "kupo-end-to-end" $ \dir -> do action ( \mkConfig -> do - let cfg = mkConfig (defaultCfg { serverPort, workDir = Dir dir }) + let cfg = mkConfig (defaultCfg { serverPort, databaseLocation = OnDisk dir }) (cfg,) <$> newEnvironmentWith throwIO cfg , \env t test -> do withTempFile dir "traces" $ \fp h -> do