Skip to content

Commit

Permalink
Merge #979
Browse files Browse the repository at this point in the history
979: Support atomic and isolated executions of the database layers r=KtorZ a=Anviking

# Issue Number

#713 

# Overview

- [x] I have made sure that `putPoolProduction` and `putStakeDistribution` happens in a single Sqlite transaction. If the wallet were to crash, either none of them or both of them succeed.
- [x] Existing tests verify that this works for large batches `~ 100`. I tried writing 2000 pool-productions atomically, and it also worked, but slowed down tests, so I didn't commit it.

# Comments

<!-- Additional comments or screenshots to attach if any -->

<!-- 
Don't forget to:

 ✓ Self-review your changes to make sure nothing unexpected slipped through
 ✓ Assign yourself to the PR
 ✓ Assign one or several reviewer(s)
 ✓ Once created, link this PR to its corresponding ticket
 ✓ Acknowledge any changes required to the Wiki
-->


Co-authored-by: Johannes Lund <johannes.lund@iohk.io>
Co-authored-by: KtorZ <matthias.benkort@gmail.com>
  • Loading branch information
3 people committed Nov 9, 2019
2 parents 2a92a99 + ac68610 commit 1e903c8
Show file tree
Hide file tree
Showing 20 changed files with 801 additions and 786 deletions.
1 change: 1 addition & 0 deletions lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ benchmark db
, text
, text-class
, time
, transformers
type:
exitcode-stdio-1.0
hs-source-dirs:
Expand Down
36 changes: 28 additions & 8 deletions lib/core/src/Cardano/Pool/DB.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}

-- |
Expand All @@ -19,6 +21,8 @@ import Prelude

import Cardano.Wallet.Primitive.Types
( BlockHeader, EpochNo (..), PoolId, SlotId (..) )
import Control.Monad.Fail
( MonadFail )
import Control.Monad.Trans.Except
( ExceptT )
import Data.Map.Strict
Expand All @@ -29,47 +33,63 @@ import Data.Word
( Word64 )

-- | A Database interface for storing pool production in DB.
data DBLayer m = DBLayer
--
-- To use it, you will need the NamedFieldPuns extension and wrap operations
-- with @atomically@:
--
-- Example:
-- >>> :set -XNamedFieldPuns
-- >>> DBLayer{atomically,putPoolProduction} = db
-- >>> atomically $ putPoolProduction blockHeader pool
--
-- This gives you the power to also run /multiple/ operations atomically.
data DBLayer m = forall stm. MonadFail stm => DBLayer
{ putPoolProduction
:: BlockHeader
-> PoolId
-> ExceptT ErrPointAlreadyExists m ()
-> ExceptT ErrPointAlreadyExists stm ()
-- ^ Write for a given slot id the id of stake pool that produced a
-- a corresponding block

, readPoolProduction
:: EpochNo
-> m (Map PoolId [BlockHeader])
-> stm (Map PoolId [BlockHeader])
-- ^ Read the all stake pools together with corresponding slot ids
-- for a given epoch.

, putStakeDistribution
:: EpochNo
-> [(PoolId, Quantity "lovelace" Word64)]
-> m ()
-> stm ()
-- ^ Replace an existing distribution for the given epoch by the one
-- given as argument.
--
-- If there's no existing distribution, simply inserts it.

, readStakeDistribution
:: EpochNo
-> m [(PoolId, Quantity "lovelace" Word64)]
-> stm [(PoolId, Quantity "lovelace" Word64)]

, readPoolProductionCursor
:: Int -> m [BlockHeader]
:: Int -> stm [BlockHeader]
-- ^ Read the latest @k@ blockheaders in ascending order. The tip will
-- be the last element in the list.
--
-- This is useful for the @NetworkLayer@ to know how far we have synced.

, rollbackTo
:: SlotId -> m ()
:: SlotId -> stm ()
-- ^ Remove all entries of slot ids newer than the argument

, cleanDB
:: m ()
:: stm ()
-- ^ Clean a database

, atomically
:: forall a. stm a -> m a
-- ^ Run an operation.
--
-- For a Sqlite DB, this would be "run a query inside a transaction".
}

-- | Forbidden operation was executed on an already existing slot
Expand Down
1 change: 1 addition & 0 deletions lib/core/src/Cardano/Pool/DB/MVar.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ newDBLayer = do

