-
-
Notifications
You must be signed in to change notification settings - Fork 86
/
TxMonitor.hs
159 lines (151 loc) · 6.45 KB
/
TxMonitor.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
{-# LANGUAGE RecordWildCards #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
-- This is used by local clients (like wallets, explorers and CLI tools) to
-- monitor the transactions passing through the mempool of a local node.
--
-- The protocol is stateful such that the server keeps track of the transactions
-- already sent to the client.
--
-- @
-- START
-- ⇓
-- ┌───────────────┐
-- ┌──────▶│ Idle │⇒ DONE
-- │ └───┬───────────┘
-- │ │
-- │ Acquire │
-- │ ▼
-- │ ┌───────────────┐
-- Release │ │ Acquiring │
-- │ └───┬───────────┘
-- │ │ ▲
-- │ Acquired │ │ AwaitAcquire
-- │ ▼ │
-- │ ┌───────────┴───┐
-- └───────┤ Acquired │
-- └───┬───────────┘
-- │ ▲
-- HasTx|NextTx|GetSizes │ │ Reply (HasTx|NextTx|GetSizes)
-- ▼ │
-- ┌───────────┴───┐
-- │ Busy │
-- └───────────────┘
-- @
module Ogmios.App.Protocol.TxMonitor
( mkTxMonitorClient
) where
import Ogmios.Prelude hiding
( id
)
import Ogmios.App.Protocol
( defaultWithInternalError
)
import Ogmios.Control.Exception
( MonadCatch
)
import Ogmios.Control.MonadSTM
( MonadSTM (..)
)
import Ogmios.Data.Json
( Json
)
import Ogmios.Data.Protocol.TxMonitor
( AcquireMempool (..)
, AcquireMempoolResponse (..)
, GenTx
, GenTxId
, HasTransaction (..)
, HasTransactionResponse (..)
, NextTransaction (..)
, NextTransactionFields (..)
, NextTransactionResponse (..)
, ReleaseMempool (..)
, ReleaseMempoolResponse (..)
, SizeOfMempool (..)
, SizeOfMempoolResponse (..)
, SlotNo (..)
, TxMonitorCodecs (..)
, TxMonitorMessage (..)
)
import Ouroboros.Consensus.Ledger.SupportsMempool
( HasTxId (..)
)
import Ouroboros.Network.Protocol.LocalTxMonitor.Client
( ClientStAcquired (..)
, ClientStIdle (..)
, LocalTxMonitorClient (..)
)
mkTxMonitorClient
:: forall m block.
( MonadSTM m
, MonadCatch m
, HasTxId (GenTx block)
)
=> TxMonitorCodecs block
-- ^ For encoding Haskell types to JSON
-> TQueue m (TxMonitorMessage block)
-- ^ Incoming request queue
-> (Json -> m ())
-- ^ An emitter for yielding JSON objects
-> LocalTxMonitorClient (GenTxId block) (GenTx block) SlotNo m ()
mkTxMonitorClient TxMonitorCodecs{..} queue yield =
LocalTxMonitorClient clientStIdle
where
await :: m (TxMonitorMessage block)
await = atomically (readTQueue queue)
clientStIdle
:: m (ClientStIdle (GenTxId block) (GenTx block) SlotNo m ())
clientStIdle = await >>= \case
MsgAcquireMempool AcquireMempool toResponse ->
defaultWithInternalError clientStIdle yield toResponse $ do
pure $ SendMsgAcquire $ \slot -> do
yield $ encodeAcquireMempoolResponse $ toResponse $ AcquireMempoolResponse slot
clientStAcquired
MsgNextTransaction NextTransaction{} toResponse -> do
yield $ encodeNextTransactionResponse $ toResponse NextTransactionMustAcquireFirst
clientStIdle
MsgHasTransaction HasTransaction{} toResponse -> do
yield $ encodeHasTransactionResponse $ toResponse HasTransactionMustAcquireFirst
clientStIdle
MsgSizeOfMempool SizeOfMempool toResponse -> do
yield $ encodeSizeOfMempoolResponse $ toResponse SizeOfMempoolMustAcquireFirst
clientStIdle
MsgReleaseMempool ReleaseMempool toResponse -> do
yield $ encodeReleaseMempoolResponse $ toResponse ReleaseMempoolMustAcquireFirst
clientStIdle
clientStAcquired
:: m (ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ())
clientStAcquired = await >>= \case
MsgAcquireMempool AcquireMempool toResponse ->
defaultWithInternalError clientStAcquired yield toResponse $ do
pure $ SendMsgAwaitAcquire $ \slot -> do
yield $ encodeAcquireMempoolResponse $ toResponse $ AcquireMempoolResponse slot
clientStAcquired
MsgNextTransaction NextTransaction{fields} toResponse ->
defaultWithInternalError clientStAcquired yield toResponse $ do
pure $ SendMsgNextTx $ \mTx -> do
let response = case fields of
Nothing ->
NextTransactionResponseId (txId <$> mTx)
Just NextTransactionAllFields ->
NextTransactionResponseTx mTx
yield $ encodeNextTransactionResponse $ toResponse response
clientStAcquired
MsgHasTransaction HasTransaction{id} toResponse ->
defaultWithInternalError clientStAcquired yield toResponse $ do
pure $ SendMsgHasTx id $ \has -> do
yield $ encodeHasTransactionResponse $ toResponse $ HasTransactionResponse{has}
clientStAcquired
MsgSizeOfMempool SizeOfMempool toResponse ->
defaultWithInternalError clientStAcquired yield toResponse $ do
pure $ SendMsgGetSizes $ \mempool -> do
yield $ encodeSizeOfMempoolResponse $ toResponse $ SizeOfMempoolResponse{mempool}
clientStAcquired
MsgReleaseMempool ReleaseMempool toResponse ->
defaultWithInternalError clientStAcquired yield toResponse $ do
pure $ SendMsgRelease $ do
yield $ encodeReleaseMempoolResponse $ toResponse Released
clientStIdle