Skip to content

Commit

Permalink
refactors
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Tullsen <mtullsen@users.noreply.github.com>
  • Loading branch information
coot and mtullsen committed Oct 28, 2022
1 parent a079fea commit 8396e9d
Show file tree
Hide file tree
Showing 18 changed files with 46 additions and 73 deletions.
4 changes: 2 additions & 2 deletions network-mux/src/Network/Mux/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ newtype MiniProtocolNum = MiniProtocolNum Word16
deriving (Eq, Ord, Enum, Ix, Show)

-- | Per Miniprotocol limits
data MiniProtocolLimits =
newtype MiniProtocolLimits =
MiniProtocolLimits {
-- | Limit on the maximum number of bytes that can be queued in the
-- miniprotocol's ingress queue.
--
maximumIngressQueue :: !Int
maximumIngressQueue :: Int
}
-- GR-FIXME: Q. if part of wire format, shouldn't this be a fixed width, E.g.,
-- Word16?
Expand Down
6 changes: 3 additions & 3 deletions ouroboros-network-framework/src/Ouroboros/Network/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ hoistChannel nat channel = Channel
--
fixedInputChannel :: MonadSTM m => [a] -> m (Channel m a)
fixedInputChannel xs0 = do
v <- atomically $ newTVar xs0
v <- newTVarIO xs0
return Channel {send, recv = recv v}
where
recv v = atomically $ do
Expand Down Expand Up @@ -155,8 +155,8 @@ createConnectedChannels :: MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels = do
-- Create two TMVars to act as the channel buffer (one for each direction)
-- and use them to make both ends of a bidirectional channel
bufferA <- atomically $ newEmptyTMVar
bufferB <- atomically $ newEmptyTMVar
bufferA <- newEmptyTMVarIO
bufferB <- newEmptyTMVarIO

return (mvarsAsChannel bufferB bufferA,
mvarsAsChannel bufferA bufferB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}

-- | Implementation of 'ConnectionHandler'
--
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveFoldable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand Down Expand Up @@ -75,7 +75,8 @@ data ConnectionManagerArguments handlerTrace socket peerAddr handle handleError

-- | Trace state transitions.
--
cmTrTracer :: Tracer m (TransitionTrace peerAddr (ConnectionState peerAddr handle handleError version m)),
cmTrTracer :: Tracer m (TransitionTrace peerAddr
(ConnectionState peerAddr handle handleError version m)),

-- | Mux trace.
--
Expand Down Expand Up @@ -330,13 +331,13 @@ instance ( Show peerAddr
, show df
]
show (InboundIdleState connId connThread _handle df) =
concat ([ "InboundIdleState "
, show connId
, " "
, show (asyncThreadId connThread)
, " "
, show df
])
concat [ "InboundIdleState "
, show connId
, " "
, show (asyncThreadId connThread)
, " "
, show df
]
show (InboundState connId connThread _handle df) =
concat [ "InboundState "
, show connId
Expand All @@ -357,10 +358,10 @@ instance ( Show peerAddr
, " "
, show (asyncThreadId connThread)
]
++ maybeToList (((' ' :) . show) <$> handleError))
++ maybeToList ((' ' :) . show <$> handleError))
show (TerminatedState handleError) =
concat (["TerminatedState"]
++ maybeToList (((' ' :) . show) <$> handleError))
++ maybeToList ((' ' :) . show <$> handleError))


getConnThread :: ConnectionState peerAddr handle handleError version m
Expand Down Expand Up @@ -411,7 +412,7 @@ isInboundConn TerminatedState {} = False


