Skip to content

Commit

Permalink
rework pool metadata fetching to work using a bounded queue
Browse files Browse the repository at this point in the history
  This works by using a bounded queue as a 'seat allocator'. Requests
  are only emitted in an asynchronous thread when there's a seat at
  the table.

  On my machine, the total time is cut from ~320s to ~50s. It also
  guarantees that there are never more than N requests in flight
  and does a best effort at keeping the number of requests in
  flight at any time close to N (this works because the time needed
  to fetch requests to send from the database is much smaller than
  the time needed to actually send and process requests.

  There's one seemingly important yet subtle change in this PR with
  regards to the fetching strategy.

  From:
  ```hs
  when (null refs || null successes) $ do
      traceWith tr $ MsgFetchTakeBreak blockFrequency
      threadDelay blockFrequency
  ```

  To:
  ```hs
  when (null refs) $ do
      traceWith tr $ MsgFetchTakeBreak blockFrequency
      threadDelay blockFrequency
  ```

  So, as of now, the wallet pauses for a whole 20s when all requests to
  fetching metadata failed. I think the rationale was originally to
  prevent the wallet from just eating up CPU by trying to fetch only
  invalid pools... What this create in practice is simply more
  unnecessary delays. There's about 8.5K pool registrations currently on
  mainnet, and after a full sync, there are still ~2254 fetch attempts
  still-to-proceed... so that's a lot of occasions to get a `null
  successes` on a particular loop.

  There's already a backoff which comes with the fetch attempts which
  prevent the wallet from doing needless work anyway. With this little
  change, we should massively improve the time needed for updating
  metadata upon new registrations, such as the problem described in:
  input-output-hk/daedalus#2359
  • Loading branch information
KtorZ committed Feb 15, 2021
1 parent 9efc846 commit a6c05d1
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 37 deletions.
23 changes: 11 additions & 12 deletions lib/core/src/Cardano/Pool/Metadata.hs
Expand Up @@ -26,10 +26,12 @@ module Cardano.Pool.Metadata
, HealthStatusSMASH (..)

-- * Construct URLs
, UrlBuilder
, identityUrlBuilder
, registryUrlBuilder

-- * re-exports
, Manager
, newManager
, defaultManagerSettings

Expand Down Expand Up @@ -164,12 +166,16 @@ defaultManagerSettings =
newManager :: MonadIO m => ManagerSettings -> m Manager
newManager = HTTPS.newTlsManagerWith

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: PoolId
-- | A type-alias to ease signatures
type UrlBuilder
= PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: UrlBuilder
identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
maybe (Left e) Right $ parseURI (T.unpack url)
where
Expand All @@ -178,10 +184,7 @@ identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
-- | Build a URL from a metadata hash compatible with an aggregation registry
registryUrlBuilder
:: URI
-> PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
-> UrlBuilder
registryUrlBuilder baseUrl pid _ hash =
Right $ baseUrl
{ uriPath = "/" <> metadaFetchEp pid hash
Expand Down Expand Up @@ -278,11 +281,7 @@ fetchDelistedPools tr uri manager = runExceptTLog $ do
-- TODO: refactor/simplify this
fetchFromRemote
:: Tracer IO StakePoolMetadataFetchLog
-> [ PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
]
-> [UrlBuilder]
-> Manager
-> PoolId
-> StakePoolMetadataUrl
Expand Down
76 changes: 51 additions & 25 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Expand Up @@ -42,7 +42,9 @@ import Cardano.BM.Data.Tracer
import Cardano.Pool.DB
( DBLayer (..), ErrPointAlreadyExists (..), readPoolLifeCycleStatus )
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog
( Manager
, StakePoolMetadataFetchLog
, UrlBuilder
, defaultManagerSettings
, fetchDelistedPools
, fetchFromRemote
Expand Down Expand Up @@ -112,7 +114,7 @@ import Cardano.Wallet.Unsafe
import Control.Exception.Base
( AsyncException (..), asyncExceptionFromException )
import Control.Monad
( forM, forM_, forever, void, when )
( forM_, forever, void, when )
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Trans.Except
Expand Down Expand Up @@ -157,6 +159,8 @@ import Fmt
( fixedF, pretty )
import GHC.Generics
( Generic )
import Numeric.Natural
( Natural )
import Ouroboros.Consensus.Cardano.Block
( CardanoBlock, HardForkBlock (..) )
import System.Random
Expand All @@ -168,7 +172,14 @@ import UnliftIO.Exception
import UnliftIO.IORef
( IORef, newIORef, readIORef, writeIORef )
import UnliftIO.STM
( TVar, readTVarIO, writeTVar )
( TBQueue
, TVar
, newTBQueue
, readTBQueue
, readTVarIO
, writeTBQueue
, writeTVar
)

import qualified Cardano.Wallet.Api.Types as Api
import qualified Data.List as L
Expand Down Expand Up @@ -723,43 +734,58 @@ monitorMetadata gcStatus tr sp db@(DBLayer{..}) = do
_ -> pure NoSmashConfigured

if | health == Available || health == NoSmashConfigured -> do
let fetcher fetchStrategies = fetchFromRemote trFetch fetchStrategies manager
loop getPoolMetadata = forever $ do
(refs, successes) <- getPoolMetadata
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

case poolMetadataSource settings of
FetchNone -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (pure ([], [])) -- TODO: exit loop?

FetchDirect -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (fetchThem $ fetcher [identityUrlBuilder])
fetchMetadata manager [identityUrlBuilder]

FetchSMASH (unSmashServer -> uri) -> do
STM.atomically $ writeTVar gcStatus NotStarted
let getDelistedPools =
fetchDelistedPools trFetch uri manager
tid <- forkFinally
(gcDelistedPools gcStatus tr db getDelistedPools)
onExit
flip finally (killThread tid) $
loop (fetchThem $ fetcher [registryUrlBuilder uri])
| otherwise -> traceWith tr MsgSMASHUnreachable
fetchMetadata manager [registryUrlBuilder uri]
`finally` killThread tid

| otherwise ->
traceWith tr MsgSMASHUnreachable
where
trFetch = contramap MsgFetchPoolMetadata tr
fetchThem fetchMetadata = do
refs <- atomically (unfetchedPoolMetadataRefs 100)
successes <- fmap catMaybes $ forM refs $ \(pid, url, hash) -> do
fetchMetadata pid url hash >>= \case
Nothing -> Nothing <$ do
atomically $ putFetchAttempt (url, hash)

Just meta -> Just hash <$ do
atomically $ putPoolMetadata hash meta
pure (refs, successes)

fetchMetadata
:: Manager
-> [UrlBuilder]
-> IO ()
fetchMetadata manager strategies = do
inFlights <- STM.atomically $ newTBQueue maxInFlight
forever $ do
refs <- atomically (unfetchedPoolMetadataRefs $ fromIntegral maxInFlight)
forM_ refs $ \(pid, url, hash) -> withAvailableSeat inFlights $ do
fetchFromRemote trFetch strategies manager pid url hash >>= \case
Nothing ->
atomically $ putFetchAttempt (url, hash)
Just meta -> do
atomically $ putPoolMetadata hash meta
when (null refs) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency
where
maxInFlight :: Natural
maxInFlight = 20

-- | Run an action asyncronously only when there's an available seat.
-- Seats are materialized by a bounded queue. If the queue is full,
-- then there's no seat.
withAvailableSeat :: TBQueue () -> IO a -> IO ()
withAvailableSeat q action = do
STM.atomically $ writeTBQueue q ()
void $ action `forkFinally` const (STM.atomically $ readTBQueue q)

-- NOTE
-- If there's no metadata, we typically need not to retry sooner than the
-- next block. So waiting for a delay that is roughly the same order of
Expand Down

0 comments on commit a6c05d1

Please sign in to comment.