, cleanDB =
void $ alterPoolDB (const Nothing) db mCleanPoolProduction
, atomically = id
}

alterPoolDB
Expand Down
17 changes: 9 additions & 8 deletions lib/core/src/Cardano/Pool/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ newDBLayer logConfig trace fp = do
ctx@SqliteContext{runQuery} <-
startSqliteBackend logConfig migrateAll trace' fp
return (ctx, DBLayer
{ putPoolProduction = \point pool -> ExceptT $ runQuery $
{ putPoolProduction = \point pool -> ExceptT $
handleConstraint (ErrPointAlreadyExists point) $
insert_ (mkPoolProduction pool point)

, readPoolProduction = \epoch -> runQuery $ do
, readPoolProduction = \epoch -> do
production <- fmap fromPoolProduction <$> selectPoolProduction epoch

let toMap :: Ord a => Map a [b] -> (a,b) -> Map a [b]
Expand All @@ -148,28 +148,29 @@ newDBLayer logConfig trace fp = do

pure (foldl' toMap Map.empty production)

, putStakeDistribution = \epoch@(EpochNo ep) distribution -> runQuery $ do
, putStakeDistribution = \epoch@(EpochNo ep) distribution -> do
deleteWhere [StakeDistributionEpoch ==. ep]
insertMany_ (mkStakeDistribution epoch distribution)

, readStakeDistribution = \(EpochNo epoch) -> runQuery $ do
, readStakeDistribution = \(EpochNo epoch) -> do
fmap (fromStakeDistribution . entityVal) <$> selectList
[ StakeDistributionEpoch ==. epoch ]
[]

, rollbackTo = \point -> runQuery $ do
, rollbackTo = \point -> do
let (EpochNo epoch) = epochNumber point
deleteWhere [ PoolProductionSlot >. point ]
deleteWhere [ StakeDistributionEpoch >. epoch ]


, readPoolProductionCursor = \k -> runQuery $ do
, readPoolProductionCursor = \k -> do
reverse . map (snd . fromPoolProduction . entityVal) <$> selectList
[]
[Desc PoolProductionSlot, LimitTo k]

, cleanDB = runQuery $
, cleanDB =
deleteWhere ([] :: [Filter PoolProduction])

, atomically = runQuery
})

{-------------------------------------------------------------------------------
Expand Down
90 changes: 42 additions & 48 deletions lib/core/src/Cardano/Pool/Metrics.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | This module can fold over a blockchain to collect metrics about
Expand Down Expand Up @@ -57,7 +59,7 @@ import Control.Monad.IO.Class
import Control.Monad.Trans.Class
( lift )
import Control.Monad.Trans.Except
( ExceptT, runExceptT, throwE, withExceptT )
( ExceptT, mapExceptT, runExceptT, throwE, withExceptT )
import Data.Generics.Internal.VL.Lens
( view, (^.) )
import Data.List
Expand Down Expand Up @@ -110,17 +112,12 @@ data StakePool = StakePool
--
-- The pool productions and stake distrubtions in the db can /never/ be from
-- different forks such that it's safe for readers to access it.
--
-- FIXME: This last statement is only true if 'putStakeProduction' and
-- 'putPoolProduction' are running in the same db transaction, which would be
-- the case if we make 'SqlPersistT' the top-level monad for the 'DBLayer'
-- instead of 'IO'.
monitorStakePools
:: Trace IO Text
-> NetworkLayer IO t Block
-> DBLayer IO
-> IO ()
monitorStakePools tr nl db = do
monitorStakePools tr nl DBLayer{..} = do
cursor <- initCursor
logInfo tr $ mconcat
[ "Start monitoring stake pools. Currently at "
Expand All @@ -134,13 +131,13 @@ monitorStakePools tr nl db = do
initCursor = do
let (_, bp) = staticBlockchainParameters nl
let k = fromIntegral . getQuantity . view #getEpochStability $ bp
readPoolProductionCursor db k
atomically $ readPoolProductionCursor k

backward
:: SlotId
-> IO (FollowAction ErrMonitorStakePools)
backward point = do
liftIO $ rollbackTo db point
liftIO . atomically $ rollbackTo point
return Continue

forward
Expand All @@ -153,11 +150,11 @@ monitorStakePools tr nl db = do
currentTip <- withExceptT ErrMonitorStakePoolsNetworkTip $
networkTip nl
when (nodeTip /= currentTip) $ throwE ErrMonitorStakePoolsWrongTip
-- FIXME Do the next operation in a database transaction
liftIO $ logInfo tr $ "Writing stake-distribution for epoch " <> pretty ep
lift $ putStakeDistribution db ep (Map.toList dist)
forM_ blocks $ \b -> withExceptT ErrMonitorStakePoolsPoolAlreadyExists $
putPoolProduction db (header b) (producer b)
mapExceptT atomically $ do
lift $ putStakeDistribution ep (Map.toList dist)
forM_ blocks $ \b -> withExceptT ErrMonitorStakePoolsPoolAlreadyExists $
putPoolProduction (header b) (producer b)
where
handler action = runExceptT action >>= \case
Left ErrMonitorStakePoolsNetworkUnavailable{} -> do
Expand Down Expand Up @@ -202,36 +199,33 @@ newStakePoolLayer
:: DBLayer IO
-> NetworkLayer IO t block
-> Trace IO Text
-> IO (StakePoolLayer IO)
newStakePoolLayer db nl tr = do
return $ StakePoolLayer
{ listStakePools = do
nodeTip <- withExceptT ErrListStakePoolsErrNetworkTip
$ networkTip nl
let nodeEpoch = nodeTip ^. #slotId . #epochNumber

distr <- liftIO $ Map.fromList <$>
readStakeDistribution db nodeEpoch

prod <- liftIO $ count <$>
readPoolProduction db nodeEpoch

when (Map.null distr || Map.null prod) $ do
computeProgress nodeTip >>= throwE . ErrMetricsIsUnsynced

perfs <- liftIO $
readPoolsPerformances db nodeEpoch

case combineMetrics distr prod perfs of
Right x -> return
$ sortOn (Down . apparentPerformance)
$ map (uncurry mkStakePool)
$ Map.toList x
Left e ->
throwE $ ErrListStakePoolsMetricsInconsistency e
}
-> StakePoolLayer IO
newStakePoolLayer db@DBLayer{..} nl tr = StakePoolLayer
{ listStakePools = do
nodeTip <- withExceptT ErrListStakePoolsErrNetworkTip
$ networkTip nl
let nodeEpoch = nodeTip ^. #slotId . #epochNumber

(distr, prod) <- liftIO . atomically $ (,)
<$> (Map.fromList <$> readStakeDistribution nodeEpoch)
<*> (count <$> readPoolProduction nodeEpoch)

when (Map.null distr || Map.null prod) $ do
computeProgress nodeTip >>= throwE . ErrMetricsIsUnsynced

perfs <- liftIO $
readPoolsPerformances db nodeEpoch

case combineMetrics distr prod perfs of
Right x -> return
$ sortOn (Down . apparentPerformance)
$ map (uncurry mkStakePool)
$ Map.toList x
Left e ->
throwE $ ErrListStakePoolsMetricsInconsistency e
}
where
poolProductionTip = readPoolProductionCursor db 1 >>= \case
poolProductionTip = atomically $ readPoolProductionCursor 1 >>= \case
[x] -> return $ Just x
_ -> return Nothing

Expand Down Expand Up @@ -273,14 +267,14 @@ newStakePoolLayer db nl tr = do
toD = fromIntegral

readPoolsPerformances
:: DBLayer IO
:: DBLayer m
-> EpochNo
-> IO (Map PoolId Double)
readPoolsPerformances db (EpochNo epochNo) = do
-> m (Map PoolId Double)
readPoolsPerformances DBLayer{..} (EpochNo epochNo) = do
let range = [max 0 (fromIntegral epochNo - 14) .. fromIntegral epochNo]
fmap avg $ forM range $ \ep -> calculatePerformance
<$> liftIO (Map.fromList <$> readStakeDistribution db ep)
<*> liftIO (count <$> readPoolProduction db ep)
atomically $ fmap avg $ forM range $ \ep -> calculatePerformance
<$> (Map.fromList <$> readStakeDistribution ep)
<*> (count <$> readPoolProduction ep)
where
-- | Performances are computed over many epochs to cope with the fact that
-- our data is sparse (regarding stake distribution at least).
Expand Down
Loading

0 comments on commit 1e903c8

Please sign in to comment.