abstractState :: MaybeUnknown (ConnectionState muxMode peerAddr m a b) -> AbstractState
abstractState = \s -> case s of
abstractState = \case
Unknown -> UnknownConnectionSt
Race s' -> go s'
Known s' -> go s'
Expand Down Expand Up @@ -854,7 +855,7 @@ withConnectionManager ConnectionManagerArguments {
unmask (threadDelay delay)
`catch` \e ->
case fromException e
of Just (AsyncCancelled) -> do
of Just AsyncCancelled -> do
t' <- getMonotonicTime
forceThreadDelay (delay - t' `diffTime` t)
_ -> throwIO e
Expand Down Expand Up @@ -1930,15 +1931,15 @@ withConnectionManager ConnectionManagerArguments {
-- operation which returns only once the connection is
-- negotiated.
ReservedOutboundState ->
return $
return
( DemoteToColdLocalError
(TrForbiddenOperation peerAddr st)
st
, Nothing
)

UnnegotiatedState _ _ _ ->
return $
return
( DemoteToColdLocalError
(TrForbiddenOperation peerAddr st)
st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ type ConnectionHandlerFn handlerTrace socket peerAddr handle handleError version
-- There's one 'ConnectionHandlerFn' per provenance, possibly limited by
-- @muxMode@.
--
data ConnectionHandler muxMode handlerTrace socket peerAddr handle handleError version m =
newtype ConnectionHandler muxMode handlerTrace socket peerAddr handle handleError version m =
ConnectionHandler {
-- | Connection handler.
--
connectionHandler ::
(WithMuxTuple muxMode
(ConnectionHandlerFn handlerTrace socket peerAddr handle handleError version m))
WithMuxTuple muxMode
(ConnectionHandlerFn handlerTrace socket peerAddr handle handleError version m)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}

{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}


{-# LANGUAGE TypeOperators #-}

-- 'runResponder' is using a redundant constraint.
Expand Down Expand Up @@ -109,11 +109,11 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
-- State needs to be a TVar, otherwise, when catching the exception inside
-- the loop we do not have access to the most recent version of the state
-- and might be truncating transitions.
st <- atomically $ newTVar emptyState
st <- newTVarIO emptyState
inboundGovernorLoop st
`catch`
(\(e :: SomeException) -> do
state <- atomically $ readTVar st
state <- readTVarIO st
_ <- Map.traverseWithKey
(\connId _ -> do
-- Remove the connection from the state so
Expand All @@ -136,13 +136,13 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
}

-- The inbound protocol governor recursive loop. The 'igsConnections' is
-- updated as we recurs.
-- updated as we recurse.
--
inboundGovernorLoop
:: StrictTVar m (InboundGovernorState muxMode peerAddr m a b)
-> m Void
inboundGovernorLoop !st = do
state <- atomically $ readTVar st
state <- readTVarIO st
mapTraceWithCache TrInboundGovernorCounters
tracer
(igsCountersCache state)
Expand All @@ -165,7 +165,7 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
:: EventSignal muxMode peerAddr m a b
)
(igsConnections state)
<> (FirstToFinish $
<> FirstToFinish (
NewConnection <$> ControlChannel.readMessage serverControlChannel)
(mbConnId, state') <- case event of
NewConnection
Expand Down Expand Up @@ -310,8 +310,7 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
case result of
Right completionAction -> do
traceWith tracer (TrResponderRestarted tConnId num)
let state' = updateMiniProtocol tConnId num completionAction
$ state
let state' = updateMiniProtocol tConnId num completionAction state
return (Nothing, state')

Left err -> do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}


-- | Intended to be imported qualified.
--
Expand Down Expand Up @@ -91,20 +90,9 @@ newControlChannel = do
newTBQueue 10 -- G-FIXME[R]: magic number
>>= \q -> labelTBQueue q "server-cc" $> q
pure $ ControlChannel {
readMessage = readMessage channel,
writeMessage = writeMessage channel
readMessage = readTBQueue channel,
writeMessage = writeTBQueue channel
}
where
readMessage
:: TBQueue m srvCntrlMsg
-> STM m srvCntrlMsg
readMessage = readTBQueue

writeMessage
:: TBQueue m srvCntrlMsg
-> srvCntrlMsg
-> STM m ()
writeMessage q a = writeTBQueue q a


newOutboundConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ firstPeerDemotedToWarm
connId connState@ConnectionState { csRemoteState }
= case csRemoteState of
RemoteHot ->
const (RemoteDemotedToWarm connId)
<$> foldMap fn (hotMiniProtocolStateMap connState)
RemoteDemotedToWarm connId <$ foldMap fn (hotMiniProtocolStateMap connState)

_ -> mempty
where
Expand Down Expand Up @@ -325,7 +324,7 @@ firstPeerDemotedToCold
RemoteEstablished ->
fmap (const $ WaitIdleRemote connId)
. lastToFirstM
$ (Map.foldMapWithKey
$ Map.foldMapWithKey
(\(_, miniProtocolDir) miniProtocolStatus ->
case miniProtocolDir of
InitiatorDir -> mempty
Expand All @@ -338,7 +337,6 @@ firstPeerDemotedToCold
StatusRunning -> retry
)
(Mux.miniProtocolStateMap csMux)
)

RemoteIdle {} -> mempty

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
Expand Down
4 changes: 3 additions & 1 deletion ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ instance Applicative (WithProtocolTemperature Established) where
pure = WithEstablished
(<*>) (WithEstablished f) = fmap f

instance Semigroup a => Semigroup (WithProtocolTemperature pt a) where
instance Semigroup a => Semigroup (WithProtocolTemperature Hot a) where
WithHot a <> WithHot b = WithHot (a <> b)
instance Semigroup a => Semigroup (WithProtocolTemperature Warm a) where
WithWarm a <> WithWarm b = WithWarm (a <> b)
instance Semigroup a => Semigroup (WithProtocolTemperature Established a) where
WithEstablished a <> WithEstablished b = WithEstablished (a <> b)

instance Monoid a => Monoid (WithProtocolTemperature Hot a) where
Expand Down
3 changes: 0 additions & 3 deletions ouroboros-network-framework/src/Ouroboros/Network/Server2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}

-- 'runResponder' is using a redundant constraint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ subscriptionLoop
forever $ do
traceWith tr (SubscriptionTraceStart valency)
start <- getMonotonicTime
conThreads <- atomically $ newTVar Set.empty
conThreads <- newTVarIO Set.empty
sTarget <- subscriptionTargets
innerLoop conThreads valencyVar sTarget
atomically $ waitValencyCounter valencyVar
Expand Down Expand Up @@ -567,7 +567,7 @@ worker
-> IO x
worker tr errTrace tbl sVar snocket workerCallbacks@WorkerCallbacks {wcCompleteApplicationTx, wcMainTx } workerParams k = do
resQ <- newResultQ
threadsVar <- atomically $ newTVar Set.empty
threadsVar <- newTVarIO Set.empty
withAsync
(subscriptionLoop tr tbl resQ sVar threadsVar snocket
workerCallbacks workerParams k) $ \_ ->
Expand Down
1 change: 0 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

-- `withLocalSocket` has some constraints that are only required on Windows.
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}


-- | This subsystem manages the discovery and selection of /upstream/ peers.
--
module Ouroboros.Network.PeerSelection.Governor
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ connections PeerSelectionActions{
. Set.foldr'
((snd .) . KnownPeers.incrementFailCount)
(knownPeers st)
$ (Map.keysSet demotedToCold)
$ Map.keysSet demotedToCold

in assert
(let establishedPeersSet' =
Expand Down Expand Up @@ -259,11 +259,10 @@ localRoots actions@PeerSelectionActions{readLocalRootPeers}
LocalRootPeers.toMap localRootPeers'
addedSet = Map.keysSet added
removedSet = Map.keysSet removed
knownPeers' = KnownPeers.insert addedSet
knownPeers' = KnownPeers.insert addedSet knownPeers
-- We do not immediately remove old ones from the
-- known peers set because we may have established
-- connections
$ knownPeers

-- We have to adjust the publicRootPeers to maintain the invariant
-- that the local and public sets are non-overlapping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}

-- 'startProtocols' is using 'HasInitiator' constraint to limit pattern
Expand Down Expand Up @@ -556,7 +555,7 @@ withPeerStateActions PeerStateActionsArguments {
spsTracer,
spsConnectionManager
}
k = do
k =
JobPool.withJobPool $ \jobPool ->
k PeerStateActions {
establishPeerConnection = establishPeerConnection jobPool,
Expand Down

0 comments on commit 8396e9d

Please sign in to comment.