Skip to content

Commit

Permalink
Properly handle 'SQLITE_BUSY' in the context of a connection pool
Browse files Browse the repository at this point in the history
  I ran into quite a few issues with the integration tests since the
  unliftio merge and rebase (I think, as I am pretty I did observe unit
  and integration tests doing just fine with the resource pool at least
  once). I've been investigating this for most of the day, and found a few
  interesting cases:

  (a) SQLite may return 'SQLITE_BUSY' on pretty much any requests if two
      concurrent write queries hit the engine; though we
      currently only catch this kind of exception when we try closing the
      database so I generalized a bit our error handling here.

  (b) It seems that calling destroyAllResources from resource-pool does not
      prevent new threads from acquiring new resources. And there's no way
      with the resource-pool library itself to prevent the creation of
      new resources after a certain point. So it may happen that while the
      database layer is being destroyed, new database connections are created
      and start causing conflicts between each others.
  • Loading branch information
KtorZ authored and Anviking committed Jan 25, 2021
1 parent 77ab00a commit 2f37250
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 102 deletions.
1 change: 1 addition & 0 deletions lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ library
, servant-server
, split
, statistics
, stm
, streaming-commons
, strict-non-empty-containers
, string-interpolate
Expand Down
181 changes: 126 additions & 55 deletions lib/core/src/Cardano/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
Expand All @@ -28,7 +29,7 @@ module Cardano.DB.Sqlite
, dbChunked'
, destroyDBLayer
, handleConstraint
, startSqliteBackend
, newSqliteContext
, unsafeRunQuery

-- * Manual Migration
Expand All @@ -53,20 +54,28 @@ import Cardano.DB.Sqlite.Delete
( DeleteSqliteDatabaseLog )
import Cardano.Wallet.Logging
( BracketLog, bracketTracer )
import Control.Concurrent.STM.TVar
( TVar, newTVarIO, readTVarIO, writeTVar )
import Control.Monad
( join, mapM_, when )
import Control.Monad.IO.Unlift
( MonadUnliftIO (..) )
import Control.Monad.Logger
( LogLevel (..) )
import Control.Retry
( constantDelay, limitRetriesByCumulativeDelay, recovering )
( RetryStatus (..)
, constantDelay
, limitRetriesByCumulativeDelay
, recovering
)
import Control.Tracer
( Tracer, contramap, traceWith )
import Data.Aeson
( ToJSON (..) )
import Data.Function
( (&) )
import Data.Functor
( ($>), (<&>) )
import Data.List
( isInfixOf )
import Data.List.Split
Expand Down Expand Up @@ -104,16 +113,17 @@ import Database.Persist.Sqlite
import Database.Sqlite
( Error (ErrorConstraint), SqliteException (SqliteException) )
import Fmt
( fmt, (+|), (+||), (|+), (||+) )
( fmt, ordinalF, (+|), (+||), (|+), (||+) )
import GHC.Generics
( Generic )
import System.Log.FastLogger
( fromLogStr )
import UnliftIO.Compat
( handleIf, mkRetryHandler )
import UnliftIO.Exception
( Exception, bracket_, handleJust, mask_, tryJust )
( Exception, bracket_, handleJust, mask_, throwIO, tryJust )

import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Char8 as B8
import qualified Data.Text as T
Expand All @@ -129,12 +139,24 @@ import qualified Database.Sqlite as Sqlite
data SqliteContext = SqliteContext
{ connectionPool :: Pool (SqlBackend, Sqlite.Connection)
-- ^ A handle to the Persistent SQL backend.
, isDatabaseActive :: TVar Bool
-- ^ A mutable reference to know whether the database is 'active'. This is
-- useful to prevent new requests from being accepted when we're trying to
-- shutdown the database. It is actually crucial with the connection pool
-- since, even though we can purge the pool of all existing resources, we
-- can't easily prevent the creation of new resources. This TVar must
-- therefore be used to guard any call to 'withResource'; if 'False', then
-- 'withResource' mustn't be called.
, runQuery :: forall a. SqlPersistT IO a -> IO a
-- ^ 'safely' run a query with logging and lock-protection
, dbFile :: Maybe FilePath
-- ^ The actual database file, if any. If none, runs in-memory
}

