Skip to content

Commit

Permalink
outbound-governor: replaced readNewInboundConnection with readInbound…
Browse files Browse the repository at this point in the history
…Peers

This patch replaces `readNewInboundConnection` with `readInboundPeers`,
including the monitoring action for inbound connections.   In the
following patch we will implement an action which is using inbound
duplex peers.
  • Loading branch information
coot committed May 6, 2024
1 parent 3ce8fcb commit c3aa384
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 164 deletions.
7 changes: 1 addition & 6 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import Ouroboros.Network.Context
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec (timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
Expand Down Expand Up @@ -214,7 +213,6 @@ withBidirectionalConnectionManager snocket makeBearer socket
k = do
mainThreadId <- myThreadId
inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
-- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to
-- pass list of requests, but since we are also interested in the results we
-- need to have multable cells to pass the accumulators around.
Expand Down Expand Up @@ -245,8 +243,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
acceptedConnectionsHardLimit = maxBound,
acceptedConnectionsSoftLimit = maxBound,
acceptedConnectionsDelay = 0
},
cmGetPeerSharing = \_ -> PeerSharingDisabled
}
}
(makeConnectionHandler
muxTracer
Expand All @@ -266,10 +263,8 @@ withBidirectionalConnectionManager snocket makeBearer socket
establishedRequestsVar))
(mainThreadId, debugMuxErrorRethrowPolicy
<> debugIOErrorRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager -> do
serverAddr <- Snocket.getLocalAddr snocket socket
Server.with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ test-suite sim-tests
, strict-stm
, network-mux
, monoidal-synchronisation
, ouroboros-network-api
, ouroboros-network-framework
, ouroboros-network-framework:testlib
, ouroboros-network-testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ import Ouroboros.Network.Snocket (Accept (..), Accepted (..),
import Ouroboros.Network.ConnectionManager.InformationChannel
(newInformationChannel)
import Ouroboros.Network.ConnectionManager.InformationChannel qualified as InfoChannel
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))



Expand Down Expand Up @@ -752,7 +751,6 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
--}

inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
let connectionHandler = mkConnectionHandler snocket
result <- withConnectionManager
ConnectionManagerArguments {
Expand All @@ -774,14 +772,11 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
acceptedConnectionsDelay = 0
},
cmTimeWaitTimeout = testTimeWaitTimeout,
cmOutboundIdleTimeout = testOutboundIdleTimeout,
cmGetPeerSharing = \_ -> PeerSharingDisabled
cmOutboundIdleTimeout = testOutboundIdleTimeout
}
connectionHandler
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \(connectionManager
:: ConnectionManager InitiatorResponderMode (FD (IOSim s))
Addr (Handle m) Void (IOSim s)) -> do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.ConnectionManager.Types qualified as CM
import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo (..))
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket

Expand Down Expand Up @@ -143,11 +142,7 @@ data ConnectionManagerArguments handlerTrace socket peerAddr handle handleError
--
cmStdGen :: StdGen,

cmConnectionsLimits :: AcceptedConnectionsLimit,

-- | How to extract remote side's PeerSharing information from
-- versionData
cmGetPeerSharing :: versionData -> PeerSharing
cmConnectionsLimits :: AcceptedConnectionsLimit
}


