Skip to content

Commit

Permalink
outbound-governor: replaced readNewInboundConnection with readInbound…
Browse files Browse the repository at this point in the history
…Peers

This patch replaces `readNewInboundConnection` with `readInboundPeers`,
including the monitoring action for inbound connections.   In the
following patch we will implement an action which is using inbound
duplex peers.
  • Loading branch information
coot committed May 6, 2024
1 parent 930689c commit 467f846
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 169 deletions.
7 changes: 1 addition & 6 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import Ouroboros.Network.Context
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec (timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
Expand Down Expand Up @@ -214,7 +213,6 @@ withBidirectionalConnectionManager snocket makeBearer socket
k = do
mainThreadId <- myThreadId
inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
-- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to
-- pass list of requests, but since we are also interested in the results we
-- need to have multable cells to pass the accumulators around.
Expand Down Expand Up @@ -245,8 +243,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
acceptedConnectionsHardLimit = maxBound,
acceptedConnectionsSoftLimit = maxBound,
acceptedConnectionsDelay = 0
},
cmGetPeerSharing = \_ -> PeerSharingDisabled
}
}
(makeConnectionHandler
muxTracer
Expand All @@ -266,10 +263,8 @@ withBidirectionalConnectionManager snocket makeBearer socket
establishedRequestsVar))
(mainThreadId, debugMuxErrorRethrowPolicy
<> debugIOErrorRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager -> do
serverAddr <- Snocket.getLocalAddr snocket socket
Server.with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ test-suite sim-tests
, strict-stm
, network-mux
, monoidal-synchronisation
, ouroboros-network-api
, ouroboros-network-framework
, ouroboros-network-framework:testlib
, ouroboros-network-testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ import Ouroboros.Network.Snocket (Accept (..), Accepted (..),
import Ouroboros.Network.ConnectionManager.InformationChannel
(newInformationChannel)
import Ouroboros.Network.ConnectionManager.InformationChannel qualified as InfoChannel
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))



Expand Down Expand Up @@ -752,7 +751,6 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
--}

inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
let connectionHandler = mkConnectionHandler snocket
result <- withConnectionManager
ConnectionManagerArguments {
Expand All @@ -774,14 +772,11 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
acceptedConnectionsDelay = 0
},
cmTimeWaitTimeout = testTimeWaitTimeout,
cmOutboundIdleTimeout = testOutboundIdleTimeout,
cmGetPeerSharing = \_ -> PeerSharingDisabled
cmOutboundIdleTimeout = testOutboundIdleTimeout
}
connectionHandler
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \(connectionManager
:: ConnectionManager InitiatorResponderMode (FD (IOSim s))
Addr (Handle m) Void (IOSim s)) -> do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.ConnectionManager.Types qualified as CM
import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo (..))
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket

Expand Down Expand Up @@ -143,11 +142,7 @@ data ConnectionManagerArguments handlerTrace socket peerAddr handle handleError
--
cmStdGen :: StdGen,

cmConnectionsLimits :: AcceptedConnectionsLimit,

-- | How to extract remote side's PeerSharing information from
-- versionData
cmGetPeerSharing :: versionData -> PeerSharing
cmConnectionsLimits :: AcceptedConnectionsLimit
}


