/
Wallet.hs
582 lines (533 loc) · 23.3 KB
/
Wallet.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
-- |
-- Copyright: © 2020 IOHK
-- License: Apache-2.0
--
-- Ouroboros mini-protocols clients for implementing cardano-wallet. These
-- clients implement the logic and lift away concerns related to concrete
-- data-type representation so that the code can be re-used / shared between
-- Byron and Shelley.
module Ouroboros.Network.Client.Wallet
(
-- * ChainSyncFollowTip
chainSyncFollowTip
-- * ChainSyncWithBlocks
, ChainSyncCmd (..)
, chainSyncWithBlocks
-- * LocalTxSubmission
, LocalTxSubmissionCmd (..)
, localTxSubmission
-- * LocalStateQuery
, LocalStateQueryCmd (..)
, LocalStateQueryResult
, localStateQuery
-- * Helpers
, send
) where
import Prelude
import Cardano.Slotting.Slot
( WithOrigin (..) )
import Cardano.Wallet.Network
( NextBlocksResult (..) )
import Control.Monad.Class.MonadSTM
( MonadSTM
, TQueue
, atomically
, isEmptyTQueue
, newEmptyTMVarM
, putTMVar
, readTQueue
, takeTMVar
, tryReadTQueue
, writeTQueue
)
import Control.Monad.Class.MonadThrow
( MonadThrow )
import Data.Functor
( (<&>) )
import Data.Maybe
( isNothing )
import Data.Void
( Void )
import Network.TypedProtocol.Pipelined
( N (..), Nat (..), natToInt )
import Numeric.Natural
( Natural )
import Ouroboros.Consensus.Ledger.Abstract
( Query (..) )
import Ouroboros.Network.Block
( BlockNo (..)
, HasHeader (..)
, Point (..)
, Serialised (..)
, Tip (..)
, blockNo
, blockPoint
, blockSlot
, castTip
, getTipPoint
, pointSlot
)
import Ouroboros.Network.Protocol.ChainSync.Client
( ChainSyncClient (..)
, ClientStIdle (..)
, ClientStIntersect (..)
, ClientStNext (..)
)
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
( ChainSyncClientPipelined (..) )
import Ouroboros.Network.Protocol.LocalStateQuery.Client
( ClientStAcquiring (..)
, ClientStQuerying (..)
, LocalStateQueryClient (..)
)
import Ouroboros.Network.Protocol.LocalStateQuery.Type
( AcquireFailure )
import Ouroboros.Network.Protocol.LocalTxSubmission.Client
( LocalTxClientStIdle (..), LocalTxSubmissionClient (..) )
import Ouroboros.Network.Protocol.LocalTxSubmission.Type
( SubmitResult (..) )
import qualified Cardano.Wallet.Primitive.Types as W
import qualified Ouroboros.Network.Protocol.ChainSync.ClientPipelined as P
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as LSQ
--------------------------------------------------------------------------------
--
-- chainSyncFollowTip
-- | Client for the 'Chain Sync' mini-protocol, which provides notifications
-- when the node tip changes.
--
-- This is used in the same way as 'chainSyncWithBlocks', except that only one
-- of these clients is necessary, rather than one client per wallet.
chainSyncFollowTip
:: forall m block. (Monad m)
=> (Tip block -> m ())
-- ^ Callback for when the tip changes.
-> ChainSyncClient (Serialised block) (Tip (block)) m Void
chainSyncFollowTip onTipUpdate =
ChainSyncClient (clientStIdle False)
where
-- Client in the state 'Idle'. We immediately request the next block.
clientStIdle
:: Bool
-> m (ClientStIdle (Serialised block) (Tip block) m Void)
clientStIdle synced = pure $ SendMsgRequestNext
(clientStNext synced)
(pure $ clientStNext synced)
-- In the CanAwait state, we take the tip point given by the node and
-- ask for the intersection of that point. This fast-fowards us to the
-- tip. Once synchronised with the tip, we expect to be waiting for the
-- server to send AwaitReply most of the time.
clientStNext
:: Bool
-> ClientStNext (Serialised block) (Tip block) m Void
clientStNext False = ClientStNext
{ recvMsgRollBackward = const findIntersect
, recvMsgRollForward = const findIntersect
}
where
findIntersect tip = ChainSyncClient $
pure $ SendMsgFindIntersect [getTipPoint $ castTip tip] clientStIntersect
clientStNext True = ClientStNext
{ recvMsgRollBackward = const doUpdate
, recvMsgRollForward = const doUpdate
}
where
doUpdate tip = ChainSyncClient $ do
onTipUpdate (castTip tip)
clientStIdle True
-- After an intersection is found, we return to idle with the sync flag
-- set.
clientStIntersect
:: ClientStIntersect (Serialised block) (Tip block) m Void
clientStIntersect = ClientStIntersect
{ recvMsgIntersectFound = \_intersection _tip ->
ChainSyncClient $ clientStIdle True
, recvMsgIntersectNotFound = \_tip ->
ChainSyncClient $ clientStIdle False
}
--------------------------------------------------------------------------------
--
-- chainSyncWithBlocks
-- | We interact with the 'NetworkClient' via a commands instrumenting the
-- client to move within the state-machine protocol. Commands are sent from a
-- parent thread via a shared 'TQueue'.
--
--
-- MAIN THREAD | NETWORK CLIENT THREAD
-- |
-- *---------------* |
-- | | |
-- | Wallet Engine | |
-- | | |
-- *---------------* |
-- | ^ |
-- v | |
-- *---------------* | *----------------*
-- | | | | |
-- | Network Layer |<===[ TQueue ]===>| Network Client |
-- | | | | |
-- *---------------* | *----------------*
-- | | ^
-- | v |
-- | (ChainSync + TxSubmission)
--
-- The NetworkClient is idling most of the time and blocking on the TQueue while
-- waiting for commands. Upon receiving a command, it interprets it by sending
-- the corresponding instruction to the node and responding via a given
-- callback.
--
-- See also 'send' for invoking commands.
data ChainSyncCmd block (m :: * -> *)
= CmdFindIntersection
[Point block]
(Maybe (Point block) -> m ())
| CmdNextBlocks
(NextBlocksResult (Point block) block -> m ())
-- | A little type-alias to ease signatures in 'chainSyncWithBlocks'
type RequestNextStrategy m n block
= (NextBlocksResult (Point block) block -> m ())
-> P.ClientPipelinedStIdle n block (Tip block) m Void
-- | Client for the 'Chain Sync' mini-protocol.
--
-- Once started, the client simply runs ad-infinitum but one may
-- interact with it via a 'TQueue' of commands / messages used to move inside
-- the state-machine.
--
-- In a typical usage, 'chainSyncWithBlocks' would be executed in a forked
-- thread and given a 'TQueue' over which the parent thread as control.
--
-- >>> forkIO $ void $ chainSyncWithBlocks tr queue channel
-- ()
-- >>> writeTQueue queue ...
--
-- Agency
-- -------------------------------------------------------------------------
-- Client has agency* | Idle
-- Server has agency* | Intersect, Next
--
-- * A peer has agency if it is expected to send the next message.
--
-- *-----------*
-- | Intersect |◀══════════════════════════════╗
-- *-----------* FindIntersect ║
-- │ ║
-- │ *---------* *------*
-- │ Intersect.{Found,NotFound} | |═════════════▶| Done |
-- └───────────────────────────────╼| | MsgDone *------*
-- | Idle |
-- ╔═══════════════════════════════════| |
-- ║ RequestNext | |⇦ START
-- ║ *---------*
-- ▼ ╿
-- *------* Roll.{Backward,Forward} │
-- | Next |────────────────────────────────────┘
-- *------*
--
chainSyncWithBlocks
:: forall m block. (Monad m, MonadSTM m, HasHeader block)
=> (Tip block -> W.BlockHeader)
-- ^ Convert an abstract tip to a concrete 'BlockHeader'
--
-- TODO: We probably need a better type for representing Tip as well!
-> TQueue m (ChainSyncCmd block m)
-- ^ We use a 'TQueue' as a communication channel to drive queries from
-- outside of the network client to the client itself.
-- Requests are pushed to the queue which are then transformed into
-- messages to keep the state-machine moving.
-> TQueue m (NextBlocksResult (Point block) block)
-- ^ An internal queue used for buffering responses collected while
-- pipelining. As argument to simplify code below. Responses are first
-- poped from this buffer if not empty, otherwise they'll simply trigger
-- an exchange with the node.
-> ChainSyncClientPipelined block (Tip block) m Void
chainSyncWithBlocks fromTip queue responseBuffer =
ChainSyncClientPipelined $ clientStIdle oneByOne
where
-- Return the _number of slots between two tips.
tipDistance :: BlockNo -> Tip block -> Natural
tipDistance (BlockNo n) TipGenesis =
1 + fromIntegral n
tipDistance (BlockNo n) (Tip _ _ (BlockNo n')) =
fromIntegral @Integer $ abs $ fromIntegral n - fromIntegral n'
-- | Keep only blocks from the list that are before or exactly at the given
-- point.
rollback :: Point block -> [block] -> [block]
rollback pt = filter (\b -> At (blockSlot b) <= pointSlot pt)
-- Client in the state 'Idle'. We wait for requests / commands on an
-- 'TQueue'. Commands start a chain of messages and state transitions
-- before finally returning to 'Idle', waiting for the next command.
clientStIdle
:: RequestNextStrategy m 'Z block
-> m (P.ClientPipelinedStIdle 'Z block (Tip block) m Void)
clientStIdle strategy = atomically (readTQueue queue) >>= \case
CmdFindIntersection points respond -> pure $
P.SendMsgFindIntersect points (clientStIntersect respond)
CmdNextBlocks respond ->
-- We are the only consumer & producer of this queue, so it's fine
-- to run 'isEmpty' and 'read' in two separate atomatic operations.
atomically (isEmptyTQueue responseBuffer) >>= \case
True ->
pure $ strategy respond
False -> do
atomically (readTQueue responseBuffer) >>= respond
clientStIdle strategy
-- When the client intersect, we are effectively starting "a new session",
-- so any buffered responses no longer apply and must be discarded.
clientStIntersect
:: (Maybe (Point block) -> m ())
-> P.ClientPipelinedStIntersect block (Tip block) m Void
clientStIntersect respond = P.ClientPipelinedStIntersect
{ recvMsgIntersectFound = \intersection _tip -> do
respond (Just intersection)
flush responseBuffer
clientStIdle oneByOne
, recvMsgIntersectNotFound = \_tip -> do
respond Nothing
flush responseBuffer
clientStIdle oneByOne
}
-- Simple strategy that sends a request and waits for an answer.
oneByOne
:: RequestNextStrategy m 'Z block
oneByOne respond = P.SendMsgRequestNext
(collectResponses respond [] Zero)
(pure $ collectResponses respond [] Zero)
-- We only pipeline requests when we are far from the tip. As soon as we
-- reach the tip however, there's no point pipelining anymore, so we start
-- collecting responses one by one.
--
-- 0 tip
-- |-----------------------------------|----->
-- pipelined one by one
pipeline
:: Int
-> Nat n
-> RequestNextStrategy m n block
pipeline goal (Succ n) respond | natToInt (Succ n) == goal =
P.CollectResponse Nothing $ collectResponses respond [] n
pipeline goal n respond =
P.SendMsgRequestNextPipelined $ pipeline goal (Succ n) respond
collectResponses
:: (NextBlocksResult (Point block) block -> m ())
-> [block]
-> Nat n
-> P.ClientStNext n block (Tip block) m Void
collectResponses respond blocks Zero = P.ClientStNext
{ P.recvMsgRollForward = \block tip -> do
let cursor' = blockPoint block
let blocks' = reverse (block:blocks)
let tip' = fromTip tip
respond (RollForward cursor' tip' blocks')
let distance = tipDistance (blockNo block) tip
let strategy = if distance <= 1
then oneByOne
else pipeline (fromIntegral $ min distance 1000) Zero
clientStIdle strategy
-- When the last message we receive is a request to rollback, we have
-- two possibilities:
--
-- a) Either, we are asked to rollback to a point that is within the
-- blocks we have just collected. So it suffices to remove blocks from
-- the list, and apply the remaining portion.
--
-- b) We are asked to rollback even further and discard all the blocks
-- we just collected. In which case, we simply discard all blocks and
-- rollback to that point as if nothing happened.
, P.recvMsgRollBackward = \point tip ->
case rollback point blocks of
[] -> do -- b)
respond (RollBackward point)
clientStIdle oneByOne
xs -> do -- a)
let cursor' = blockPoint $ head xs
let blocks' = reverse xs
let tip' = fromTip tip
respond (RollForward cursor' tip' blocks')
clientStIdle oneByOne
}
collectResponses respond blocks (Succ n) = P.ClientStNext
{ P.recvMsgRollForward = \block _tip -> pure $
P.CollectResponse Nothing $ collectResponses respond (block:blocks) n
-- This scenario is slightly more complicated than for the 'Zero' case.
-- Again, there are two possibilities:
--
-- a) Either we rollback to a point we have just collected, so it
-- suffices to discard blocks from the list and continue.
--
-- b) Or, we need to reply immediately, but we still have to collect the
-- remaining responses. BUT, we can only reply once to a given command.
-- So instead, we buffer all the remaining responses in a queue and, upon
-- receiving future requests, we'll simply read them from the buffer!
, P.recvMsgRollBackward = \point _tip ->
case rollback point blocks of
[] -> do -- b)
let save = atomically . writeTQueue responseBuffer
respond (RollBackward point)
pure $ P.CollectResponse Nothing $ collectResponses save [] n
xs -> do -- a)
pure $ P.CollectResponse Nothing $ collectResponses respond xs n
}
--------------------------------------------------------------------------------
--
-- LocalStateQuery
-- | Command to send to the localStateQuery client. See also 'ChainSyncCmd'.
data LocalStateQueryCmd block (m :: * -> *)
= forall state. CmdQueryLocalState
(Point block)
(Query block state)
(LocalStateQueryResult state -> m ())
-- | Shorthand for the possible outcomes of acquiring local state parameters.
type LocalStateQueryResult state = Either AcquireFailure state
-- | Client for the 'Local State Query' mini-protocol.
--
-- Agency
-- -------------------------------------------------------------------------
-- Client has agency* | Idle, Acquired
-- Server has agency* | Acquiring, Querying
-- * A peer has agency if it is expected to send the next message.
--
--
-- ┌───────────────┐ Done ┌───────────────┐
-- ┌──────▶│ Idle ├─────────────▶│ Done │
-- │ └───┬───────────┘ └───────────────┘
-- │ │ ▲
-- │ Acquire │ │
-- │ │ │ Failure
-- │ ▼ │
-- │ ┌───────────┴───┐ Result
-- │ │ Acquiring │◀─────────────────────┐
-- │ └───┬───────────┘ │
-- Release│ │ ▲ │
-- │ │ │ │
-- │ Acquired ▼ │ ReAcquire │
-- │ ┌───────────┴───┐ ┌────────┴───────┐
-- └───────┤ Acquired │────────────>│ Querying │
-- └───────────────┘ └────────────────┘
--
localStateQuery
:: forall m block . (MonadThrow m, MonadSTM m)
=> TQueue m (LocalStateQueryCmd block m)
-- ^ We use a 'TQueue' as a communication channel to drive queries from
-- outside of the network client to the client itself.
-- Requests are pushed to the queue which are then transformed into
-- messages to keep the state-machine moving.
-> LocalStateQueryClient block (Query block) m Void
localStateQuery queue =
LocalStateQueryClient clientStIdle
where
clientStIdle
:: m (LSQ.ClientStIdle block (Query block) m Void)
clientStIdle = awaitNextCmd <&> \case
CmdQueryLocalState pt query respond ->
LSQ.SendMsgAcquire pt (clientStAcquiring query respond)
clientStAcquiring
:: forall state. Query block state
-> (LocalStateQueryResult state -> m ())
-> LSQ.ClientStAcquiring block (Query block) m Void
clientStAcquiring query respond = LSQ.ClientStAcquiring
{ recvMsgAcquired = clientStAcquired query respond
, recvMsgFailure = \failure -> do
respond (Left failure)
clientStIdle
}
clientStAcquired
:: forall state. Query block state
-> (LocalStateQueryResult state -> m ())
-> LSQ.ClientStAcquired block (Query block) m Void
clientStAcquired query respond =
LSQ.SendMsgQuery query (clientStQuerying respond)
-- By re-acquiring rather releasing the state with 'MsgRelease' it
-- enables optimisations on the server side.
clientStAcquiredAgain
:: m (LSQ.ClientStAcquired block (Query block) m Void)
clientStAcquiredAgain = awaitNextCmd <&> \case
CmdQueryLocalState pt query respond ->
LSQ.SendMsgReAcquire pt (clientStAcquiring query respond)
clientStQuerying
:: forall state. (LocalStateQueryResult state -> m ())
-> LSQ.ClientStQuerying block (Query block) m Void state
clientStQuerying respond = LSQ.ClientStQuerying
{ recvMsgResult = \result -> do
respond (Right result)
clientStAcquiredAgain
}
awaitNextCmd :: m (LocalStateQueryCmd block m)
awaitNextCmd = atomically $ readTQueue queue
--------------------------------------------------------------------------------
--
-- LocalTxSubmission
-- | Sending command to the localTxSubmission client. See also 'ChainSyncCmd'.
data LocalTxSubmissionCmd tx err (m :: * -> *)
= CmdSubmitTx tx (SubmitResult err -> m ())
-- | Client for the 'Local Tx Submission' mini-protocol.
--
-- Agency
-- -------------------------------------------------------------------------
-- Client has agency* | Idle
-- Server has agency* | Busy
-- * A peer has agency if it is expected to send the next message.
--
-- *-----------*
-- | Busy |◀══════════════════════════════╗
-- *-----------* SubmitTx ║
-- │ │ ║
-- │ │ *---------* *------*
-- │ │ AcceptTx | |═════════════▶| Done |
-- │ └────────────────────────────╼| | MsgDone *------*
-- │ RejectTx | Idle |
-- └──────────────────────────────────╼| |
-- | |⇦ START
-- *---------*
localTxSubmission
:: forall m tx err. (MonadThrow m, MonadSTM m)
=> TQueue m (LocalTxSubmissionCmd tx err m)
-- ^ We use a 'TQueue' as a communication channel to drive queries from
-- outside of the network client to the client itself.
-- Requests are pushed to the queue which are then transformed into
-- messages to keep the state-machine moving.
-> LocalTxSubmissionClient tx err m Void
localTxSubmission queue =
LocalTxSubmissionClient clientStIdle
where
clientStIdle
:: m (LocalTxClientStIdle tx err m Void)
clientStIdle = atomically (readTQueue queue) <&> \case
CmdSubmitTx tx respond ->
SendMsgSubmitTx tx (\e -> respond e >> clientStIdle)
--------------------------------------------------------------------------------
--
-- Helpers
flush :: (MonadSTM m) => TQueue m a -> m ()
flush queue =
atomically $ dropUntil isNothing queue
where
dropUntil predicate q =
(predicate <$> tryReadTQueue q) >>= \case
True -> pure ()
False -> dropUntil predicate q
-- | Helper function to easily send commands to the node's client and read
-- responses back.
--
-- >>> queue `send` CmdNextBlocks
-- RollForward cursor nodeTip blocks
--
-- >>> queue `send` CmdNextBlocks
-- AwaitReply
send
:: MonadSTM m
=> TQueue m (cmd m)
-> ((a -> m ()) -> cmd m)
-> m a
send queue cmd = do
tvar <- newEmptyTMVarM
atomically $ writeTQueue queue (cmd (atomically . putTMVar tvar))
atomically $ takeTMVar tvar