From 8e7003529d2b33abb4041b63c21200859b54d158 Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Tue, 8 Mar 2022 11:07:52 +0800 Subject: [PATCH] Renames, comments, exports --- plutus-streaming/src/Plutus/Streaming.hs | 52 ++++++++++++++---------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 75550db695..e349c36fe4 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -1,4 +1,12 @@ -module Plutus.Streaming where +module Plutus.Streaming + ( SimpleChainSyncEvent, + withSimpleChainSyncEventStream, + ChainSyncEventWithLedgerState, + withChainSyncEventStreamWithLedgerState, + ChainSyncEvent(..), + EventStreamResult(..) + ) +where import Cardano.Api import Cardano.Api.ChainSync.Client @@ -8,16 +16,6 @@ import Control.Monad.Trans.Except (runExceptT) import Streaming import Streaming.Prelude qualified as S --- import Plutus.Contract.CardanoAPI (fromCardanoBlock, fromCardanoTx) - --- --- FIXME this needs IsString (Hash BlockHeader) which seems to be missing --- --- recentPoint :: ChainPoint --- recentPoint = ChainPoint (SlotNo 53427524) "5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99" - --- Simple ChainSync client (non pipelined) - data ChainSyncEvent a = RollForward a ChainTip | RollBackward ChainPoint ChainTip @@ -38,7 +36,7 @@ withSimpleChainSyncEventStream :: (Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) -> IO b withSimpleChainSyncEventStream filePath networkId point = - withClientStream (chainSyncStreamingClient filePath networkId point) + withClientStream (runChainSyncStreamingClient filePath networkId point) withChainSyncEventStreamWithLedgerState :: FilePath -> @@ -48,7 +46,7 @@ withChainSyncEventStreamWithLedgerState :: (Stream (Of ChainSyncEventWithLedgerState) IO EventStreamResult -> IO b) -> IO b withChainSyncEventStreamWithLedgerState networkConfigPath filePath networkId point = - withClientStream (chainSyncStreamingClientWithLedgerState networkConfigPath filePath networkId point) + withClientStream (runChainSyncStreamingClientWithLedgerState networkConfigPath filePath networkId point) -- This adapts a streaming client to a stream withClientStream :: @@ -77,14 +75,14 @@ withClientStream client consumer = do -- Cardano.Protocol.Socket.Client. -- -- TODO move to pipelined version -chainSyncStreamingClient :: +runChainSyncStreamingClient :: FilePath -> NetworkId -> ChainPoint -> MVar (Maybe (Chan SimpleChainSyncEvent)) -> IO () -chainSyncStreamingClient socketPath networkId point mChan = do - let client = chainSyncStreamingClient' point mChan +runChainSyncStreamingClient socketPath networkId point mChan = do + let client = chainSyncStreamingClient point mChan localNodeClientProtocols = LocalNodeClientProtocols @@ -107,22 +105,22 @@ chainSyncStreamingClient socketPath networkId point mChan = do connectInfo localNodeClientProtocols -chainSyncStreamingClientWithLedgerState :: +runChainSyncStreamingClientWithLedgerState :: FilePath -> FilePath -> NetworkId -> ChainPoint -> MVar (Maybe (Chan ChainSyncEventWithLedgerState)) -> IO () -chainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId point mChan = do +runChainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId point mChan = do ils <- runExceptT (initialLedgerState networkConfigFile) case ils of (Left _) -> - -- FIXME here we swallow the error but we can do better + -- FIXME here we swallow the error but we could do better putMVar mChan Nothing (Right (env, ledgerState)) -> do - let client = chainSyncClientWithLedgerState env ledgerState QuickValidation (chainSyncStreamingClient' point mChan) + let client = chainSyncClientWithLedgerState env ledgerState QuickValidation (chainSyncStreamingClient point mChan) cardanoModeParams = CardanoModeParams . EpochSlots $ 10 * envSecurityParam env @@ -144,11 +142,21 @@ chainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId p connectInfo localNodeClientProtocols -chainSyncStreamingClient' :: +-- | This is the "core" client that connects to a local node and +-- runs the chain-sync mini-protocol. The only job of this client is to +-- keep sending requests for new blocks, and passing the results (a +-- `ChainSyncEvent`) to the consumer. In particular, this client is +-- fire-and-forget and does not require any control from the consumer. +chainSyncStreamingClient :: + -- | The point on the chain to start from ChainPoint -> + -- | This MVar is how we communicate back to the consumer. The idea here + -- is that the client might fail to initialise but once it's initalised + -- it will always be able to spit out `ChainSyncEvent`s. MVar (Maybe (Chan (ChainSyncEvent e))) -> + -- | The entry point to the client to pass to `connectToLocalNode` ChainSyncClient e ChainPoint ChainTip IO () -chainSyncStreamingClient' point mChan = +chainSyncStreamingClient point mChan = ChainSyncClient $ do putStrLn "Connecting ..." pure $ SendMsgFindIntersect [point] onIntersect