/
BlockFetch.hs
301 lines (261 loc) · 14.5 KB
/
BlockFetch.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-| Let's start with the big picture...
@
Key: ┏━━━━━━━━━━━━┓ ╔═════════════╗ ┏━━━━━━━━━━━━━━┓ ╔════════════╗
┃ STM-based ┃ ║active thread║ ┃state instance┃┓ ║ one thread ║╗
┃shared state┃ ║ ║ ┃ per peer ┃┃ ║ per peer ║║
┗━━━━━━━━━━━━┛ ╚═════════════╝ ┗━━━━━━━━━━━━━━┛┃ ╚════════════╝║
┗━━━━━━━━━━━━━━┛ ╚════════════╝
@
@
╔═════════════╗ ┏━━━━━━━━━━━━━┓
║ Chain sync ║╗ ┃ Ledger ┃
║ protocol ║║◀───┨ state ┃◀───────────╮
║(client side)║║ ┃ ┃ │
╚══════╤══════╝║ ┗━━━━━━━━━━━━━┛ │
╚═════╪═══════╝ │
▼ │
┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓ ╔══════╧══════╗
┃ Candidate ┃ ┃ Set of ┃ ║ Chain and ║
┃ chains ┃ ┃ downloaded ┠────▶║ ledger ║
┃ (headers) ┃ ┃ blocks ┃ ║ validation ║
┗━━━━━┯━━━━━━━┛ ┗━━━━━┯━━━━━━━┛ ╚══════╤══════╝
│ │ ▲ │
│ ╭─────────────────╯ │ │
░░░░░░░░▼░▼░░░░░░░░ │ ▼
░░╔═════════════╗░░ │ ┏━━━━━━━━━━━━━┓ ╔═════════════╗
░░║ Block ║░░ │ ┃ Current ┃ ║ Block fetch ║╗
░░╢ fetch ║◀────────────┼───────────┨ chain ┠────▶║ protocol ║║
░░║ logic ║░░ │ ┃ (blocks) ┃ ║(server side)║║
░░╚═════════════╝░░ │ ┠─────────────┨ ╚═════════════╝║
░░░░░░░░░▲░░░░░░░░░ │ ┃ Tentative ┃ ╚═════════════╝
░░░░░░░░░▼░░░░░░░░░░░░░░░░░░░░│░░░░░░░░ ┃ chain ┠──╮
░░┏━━━━━━━━━━━━━┓░░░░░╔═══════╧═════╗░░ ┃ (headers) ┃ │ ╔═════════════╗
░░┃ Block fetch ┃┓░░░░║ block fetch ║╗░ ┗━━━━━━━━━━━━━┛ │ ║ Chain sync ║╗
░░┃ state and ┃┃◀──▶║ protocol ║║░ ╰─▶║ protocol ║║
░░┃ requests ┃┃░░░░║(client side)║║░ ║(server side)║║
░░┗━━━━━━━━━━━━━┛┃░░░░╚═════════════╝║░ ╚═════════════╝║
░░░┗━━━━━━━━━━━━━┛░░░░░╚═════════════╝░ ╚═════════════╝
░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
@
Notes:
* Thread communication is via STM based state.
* Outbound: threads update STM state.
* Inbound: threads wait on STM state changing (using retry).
* These are no queues: there is only the current state, not all change events.
We consider the block fetch logic and the policy for the block fetch protocol
client together as one unit of functionality. This is the shaded area in the
diagram.
Looking at the diagram we see that these two threads interact with each other
and other threads via the following shared state
+-----------------------------+----------------+--------------------+
| State | Interactions | Internal\/External |
+=============================+================+====================+
| Candidate chains (headers) | Read | External |
+-----------------------------+----------------+--------------------+
| Current chain (blocks) | Read | External |
+-----------------------------+----------------+--------------------+
| Set of downloaded blocks | Read & Write | External |
+-----------------------------+----------------+--------------------+
| Block fetch requests | Read & Write | Internal |
+-----------------------------+----------------+--------------------+
The block fetch requests state is private between the block fetch logic
and the block fetch protocol client, so it is implemented here.
The other state is managed by the consensus layer and is considered external
here. So here we define interfaces for interacting with the external state.
These have to be provided when instantiating the block fetch logic.
-}
module Ouroboros.Network.BlockFetch (
blockFetchLogic,
BlockFetchConfiguration(..),
BlockFetchConsensusInterface(..),
-- ** Tracer types
FetchDecision,
TraceFetchClientState(..),
TraceLabelPeer(..),
-- * The 'FetchClientRegistry'
FetchClientRegistry,
newFetchClientRegistry,
bracketFetchClient,
bracketSyncWithFetchClient,
bracketKeepAliveClient,
-- * Re-export types used by 'BlockFetchConsensusInterface'
FetchMode (..),
SizeInBytes,
) where
import Data.Hashable (Hashable)
import Data.Map.Strict (Map)
import Data.Void
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..))
import Ouroboros.Network.Block
import Ouroboros.Network.DeltaQ ( SizeInBytes )
import Ouroboros.Network.BlockFetch.State
import Ouroboros.Network.BlockFetch.ClientRegistry
( FetchClientPolicy(..)
, FetchClientRegistry, newFetchClientRegistry
, readFetchClientsStatus, readFetchClientsStateVars
, readPeerGSVs
, bracketFetchClient, bracketKeepAliveClient
, bracketSyncWithFetchClient, setFetchClientContext )
-- | The consensus layer functionality that the block fetch logic requires.
--
-- These are provided as input to the block fetch by the consensus layer.
--
data BlockFetchConsensusInterface peer header block m =
BlockFetchConsensusInterface {
-- | Read the K-suffixes of the candidate chains.
--
-- Assumptions:
-- * They must be already validated.
-- * They may contain /fewer/ than @K@ blocks.
-- * Their anchor does not have to intersect with the current chain.
readCandidateChains :: STM m (Map peer (AnchoredFragment header)),
-- | Read the K-suffix of the current chain.
--
-- This must contain info on the last @K@ blocks (unless we're near
-- the chain genesis of course).
--
readCurrentChain :: STM m (AnchoredFragment header),
-- | Read the current fetch mode that the block fetch logic should use.
--
-- The fetch mode is a dynamic part of the block fetch policy. In
-- 'FetchModeBulkSync' it follows a policy that optimises for expected
-- bandwidth over latency to fetch any particular block, whereas in
-- 'FetchModeDeadline' it follows a policy optimises for the latency
-- to fetch blocks, at the expense of wasting bandwidth.
--
-- This mode should be set so that when the node's current chain is near
-- to \"now\" it uses the deadline mode, and when it is far away it uses
-- the bulk sync mode.
--
readFetchMode :: STM m FetchMode,
-- | Recent, only within last K
readFetchedBlocks :: STM m (Point block -> Bool),
-- | This and 'readFetchedBlocks' are required to be linked. Upon
-- successful completion of 'addFetchedBlock' it must be the case that
-- 'readFetchedBlocks' reports the block.
addFetchedBlock :: Point block -> block -> m (),
-- | The highest stored/downloaded slot number.
--
-- This is used to optimise the filtering of fragments in the block
-- fetch logic: when removing already downloaded blocks from a
-- fragment, the filtering (with a linear cost) is stopped as soon as a
-- block has a slot number higher than this slot number, as it cannot
-- have been downloaded anyway.
readFetchedMaxSlotNo :: STM m MaxSlotNo,
-- | Given the current chain, is the given chain plausible as a
-- candidate chain. Classically for Ouroboros this would simply
-- check if the candidate is strictly longer, but for Ouroboros
-- with operational key certificates there are also cases where
-- we would consider a chain of equal length to the current chain.
--
plausibleCandidateChain :: AnchoredFragment header
-> AnchoredFragment header -> Bool,
-- | Compare two candidate chains and return a preference ordering.
-- This is used as part of selecting which chains to prioritise for
-- downloading block bodies.
--
compareCandidateChains :: AnchoredFragment header
-> AnchoredFragment header
-> Ordering,
-- | Much of the logic for deciding which blocks to download from which
-- peer depends on making estimates based on recent performance metrics.
-- These estimates of course depend on the amount of data we will be
-- downloading.
--
blockFetchSize :: header -> SizeInBytes,
-- | Given a block header, validate the supposed corresponding block
-- body.
--
blockMatchesHeader :: header -> block -> Bool
}
-- | Configuration for FetchDecisionPolicy.
-- Should be determined by external local node config.
data BlockFetchConfiguration =
BlockFetchConfiguration {
-- | Maximum concurrent downloads during bulk syncing.
bfcMaxConcurrencyBulkSync :: !Word,
-- | Maximum concurrent downloads during deadline syncing.
bfcMaxConcurrencyDeadline :: !Word,
-- | Maximum requests in flight per each peer.
bfcMaxRequestsInflight :: !Word,
-- | Desired intervall between calls to fetchLogicIteration
bfcDecisionLoopInterval :: !DiffTime,
-- | Salt used when comparing peers
bfcSalt :: !Int
}
-- | Execute the block fetch logic. It monitors the current chain and candidate
-- chains. It decided which block bodies to fetch and manages the process of
-- fetching them, including making alternative decisions based on timeouts and
-- failures.
--
-- This runs forever and should be shut down using mechanisms such as async.
--
blockFetchLogic :: forall peer header block m.
( HasHeader header
, HasHeader block
, HeaderHash header ~ HeaderHash block
, MonadDelay m
, MonadMonotonicTime m
, MonadSTM m
, Ord peer
, Hashable peer
)
=> Tracer m [TraceLabelPeer peer (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> BlockFetchConsensusInterface peer header block m
-> FetchClientRegistry peer header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic decisionTracer clientStateTracer
BlockFetchConsensusInterface{..}
registry
BlockFetchConfiguration{..} = do
setFetchClientContext registry clientStateTracer fetchClientPolicy
fetchLogicIterations
decisionTracer clientStateTracer
fetchDecisionPolicy
fetchTriggerVariables
fetchNonTriggerVariables
where
fetchClientPolicy :: FetchClientPolicy header block m
fetchClientPolicy = FetchClientPolicy {
blockFetchSize,
blockMatchesHeader,
addFetchedBlock
}
fetchDecisionPolicy :: FetchDecisionPolicy header
fetchDecisionPolicy =
FetchDecisionPolicy {
maxInFlightReqsPerPeer = bfcMaxRequestsInflight,
maxConcurrencyBulkSync = bfcMaxConcurrencyBulkSync,
maxConcurrencyDeadline = bfcMaxConcurrencyDeadline,
decisionLoopInterval = bfcDecisionLoopInterval,
peerSalt = bfcSalt,
plausibleCandidateChain,
compareCandidateChains,
blockFetchSize
}
fetchTriggerVariables :: FetchTriggerVariables peer header m
fetchTriggerVariables =
FetchTriggerVariables {
readStateCurrentChain = readCurrentChain,
readStateCandidateChains = readCandidateChains,
readStatePeerStatus = readFetchClientsStatus registry
}
fetchNonTriggerVariables :: FetchNonTriggerVariables peer header block m
fetchNonTriggerVariables =
FetchNonTriggerVariables {
readStateFetchedBlocks = readFetchedBlocks,
readStatePeerStateVars = readFetchClientsStateVars registry,
readStatePeerGSVs = readPeerGSVs registry,
readStateFetchMode = readFetchMode,
readStateFetchedMaxSlotNo = readFetchedMaxSlotNo
}