data DatabaseIsShuttingDownError = DatabaseIsShuttingDownError deriving Show

instance Exception DatabaseIsShuttingDownError

-- | Error type for when migrations go wrong after opening a database.
newtype MigrationError = MigrationError
{ getMigrationErrorMessage :: Text }
Expand Down Expand Up @@ -174,52 +196,54 @@ handleConstraint e = handleJust select handler . fmap Right
-- | Free all allocated database connections. See also 'destroySqliteBackend'
--
destroyDBLayer :: Tracer IO DBLog -> SqliteContext -> IO ()
destroyDBLayer tr SqliteContext{connectionPool,dbFile} = do
destroyDBLayer tr SqliteContext{connectionPool,isDatabaseActive,dbFile} = do
STM.atomically $ writeTVar isDatabaseActive False
traceWith tr (MsgDestroyConnectionPool dbFile)
destroyAllResources connectionPool

{-------------------------------------------------------------------------------
Internal / Database Setup
-------------------------------------------------------------------------------}

-- | Opens the SQLite database connection, sets up query logging and timing,
-- | Opens the SQLite database connection pool, sets up query logging and timing,
-- runs schema migrations if necessary.
startSqliteBackend
:: ManualMigration
newSqliteContext
:: [ManualMigration]
-> Migration
-> Tracer IO DBLog
-> Maybe FilePath
-> IO (Either MigrationError SqliteContext)
startSqliteBackend manualMigration autoMigration tr fp = do
pool <- createSqlitePool tr fp manualMigration (queryLogFunc tr)
let observe :: IO a -> IO a
observe = bracketTracer (contramap MsgRun tr)
-- runSqlConn is guarded with a lock because it's not threadsafe in general.
-- It is also masked, so that the SqlBackend state is not corrupted if a
-- thread gets cancelled while running a query.
-- See: https://github.com/yesodweb/persistent/issues/981
--
-- Note that `withResource` does already mask async exception but only for
-- dealing with the pool resource acquisition. The action is then ran
-- unmasked with the acquired resource. If an asynchronous exception occurs,
-- the resource is NOT placed back in the pool.
let runQuery :: SqlPersistT IO a -> IO a
runQuery cmd = withResource pool $ \(backend, _) ->
observe $ mask_ $ runSqlConn cmd backend

autoMigrationResult <- withResource pool $ \(backend, connection) -> do
withForeignKeysDisabled tr connection
$ mask_ (runSqlConn (runMigrationQuiet autoMigration) backend)
& tryJust (matchMigrationError @PersistException)
& tryJust (matchMigrationError @SqliteException)
& fmap join
traceWith tr $ MsgMigrations $ fmap length autoMigrationResult
let ctx = SqliteContext pool runQuery fp
case autoMigrationResult of
Left e -> do
destroyDBLayer tr ctx
pure $ Left e
Right _ -> pure $ Right ctx
newSqliteContext manualMigrations autoMigration tr dbFile = do
isDatabaseActive <- newTVarIO True
createSqlitePool tr dbFile manualMigrations autoMigration <&> \case
Left e -> Left e
Right connectionPool ->
let observe :: IO a -> IO a
observe = bracketTracer (contramap MsgRun tr)

-- runSqlConn is guarded with a lock because it's not threadsafe in
-- general.It is also masked, so that the SqlBackend state is not
-- corrupted if a thread gets cancelled while running a query.
-- See: https://github.com/yesodweb/persistent/issues/981
--
-- Note that `withResource` does already mask async exception but
-- only for dealing with the pool resource acquisition. The action
-- is then ran unmasked with the acquired resource. If an
-- asynchronous exception occurs (or actually any exception), the
-- resource is NOT placed back in the pool.
runQuery :: SqlPersistT IO a -> IO a
runQuery cmd = do
readTVarIO isDatabaseActive >>= \case
False -> throwIO DatabaseIsShuttingDownError
True -> withResource connectionPool $
mask_ . observe . retryOnBusy tr . runSqlConn cmd . fst

in Right $ SqliteContext
{ connectionPool
, isDatabaseActive
, runQuery
, dbFile
}

