-
Notifications
You must be signed in to change notification settings - Fork 86
/
Monitor.hs
220 lines (197 loc) · 9.49 KB
/
Monitor.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- | This module contains governor decisions for monitoring tasks:
--
-- * monitoring target peer changes
-- * monitoring results governor's job results
-- * monitoring connections
--
module Ouroboros.Network.PeerSelection.Governor.Monitor
( targetPeers
, jobs
, connections
, localRoots
) where
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Set (Set)
import Control.Concurrent.JobPool (JobPool)
import qualified Control.Concurrent.JobPool as JobPool
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime
import Control.Exception (assert)
import qualified Ouroboros.Network.PeerSelection.KnownPeers as KnownPeers
import Ouroboros.Network.PeerSelection.Types
import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.PeerSelection.Governor.ActivePeers (jobDemoteActivePeer)
-- | Monitor 'PeerSelectionTargets', if they change, we just need to update
-- 'PeerSelectionState', since we return it in a 'Decision' action it will be
-- picked by the governor's 'peerSelectionGovernorLoop'.
--
targetPeers :: MonadSTM m
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
targetPeers PeerSelectionActions{readPeerSelectionTargets}
st@PeerSelectionState{
localRootPeers,
targets
} =
Guarded Nothing $ do
targets' <- readPeerSelectionTargets
check (targets' /= targets)
-- We have to enforce the invariant that the number of root peers is
-- not more than the target number of known peers. It's unlikely in
-- practice so it's ok to resolve it arbitrarily using Map.take.
let localRootPeers' = Map.take (targetNumberOfKnownPeers targets')
localRootPeers
return Decision {
decisionTrace = TraceTargetsChanged targets targets',
decisionJobs = [],
decisionState = assert (sanePeerSelectionTargets targets')
st {
targets = targets',
localRootPeers = localRootPeers'
}
}
-- | Await for the first result from 'JobPool' and return its 'Decision'.
--
jobs :: MonadSTM m
=> JobPool m (Completion m peeraddr peerconn)
-> PeerSelectionState peeraddr peerconn
-> Time
-> Guarded (STM m) (Decision m peeraddr peerconn)
jobs jobPool st now =
-- This case is simple because the job pool returns a 'Completion' which is
-- just a function from the current state to a new 'Decision'.
Guarded Nothing $ do
Completion completion <- JobPool.collect jobPool
return $! completion st now
-- | Monitor connections.
--
connections :: forall m peeraddr peerconn.
(MonadSTM m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
connections PeerSelectionActions{peerStateActions = PeerStateActions {monitorPeerConnection}}
st@PeerSelectionState {
activePeers,
establishedPeers,
establishedStatus,
inProgressDemoteHot,
inProgressDemoteWarm
} =
Guarded Nothing $ do
establishedStatus' <- traverse monitorPeerConnection establishedPeers
let demotions = asynchronousDemotions establishedStatus
establishedStatus'
check (not (Map.null demotions))
let (demotedToWarm, demotedToCold) = Map.partition (==PeerWarm) demotions
return Decision {
decisionTrace = TraceDemoteAsynchronous demotions,
decisionJobs = [],
decisionState = st {
activePeers = activePeers
Set.\\ Map.keysSet demotions,
establishedPeers = establishedPeers
Map.\\ demotedToCold,
-- Note that we do not use establishedStatus' which
-- has the synchronous ones that are supposed to be
-- handled elsewhere. We just update the async ones:
establishedStatus = demotedToWarm
<> establishedStatus
Map.\\ demotedToCold
}
}
where
-- Those demotions that occurred not as a result of action by the governor.
-- They're further classified into demotions to warm, and demotions to cold.
asynchronousDemotions :: Map peeraddr PeerStatus
-> Map peeraddr PeerStatus
-> Map peeraddr PeerStatus
asynchronousDemotions old new =
Map.mapMaybeWithKey asyncDemotion
(Map.filter (uncurry (>))
(Map.intersectionWith (,) old new))
-- The asynchronous ones, those not directed by the governor, are:
-- hot -> warm, warm -> cold and hot -> cold, other than the ones in the in
-- relevant progress set.
asyncDemotion :: peeraddr -> (PeerStatus, PeerStatus) -> Maybe PeerStatus
asyncDemotion peeraddr (PeerHot, PeerWarm)
| peeraddr `Set.notMember` inProgressDemoteHot = Just PeerWarm
asyncDemotion peeraddr (PeerWarm, PeerCold)
| peeraddr `Set.notMember` inProgressDemoteWarm = Just PeerCold
asyncDemotion _ (PeerHot, PeerCold) = Just PeerCold
asyncDemotion _ _ = Nothing
--------------------------------
-- Local root peers below target
--
-- | Monitor local roots using 'readLocalRootPeers' 'STM' action.
--
localRoots :: forall peeraddr peerconn m.
(MonadSTM m, Ord peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionState peeraddr peerconn
-> Guarded (STM m) (Decision m peeraddr peerconn)
localRoots actions@PeerSelectionActions{readLocalRootPeers}
st@PeerSelectionState{
localRootPeers,
publicRootPeers,
knownPeers,
establishedPeers,
activePeers,
inProgressDemoteHot,
targets = PeerSelectionTargets{targetNumberOfKnownPeers}
} =
Guarded Nothing $ do
-- We have to enforce the invariant that the number of root peers is
-- not more than the target number of known peers. It's unlikely in
-- practice so it's ok to resolve it arbitrarily using Map.take.
localRootPeers' <- Map.take targetNumberOfKnownPeers <$> readLocalRootPeers
check (localRootPeers' /= localRootPeers)
let added = localRootPeers' Map.\\ localRootPeers
removed = localRootPeers Map.\\ localRootPeers'
addedSet = Map.keysSet added
removedSet = Map.keysSet removed
knownPeers' = KnownPeers.insert PeerSourceLocalRoot
(added Map.!)
addedSet
-- We do not immediately remove old ones from the
-- known peers set because we may have established
-- connections, but we mark them so that policy
-- functions can prioritise them to forget:
. KnownPeers.insert PeerSourceStaleRoot
(const DoNotAdvertisePeer)
removedSet
$ knownPeers
-- We have to adjust the publicRootPeers to maintain the invariant
-- that the local and public sets are non-overlapping.
publicRootPeers' = publicRootPeers Set.\\ Map.keysSet localRootPeers'
-- If we are removing local roots and we have active connections to
-- them then things are a little more complicated. We would typically
-- change local roots so that we can establish new connections to
-- the new local roots. But since we will typically already be at our
-- target for active peers then that will not be possible without us
-- taking additional action. What we choose to do here is to demote
-- the peer from active to warm, which will then allow new ones to
-- be promoted to active.
selectedToDemote :: Set peeraddr
selectedToDemote' :: Map peeraddr peerconn
selectedToDemote = activePeers `Set.intersection` removedSet
selectedToDemote' = establishedPeers
`Map.restrictKeys` selectedToDemote
return Decision {
decisionTrace = TraceLocalRootPeersChanged localRootPeers
localRootPeers',
decisionState = st {
localRootPeers = localRootPeers',
publicRootPeers = publicRootPeers',
knownPeers = knownPeers',
inProgressDemoteHot = inProgressDemoteHot
<> selectedToDemote
},
decisionJobs = [ jobDemoteActivePeer actions peeraddr peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToDemote' ]
}