diff --git a/ouroboros-network-framework/ouroboros-network-framework.cabal b/ouroboros-network-framework/ouroboros-network-framework.cabal index eed1efc9ae2..4063f5489fc 100644 --- a/ouroboros-network-framework/ouroboros-network-framework.cabal +++ b/ouroboros-network-framework/ouroboros-network-framework.cabal @@ -41,6 +41,8 @@ library Ouroboros.Network.Protocol.Limits Ouroboros.Network.ConnectionId + Ouroboros.Network.ConnectionManager.Types + Ouroboros.Network.ConnectionManager.Core Ouroboros.Network.Server.ConnectionTable Ouroboros.Network.Server.Socket Ouroboros.Network.Server.RateLimiting diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs new file mode 100644 index 00000000000..2638d80a887 --- /dev/null +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs @@ -0,0 +1,329 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RankNTypes #-} + +-- | The implementation of connection manager's resource managment. +-- +module Ouroboros.Network.ConnectionManager.Core + ( withConnectionManager + ) where + +import Control.Monad (when) +import Control.Monad.Class.MonadFork +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadThrow hiding (handle) +import Control.Monad.Class.MonadSTM.Strict +import Control.Tracer (traceWith, contramap) +import Data.Foldable (traverse_) +import Data.Functor (($>)) +import Data.Typeable (Typeable) + +import Data.Map (Map) +import qualified Data.Map as Map + +import Network.Mux.Trace (WithMuxBearer (..)) + +import Ouroboros.Network.ConnectionId +import Ouroboros.Network.ConnectionManager.Types +import Ouroboros.Network.Snocket + + +-- | Internal type to the 'ConnectionManager'; this the state the connection manager +-- keeps per peer. +-- +data ConnectionHandle peerAddr socket muxPromise m = ConnectionHandle { + -- | Socket with a close callback. + -- + chSocket :: !socket, + + -- | A uniqe connection identifier. + -- + chConnectionId :: ConnectionId peerAddr, + + -- | The connection manager shares a muxPromise between inbound and + -- outbound connections. + -- + chMuxPromise :: !(StrictTVar m (Promise muxPromise)), + + -- | Action which stop the connection. + -- + chThread :: !(Async m ()), + + -- | Internal state of the 'ConnectionHandle'. It can be 'Inbound', + -- 'Outbound' or 'InboundOutbound'. + -- + chState :: !ConnectionState + } + + +-- | 'ConnectionManager' state: for each peer we keep a 'ConnectionHandle'. +-- +type State peerAddr socket muxPromise m + = Map peerAddr (ConnectionHandle peerAddr socket muxPromise m) + + +-- | Entry point for using the connection manager. This is a classic @with@ style +-- combinator, which cleans resources on exit of the callback (whether cleanly +-- or through an exception). +-- +-- Including a connection (either inbound or outbound) is an indepotent +-- operation on connection manager state. The connection manager will always +-- return the handle that was first to be included in its state. +-- +-- Once an inbound connection is passed to the 'ConnectionManager', the manager +-- is responsible for the resource. +-- +withConnectionManager + :: forall muxMode peerAddr socket handlerTrace muxPromise m a. + ( Monad m + -- We use 'MonadFork' to rethrow exceptions in the main thread. + , MonadFork m + , MonadAsync m + , MonadEvaluate m + , MonadMask m + + , Ord peerAddr + , Show peerAddr + , Typeable peerAddr + ) + => ConnectionManagerArguments muxMode handlerTrace socket peerAddr muxPromise m + -> (ConnectionManager muxMode socket peerAddr muxPromise m -> m a) + -- ^ Continuation which receives the 'ConnectionManager'. It must not leak + -- outside of scope of this callback. Once it returns all resources + -- will be closed. + -> m a +withConnectionManager ConnectionManagerArguments { + connectionManagerTracer = tracer, + connectionManagerMuxTracer = muxTracer, + connectionManagerIPv4Address, + connectionManagerIPv6Address, + connectionManagerAddressType, + connectionHandler, + connectionSnocket + } k = do + stateVar <- newTMVarM (Map.empty :: State peerAddr sockert muxPromise m) + let connectionManager :: ConnectionManager muxMode socket peerAddr muxPromise m + connectionManager = + case connectionHandler of + ConnectionHandler (WithInitiatorMode outboundHandler) -> + ConnectionManager + (WithInitiatorMode + (connectAndInclude stateVar outboundHandler)) + + ConnectionHandler (WithResponderMode inboundHandler) -> + ConnectionManager + (WithResponderMode + (includeConnection stateVar inboundHandler Inbound)) + + ConnectionHandler (WithInitiatorResponderMode outboundHandler inboundHandler) -> + ConnectionManager + (WithInitiatorResponderMode + (connectAndInclude stateVar outboundHandler) + (includeConnection stateVar inboundHandler Inbound)) + + k connectionManager + `finally` do + traceWith tracer ShutdownConnectionManager + state <- atomically $ readTMVar stateVar + traverse_ + (\ConnectionHandle { chSocket, chThread } + -> cancel chThread + >> close connectionSnocket chSocket ) + state + where + -- Include a connection in the 'State'; we use this for both inbound and + -- outbound (via 'connectAndInclude' below) connections. + -- + -- This operation is idempotent. If we try to include the connection to the + -- same peer multiple times, it will also return the already existing handle + -- and it will close the given one. Why closing it here, and not by the + -- caller? This makes it more homogeneus: the connection mamanger is + -- responsible for handling all connections weather included or not in + -- its state. + includeConnection + :: StrictTMVar m (State peerAddr socket muxPromise m) + -> ConnectionHandlerFn handlerTrace peerAddr muxPromise m + -> ConnectionState + -- ^ initialt connection state + -> socket + -- ^ resource to include in the state + -> peerAddr + -- ^ remote address used as an identifier of the resource + -> m (STM m muxPromise) + includeConnection stateVar + handler + connectionState + socket + peerAddr = + modifyTMVar stateVar $ \state -> + case Map.lookup peerAddr state of + + ----------------- + -- New connection + -- + + Nothing -> do + + localAddress <- getLocalAddr connectionSnocket socket + let connectionId = ConnectionId { remoteAddress = peerAddr + , localAddress + } + !muxPromise <- newTVarM Empty + let cleanup = + modifyTMVar_ stateVar $ \state' -> do + close connectionSnocket socket + let ConnectionHandle { chState } = state' Map.! peerAddr + traceWith tracer (ConnectionFinished connectionId chState) + pure $ Map.delete peerAddr state' + + case handler + muxPromise + (ConnectionTrace connectionId `contramap` tracer) + connectionId + (\bearerTimeout -> + toBearer + connectionSnocket + bearerTimeout + (WithMuxBearer connectionId `contramap` muxTracer) + socket) of + Action action errorHandler -> do + thread <- + mask $ \unmask -> + async $ errorHandler (unmask action `finally` cleanup) + let conn = ConnectionHandle { + chSocket = socket, + chConnectionId = connectionId, + chMuxPromise = muxPromise, + chThread = thread, + chState = connectionState + } + traceWith tracer (IncludedConnection connectionId connectionState) + pure ( Map.insert peerAddr conn state + , muxPromiseSTM muxPromise ) + + ---------------------- + -- Existing connection + -- + + Just conn@ConnectionHandle { chState, chMuxPromise } -> do + when (chState == InboundOutbound || chState == connectionState) $ do + traceWith tracer (ConnectionExists peerAddr connectionState) + throwM (ConnectionExistsError peerAddr connectionState) + let conn' = conn { chState = InboundOutbound } + -- Say go away! There are two cases: + -- + -- 1. for inbound connections: this means we've been contacted + -- twice from the same peer. We might be using two ports (or + -- two addresses), and the other end didn't realised they lead + -- to the same peer. + -- 2. for outbound connections: we might have tried connect to + -- the same peer. This might be the case if the same ip + -- address gets resolved from two different domain names. + -- + close connectionSnocket socket + + pure ( Map.update (const (Just conn')) peerAddr state + , muxPromiseSTM chMuxPromise ) + + connectAndInclude + :: StrictTMVar m (State peerAddr socket muxPromise m) + -> ConnectionHandlerFn handlerTrace peerAddr muxPromise m + -> peerAddr + -> m (STM m muxPromise) + connectAndInclude stateVar handler peerAddr = do + -- Three three stages: + -- + -- 1. Check if there is a recorded connection, if there is return it. + -- 2. Otherwise, connect the peer. + -- 3. Now try to include the existing resource. + -- + -- In steps 1 and 3 we can hold a lock on `state` as these are non + -- blocking operations; but is not the case for 2. During 2 the state + -- could have changed, i.e. the peer might contacted us before we + -- contacted them. Simultaneous open will not error on this level + -- (though it will when running the handshake mini-protocol). + -- + mbMuxPromise <- + modifyTMVar stateVar $ \state -> + case Map.lookup peerAddr state of + Just conn@ConnectionHandle { chState, chMuxPromise } -> + case chState of + Inbound -> do + let conn' = conn { chState = InboundOutbound } + pure ( Map.update (const (Just conn')) peerAddr state + , Just chMuxPromise ) + + -- Outbound, InboundOutbound + _ -> do + traceWith tracer (ConnectionExists peerAddr Outbound) + throwM (ConnectionExistsError peerAddr Outbound) + + Nothing -> pure (state, Nothing) + + case mbMuxPromise of + Just muxPromise -> do + traceWith tracer (ReusedConnection peerAddr InboundOutbound) + pure (muxPromiseSTM muxPromise) + Nothing -> + bracketOnError + (openToConnect connectionSnocket peerAddr) + (close connectionSnocket) + $ \socket -> do + addr <- + case connectionManagerAddressType peerAddr of + Nothing -> pure Nothing + Just IPv4Address -> + traverse_ (bind connectionSnocket socket) connectionManagerIPv4Address + $> connectionManagerIPv4Address + Just IPv6Address -> + traverse_ (bind connectionSnocket socket) connectionManagerIPv6Address + $> connectionManagerIPv6Address + traceWith tracer (ConnectTo addr peerAddr) + connect connectionSnocket socket peerAddr + `catch` \e -> traceWith tracer (ConnectError addr peerAddr e) + >> throwM e + includeConnection stateVar handler + Outbound socket peerAddr + + muxPromiseSTM :: StrictTVar m (Promise muxPromise) -> STM m muxPromise + muxPromiseSTM v = do + mm <- readTVar v + case mm of + Promised muxPromise -> pure muxPromise + Empty -> retry + +-- +-- Utils +-- + + +-- | Like 'modifyMVar_' but strict +-- +modifyTMVar_ :: ( MonadSTM m + , MonadMask m + ) + => StrictTMVar m a -> (a -> m a) -> m () +modifyTMVar_ m io = + mask $ \unmask -> do + a <- atomically (takeTMVar m) + a' <- unmask (io a) `onException` atomically (putTMVar m a) + atomically (putTMVar m a') + + +-- | Like 'modifyMVar' but strict in @a@ and for 'TMVar's +-- +modifyTMVar :: ( MonadEvaluate m + , MonadMask m + , MonadSTM m + ) + => StrictTMVar m a + -> (a -> m (a, b)) + -> m b +modifyTMVar m k = + mask $ \restore -> do + a <- atomically (takeTMVar m) + (!a',b) <- restore (k a >>= evaluate) `onException` atomically (putTMVar m a) + atomically (putTMVar m a') + return b diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Types.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Types.hs new file mode 100644 index 00000000000..18363473517 --- /dev/null +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Types.hs @@ -0,0 +1,263 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- 'withInitiatorMode' has @HasInitiator muxMode ~ True@ constraint, which is +-- not redundant at all! It limits case analysis. +{-# OPTIONS_GHC -Wno-redundant-constraints #-} + +-- Connection manager core types. +-- +module Ouroboros.Network.ConnectionManager.Types + ( -- * Connection manager core types + ConnectionState (..) + , ConnectionHandler (..) + , ConnectionHandlerFn + , Action (..) + , ConnectionManagerArguments (..) + , AddressType (..) + -- * 'ConnectionManager' + , ConnectionManager (..) + , IncludeOutboundConnection + , includeOutboundConnection + , IncludeInboundConnection + , includeInboundConnection + -- * Exceptions + , ExceptionInHandler (..) + , ConnectionManagerError (..) + -- * Mux types + , WithMuxMode (..) + , WithMuxTuple + , withInitiatorMode + , withResponderMode + , SingInitiatorResponderMode (..) + -- * Tracing + , ConnectionManagerTrace (..) + -- * Auxiliary types + , Promise (..) + ) where + +import Control.Exception ( Exception + , SomeException ) +import Control.Monad.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime (DiffTime) +import Control.Tracer (Tracer) +import Data.Typeable (Typeable) + +import Network.Mux.Types ( MuxBearer + , MuxMode (..) + , HasInitiator + , HasResponder + ) +import Network.Mux.Trace ( MuxTrace + , WithMuxBearer ) + +import Ouroboros.Network.ConnectionId (ConnectionId) +import Ouroboros.Network.Snocket (Snocket) + + +-- | During its lifetime, each connection is in either one of the three states: +-- +data ConnectionState = + -- | An inbound connection: one that was initiated by a remote peer. + -- + Inbound + + -- | An outbound connection: one that was initiated by us. + | Outbound + + -- | A connection which was initiated by either side and is not used in + -- duplex mode. + -- + | InboundOutbound + deriving (Eq, Show) + + +-- +-- Mux types +-- +-- TODO: find a better place for them, maybe 'Ouroboros.Network.Mux' +-- + +data WithMuxMode (muxMode :: MuxMode) a b where + WithInitiatorMode :: a -> WithMuxMode InitiatorMode a b + WithResponderMode :: b -> WithMuxMode ResponderMode a b + WithInitiatorResponderMode :: a -> b -> WithMuxMode InitiatorResponderMode a b + +type WithMuxTuple muxMode a = WithMuxMode muxMode a a + +withInitiatorMode :: HasInitiator muxMode ~ True + => WithMuxMode muxMode a b + -> a +withInitiatorMode (WithInitiatorMode a ) = a +withInitiatorMode (WithInitiatorResponderMode a _) = a + +withResponderMode :: HasResponder muxMode ~ True + => WithMuxMode muxMode a b + -> b +withResponderMode (WithResponderMode b) = b +withResponderMode (WithInitiatorResponderMode _ b) = b + + +-- | Singletons for matching the 'muxMode'. +-- +data SingInitiatorResponderMode (muxMode :: MuxMode) where + SInitiatorMode :: SingInitiatorResponderMode InitiatorMode + SResponderMode :: SingInitiatorResponderMode ResponderMode + SInitiatorResponderMode :: SingInitiatorResponderMode InitiatorResponderMode + + +-- | Promise is a strict version of 'Maybe' +-- +data Promise a + = Promised !a + | Empty + + +-- | Split error handling from action. The indentend usage is: +-- ``` +-- \(Action action errorHandler) -> mask (errorHandler (unmask action)) +-- ``` +-- This allows to attach various error handlers to the action, e.g. both +-- `finally` and `catch`. +data Action m a = Action { + action :: m a, + errorHandler :: m a -> m a + } + + +-- | Action which is executed by thread designated for a given connection. +-- +type ConnectionHandlerFn handlerTrace peerAddr muxPromise m + = StrictTVar m (Promise muxPromise) + -> Tracer m handlerTrace + -> ConnectionId peerAddr + -> (DiffTime -> MuxBearer m) + -> Action m () + + +newtype ConnectionHandler muxMode handlerTrace peerAddr muxPromise m = + ConnectionHandler + (WithMuxTuple muxMode (ConnectionHandlerFn handlerTrace peerAddr muxPromise m)) + +-- | Exception which where caught in the connection thread and were re-thrown +-- in the main thread by the 'rethrowPolicy'. +-- +data ExceptionInHandler peerAddr where + ExceptionInHandler :: !peerAddr + -> !SomeException + -> ExceptionInHandler peerAddr + deriving Typeable + +instance Show peerAddr => Show (ExceptionInHandler peerAddr) where + show (ExceptionInHandler peerAddr e) = "ExceptionInHandler " + ++ show peerAddr + ++ " " + ++ show e +instance ( Show peerAddr + , Typeable peerAddr ) => Exception (ExceptionInHandler peerAddr) + + +data ConnectionManagerError peerAddr + = ConnectionExistsError !peerAddr !ConnectionState + deriving (Show, Typeable) + +instance ( Show peerAddr + , Typeable peerAddr ) => Exception (ConnectionManagerError peerAddr) + + +-- | Connection maanger supports `IPv4` and `IPv6` addresses. +-- +data AddressType = IPv4Address | IPv6Address + deriving Show + + +-- | Assumptions \/ arguments for a 'ConnectionManager'. +-- +data ConnectionManagerArguments (muxMode :: MuxMode) handlerTrace socket peerAddr muxPromise m = + ConnectionManagerArguments { + connectionManagerTracer :: Tracer m (ConnectionManagerTrace peerAddr handlerTrace), + + -- | Mux trace. + -- + connectionManagerMuxTracer :: Tracer m (WithMuxBearer (ConnectionId peerAddr) MuxTrace), + + -- | Local @IPv4@ address of the connection manager. If given, outbound + -- connections to an @IPv4@ address will bound to it. + -- + connectionManagerIPv4Address :: Maybe peerAddr, + + -- | Local @IPv6@ address of the connection manager. If given, outbound + -- connections to an @IPv6@ address will bound to it. + -- + connectionManagerIPv6Address :: Maybe peerAddr, + + connectionManagerAddressType :: peerAddr -> Maybe AddressType, + + -- | Callback which runs in a thread dedicated for a given connection. + -- + connectionHandler :: ConnectionHandler muxMode handlerTrace peerAddr muxPromise m, + + -- | Snocket for the 'socket' type. + -- + connectionSnocket :: Snocket m socket peerAddr + } + + +type IncludeOutboundConnection peerAddr muxPromise m + = peerAddr -> m (STM m muxPromise) +type IncludeInboundConnection socket peerAddr muxPromise m + = socket -> peerAddr -> m (STM m muxPromise) + + +-- | 'ConnectionManager'. +-- +-- We identify resources (e.g. 'Network.Socket.Socket') by their address. It +-- is enough for us to use just the remote address rather than connection +-- identifier, since we just need one connection towards that peer, even if we +-- are connected through multiple addresses. It is safe to share a connection +-- manager with all the accepting sockets. +-- +newtype ConnectionManager (muxMode :: MuxMode) socket peerAddr muxPromise m = ConnectionManager { + runConnectionManager + :: WithMuxMode muxMode (IncludeOutboundConnection peerAddr muxPromise m) + (IncludeInboundConnection socket peerAddr muxPromise m) + } + +-- | Include outbound connection into 'ConnectionManager'. +-- +includeOutboundConnection :: HasInitiator muxMode ~ True + => ConnectionManager muxMode socket peerAddr muxPromise m + -> IncludeOutboundConnection peerAddr muxPromise m +includeOutboundConnection = withInitiatorMode . runConnectionManager + +-- | Include an inbound connection into 'ConnectionManager'. +-- +includeInboundConnection :: HasResponder muxMode ~ True + => ConnectionManager muxMode socket peerAddr muxPromise m + -> IncludeInboundConnection socket peerAddr muxPromise m +includeInboundConnection = withResponderMode . runConnectionManager + + +-- +-- Tracing +-- + +-- | 'ConenctionManagerTrace' contains a hole for a trace of single connection +-- which is filled with 'ConnectionTrace'. +-- +data ConnectionManagerTrace peerAddr a + = IncludedConnection !(ConnectionId peerAddr) !ConnectionState + | ConnectTo !(Maybe peerAddr) !peerAddr + | ConnectError !(Maybe peerAddr) !peerAddr !SomeException + | ReusedConnection !peerAddr !ConnectionState + | ConnectionFinished !(ConnectionId peerAddr) !ConnectionState + | ErrorFromHandler !(ConnectionId peerAddr) !SomeException + | RethrownErrorFromHandler !(ExceptionInHandler peerAddr) + | ConnectionTrace !(ConnectionId peerAddr) !a + | ShutdownConnectionManager + | ConnectionExists !peerAddr !ConnectionState + deriving Show