/
State.hs
377 lines (332 loc) · 14.3 KB
/
State.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}
module Ouroboros.Network.BlockFetch.State (
fetchLogicIterations,
FetchDecisionPolicy(..),
FetchTriggerVariables(..),
FetchNonTriggerVariables(..),
FetchDecision,
FetchDecline(..),
FetchMode(..),
FetchStateSnapshot(..),
TraceLabelPeer(..),
TraceFetchClientState(..),
) where
import Data.Functor.Contravariant (contramap)
import Data.Hashable (Hashable)
import qualified Data.Map.Strict as Map
import Data.Map.Strict (Map)
import qualified Data.Set as Set
import Data.Void
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Exception (assert)
import Control.Tracer (Tracer, traceWith)
import Ouroboros.Network.Block
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.AnchoredFragment.Completeness as AF
import Ouroboros.Network.BlockFetch.ClientState
( FetchRequest(..)
, PeerFetchInFlight(..)
, PeerFetchStatus(..)
, FetchClientStateVars(..)
, addNewFetchRequest
, readFetchClientState
, TraceFetchClientState(..)
, TraceLabelPeer(..)
)
import Ouroboros.Network.BlockFetch.Decision
( fetchDecisions
, PeerInfo
, FetchDecisionPolicy(..)
, FetchMode(..)
, FetchDecision
, FetchDecline(..)
)
import Ouroboros.Network.BlockFetch.DeltaQ
( PeerGSV(..) )
fetchLogicIterations
:: ( HasHeader header
, HasHeader block
, HeaderHash header ~ HeaderHash block
, MonadDelay m
, MonadMonotonicTime m
, MonadSTM m
, Ord peer
, Hashable peer
)
=> Tracer m [TraceLabelPeer peer (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> FetchDecisionPolicy header
-> FetchTriggerVariables peer header m
-> FetchNonTriggerVariables peer header block m
-> ( FetchStateSnapshot peer header block m
-> STM m (FetchStateSnapshot peer header block m)
) -- ^ Consensus refinement in between ChainSync and BlockFetch
-> m Void
fetchLogicIterations decisionTracer clientStateTracer
fetchDecisionPolicy
fetchTriggerVariables
fetchNonTriggerVariables
consensusRefinement =
iterateForever initialFetchStateFingerprint $ \stateFingerprint -> do
-- Run a single iteration of the fetch logic:
--
-- + wait for the state to change and make decisions for the new state
-- + act on those decisions
start <- getMonotonicTime
stateFingerprint' <- fetchLogicIteration
decisionTracer clientStateTracer
fetchDecisionPolicy
fetchTriggerVariables
fetchNonTriggerVariables
stateFingerprint
consensusRefinement
end <- getMonotonicTime
let delta = diffTime end start
-- Limit descision making to once every decisionLoopInterval.
threadDelay $ (decisionLoopInterval fetchDecisionPolicy) - delta
return stateFingerprint'
iterateForever :: Monad m => a -> (a -> m a) -> m Void
iterateForever x0 m = go x0 where go x = m x >>= go
-- | A single iteration of the fetch logic.
--
-- This involves:
--
-- * waiting for the state that the fetch decisions depend upon to change;
-- * taking a snapshot of the state;
-- * deciding for each peer if we will initiate a new fetch request
--
fetchLogicIteration
:: (Hashable peer, MonadSTM m, Ord peer,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block)
=> Tracer m [TraceLabelPeer peer (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> FetchDecisionPolicy header
-> FetchTriggerVariables peer header m
-> FetchNonTriggerVariables peer header block m
-> FetchStateFingerprint peer header block
-> ( FetchStateSnapshot peer header block m
-> STM m (FetchStateSnapshot peer header block m)
) -- ^ Consensus refinement in between ChainSync and BlockFetch
-> m (FetchStateFingerprint peer header block)
fetchLogicIteration decisionTracer clientStateTracer
fetchDecisionPolicy
fetchTriggerVariables
fetchNonTriggerVariables
stateFingerprint
consensusRefinement = do
-- Gather a snapshot of all the state we need.
(stateSnapshot, stateFingerprint') <-
atomically $ do
(state, fingerprint) <- readStateVariables
fetchTriggerVariables
fetchNonTriggerVariables
stateFingerprint
state' <- consensusRefinement state
return (state', fingerprint)
-- TODO: allow for boring PeerFetchStatusBusy transitions where we go round
-- again rather than re-evaluating everything.
assert (stateFingerprint' /= stateFingerprint) $ return ()
-- TODO: log the difference in the fingerprint that caused us to wake up
-- Make all the fetch decisions
let decisions = fetchDecisionsForStateSnapshot
fetchDecisionPolicy
stateSnapshot
-- If we want to trace timings, we can do it here after forcing:
-- _ <- evaluate (force decisions)
-- Trace the batch of fetch decisions
traceWith decisionTracer
[ TraceLabelPeer peer (fmap fetchRequestPoints decision)
| (decision, (_, _, _, peer, _)) <- decisions ]
-- Tell the fetch clients to act on our decisions
statusUpdates <- fetchLogicIterationAct clientStateTracer
fetchDecisionPolicy
(map swizzleReqVar decisions)
let !stateFingerprint'' =
updateFetchStateFingerprintPeerStatus statusUpdates stateFingerprint'
return stateFingerprint''
where
swizzleReqVar (d,(_,_,g,_,(rq,p))) = (d,g,rq,p)
fetchRequestPoints :: HasHeader hdr => FetchRequest hdr -> [Point hdr]
fetchRequestPoints (FetchRequest headerss) =
-- Flatten multiple fragments and trace points, not full headers
[ blockPoint header
| headers <- headerss
, header <- AF.toOldestFirst headers ]
-- | Do a bit of rearranging of data before calling 'fetchDecisions' to do the
-- real work.
--
fetchDecisionsForStateSnapshot
:: (HasHeader header,
HeaderHash header ~ HeaderHash block,
Ord peer,
Hashable peer)
=> FetchDecisionPolicy header
-> FetchStateSnapshot peer header block m
-> [( FetchDecision (FetchRequest header),
PeerInfo header peer (FetchClientStateVars m header, peer)
)]
fetchDecisionsForStateSnapshot
fetchDecisionPolicy
FetchStateSnapshot {
fetchStateCurrentChain,
fetchStatePeerChains,
fetchStatePeerStates,
fetchStatePeerGSVs,
fetchStateFetchedBlocks,
fetchStateFetchedMaxSlotNo,
fetchStateFetchMode
} =
assert ( Map.keysSet fetchStatePeerChains
`Set.isSubsetOf` Map.keysSet fetchStatePeerStates) $
assert ( Map.keysSet fetchStatePeerStates
`Set.isSubsetOf` Map.keysSet fetchStatePeerGSVs) $
fetchDecisions
fetchDecisionPolicy
fetchStateFetchMode
fetchStateCurrentChain
fetchStateFetchedBlocks
fetchStateFetchedMaxSlotNo
peerChainsAndPeerInfo
where
peerChainsAndPeerInfo =
map swizzle . Map.toList $
Map.intersectionWith (,)
(Map.intersectionWith (,) fetchStatePeerChains fetchStatePeerStates)
fetchStatePeerGSVs
swizzle (peer, (((chain, _), (status, inflight, vars)), gsvs)) =
(chain, (status, inflight, gsvs, peer, (vars, peer)))
-- | Act on decisions to send new requests. In fact all we do here is update
-- request variables that are shared with the threads running the block fetch
-- protocol with each peer.
--
fetchLogicIterationAct :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> FetchDecisionPolicy header
-> [(FetchDecision (FetchRequest header),
PeerGSV,
FetchClientStateVars m header,
peer)]
-> m [(peer, PeerFetchStatus header)]
fetchLogicIterationAct clientStateTracer FetchDecisionPolicy{blockFetchSize}
decisions =
sequence
[ (,) peer <$> addNewFetchRequest
(contramap (TraceLabelPeer peer) clientStateTracer)
blockFetchSize
request gsvs
stateVars
| (Right request, gsvs, stateVars, peer) <- decisions ]
-- | STM actions to read various state variables that the fetch logic depends
-- upon. Any change in these variables is a trigger to re-evaluate the decision
-- on what blocks to fetch.
--
-- Note that this is a \"level trigger\" not an \"edge trigger\": we do not
-- have to re-evaluate on every change, it is sufficient to re-evaluate at some
-- stage after one or more changes. This means it is ok to get somewhat behind,
-- and it is not necessary to determine exactly what changed, just that there
-- was some change.
--
data FetchTriggerVariables peer header m = FetchTriggerVariables {
readStateCurrentChain :: STM m (AnchoredFragment header),
readStateCandidateChains :: STM m (Map peer (AnchoredFragment header, AF.FragmentCompleteness)),
readStatePeerStatus :: STM m (Map peer (PeerFetchStatus header))
}
-- | STM actions to read various state variables that the fetch logic uses.
-- While the decisions do make use of the values of these variables, it is not
-- necessary to re-evaluate when these variables change.
--
data FetchNonTriggerVariables peer header block m = FetchNonTriggerVariables {
readStateFetchedBlocks :: STM m (Point block -> Bool),
readStatePeerStateVars :: STM m (Map peer (FetchClientStateVars m header)),
readStatePeerGSVs :: STM m (Map peer PeerGSV),
readStateFetchMode :: STM m FetchMode,
readStateFetchedMaxSlotNo :: STM m MaxSlotNo
}
data FetchStateFingerprint peer header block =
FetchStateFingerprint
!(Maybe (Point block))
!(Map peer (Point header))
!(Map peer (PeerFetchStatus header))
deriving Eq
initialFetchStateFingerprint :: FetchStateFingerprint peer header block
initialFetchStateFingerprint =
FetchStateFingerprint
Nothing
Map.empty
Map.empty
updateFetchStateFingerprintPeerStatus :: Ord peer
=> [(peer, PeerFetchStatus header)]
-> FetchStateFingerprint peer header block
-> FetchStateFingerprint peer header block
updateFetchStateFingerprintPeerStatus statuses'
(FetchStateFingerprint current candidates statuses) =
FetchStateFingerprint
current
candidates
(Map.union (Map.fromList statuses') statuses) -- left overrides right
-- |
--
-- Note that the domain of 'fetchStatePeerChains' is a subset of the domain
-- of 'fetchStatePeerStates' and 'fetchStatePeerReqVars'.
--
data FetchStateSnapshot peer header block m = FetchStateSnapshot {
fetchStateCurrentChain :: AnchoredFragment header,
fetchStatePeerChains :: Map peer (AnchoredFragment header, AF.FragmentCompleteness),
fetchStatePeerStates :: Map peer (PeerFetchStatus header,
PeerFetchInFlight header,
FetchClientStateVars m header),
fetchStatePeerGSVs :: Map peer PeerGSV,
fetchStateFetchedBlocks :: Point block -> Bool,
fetchStateFetchMode :: FetchMode,
fetchStateFetchedMaxSlotNo :: MaxSlotNo
}
readStateVariables :: (MonadSTM m, Eq peer,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block)
=> FetchTriggerVariables peer header m
-> FetchNonTriggerVariables peer header block m
-> FetchStateFingerprint peer header block
-> STM m (FetchStateSnapshot peer header block m,
FetchStateFingerprint peer header block)
readStateVariables FetchTriggerVariables{..}
FetchNonTriggerVariables{..}
fetchStateFingerprint = do
-- Read all the trigger state variables
fetchStateCurrentChain <- readStateCurrentChain
fetchStatePeerChains <- readStateCandidateChains
fetchStatePeerStatus <- readStatePeerStatus
-- Construct the change detection fingerprint
let !fetchStateFingerprint' =
FetchStateFingerprint
(Just (castPoint (AF.headPoint fetchStateCurrentChain)))
(Map.map (AF.headPoint. fst) fetchStatePeerChains)
fetchStatePeerStatus
-- Check the fingerprint changed, or block and wait until it does
check (fetchStateFingerprint' /= fetchStateFingerprint)
-- Now read all the non-trigger state variables
fetchStatePeerStates <- readStatePeerStateVars
>>= traverse readFetchClientState
fetchStatePeerGSVs <- readStatePeerGSVs
fetchStateFetchedBlocks <- readStateFetchedBlocks
fetchStateFetchMode <- readStateFetchMode
fetchStateFetchedMaxSlotNo <- readStateFetchedMaxSlotNo
-- Construct the overall snapshot of the state
let fetchStateSnapshot =
FetchStateSnapshot {
fetchStateCurrentChain,
fetchStatePeerChains,
fetchStatePeerStates,
fetchStatePeerGSVs,
fetchStateFetchedBlocks,
fetchStateFetchMode,
fetchStateFetchedMaxSlotNo
}
return (fetchStateSnapshot, fetchStateFingerprint')