/
Streaming.hs
201 lines (177 loc) · 6.68 KB
/
Streaming.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
{-# LANGUAGE FlexibleInstances #-}
module Plutus.Streaming
( SimpleChainSyncEvent,
withSimpleChainSyncEventStream,
ChainSyncEventWithLedgerState,
withChainSyncEventStreamWithLedgerState,
ChainSyncEvent (..),
EventStreamResult (..),
)
where
import Cardano.Api
import Cardano.Api.ChainSync.Client
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad.Trans.Except (runExceptT)
-- import Data.Aeson (ToJSON (..))
import GHC.Generics (Generic)
import Streaming
import Streaming.Prelude qualified as S
data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
deriving (Show, Functor, Generic)
-- deriving instance Generic ChainPoint
-- instance ToJSON ChainPoint
-- instance ToJSON a => ToJSON (ChainSyncEvent a)
type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)
type ChainSyncEventWithLedgerState = ChainSyncEvent (BlockInMode CardanoMode, Either LedgerStateError (LedgerState, [LedgerEvent]))
data EventStreamResult
= NoIntersectionFound
deriving (Show)
withSimpleChainSyncEventStream ::
FilePath ->
NetworkId ->
ChainPoint ->
(Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) ->
IO b
withSimpleChainSyncEventStream filePath networkId point =
withClientStream (runChainSyncStreamingClient filePath networkId point)
withChainSyncEventStreamWithLedgerState ::
FilePath ->
FilePath ->
NetworkId ->
ChainPoint ->
(Stream (Of ChainSyncEventWithLedgerState) IO EventStreamResult -> IO b) ->
IO b
withChainSyncEventStreamWithLedgerState networkConfigPath filePath networkId point =
withClientStream (runChainSyncStreamingClientWithLedgerState networkConfigPath filePath networkId point)
-- This adapts a streaming client to a stream
withClientStream ::
(MVar (Maybe (Chan e)) -> IO r) ->
(Stream (Of e) IO EventStreamResult -> IO b) ->
IO b
withClientStream client consumer = do
-- We use a MVar as a synchronisation point to learn if the client as
-- successfully found an intersection. He rely on the fact that
-- clientSyncChain will write into m, telling us whether it has found an
-- intersection. If this doesn't happen we will be stuck waiting forever.
-- FIXME I haven't even thought about exception safety here.
m <- newEmptyMVar
withAsync (client m) $ \_ -> do
mc <- takeMVar m
case mc of
Nothing ->
consumer $ return NoIntersectionFound
Just c -> do
-- FIXME Client gets killed when the consumer finishes, we
-- should allow for a better clean up here
consumer $ S.repeatM $ readChan c
--
-- this can be replaced by the almost identical function in
-- Cardano.Protocol.Socket.Client.
--
-- TODO move to pipelined version
runChainSyncStreamingClient ::
FilePath ->
NetworkId ->
ChainPoint ->
MVar (Maybe (Chan SimpleChainSyncEvent)) ->
IO ()
runChainSyncStreamingClient socketPath networkId point mChan = do
let client = chainSyncStreamingClient point mChan
localNodeClientProtocols =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient client,
localTxSubmissionClient = Nothing,
localStateQueryClient = Nothing
}
connectInfo =
LocalNodeConnectInfo
{ localConsensusModeParams = CardanoModeParams epochSlots,
localNodeNetworkId = networkId,
localNodeSocketPath = socketPath
}
-- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig!
epochSlots = EpochSlots 40
connectToLocalNode
connectInfo
localNodeClientProtocols
runChainSyncStreamingClientWithLedgerState ::
FilePath ->
FilePath ->
NetworkId ->
ChainPoint ->
MVar (Maybe (Chan ChainSyncEventWithLedgerState)) ->
IO ()
runChainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId point mChan = do
ils <- runExceptT (initialLedgerState networkConfigFile)
case ils of
(Left _) ->
-- FIXME here we swallow the error but we could do better
putMVar mChan Nothing
(Right (env, ledgerState)) -> do
let client = chainSyncClientWithLedgerState env ledgerState QuickValidation (chainSyncStreamingClient point mChan)
cardanoModeParams = CardanoModeParams . EpochSlots $ 10 * envSecurityParam env
connectInfo =
LocalNodeConnectInfo
{ localConsensusModeParams = cardanoModeParams,
localNodeNetworkId = networkId,
localNodeSocketPath = socketPath
}
localNodeClientProtocols =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient client,
localTxSubmissionClient = Nothing,
localStateQueryClient = Nothing
}
connectToLocalNode
connectInfo
localNodeClientProtocols
-- | This is the "core" client that connects to a local node and
-- runs the chain-sync mini-protocol. The only job of this client is to
-- keep sending requests for new blocks, and passing the results (a
-- `ChainSyncEvent`) to the consumer. In particular, this client is
-- fire-and-forget and does not require any control from the consumer.
chainSyncStreamingClient ::
-- | The point on the chain to start from
ChainPoint ->
-- | This MVar is how we communicate back to the consumer. The idea here
-- is that the client might fail to initialise but once it's initalised
-- it will always be able to spit out `ChainSyncEvent`s.
MVar (Maybe (Chan (ChainSyncEvent e))) ->
-- | The entry point to the client to pass to `connectToLocalNode`
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point mChan =
ChainSyncClient $ do
putStrLn "Connecting ..."
pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \point' _ ->
ChainSyncClient $ do
putStrLn $ "Intersection found at " ++ show point'
c <- newChan
putMVar mChan (Just c)
sendRequestNext c,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $ do
putStrLn "Intersection not found"
putMVar mChan Nothing
pure $ SendMsgDone ()
}
sendRequestNext c =
pure $ SendMsgRequestNext onNext (pure onNext)
where
onNext =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
writeChan c (RollForward bim ct)
sendRequestNext c,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
writeChan c (RollBackward cp ct)
sendRequestNext c
}