Expand Down Expand Up @@ -560,16 +555,11 @@ withConnectionManager
=> ConnectionManagerArguments handlerTrace socket peerAddr handle handleError version versionData m
-> ConnectionHandler muxMode handlerTrace socket peerAddr handle handleError (version, versionData) m
-- ^ Callback which runs in a thread dedicated for a given connection.
-> PeerSharing
-- ^ Configuration PeerSharing value.
-> (handleError -> HandleErrorType)
-- ^ classify 'handleError's
-> InResponderMode muxMode (InformationChannel (NewConnectionInfo peerAddr handle) m)
-- ^ On outbound duplex connections we need to notify the server about
-- a new connection.
-> InResponderMode muxMode (Maybe (InformationChannel (peerAddr, PeerSharing) m))
-- ^ On inbound duplex connections we need to notify the outbound governor about
-- a new connection.
-> (ConnectionManager muxMode socket peerAddr handle handleError m -> m a)
-- ^ Continuation which receives the 'ConnectionManager'. It must not leak
-- outside of scope of this callback. Once it returns all resources
Expand All @@ -589,16 +579,13 @@ withConnectionManager args@ConnectionManagerArguments {
cmOutboundIdleTimeout,
connectionDataFlow,
cmPrunePolicy,
cmConnectionsLimits,
cmGetPeerSharing
cmConnectionsLimits
}
ConnectionHandler {
connectionHandler
}
peerSharing
classifyHandleError
inboundGovernorInfoChannel
outboundGovernorInfoChannel
k = do
((freshIdSupply, stateVar, stdGenVar)
:: ( FreshIdSupply m
Expand Down Expand Up @@ -1262,27 +1249,6 @@ withConnectionManager args@ConnectionManagerArguments {
infoChannel
(NewConnectionInfo provenance connId dataFlow handle)
_ -> return ()

let -- True iff the connection can be used by the outbound
-- governor.
notifyOutboundGov =
case (provenance, peerSharing) of
(Inbound, PeerSharingEnabled) -> Duplex == dataFlow
_ -> False
-- The connection started as inbound but its
-- provenance was changed to outbound; this is only
-- possible if we are connecting to ourselves. In
-- this case we don't need to notify the outbound
-- governor.
case outboundGovernorInfoChannel of
InResponderMode (Just infoChannel) | notifyOutboundGov
->
atomically $ InfoChannel.writeMessage
infoChannel
(peerAddr, cmGetPeerSharing versionData)

_ -> return ()

return $ Connected connId dataFlow handle

-- the connection is in `TerminatingState` or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ import Ouroboros.Network.Test.Orphans ()
import Ouroboros.Network.ConnectionManager.InformationChannel
(newInformationChannel)
import Ouroboros.Network.ConnectionManager.Test.Timeouts
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))


--
Expand Down Expand Up @@ -298,8 +297,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer cmStdGen snoc
cmStdGen,
cmConnectionsLimits = acceptedConnLimit,
cmTimeWaitTimeout = tTimeWaitTimeout timeouts,
cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts,
cmGetPeerSharing = \(DataFlowProtocolData _ ps) -> ps
cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts
}
(makeConnectionHandler
muxTracer
Expand All @@ -318,10 +316,8 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer cmStdGen snoc
<> debugMuxRuntimeErrorRethrowPolicy
<> debugIOErrorRethrowPolicy
<> assertRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
NotInResponderMode
NotInResponderMode
(\cm ->
k cm `catch` \(e :: SomeException) -> throwIO e)
where
Expand Down Expand Up @@ -466,7 +462,6 @@ withBidirectionalConnectionManager name timeouts
acceptedConnLimit k = do
mainThreadId <- myThreadId
inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
let muxTracer = WithName name `contramap` nullTracer -- mux tracer

withConnectionManager
Expand All @@ -489,8 +484,7 @@ withBidirectionalConnectionManager name timeouts
connectionDataFlow = \_ (DataFlowProtocolData df _) -> df,
cmPrunePolicy = simplePrunePolicy,
cmStdGen,
cmConnectionsLimits = acceptedConnLimit,
cmGetPeerSharing = \(DataFlowProtocolData _ ps) -> ps
cmConnectionsLimits = acceptedConnLimit
}
(makeConnectionHandler
muxTracer
Expand All @@ -509,10 +503,8 @@ withBidirectionalConnectionManager name timeouts
<> debugMuxRuntimeErrorRethrowPolicy
<> debugIOErrorRethrowPolicy
<> assertRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager ->
do
serverAddr <- Snocket.getLocalAddr snocket socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3627,7 +3627,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
requestPeerShare = \_ _ -> return (PeerSharingResult []),
peerConnToPeerSharing = \ps -> ps,
requestPublicRootPeers = \_ _ -> return (PublicRootPeers.empty, 0),
readNewInboundConnection = retry,
peerStateActions = PeerStateActions {
establishPeerConnection = error "establishPeerConnection",
monitorPeerConnection = error "monitorPeerConnection",
Expand All @@ -3637,11 +3636,12 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
},
readUseBootstrapPeers,
readLedgerStateJudgement,
readInboundPeers = pure Map.empty,
updateOutboundConnectionsState = \a -> do
a' <- readTVar olocVar
when (a /= a') $
writeTVar olocVar a
}
}

targets :: PeerSelectionTargets
targets = nullPeerSelectionTargets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ mockPeerSelectionActions' tracer
peerConnToPeerSharing = \(PeerConn _ ps _) -> ps,
requestPublicRootPeers,
readPeerSelectionTargets = readTVar targetsVar,
readNewInboundConnection = retry,
requestPeerShare,
peerStateActions = PeerStateActions {
establishPeerConnection,
Expand All @@ -389,6 +388,7 @@ mockPeerSelectionActions' tracer
},
readUseBootstrapPeers,
readLedgerStateJudgement,
readInboundPeers = pure Map.empty,
updateOutboundConnectionsState = \a -> do
a' <- readTVar outboundConnectionsStateVar
when (a /= a') $
Expand Down

0 comments on commit c3aa384

Please sign in to comment.