diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 94d37f0ebe..5928f6dd4d 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -16,7 +16,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindInter ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) import Control.Concurrent.Async (withAsync) -import Control.Concurrent.STM (TChan, atomically, newBroadcastTChanIO, readTChan, writeTChan) +import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) import Control.Exception (Exception, throw) import GHC.Generics (Generic) import Streaming (Of, Stream) @@ -46,6 +46,7 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do -- The chain-sync client runs in a different thread and it will send us -- block through this channel. chan <- newBroadcastTChanIO + readerChannel <- atomically $ dupTChan chan let client = chainSyncStreamingClient point chan @@ -80,7 +81,7 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do -- waiting on the channel we get a BlockedIndefinitelyOnMVar right away -- before the exception that killed the client withAsync clientThread $ \_ -> do - consumer $ S.repeatM $ atomically (readTChan chan) + consumer $ S.repeatM $ atomically (readTChan readerChannel) -- | `chainSyncStreamingClient` is the client that connects to a local node -- and runs the chain-sync mini-protocol.