Skip to content

Commit

Permalink
Merge pull request #4868 from IntersectMBO/coot/peer-sharing
Browse files Browse the repository at this point in the history
peer sharing changes
  • Loading branch information
coot committed May 7, 2024
2 parents 6be44ce + 8fbf956 commit 811b1de
Show file tree
Hide file tree
Showing 32 changed files with 982 additions and 639 deletions.
12 changes: 12 additions & 0 deletions ouroboros-network-framework/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

### Breaking changes

* connection-manager: maintain it's own source of randomness for `PrunePolicy`.
The types `PrunPolicy`, `ConnectionManagerArguments` changed.
* server accepts a callback which receives an `STM` action allowing to observe
public part of `InboundGovernorState`. The refactorisation changed how
exceptions are propagated through from the threads run by the server to the
main thread. `InboundGovernorObservableState` was replaced with
`PublicInboundGovernorState`.
* removed the outbound information channel between the connection manager
& outbound governor; the outbound governor now can use the
`PublichInboundGovernorState`.
* Added `serverDebugInboundGovernor` tracer was added to `ServerArguments`.

### Non-breaking changes

## 0.12.0.0 -- 2024-03-15
Expand Down
46 changes: 20 additions & 26 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import Ouroboros.Network.Context
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec (timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
Expand Down Expand Up @@ -191,6 +190,7 @@ withBidirectionalConnectionManager
-> DiffTime -- protocol idle timeout
-> DiffTime -- wait time timeout
-> Maybe peerAddr
-> Random.StdGen
-> ClientAndServerData
-- ^ series of request possible to do with the bidirectional connection
-- manager towards some peer.
Expand All @@ -204,6 +204,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
protocolIdleTimeout
timeWaitTimeout
localAddress
stdGen
ClientAndServerData {
hotInitiatorRequests,
warmInitiatorRequests,
Expand All @@ -212,15 +213,12 @@ withBidirectionalConnectionManager snocket makeBearer socket
k = do
mainThreadId <- myThreadId
inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
-- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to
-- pass list of requests, but since we are also interested in the results we
-- need to have multable cells to pass the accumulators around.
hotRequestsVar <- LazySTM.newTVarIO hotInitiatorRequests
warmRequestsVar <- LazySTM.newTVarIO warmInitiatorRequests
establishedRequestsVar <- LazySTM.newTVarIO establishedInitiatorRequests
-- we are not using the randomness
observableStateVar <- Server.newObservableStateVarFromSeed 0
let muxTracer = ("mux",) `contramap` nullTracer -- mux tracer

withConnectionManager
Expand All @@ -240,12 +238,12 @@ withBidirectionalConnectionManager snocket makeBearer socket
cmOutboundIdleTimeout = protocolIdleTimeout,
connectionDataFlow = \_ _ -> Duplex,
cmPrunePolicy = simplePrunePolicy,
cmStdGen = stdGen,
cmConnectionsLimits = AcceptedConnectionsLimit {
acceptedConnectionsHardLimit = maxBound,
acceptedConnectionsSoftLimit = maxBound,
acceptedConnectionsDelay = 0
},
cmGetPeerSharing = \_ -> PeerSharingDisabled
}
}
(makeConnectionHandler
muxTracer
Expand All @@ -265,29 +263,24 @@ withBidirectionalConnectionManager snocket makeBearer socket
establishedRequestsVar))
(mainThreadId, debugMuxErrorRethrowPolicy
<> debugIOErrorRethrowPolicy))
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager -> do
serverAddr <- Snocket.getLocalAddr snocket socket
withAsync
(Server.run
ServerArguments {
serverSockets = socket :| [],
serverSnocket = snocket,
serverTracer = ("server",) `contramap` debugTracer, -- ServerTrace
serverTrTracer = nullTracer,
serverInboundGovernorTracer = ("inbound-governor",) `contramap` debugTracer,
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverInboundInfoChannel = inbgovInfoChannel,
serverObservableStateVar = observableStateVar
}
)
(\thread -> link thread
>> k connectionManager serverAddr)
Server.with
ServerArguments {
serverSockets = socket :| [],
serverSnocket = snocket,
serverTracer = ("server",) `contramap` debugTracer, -- ServerTrace
serverTrTracer = nullTracer,
serverInboundGovernorTracer = ("inbound-governor",) `contramap` debugTracer,
serverDebugInboundGovernor = nullTracer,
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverInboundInfoChannel = inbgovInfoChannel
}
(\_ _ -> k connectionManager serverAddr)
where
serverApplication :: LazySTM.TVar m [[Int]]
-> LazySTM.TVar m [[Int]]
Expand Down Expand Up @@ -461,10 +454,11 @@ bidirectionalExperiment
timeWaitTimeout
localAddr remoteAddr
clientAndServerData = do
stdGen <- Random.newStdGen
withBidirectionalConnectionManager
snocket makeBearer socket0
protocolIdleTimeout timeWaitTimeout
(Just localAddr) clientAndServerData $
(Just localAddr) stdGen clientAndServerData $
\connectionManager _serverAddr -> forever' $ do
-- runInitiatorProtocols returns a list of results per each protocol
-- in each bucket (warm \/ hot \/ established); but we run only one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
module Test.Ouroboros.Network.Server2.IO (tests) where

import Control.Monad.Class.MonadThrow
import System.Random (mkStdGen)

