Skip to content

Commit

Permalink
Make max in flight (a.k.a. pipelined) chain-sync requests configurable
Browse files Browse the repository at this point in the history
  This is mainly for enabling unit test scenarios which saturate the queue without having to generate thousands of events.
  • Loading branch information
KtorZ committed Jun 14, 2021
1 parent 4b2d409 commit c7f201a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
13 changes: 12 additions & 1 deletion server/src/Ogmios/App/Options.hs
Expand Up @@ -126,6 +126,7 @@ parserInfo = info (helper <*> parser) $ mempty
<*> serverHostOption
<*> serverPortOption
<*> connectionTimeoutOption
<*> maxInFlightOption
<*> logLevelOption
)
)
Expand All @@ -139,6 +140,7 @@ data Options = Options
, serverHost :: !String
, serverPort :: !Int
, connectionTimeout :: !Int
, maxInFlight :: !Int
, logLevel :: !Severity
} deriving (Generic, Eq, Show)

Expand Down Expand Up @@ -169,7 +171,7 @@ serverPortOption = option auto $ mempty
<> value 1337
<> showDefault

-- | [--websocket-timeout], default: 90s
-- | [--timeout=SECONDS], default: 90s
connectionTimeoutOption :: Parser Int
connectionTimeoutOption = option auto $ mempty
<> long "timeout"
Expand All @@ -178,6 +180,15 @@ connectionTimeoutOption = option auto $ mempty
<> value 90
<> showDefault

-- | [--max-in-flight=INT], default: 1000
maxInFlightOption :: Parser Int
maxInFlightOption = option auto $ mempty
<> long "max-in-flight"
<> metavar "INT"
<> help "Max number of ChainSync requests which can be pipelined at once. Only apply to the chain-sync protocol."
<> value 1000
<> showDefault

-- | [--log-level=SEVERITY], default: Info
logLevelOption :: Parser Severity
logLevelOption = option auto $ mempty
Expand Down
11 changes: 8 additions & 3 deletions server/src/Ogmios/App/Protocol/ChainSync.hs
Expand Up @@ -36,6 +36,7 @@
-- @
module Ogmios.App.Protocol.ChainSync
( mkChainSyncClient
, MaxInFlight
) where

import Ogmios.Prelude
Expand Down Expand Up @@ -70,18 +71,22 @@ import qualified Codec.Json.Wsp as Wsp
import qualified Codec.Json.Wsp.Handler as Wsp
import qualified Data.Sequence as Seq

type MaxInFlight = Int

mkChainSyncClient
:: forall m block.
( MonadSTM m
)
=> ChainSyncCodecs block
=> MaxInFlight
-- ^ Max number of requests allowed to be in-flight / pipelined
-> ChainSyncCodecs block
-- ^ For encoding Haskell types to JSON
-> TQueue m (ChainSyncMessage block)
-- ^ Incoming request queue
-> (Json -> m ())
-- ^ An emitter for yielding JSON objects
-> ChainSyncClientPipelined block (Point block) (Tip block) m ()
mkChainSyncClient ChainSyncCodecs{..} queue yield =
mkChainSyncClient maxInFlight ChainSyncCodecs{..} queue yield =
ChainSyncClientPipelined $ clientStIdle Zero Seq.Empty
where
await :: m (ChainSyncMessage block)
Expand Down Expand Up @@ -121,7 +126,7 @@ mkChainSyncClient ChainSyncCodecs{..} queue yield =
Just (MsgRequestNext RequestNext toResponse) -> do
let buffer' = buffer |> toResponse
let collect = CollectResponse
(guard (natToInt n < 1000) $> clientStIdle (Succ n) buffer')
(guard (natToInt n < maxInFlight) $> clientStIdle (Succ n) buffer')
(clientStNext n buffer')
pure $ SendMsgRequestNextPipelined collect

Expand Down
11 changes: 6 additions & 5 deletions server/src/Ogmios/App/Server/WebSocket.hs
Expand Up @@ -26,7 +26,7 @@ import Ogmios.App.Options
import Ogmios.App.Protocol
( onUnmatchedMessage )
import Ogmios.App.Protocol.ChainSync
( mkChainSyncClient )
( MaxInFlight, mkChainSyncClient )
import Ogmios.App.Protocol.StateQuery
( mkStateQueryClient )
import Ogmios.App.Protocol.TxSubmission
Expand Down Expand Up @@ -121,14 +121,14 @@ newWebSocketApp
-> m WebSocketApp
newWebSocketApp tr unliftIO = do
NetworkParameters{slotsPerEpoch,networkMagic} <- asks (view typed)
Options{nodeSocket} <- asks (view typed)
Options{nodeSocket,maxInFlight} <- asks (view typed)
sensors <- asks (view typed)
return $ \pending -> unliftIO $ do
let (mode, sub) = choseSerializationMode pending
logWith tr $ WebSocketConnectionAccepted (userAgent pending) mode
recordSession sensors $ onExceptions $ acceptRequest pending sub $ \conn -> do
let trClient = contramap WebSocketClient tr
withOuroborosClients mode sensors conn $ \clients -> do
withOuroborosClients mode maxInFlight sensors conn $ \clients -> do
let client = mkClient unliftIO (natTracer liftIO trClient) slotsPerEpoch clients
let vData = NodeToClientVersionData networkMagic
connectClient trClient client vData nodeSocket
Expand Down Expand Up @@ -201,19 +201,20 @@ withOuroborosClients
, MonadWebSocket m
)
=> SerializationMode
-> MaxInFlight
-> Sensors m
-> Connection
-> (Clients m Block -> m a)
-> m a
withOuroborosClients mode sensors conn action = do
withOuroborosClients mode maxInFlight sensors conn action = do
(chainSyncQ, txSubmissionQ, stateQueryQ) <-
atomically $ (,,) <$> newTQueue <*> newTQueue <*> newTQueue

withAsync (routeMessage Nothing chainSyncQ stateQueryQ txSubmissionQ) $ \worker -> do
link worker
action $ Clients
{ chainSyncClient =
mkChainSyncClient chainSyncCodecs chainSyncQ yield
mkChainSyncClient maxInFlight chainSyncCodecs chainSyncQ yield
, stateQueryClient =
mkStateQueryClient stateQueryCodecs stateQueryQ yield
, txSubmissionClient =
Expand Down
4 changes: 2 additions & 2 deletions server/test/unit/Ogmios/App/Protocol/ChainSyncSpec.hs
Expand Up @@ -26,7 +26,7 @@ import Network.TypedProtocol.Codec
import Ogmios.App.Options
( defaultSlotsPerEpoch )
import Ogmios.App.Protocol.ChainSync
( mkChainSyncClient )
( MaxInFlight, mkChainSyncClient )
import Ogmios.Control.Exception
( MonadThrow (..) )
import Ogmios.Control.MonadAsync
Expand Down Expand Up @@ -100,7 +100,7 @@ withChainSyncClient action seed = do
(recvQ, sendQ) <- atomically $ (,) <$> newTQueue <*> newTQueue
let mode = CompactSerialization
let innerCodecs = mkChainSyncCodecs (encodeBlock mode) encodePoint encodeTip
let client = mkChainSyncClient innerCodecs recvQ (atomically . writeTQueue sendQ)
let client = mkChainSyncClient maxInFlight innerCodecs recvQ (atomically . writeTQueue sendQ)
let codec = codecs defaultSlotsPerEpoch nodeToClientV_Latest & cChainSyncCodec
withMockChannel (chainSyncMockPeer seed codec) $ \channel -> do
result <- race
Expand Down

0 comments on commit c7f201a

Please sign in to comment.