diff --git a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs index f2f91c828b2..d5f7c04c6b4 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs @@ -31,9 +31,11 @@ module Cardano.Benchmarking.GeneratorTx import Cardano.Prelude import Prelude (id, String) +import qualified Control.Concurrent.STM as STM import Control.Monad (fail) import Control.Monad.Trans.Except.Extra (left, newExceptT, right) import Control.Tracer (Tracer, traceWith) +import qualified Data.Time.Clock as Clock import qualified Data.List.NonEmpty as NE import qualified Data.Map.Strict as Map @@ -302,28 +304,27 @@ asyncBenchmark traceDebug $ "******* Tx generator, launching Tx peers: " ++ show (NE.length remoteAddresses) ++ " of them" liftIO $ do - submission :: Submission IO era <- mkSubmission traceSubmit $ - SubmissionParams - { spTps = tpsRate - , spTargets = numTargets - , spQueueLen = 32 - , spErrorPolicy = errorPolicy - } - allAsyncs <- forM (zip [0..] $ NE.toList remoteAddresses) $ - \(i, remoteAddr) -> + startTime <- Clock.getCurrentTime + txSendQueue <- STM.newTBQueueIO 32 + + reportRefs <- STM.atomically $ replicateM (fromIntegral numTargets) STM.newEmptyTMVar + + allAsyncs <- forM (zip reportRefs $ NE.toList remoteAddresses) $ + \(reportRef, remoteAddr) -> launchTxPeer traceSubmit traceN2N connectClient remoteAddr - submission - i - tpsFeeder <- async $ tpsLimitedTxFeeder submission finalTransactions + txSendQueue + reportRef + errorPolicy + tpsFeeder <- async $ tpsLimitedTxFeeder traceSubmit numTargets txSendQueue tpsRate finalTransactions let tpsFeederShutdown = do cancel tpsFeeder - liftIO $ tpsLimitedTxFeederShutdown submission + liftIO $ tpsLimitedTxFeederShutdown numTargets txSendQueue - return (tpsFeeder, allAsyncs, mkSubmissionSummary threadName submission, tpsFeederShutdown) + return (tpsFeeder, allAsyncs, mkSubmissionSummary threadName startTime reportRefs, tpsFeederShutdown) -- | At this moment 'sourceAddress' contains a huge amount of money (lets call it A). -- Now we have to split this amount to N equal parts, as a result we'll have @@ -439,11 +440,6 @@ txGenerator -- Txs for submission. --------------------------------------------------------------------------------------------------- --- | To get higher performance we need to hide latency of getting and --- forwarding (in sufficient numbers) transactions. --- --- TODO: transform comments into haddocks. --- launchTxPeer :: forall era . IsShelleyBasedEra era @@ -451,24 +447,22 @@ launchTxPeer -> Tracer IO NodeToNodeSubmissionTrace -> ConnectClient -> Network.Socket.AddrInfo - -- Remote address - -> Submission IO era - -- Mutable state shared between submission threads - -> Natural - -- Thread index + -> TxSendQueue era + -> ReportRef + -> SubmissionErrorPolicy -> IO (Async ()) -launchTxPeer traceSubmit traceN2N connectClient remoteAddr sub tix = +launchTxPeer traceSubmit traceN2N connectClient remoteAddr txSendQueue reportRef errorPolicy = async $ handle (\(SomeException err) -> do let errDesc = mconcat - [ "Exception while talking to peer #", show tix + [ "Exception while talking to peer " , " (", show (addrAddress remoteAddr), "): " , show err] - submitThreadReport sub tix (Left errDesc) - case spErrorPolicy $ sParams sub of + submitThreadReport reportRef (Left errDesc) + case errorPolicy of FailOnError -> throwIO err LogErrors -> traceWith traceSubmit $ TraceBenchTxSubError (pack errDesc)) $ connectClient remoteAddr - (txSubmissionClient traceN2N traceSubmit sub tix) + (txSubmissionClient traceN2N traceSubmit txSendQueue reportRef) diff --git a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs index 8a766d63bc5..d913ffa04f0 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs @@ -22,9 +22,9 @@ module Cardano.Benchmarking.GeneratorTx.Submission ( SubmissionParams(..) - , Submission(sParams) , SubmissionThreadReport - , mkSubmission + , TxSendQueue + , ReportRef , mkSubmissionSummary , submitThreadReport , txSubmissionClient @@ -88,39 +88,18 @@ data SubmissionParams = SubmissionParams { spTps :: !TPSRate , spTargets :: !Natural - , spQueueLen :: !Natural - , spErrorPolicy :: !SubmissionErrorPolicy } -data Submission (m :: Type -> Type) (era :: Type) - = Submission - { sParams :: !SubmissionParams - , sStartTime :: !UTCTime - , sThreads :: !Natural - , sTxSendQueue :: !(TBQueue (Maybe (Tx era))) - , sReportsRefs :: ![STM.TMVar (Either String SubmissionThreadReport)] - , sTrace :: !(Tracer m (TraceBenchTxSubmit TxId)) - } - -mkSubmission - :: MonadIO m - => Tracer m (TraceBenchTxSubmit TxId) - -> SubmissionParams - -> m (Submission m era) -mkSubmission sTrace sParams@SubmissionParams{spTargets=sThreads, spQueueLen} = liftIO $ do - sStartTime <- Clock.getCurrentTime - sTxSendQueue <- STM.newTBQueueIO spQueueLen - sReportsRefs <- STM.atomically $ replicateM (fromIntegral sThreads) STM.newEmptyTMVar - pure Submission{..} +type ReportRef = STM.TMVar (Either String SubmissionThreadReport) +type TxSendQueue era = TBQueue (Maybe (Tx era)) submitThreadReport :: MonadIO m - => Submission m era - -> Natural + => ReportRef -> Either String SubmissionThreadReport -> m () -submitThreadReport Submission{sReportsRefs} threadIx = - liftIO . STM.atomically . STM.putTMVar (sReportsRefs L.!! fromIntegral threadIx) +submitThreadReport ref report + = liftIO $ STM.atomically $ STM.putTMVar ref report {------------------------------------------------------------------------------- Results @@ -135,20 +114,20 @@ data SubmissionThreadStats data SubmissionThreadReport = SubmissionThreadReport { strStats :: !SubmissionThreadStats --- , strThreadIndex :: !Natural , strEndOfProtocol :: !UTCTime } mkSubmissionSummary :: String - -> Submission IO tx + -> UTCTime + -> [ReportRef] -> IO SubmissionSummary -mkSubmissionSummary ssThreadName Submission{ sStartTime, sReportsRefs} +mkSubmissionSummary ssThreadName startTime reportsRefs = do - results <- sequence (STM.atomically . STM.readTMVar <$> sReportsRefs) + results <- sequence (STM.atomically . STM.readTMVar <$> reportsRefs) let (failures, reports) = partitionEithers results now <- Clock.getCurrentTime - let ssElapsed = Clock.diffUTCTime now sStartTime + let ssElapsed = Clock.diffUTCTime now startTime ssTxSent@(Sent sent) = sum $ stsSent . strStats <$> reports ssTxUnavailable = sum $ stsUnavailable . strStats <$> reports ssEffectiveTps = txDiffTimeTPS sent ssElapsed @@ -164,53 +143,60 @@ mkSubmissionSummary ssThreadName Submission{ sStartTime, sReportsRefs} threadReportTps SubmissionThreadReport { strStats=SubmissionThreadStats{stsAcked=Ack ack}, strEndOfProtocol } = - txDiffTimeTPS ack (Clock.diffUTCTime strEndOfProtocol sStartTime) + txDiffTimeTPS ack (Clock.diffUTCTime strEndOfProtocol startTime) {------------------------------------------------------------------------------- Submission queue: feeding and consumption -------------------------------------------------------------------------------} -simpleTxFeeder - :: forall m era - . (MonadIO m) - => Submission m era -> [Tx era] -> m () -simpleTxFeeder - Submission{sTrace, sThreads, sTxSendQueue} txs = do +simpleTxFeeder :: forall m era . + MonadIO m + => Tracer m (TraceBenchTxSubmit TxId) + -> Natural + -> TxSendQueue era + -> [Tx era] + -> m () +simpleTxFeeder tracer threads txSendQueue txs = do forM_ (zip txs [0..]) feedTx -- Issue the termination notifications. - replicateM_ (fromIntegral sThreads) $ - liftIO $ STM.atomically $ STM.writeTBQueue sTxSendQueue Nothing + replicateM_ (fromIntegral threads) $ + liftIO $ STM.atomically $ STM.writeTBQueue txSendQueue Nothing where feedTx :: (Tx era, Int) -> m () feedTx (tx, ix) = do - liftIO $ STM.atomically $ STM.writeTBQueue sTxSendQueue (Just tx) - traceWith sTrace $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix - -tpsLimitedTxFeederShutdown :: Submission m era -> IO () -tpsLimitedTxFeederShutdown Submission{sThreads, sTxSendQueue } + liftIO $ STM.atomically $ STM.writeTBQueue txSendQueue (Just tx) + traceWith tracer $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix + +tpsLimitedTxFeederShutdown :: + Natural + -> TxSendQueue era + -> IO () +tpsLimitedTxFeederShutdown threads txSendQueue = STM.atomically $ - replicateM_ (fromIntegral sThreads) - $ STM.writeTBQueue sTxSendQueue Nothing + replicateM_ (fromIntegral threads) + $ STM.writeTBQueue txSendQueue Nothing -tpsLimitedTxFeeder - :: forall m era . MonadIO m => Submission m era -> [Tx era] -> m () -tpsLimitedTxFeeder submission txs = do +tpsLimitedTxFeeder :: forall m era . + MonadIO m + => Tracer m (TraceBenchTxSubmit TxId) + -> Natural + -> TxSendQueue era + -> TPSRate + -> [Tx era] -> m () +tpsLimitedTxFeeder tracer threads txSendQueue (TPSRate rate) txs = do -- It would be nice to catch an AsyncException here and do a clean shutdown. -- However this would require extra machineries because we are in MonadIO m not in IO (). -- TODO: Move everything to IO () and avoid problems from over-polymorphism. now <- liftIO Clock.getCurrentTime foldM_ feedTx (now, 0) (zip txs [0..]) - liftIO $ tpsLimitedTxFeederShutdown submission + liftIO $ tpsLimitedTxFeederShutdown threads txSendQueue where - Submission{ sParams=SubmissionParams{spTps=TPSRate rate} - , sTrace - , sTxSendQueue } = submission feedTx :: (UTCTime, NominalDiffTime) -> (Tx era, Int) -> m (UTCTime, NominalDiffTime) feedTx (lastPreDelay, lastDelay) (tx, ix) = do - liftIO . STM.atomically $ STM.writeTBQueue sTxSendQueue (Just tx) - traceWith sTrace $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix + liftIO . STM.atomically $ STM.writeTBQueue txSendQueue (Just tx) + traceWith tracer $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix now <- liftIO Clock.getCurrentTime let targetDelay = realToFrac $ 1.0 / rate loopCost = (now `Clock.diffUTCTime` lastPreDelay) - lastDelay @@ -221,17 +207,17 @@ tpsLimitedTxFeeder submission txs = do consumeTxs :: forall m blk era . (MonadIO m) - => Submission m era -> TokBlockingStyle blk -> Req -> m (Bool, UnReqd (Tx era)) -consumeTxs Submission{sTxSendQueue} blk req + => TxSendQueue era -> TokBlockingStyle blk -> Req -> m (Bool, UnReqd (Tx era)) +consumeTxs txSendQueue blk req = liftIO . STM.atomically $ go blk req [] where go :: TokBlockingStyle a -> Req -> [Tx era] -> STM (Bool, UnReqd (Tx era)) go _ 0 acc = pure (False, UnReqd acc) - go TokBlocking n acc = STM.readTBQueue sTxSendQueue >>= + go TokBlocking n acc = STM.readTBQueue txSendQueue >>= \case Nothing -> pure (True, UnReqd acc) Just tx -> go TokBlocking (n - 1) (tx:acc) - go TokNonBlocking _ _ = STM.tryReadTBQueue sTxSendQueue >>= + go TokNonBlocking _ _ = STM.tryReadTBQueue txSendQueue >>= \case Nothing -> pure (False, UnReqd []) Just Nothing -> pure (True, UnReqd []) @@ -248,11 +234,11 @@ txSubmissionClient ) => Tracer m NodeToNodeSubmissionTrace -> Tracer m (TraceBenchTxSubmit txid) - -> Submission m era - -> Natural + -> TxSendQueue era + -> ReportRef -- This return type is forced by Ouroboros.Network.NodeToNode.connectTo -> TxSubmissionClient gentxid gentx m () -txSubmissionClient tr bmtr sub threadIx = +txSubmissionClient tr bmtr txSendQueue reportRef = TxSubmissionClient $ pure $ client False (UnAcked []) (SubmissionThreadStats 0 0 0) where @@ -283,7 +269,7 @@ txSubmissionClient tr bmtr sub threadIx = (exhausted, unReqd) <- if done then pure (True, UnReqd []) - else consumeTxs sub blocking req + else consumeTxs txSendQueue blocking req r' <- decideAnnouncement blocking ack unReqd unAcked (ann@(ToAnnce annNow), newUnacked@(UnAcked outs), Acked acked) @@ -345,7 +331,7 @@ txSubmissionClient tr bmtr sub threadIx = submitReport strStats = do strEndOfProtocol <- liftIO Clock.getCurrentTime let report = SubmissionThreadReport{..} - submitThreadReport (sReportsRefs sub) threadIx (Right report) + submitThreadReport reportRef (Right report) pure report txToIdSize :: tx -> (gentxid, TxSizeInBytes)