import Test.QuickCheck
import Test.Tasty (TestTree, testGroup)
Expand Down Expand Up @@ -52,9 +53,10 @@ tests =
--

prop_unidirectional_IO
:: ClientAndServerData Int
:: Fixed Int
-> ClientAndServerData Int
-> Property
prop_unidirectional_IO clientAndServerData =
prop_unidirectional_IO (Fixed rnd) clientAndServerData =
ioProperty $ do
withIOManager $ \iomgr ->
bracket
Expand All @@ -66,6 +68,7 @@ prop_unidirectional_IO clientAndServerData =
Socket.bind socket (Socket.addrAddress addr)
Socket.listen socket maxBound
unidirectionalExperiment
(mkStdGen rnd)
ioTimeouts
(socketSnocket iomgr)
Mux.makeSocketBearer
Expand All @@ -76,10 +79,11 @@ prop_unidirectional_IO clientAndServerData =


prop_bidirectional_IO
:: ClientAndServerData Int
:: Fixed Int
-> ClientAndServerData Int
-> ClientAndServerData Int
-> Property
prop_bidirectional_IO data0 data1 =
prop_bidirectional_IO (Fixed rnd) data0 data1 =
ioProperty $ do
withIOManager $ \iomgr ->
bracket
Expand Down Expand Up @@ -108,6 +112,7 @@ prop_bidirectional_IO data0 data1 =

bidirectionalExperiment
True
(mkStdGen rnd)
ioTimeouts
(socketSnocket iomgr)
Mux.makeSocketBearer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ library testlib
, bytestring
, cborg
, containers
, random
, serialise

, QuickCheck
Expand Down Expand Up @@ -187,6 +188,8 @@ test-suite sim-tests
, iproute
, network
, pretty-simple
, psqueues
, random
, serialise
, text
, time
Expand All @@ -207,7 +210,6 @@ test-suite sim-tests
, strict-stm
, network-mux
, monoidal-synchronisation
, ouroboros-network-api
, ouroboros-network-framework
, ouroboros-network-framework:testlib
, ouroboros-network-testing
Expand Down Expand Up @@ -253,6 +255,7 @@ test-suite io-tests
, dns
, iproute
, network
, random
, time
, with-utf8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import Data.Monoid (All (..))
import Data.Text.Lazy qualified as Text
import Data.Void (Void)
import Quiet
import System.Random qualified as Random
import Text.Pretty.Simple (defaultOutputOptionsNoColor, pShowOpt)

import Network.Mux.Bearer
Expand All @@ -68,7 +69,6 @@ import Ouroboros.Network.Snocket (Accept (..), Accepted (..),
import Ouroboros.Network.ConnectionManager.InformationChannel
(newInformationChannel)
import Ouroboros.Network.ConnectionManager.InformationChannel qualified as InfoChannel
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))



Expand Down Expand Up @@ -689,14 +689,15 @@ instance Arbitrary SkewedBool where
-- exception or being killed by an asynchronous asynchronous exception.
--
prop_valid_transitions
:: SkewedBool
:: Fixed Int
-> SkewedBool
-- ^ bind to local address or not
-> RefinedScheduleMap Addr
-- ^ A list of addresses to which we connect or which connect to us. We use
-- 'Blind' since we show the arguments using `counterexample` in a nicer
-- way.
-> Property
prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap =
prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
let tr = runSimTrace experiment in
-- `selectTraceEventsDynamic`, can throw 'Failure', hence we run
-- `traceResults` first.
Expand Down Expand Up @@ -750,7 +751,6 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap =
--}

inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
let connectionHandler = mkConnectionHandler snocket
result <- withConnectionManager
ConnectionManagerArguments {
Expand All @@ -765,20 +765,18 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap =
cmConfigureSocket = \_ _ -> return (),
connectionDataFlow = \(Version df) _ -> df,
cmPrunePolicy = simplePrunePolicy,
cmStdGen = Random.mkStdGen rnd,
cmConnectionsLimits = AcceptedConnectionsLimit {
acceptedConnectionsHardLimit = maxBound,
acceptedConnectionsSoftLimit = maxBound,
acceptedConnectionsDelay = 0
},
cmTimeWaitTimeout = testTimeWaitTimeout,
cmOutboundIdleTimeout = testOutboundIdleTimeout,
cmGetPeerSharing = \_ -> PeerSharingDisabled
cmOutboundIdleTimeout = testOutboundIdleTimeout
}
connectionHandler
PeerSharingEnabled
(\_ -> HandshakeFailure)
(InResponderMode inbgovInfoChannel)
(InResponderMode $ Just outgovInfoChannel)
$ \(connectionManager
:: ConnectionManager InitiatorResponderMode (FD (IOSim s))
Addr (Handle m) Void (IOSim s)) -> do
Expand Down Expand Up @@ -987,6 +985,7 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap =
unit_overwritten :: Property
unit_overwritten =
prop_valid_transitions
(Fixed 42)
(SkewedBool True)
(ScheduleMap $ Map.fromList
[ ( TestAddress 1
Expand Down Expand Up @@ -1036,6 +1035,7 @@ unit_overwritten =
unit_timeoutExpired :: Property
unit_timeoutExpired =
prop_valid_transitions
(Fixed 42)
(SkewedBool True)
(ScheduleMap $ Map.fromList
[ ( TestAddress 1
Expand Down

0 comments on commit 811b1de

Please sign in to comment.