Skip to content

Commit

Permalink
Renames, comments, exports
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent 0a6134a commit 8e70035
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions plutus-streaming/src/Plutus/Streaming.hs
@@ -1,4 +1,12 @@
module Plutus.Streaming where
module Plutus.Streaming
( SimpleChainSyncEvent,
withSimpleChainSyncEventStream,
ChainSyncEventWithLedgerState,
withChainSyncEventStreamWithLedgerState,
ChainSyncEvent(..),
EventStreamResult(..)
)
where

import Cardano.Api
import Cardano.Api.ChainSync.Client
Expand All @@ -8,16 +16,6 @@ import Control.Monad.Trans.Except (runExceptT)
import Streaming
import Streaming.Prelude qualified as S

-- import Plutus.Contract.CardanoAPI (fromCardanoBlock, fromCardanoTx)

--
-- FIXME this needs IsString (Hash BlockHeader) which seems to be missing
--
-- recentPoint :: ChainPoint
-- recentPoint = ChainPoint (SlotNo 53427524) "5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99"

-- Simple ChainSync client (non pipelined)

data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
Expand All @@ -38,7 +36,7 @@ withSimpleChainSyncEventStream ::
(Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) ->
IO b
withSimpleChainSyncEventStream filePath networkId point =
withClientStream (chainSyncStreamingClient filePath networkId point)
withClientStream (runChainSyncStreamingClient filePath networkId point)

withChainSyncEventStreamWithLedgerState ::
FilePath ->
Expand All @@ -48,7 +46,7 @@ withChainSyncEventStreamWithLedgerState ::
(Stream (Of ChainSyncEventWithLedgerState) IO EventStreamResult -> IO b) ->
IO b
withChainSyncEventStreamWithLedgerState networkConfigPath filePath networkId point =
withClientStream (chainSyncStreamingClientWithLedgerState networkConfigPath filePath networkId point)
withClientStream (runChainSyncStreamingClientWithLedgerState networkConfigPath filePath networkId point)

-- This adapts a streaming client to a stream
withClientStream ::
Expand Down Expand Up @@ -77,14 +75,14 @@ withClientStream client consumer = do
-- Cardano.Protocol.Socket.Client.
--
-- TODO move to pipelined version
chainSyncStreamingClient ::
runChainSyncStreamingClient ::
FilePath ->
NetworkId ->
ChainPoint ->
MVar (Maybe (Chan SimpleChainSyncEvent)) ->
IO ()
chainSyncStreamingClient socketPath networkId point mChan = do
let client = chainSyncStreamingClient' point mChan
runChainSyncStreamingClient socketPath networkId point mChan = do
let client = chainSyncStreamingClient point mChan

localNodeClientProtocols =
LocalNodeClientProtocols
Expand All @@ -107,22 +105,22 @@ chainSyncStreamingClient socketPath networkId point mChan = do
connectInfo
localNodeClientProtocols

chainSyncStreamingClientWithLedgerState ::
runChainSyncStreamingClientWithLedgerState ::
FilePath ->
FilePath ->
NetworkId ->
ChainPoint ->
MVar (Maybe (Chan ChainSyncEventWithLedgerState)) ->
IO ()
chainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId point mChan = do
runChainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId point mChan = do
ils <- runExceptT (initialLedgerState networkConfigFile)

case ils of
(Left _) ->
-- FIXME here we swallow the error but we can do better
-- FIXME here we swallow the error but we could do better
putMVar mChan Nothing
(Right (env, ledgerState)) -> do
let client = chainSyncClientWithLedgerState env ledgerState QuickValidation (chainSyncStreamingClient' point mChan)
let client = chainSyncClientWithLedgerState env ledgerState QuickValidation (chainSyncStreamingClient point mChan)

cardanoModeParams = CardanoModeParams . EpochSlots $ 10 * envSecurityParam env

Expand All @@ -144,11 +142,21 @@ chainSyncStreamingClientWithLedgerState networkConfigFile socketPath networkId p
connectInfo
localNodeClientProtocols

chainSyncStreamingClient' ::
-- | This is the "core" client that connects to a local node and
-- runs the chain-sync mini-protocol. The only job of this client is to
-- keep sending requests for new blocks, and passing the results (a
-- `ChainSyncEvent`) to the consumer. In particular, this client is
-- fire-and-forget and does not require any control from the consumer.
chainSyncStreamingClient ::
-- | The point on the chain to start from
ChainPoint ->
-- | This MVar is how we communicate back to the consumer. The idea here
-- is that the client might fail to initialise but once it's initalised
-- it will always be able to spit out `ChainSyncEvent`s.
MVar (Maybe (Chan (ChainSyncEvent e))) ->
-- | The entry point to the client to pass to `connectToLocalNode`
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient' point mChan =
chainSyncStreamingClient point mChan =
ChainSyncClient $ do
putStrLn "Connecting ..."
pure $ SendMsgFindIntersect [point] onIntersect
Expand Down

0 comments on commit 8e70035

Please sign in to comment.