Expand Down Expand Up @@ -560,16 +555,11 @@ withConnectionManager
=> ConnectionManagerArguments handlerTrace socket peerAddr handle handleError version versionData m
-> ConnectionHandler muxMode handlerTrace socket peerAddr handle handleError (version, versionData) m
-- ^ Callback which runs in a thread dedicated for a given connection.
-> PeerSharing
-- ^ Configuration PeerSharing value.
-> (handleError -> HandleErrorType)
-- ^ classify 'handleError's
-> InResponderMode muxMode (InformationChannel (NewConnectionInfo peerAddr handle) m)
-- ^ On outbound duplex connections we need to notify the server about
-- a new connection.
-> InResponderMode muxMode (Maybe (InformationChannel (peerAddr, PeerSharing) m))
-- ^ On inbound duplex connections we need to notify the outbound governor about
-- a new connection.
-> (ConnectionManager muxMode socket peerAddr handle handleError m -> m a)
-- ^ Continuation which receives the 'ConnectionManager'. It must not leak
-- outside of scope of this callback. Once it returns all resources
Expand All @@ -589,16 +579,13 @@ withConnectionManager args@ConnectionManagerArguments {
cmOutboundIdleTimeout,
connectionDataFlow,
cmPrunePolicy,
cmConnectionsLimits,
cmGetPeerSharing
cmConnectionsLimits
}
ConnectionHandler {
connectionHandler
}
peerSharing
classifyHandleError
inboundGovernorInfoChannel
outboundGovernorInfoChannel
k = do
((freshIdSupply, stateVar, stdGenVar)
:: ( FreshIdSupply m
Expand Down Expand Up @@ -1262,27 +1249,6 @@ withConnectionManager args@ConnectionManagerArguments {
infoChannel
(NewConnectionInfo provenance connId dataFlow handle)
_ -> return ()

let -- True iff the connection can be used by the outbound
-- governor.
notifyOutboundGov =
case (provenance, peerSharing) of
(Inbound, PeerSharingEnabled) -> Duplex == dataFlow
_ -> False
-- The connection started as inbound but its
-- provenance was changed to outbound; this is only
-- possible if we are connecting to ourselves. In
-- this case we don't need to notify the outbound
-- governor.
case outboundGovernorInfoChannel of
InResponderMode (Just infoChannel) | notifyOutboundGov
->
atomically $ InfoChannel.writeMessage
infoChannel
(peerAddr, cmGetPeerSharing versionData)

_ -> return ()

return $ Connected connId dataFlow handle

-- the connection is in `TerminatingState` or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ import Ouroboros.Network.Test.Orphans ()
import Ouroboros.Network.ConnectionManager.InformationChannel
(newInformationChannel)
import Ouroboros.Network.ConnectionManager.Test.Timeouts
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))


--
Expand Down Expand Up @@ -298,8 +297,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer cmStdGen snoc
cmStdGen,
cmConnectionsLimits = acceptedConnLimit,
cmTimeWaitTimeout = tTimeWaitTimeout timeouts,
cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts,
cmGetPeerSharing = \(DataFlowProtocolData _ ps) -> ps
cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts
}
(makeConnectionHandler
muxTracer
Expand All @@ -318,10 +316,8 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer cmStdGen snoc
<> debugMuxRuntimeErrorRethrowPolicy
<> debugIOErrorRethrowPolicy
<> assertRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
NotInResponderMode
NotInResponderMode
(\cm ->
k cm `catch` \(e :: SomeException) -> throwIO e)
where
Expand Down Expand Up @@ -466,7 +462,6 @@ withBidirectionalConnectionManager name timeouts
acceptedConnLimit k = do
mainThreadId <- myThreadId
inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
let muxTracer = WithName name `contramap` nullTracer -- mux tracer

withConnectionManager
Expand All @@ -489,8 +484,7 @@ withBidirectionalConnectionManager name timeouts
connectionDataFlow = \_ (DataFlowProtocolData df _) -> df,
cmPrunePolicy = simplePrunePolicy,
cmStdGen,
cmConnectionsLimits = acceptedConnLimit,
cmGetPeerSharing = \(DataFlowProtocolData _ ps) -> ps
cmConnectionsLimits = acceptedConnLimit
}
(makeConnectionHandler
muxTracer
Expand All @@ -509,10 +503,8 @@ withBidirectionalConnectionManager name timeouts
<> debugMuxRuntimeErrorRethrowPolicy
<> debugIOErrorRethrowPolicy
<> assertRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager ->
do
serverAddr <- Snocket.getLocalAddr snocket socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import Data.OrdPSQ qualified as PSQ
import System.Random (mkStdGen)

import Control.Exception (AssertionFailed (..), catch, evaluate)
import Control.Monad.Class.MonadSTM (STM, retry)
import Control.Monad.Class.MonadSTM (STM)
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..))

Expand Down Expand Up @@ -3355,7 +3355,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
requestPeerShare = \_ _ -> return (PeerSharingResult []),
peerConnToPeerSharing = \ps -> ps,
requestPublicRootPeers = \_ _ -> return (PublicRootPeers.empty, 0),
readNewInboundConnection = retry,
peerStateActions = PeerStateActions {
establishPeerConnection = error "establishPeerConnection",
monitorPeerConnection = error "monitorPeerConnection",
Expand All @@ -3364,7 +3363,8 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
closePeerConnection = error "closePeerConnection"
},
readUseBootstrapPeers,
readLedgerStateJudgement
readLedgerStateJudgement,
readInboundPeers = pure Map.empty
}

targets :: PeerSelectionTargets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ mockPeerSelectionActions' tracer
peerConnToPeerSharing = \(PeerConn _ ps _) -> ps,
requestPublicRootPeers,
readPeerSelectionTargets = readTVar targetsVar,
readNewInboundConnection = retry,
requestPeerShare,
peerStateActions = PeerStateActions {
establishPeerConnection,
Expand All @@ -357,7 +356,8 @@ mockPeerSelectionActions' tracer
closePeerConnection
},
readUseBootstrapPeers,
readLedgerStateJudgement
readLedgerStateJudgement,
readInboundPeers = pure Map.empty
}
where
-- TODO: make this dynamic
Expand Down

0 comments on commit 467f846

Please sign in to comment.