-- | Finalize database statements and close the database connection.
--
Expand All @@ -235,7 +259,7 @@ destroySqliteBackend
-> IO ()
destroySqliteBackend tr sqlBackend dbFile = do
traceWith tr (MsgCloseSingleConnection dbFile)
recovering pol (mkRetryHandler isBusy) (const $ close' sqlBackend)
retryOnBusy tr (close' sqlBackend)
& handleIf isAlreadyClosed
(traceWith tr . MsgIsAlreadyClosed . showT)
& handleIf statementAlreadyFinalized
Expand All @@ -255,11 +279,33 @@ destroySqliteBackend tr sqlBackend dbFile = do
showT :: Show a => a -> Text
showT = T.pack . show

-- | Retry an action if the database yields an 'SQLITE_BUSY' error.
--
-- From <https://www.sqlite.org/rescode.html#busy>
--
-- The SQLITE_BUSY result code indicates that the database file could not be
-- written (or in some cases read) because of concurrent activity by some
-- other database connection, usually a database connection in a separate
-- process.
--
-- For example, if process A is in the middle of a large write transaction
-- and at the same time process B attempts to start a new write transaction,
-- process B will get back an SQLITE_BUSY result because SQLite only supports
-- one writer at a time. Process B will need to wait for process A to finish
-- its transaction before starting a new transaction. The sqlite3_busy_timeout()
-- and sqlite3_busy_handler() interfaces and the busy_timeout pragma are
-- available to process B to help it deal with SQLITE_BUSY errors.
--
retryOnBusy :: Tracer IO DBLog -> IO a -> IO a
retryOnBusy tr action =
recovering policy (mkRetryHandler isBusy) $ \RetryStatus{rsIterNumber} -> do
when (rsIterNumber > 0) $ traceWith tr (MsgRetryOnBusy rsIterNumber)
action
where
isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy)
pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms)
policy = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms)
ms = 1000 -- microseconds in a millisecond


-- | Run the given task in a context where foreign key constraints are
-- /temporarily disabled/, before re-enabling them.
--
Expand Down Expand Up @@ -363,35 +409,53 @@ newtype ManualMigration = ManualMigration
createSqlitePool
:: Tracer IO DBLog
-> Maybe FilePath
-> ManualMigration
-> LogFunc
-> IO (Pool (SqlBackend, Sqlite.Connection))
createSqlitePool tr fp migration logFunc = do
-> [ManualMigration]
-> Migration
-> IO (Either MigrationError (Pool (SqlBackend, Sqlite.Connection)))
createSqlitePool tr fp migrations autoMigration = do
let connStr = sqliteConnStr fp
let info = mkSqliteConnectionInfo connStr
traceWith tr $ MsgConnStr connStr

let createConnection = do
let info = mkSqliteConnectionInfo connStr
conn <- Sqlite.open connStr
executeManualMigration migration conn
backend <- wrapConnectionInfo info conn logFunc
pure (backend, conn)
(,conn) <$> wrapConnectionInfo info conn (queryLogFunc tr)

let destroyConnection = \(backend, _) -> do
destroySqliteBackend tr backend fp

createPool
pool <- createPool
createConnection
destroyConnection
numberOfStripes
timeToLive
maximumConnections

-- Run migrations BEFORE making the pool widely accessible to other threads.
-- This works fine for the :memory: case because there's a single connection
-- in the pool, so the next 'withResource' will get exactly this
-- connection.
migrationResult <- withResource pool $ \(backend, conn) -> mask_ $ do
let executeAutoMigration = runSqlConn (runMigrationQuiet autoMigration) backend
migrationResult <- withForeignKeysDisabled tr conn $ do
mapM_ (`executeManualMigration` conn) migrations
executeAutoMigration
& tryJust (matchMigrationError @PersistException)
& tryJust (matchMigrationError @SqliteException)
& fmap join
traceWith tr $ MsgMigrations $ fmap length migrationResult
return migrationResult

