Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADP-634] Run SMASH metadata fetching in batches of 15 concurrently #2432

Merged
merged 3 commits into from Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 11 additions & 12 deletions lib/core/src/Cardano/Pool/Metadata.hs
Expand Up @@ -26,10 +26,12 @@ module Cardano.Pool.Metadata
, HealthStatusSMASH (..)

-- * Construct URLs
, UrlBuilder
, identityUrlBuilder
, registryUrlBuilder

-- * re-exports
, Manager
, newManager
, defaultManagerSettings

Expand Down Expand Up @@ -164,12 +166,16 @@ defaultManagerSettings =
newManager :: MonadIO m => ManagerSettings -> m Manager
newManager = HTTPS.newTlsManagerWith

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: PoolId
-- | A type-alias to ease signatures
type UrlBuilder
= PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: UrlBuilder
identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
maybe (Left e) Right $ parseURI (T.unpack url)
where
Expand All @@ -178,10 +184,7 @@ identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
-- | Build a URL from a metadata hash compatible with an aggregation registry
registryUrlBuilder
:: URI
-> PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
-> UrlBuilder
registryUrlBuilder baseUrl pid _ hash =
Right $ baseUrl
{ uriPath = "/" <> metadaFetchEp pid hash
Expand Down Expand Up @@ -278,11 +281,7 @@ fetchDelistedPools tr uri manager = runExceptTLog $ do
-- TODO: refactor/simplify this
fetchFromRemote
:: Tracer IO StakePoolMetadataFetchLog
-> [ PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
]
-> [UrlBuilder]
-> Manager
-> PoolId
-> StakePoolMetadataUrl
Expand Down
1 change: 1 addition & 0 deletions lib/shelley/src/Cardano/Wallet/Shelley/Network.hs
Expand Up @@ -805,6 +805,7 @@ newRewardBalanceFetcher tr readNodeTip queryRewardQ = do
:: Tip (CardanoBlock StandardCrypto)
-> Set W.RewardAccount
-> IO (Maybe (Map W.RewardAccount W.Coin))
fetch _tip accounts | Set.null accounts = pure (Just mempty)
fetch _tip accounts = do
-- NOTE: We no longer need the tip to run LSQ queries. The local state
-- query client will automatically acquire the latest tip.
Expand Down
82 changes: 58 additions & 24 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Expand Up @@ -42,7 +42,9 @@ import Cardano.BM.Data.Tracer
import Cardano.Pool.DB
( DBLayer (..), ErrPointAlreadyExists (..), readPoolLifeCycleStatus )
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog
( Manager
, StakePoolMetadataFetchLog
, UrlBuilder
, defaultManagerSettings
, fetchDelistedPools
, fetchFromRemote
Expand Down Expand Up @@ -129,6 +131,8 @@ import Data.Function
( (&) )
import Data.Generics.Internal.VL.Lens
( view )
import Data.List
( nub, (\\) )
import Data.List.NonEmpty
( NonEmpty (..) )
import Data.Map
Expand All @@ -151,6 +155,8 @@ import Data.Time.Clock.POSIX
( getPOSIXTime, posixDayLength )
import Data.Tuple.Extra
( dupe )
import Data.Void
( Void )
import Data.Word
( Word64 )
import Fmt
Expand All @@ -168,7 +174,14 @@ import UnliftIO.Exception
import UnliftIO.IORef
( IORef, newIORef, readIORef, writeIORef )
import UnliftIO.STM
( TVar, readTVarIO, writeTVar )
( TBQueue
, TVar
, newTBQueue
, readTBQueue
, readTVarIO
, writeTBQueue
, writeTVar
)

import qualified Cardano.Wallet.Api.Types as Api
import qualified Data.List as L
Expand Down Expand Up @@ -723,43 +736,64 @@ monitorMetadata gcStatus tr sp db@(DBLayer{..}) = do
_ -> pure NoSmashConfigured

if | health == Available || health == NoSmashConfigured -> do
let fetcher fetchStrategies = fetchFromRemote trFetch fetchStrategies manager
loop getPoolMetadata = forever $ do
(refs, successes) <- getPoolMetadata
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

case poolMetadataSource settings of
FetchNone -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (pure ([], [])) -- TODO: exit loop?

FetchDirect -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (fetchThem $ fetcher [identityUrlBuilder])
void $ fetchMetadata manager [identityUrlBuilder]

FetchSMASH (unSmashServer -> uri) -> do
STM.atomically $ writeTVar gcStatus NotStarted
let getDelistedPools =
fetchDelistedPools trFetch uri manager
tid <- forkFinally
(gcDelistedPools gcStatus tr db getDelistedPools)
onExit
flip finally (killThread tid) $
loop (fetchThem $ fetcher [registryUrlBuilder uri])
| otherwise -> traceWith tr MsgSMASHUnreachable
void $ fetchMetadata manager [registryUrlBuilder uri]
`finally` killThread tid

| otherwise ->
traceWith tr MsgSMASHUnreachable
where
trFetch = contramap MsgFetchPoolMetadata tr
fetchThem fetchMetadata = do
refs <- atomically (unfetchedPoolMetadataRefs 100)
successes <- fmap catMaybes $ forM refs $ \(pid, url, hash) -> do
fetchMetadata pid url hash >>= \case
Nothing -> Nothing <$ do
atomically $ putFetchAttempt (url, hash)

Just meta -> Just hash <$ do
atomically $ putPoolMetadata hash meta
pure (refs, successes)

fetchMetadata
:: Manager
-> [UrlBuilder]
-> IO Void
fetchMetadata manager strategies = do
inFlights <- STM.atomically $ newTBQueue maxInFlight
endlessly [] $ \keys -> do
refs <- nub . (\\ keys) <$> atomically (unfetchedPoolMetadataRefs limit)
when (null refs) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency
forM refs $ \k@(pid, url, hash) -> k <$ withAvailableSeat inFlights (do
fetchFromRemote trFetch strategies manager pid url hash >>= \case
Nothing ->
atomically $ putFetchAttempt (url, hash)
Just meta -> do
KtorZ marked this conversation as resolved.
Show resolved Hide resolved
atomically $ putPoolMetadata hash meta
)
where
-- Twice 'maxInFlight' so that, when removing keys currently in flight,
-- we are left with at least 'maxInFlight' keys.
limit = fromIntegral (2 * maxInFlight)
maxInFlight = 10

endlessly :: Monad m => a -> (a -> m a) -> m Void
endlessly zero action = action zero >>= (`endlessly` action)

-- | Run an action asyncronously only when there's an available seat.
-- Seats are materialized by a bounded queue. If the queue is full,
-- then there's no seat.
withAvailableSeat :: TBQueue () -> IO a -> IO ()
withAvailableSeat q action = do
STM.atomically $ writeTBQueue q ()
void $ action `forkFinally` const (STM.atomically $ readTBQueue q)

-- NOTE
-- If there's no metadata, we typically need not to retry sooner than the
-- next block. So waiting for a delay that is roughly the same order of
Expand Down