Skip to content

Commit

Permalink
rework pool metadata fetching to work using a bounded queue and 2 thr…
Browse files Browse the repository at this point in the history
…eads (producer/consumer)
  • Loading branch information
KtorZ committed Feb 15, 2021
1 parent 9efc846 commit 6fc3208
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 33 deletions.
23 changes: 11 additions & 12 deletions lib/core/src/Cardano/Pool/Metadata.hs
Expand Up @@ -26,10 +26,12 @@ module Cardano.Pool.Metadata
, HealthStatusSMASH (..)

-- * Construct URLs
, UrlBuilder
, identityUrlBuilder
, registryUrlBuilder

-- * re-exports
, Manager
, newManager
, defaultManagerSettings

Expand Down Expand Up @@ -164,12 +166,16 @@ defaultManagerSettings =
newManager :: MonadIO m => ManagerSettings -> m Manager
newManager = HTTPS.newTlsManagerWith

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: PoolId
-- | A type-alias to ease signatures
type UrlBuilder
= PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: UrlBuilder
identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
maybe (Left e) Right $ parseURI (T.unpack url)
where
Expand All @@ -178,10 +184,7 @@ identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
-- | Build a URL from a metadata hash compatible with an aggregation registry
registryUrlBuilder
:: URI
-> PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
-> UrlBuilder
registryUrlBuilder baseUrl pid _ hash =
Right $ baseUrl
{ uriPath = "/" <> metadaFetchEp pid hash
Expand Down Expand Up @@ -278,11 +281,7 @@ fetchDelistedPools tr uri manager = runExceptTLog $ do
-- TODO: refactor/simplify this
fetchFromRemote
:: Tracer IO StakePoolMetadataFetchLog
-> [ PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
]
-> [UrlBuilder]
-> Manager
-> PoolId
-> StakePoolMetadataUrl
Expand Down
93 changes: 72 additions & 21 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Expand Up @@ -42,7 +42,9 @@ import Cardano.BM.Data.Tracer
import Cardano.Pool.DB
( DBLayer (..), ErrPointAlreadyExists (..), readPoolLifeCycleStatus )
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog
( Manager
, StakePoolMetadataFetchLog
, UrlBuilder
, defaultManagerSettings
, fetchDelistedPools
, fetchFromRemote
Expand Down Expand Up @@ -91,6 +93,7 @@ import Cardano.Wallet.Primitive.Types
, SlottingParameters (..)
, StakePoolMetadata
, StakePoolMetadataHash
, StakePoolMetadataUrl
, StakePoolsSummary (..)
, getPoolRegistrationCertificate
, getPoolRetirementCertificate
Expand All @@ -112,7 +115,7 @@ import Cardano.Wallet.Unsafe
import Control.Exception.Base
( AsyncException (..), asyncExceptionFromException )
import Control.Monad
( forM, forM_, forever, void, when )
( forM_, forever, replicateM, void, when )
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Trans.Except
Expand Down Expand Up @@ -157,18 +160,30 @@ import Fmt
( fixedF, pretty )
import GHC.Generics
( Generic )
import Numeric.Natural
( Natural )
import Ouroboros.Consensus.Cardano.Block
( CardanoBlock, HardForkBlock (..) )
import System.Random
( RandomGen, random )
import UnliftIO.Async
( concurrently_, forConcurrently_ )
import UnliftIO.Concurrent
( forkFinally, killThread, threadDelay )
import UnliftIO.Exception
( SomeException (..), finally )
import UnliftIO.IORef
( IORef, newIORef, readIORef, writeIORef )
import UnliftIO.STM
( TVar, readTVarIO, writeTVar )
( TBQueue
, TVar
, newTBQueue
, readTBQueue
, readTVarIO
, tryReadTBQueue
, writeTBQueue
, writeTVar
)

import qualified Cardano.Wallet.Api.Types as Api
import qualified Data.List as L
Expand Down Expand Up @@ -723,43 +738,79 @@ monitorMetadata gcStatus tr sp db@(DBLayer{..}) = do
_ -> pure NoSmashConfigured

if | health == Available || health == NoSmashConfigured -> do
let fetcher fetchStrategies = fetchFromRemote trFetch fetchStrategies manager
loop getPoolMetadata = forever $ do
(refs, successes) <- getPoolMetadata
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

case poolMetadataSource settings of
FetchNone -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (pure ([], [])) -- TODO: exit loop?

FetchDirect -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (fetchThem $ fetcher [identityUrlBuilder])
queue <- STM.atomically $ newTBQueue queueSize
concurrently_
(enqueueMetadata queue)
(dequeueMetadata manager [identityUrlBuilder] queue)

FetchSMASH (unSmashServer -> uri) -> do
STM.atomically $ writeTVar gcStatus NotStarted
let getDelistedPools =
fetchDelistedPools trFetch uri manager
tid <- forkFinally
(gcDelistedPools gcStatus tr db getDelistedPools)
onExit
queue <- STM.atomically $ newTBQueue queueSize
flip finally (killThread tid) $
loop (fetchThem $ fetcher [registryUrlBuilder uri])
| otherwise -> traceWith tr MsgSMASHUnreachable
concurrently_
(enqueueMetadata queue)
(dequeueMetadata manager [registryUrlBuilder uri] queue)

| otherwise ->
traceWith tr MsgSMASHUnreachable
where
trFetch = contramap MsgFetchPoolMetadata tr
fetchThem fetchMetadata = do

queueSize :: Natural
queueSize = 100

-- Must be:
-- - > 1
-- - <= queueSize
-- Should be:
-- - <= queueSize / 2
numInFlight :: Int
numInFlight = 20

enqueueMetadata
:: TBQueue (PoolId, StakePoolMetadataUrl, StakePoolMetadataHash)
-> IO ()
enqueueMetadata queue = forever $ do
refs <- atomically (unfetchedPoolMetadataRefs 100)
successes <- fmap catMaybes $ forM refs $ \(pid, url, hash) -> do
fetchMetadata pid url hash >>= \case
Nothing -> Nothing <$ do
STM.atomically $ mapM_ (writeTBQueue queue) refs
when (null refs) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

dequeueMetadata
:: Manager
-> [UrlBuilder]
-> TBQueue (PoolId, StakePoolMetadataUrl, StakePoolMetadataHash)
-> IO ()
dequeueMetadata manager strategies queue = forever $ do
refs <- STM.atomically $ do
-- First read is blocking, and others aren't. This ensures a cheap
-- pause of the thread when there's nothing to fetch, but it always
-- try to fetch at most 'numInFlight' at once without blocking if
-- there are less to fetch.
-- This could be the case when the queue has been emptied and
-- registrations are now coming one-by-one.
h <- readTBQueue queue
q <- catMaybes <$> replicateM (numInFlight - 1) (tryReadTBQueue queue)
pure (h:q)
forConcurrently_ refs $ \(pid, url, hash) -> do
fetchFromRemote trFetch strategies manager pid url hash >>= \case
Nothing ->
atomically $ putFetchAttempt (url, hash)

Just meta -> Just hash <$ do
Just meta -> do
atomically $ putPoolMetadata hash meta
pure (refs, successes)

-- NOTE
-- If there's no metadata, we typically need not to retry sooner than the
-- next block. So waiting for a delay that is roughly the same order of
Expand Down

0 comments on commit 6fc3208

Please sign in to comment.