case migrationResult of
Left e -> destroyAllResources pool $> Left e
Right{} -> return (Right pool)
where
numberOfStripes = 1
timeToLive = 600 :: NominalDiffTime
-- When running in :memory:, we want a single connection that does not get
-- cleaned up.
-- cleaned up. Indeed, the pool will regularly remove connections, destroying
-- our :memory: database regularly otherwise.
maximumConnections = maybe 1 (const 10) fp
timeToLive = maybe 31536000 {- one year -} (const 600) {- 10 minutes -} fp :: NominalDiffTime

sqliteConnStr :: Maybe FilePath -> Text
sqliteConnStr = maybe ":memory:" T.pack
Expand Down Expand Up @@ -420,6 +484,7 @@ data DBLog
| MsgUpdatingForeignKeysSetting ForeignKeysSetting
| MsgFoundDatabase FilePath Text
| MsgUnknownDBFile FilePath
| MsgRetryOnBusy Int
deriving (Generic, Show, Eq, ToJSON)

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -480,9 +545,9 @@ instance HasSeverityAnnotation DBLog where
MsgMigrations (Left _) -> Error
MsgQuery _ sev -> sev
MsgRun _ -> Debug
MsgConnStr _ -> Debug
MsgCloseSingleConnection _ -> Debug
MsgDestroyConnectionPool _ -> Debug
MsgConnStr _ -> Notice
MsgCloseSingleConnection _ -> Info
MsgDestroyConnectionPool _ -> Notice
MsgWillOpenDB _ -> Info
MsgDatabaseReset -> Notice
MsgIsAlreadyClosed _ -> Warning
Expand All @@ -496,6 +561,9 @@ instance HasSeverityAnnotation DBLog where
MsgUpdatingForeignKeysSetting{} -> Debug
MsgFoundDatabase _ _ -> Info
MsgUnknownDBFile _ -> Notice
MsgRetryOnBusy n | n <= 1 -> Debug
MsgRetryOnBusy n | n <= 3 -> Notice
MsgRetryOnBusy _ -> Warning

instance ToText DBLog where
toText = \case
Expand Down Expand Up @@ -557,6 +625,9 @@ instance ToText DBLog where
[ "Found something other than a database file in "
, "the database folder: ", T.pack file
]
MsgRetryOnBusy n ->
let nF = ordinalF n in
"Retrying db query because db was busy for the " +| nF |+ " time."

{-------------------------------------------------------------------------------
Extra DB Helpers
Expand Down
21 changes: 11 additions & 10 deletions lib/core/src/Cardano/Pool/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Cardano.DB.Sqlite
, destroyDBLayer
, fieldName
, handleConstraint
, startSqliteBackend
, newSqliteContext
, tableName
)
import Cardano.Pool.DB
Expand Down Expand Up @@ -225,7 +225,7 @@ newDBLayer
-> TimeInterpreter IO
-> IO (SqliteContext, DBLayer IO)
newDBLayer trace fp ti = do
let io = startSqliteBackend
let io = newSqliteContext
(migrateManually trace)
migrateAll
(contramap MsgGeneric trace)
Expand Down Expand Up @@ -686,13 +686,14 @@ runRawQuery trace q = do

migrateManually
:: Tracer IO PoolDbLog
-> ManualMigration
-> [ManualMigration]
migrateManually _tr =
ManualMigration $ \conn -> do
createView conn activePoolLifeCycleData
createView conn activePoolOwners
createView conn activePoolRegistrations
createView conn activePoolRetirements
ManualMigration <$>
[ createView activePoolLifeCycleData
, createView activePoolOwners
, createView activePoolRegistrations
, createView activePoolRetirements
]

-- | Represents a database view.
--
Expand All @@ -705,8 +706,8 @@ data DatabaseView = DatabaseView

-- | Creates the specified database view, if it does not already exist.
--
createView :: Sqlite.Connection -> DatabaseView -> IO ()
createView conn (DatabaseView name definition) = do
createView :: DatabaseView -> Sqlite.Connection -> IO ()
createView (DatabaseView name definition) conn = do
deleteQuery <- Sqlite.prepare conn deleteQueryString
Sqlite.step deleteQuery *> Sqlite.finalize deleteQuery
createQuery <- Sqlite.prepare conn createQueryString
Expand Down

0 comments on commit 2f37250

Please sign in to comment.