From 6b5d1077e4b15e18b05722455bc71ef8bfebcdda Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Fri, 18 Feb 2022 14:09:52 +0800 Subject: [PATCH] Plutus streaming PoC --- cabal.project | 1 + plutus-streaming/CHANGELOG.md | 5 + plutus-streaming/README.md | 29 ++++ plutus-streaming/app/Main.hs | 155 +++++++++++++++++++++ plutus-streaming/plutus-streaming.cabal | 56 ++++++++ plutus-streaming/src/Cardano/Api/Extras.hs | 23 +++ plutus-streaming/src/Plutus/Streaming.hs | 145 +++++++++++++++++++ 7 files changed, 414 insertions(+) create mode 100644 plutus-streaming/CHANGELOG.md create mode 100644 plutus-streaming/README.md create mode 100644 plutus-streaming/app/Main.hs create mode 100644 plutus-streaming/plutus-streaming.cabal create mode 100644 plutus-streaming/src/Cardano/Api/Extras.hs create mode 100644 plutus-streaming/src/Plutus/Streaming.hs diff --git a/cabal.project b/cabal.project index 74a929fb48..0599430dd7 100644 --- a/cabal.project +++ b/cabal.project @@ -16,6 +16,7 @@ packages: doc plutus-playground-server plutus-script-utils plutus-use-cases + plutus-streaming quickcheck-dynamic web-ghc diff --git a/plutus-streaming/CHANGELOG.md b/plutus-streaming/CHANGELOG.md new file mode 100644 index 0000000000..035f670d01 --- /dev/null +++ b/plutus-streaming/CHANGELOG.md @@ -0,0 +1,5 @@ +# Revision history for plutus-streaming + +## 0.1.0.0 -- YYYY-mm-dd + +* First version. Released on an unsuspecting world. diff --git a/plutus-streaming/README.md b/plutus-streaming/README.md new file mode 100644 index 0000000000..11c4611457 --- /dev/null +++ b/plutus-streaming/README.md @@ -0,0 +1,29 @@ +# Plutus Streaming PoC + +Just testing ideas on a streaming interface. + +# CLI + +You can run the CLI in many ways: + +From Genesis + +``` +$ cabal run -- main --socket-path /tmp/node.socket +``` + +Passing a starting point + +``` +$ cabal run -- main --socket-path /tmp/node.socket --slot-no 53427524 --block-hash 5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99 +``` + +Running different examples + +``` +$ cabal run -- main --socket-path /tmp/node.socket --example HowManyBlocksBeforeRollback +``` + +``` +$ cabal run -- main --socket-path /tmp/node.socket --example HowManyBlocksBeforeRollbackImpurely +``` diff --git a/plutus-streaming/app/Main.hs b/plutus-streaming/app/Main.hs new file mode 100644 index 0000000000..e61d7dd3b7 --- /dev/null +++ b/plutus-streaming/app/Main.hs @@ -0,0 +1,155 @@ +{-# LANGUAGE NamedFieldPuns #-} + +module Main where + +import Cardano.Api +import Cardano.Api.Extras () +import Data.Maybe qualified as Maybe +import Options.Applicative +import Plutus.ChainIndex.Tx +import Plutus.Contract.CardanoAPI +import Plutus.Streaming +import Streaming +import Streaming.Prelude qualified as S + +-- +-- Options parsing +-- + +data Example + = JustBlocks + | HowManyBlocksBeforeRollback + | HowManyBlocksBeforeRollbackImpure + | ComposePureAndImpure + deriving (Show, Read) + +data Options = Options + { optionsSocketPath :: String, + optionsChainPoint :: ChainPoint, + optionsExample :: Example + } + deriving (Show) + +optionsParser :: Parser Options +optionsParser = + Options + <$> strOption (long "socket-path" <> help "Node socket path") + <*> chainPointParser + <*> option auto (long "example" <> value JustBlocks) + +chainPointParser :: Parser ChainPoint +chainPointParser = + pure ChainPointAtGenesis + <|> ( ChainPoint + <$> option (SlotNo <$> auto) (long "slot-no" <> metavar "SLOT-NO") + <*> option str (long "block-hash" <> metavar "BLOCK-HASH") + ) + +-- +-- Example consumers +-- + +justBlocks :: + Stream (Of ChainSyncEvent) IO () -> + Stream (Of (Either FromCardanoError [ChainIndexTx])) IO () +justBlocks = + -- + -- read the following back to front + -- + + -- format + S.map fromCardanoBlock + -- filter out rollbacks (Do we have optics?) + . S.catMaybes + . S.map (\case RollForward bim _ -> Just bim; _ -> Nothing) + -- take 10 blocks + . S.take 10 + +howManyBlocksBeforeRollback :: + Stream (Of ChainSyncEvent) IO () -> + Stream (Of Int) IO () +howManyBlocksBeforeRollback = + S.scan + ( \acc -> + \case + RollForward _ _ -> acc + 1 + RollBackward _ _ -> acc + ) + 0 + id + . S.take 100 + +howManyBlocksBeforeRollbackImpure :: + Stream (Of ChainSyncEvent) IO () -> + Stream (Of Int) IO () +howManyBlocksBeforeRollbackImpure = + S.scanM + ( \acc -> + \case + RollForward _ _ -> + pure $ acc + 1 + RollBackward _ _ -> do + putStrLn $ "Rollback after " ++ show acc ++ " blocks" + pure acc + ) + (pure 0) + pure + . S.take 100 + +-- +-- Ah! This doesn't do what I meant and I think for good reasons +-- +composePureAndImpure :: + Stream (Of ChainSyncEvent) IO () -> + Stream (Of (Int, Int)) IO () +composePureAndImpure s = + S.zip + (howManyBlocksBeforeRollback s) + (howManyBlocksBeforeRollbackImpure s) + +-- +-- Main +-- + +main :: IO () +main = do + Options {optionsSocketPath, optionsChainPoint, optionsExample} <- + execParser $ + info + (optionsParser <**> helper) + ( fullDesc + <> progDesc "Print a greeting for TARGET" + <> header "hello - a test for optparse-applicative" + ) + + withChainSyncEventStream + optionsSocketPath + Mainnet + optionsChainPoint + $ case optionsExample of + JustBlocks -> S.print . justBlocks + HowManyBlocksBeforeRollback -> S.print . howManyBlocksBeforeRollback + HowManyBlocksBeforeRollbackImpure -> S.print . howManyBlocksBeforeRollbackImpure + ComposePureAndImpure -> S.print . composePureAndImpure + +-- +-- Utilities for development +-- + +nthBlock :: Int -> IO (BlockInMode CardanoMode) +nthBlock = nthBlockAt ChainPointAtGenesis + +nthBlockAt :: ChainPoint -> Int -> IO (BlockInMode CardanoMode) +nthBlockAt point n = do + withChainSyncEventStream + "/tmp/node.socket" + Mainnet + point + ( fmap Maybe.fromJust + . S.head_ + . S.drop n + . S.catMaybes + . S.drop n + . S.map (\case RollForward bim _ -> Just bim; _ -> Nothing) + ) + diff --git a/plutus-streaming/plutus-streaming.cabal b/plutus-streaming/plutus-streaming.cabal new file mode 100644 index 0000000000..e2ee6377e0 --- /dev/null +++ b/plutus-streaming/plutus-streaming.cabal @@ -0,0 +1,56 @@ +cabal-version: 2.4 +name: plutus-streaming +version: 0.1.0.0 +author: Andrea Bedini +maintainer: andrea@andreabedini.com +extra-source-files: CHANGELOG.md + +common lang + default-language: Haskell2010 + default-extensions: + DeriveFoldable + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + ExplicitForAll + GeneralizedNewtypeDeriving + ImportQualifiedPost + LambdaCase + ScopedTypeVariables + StandaloneDeriving + ghc-options: -Wall -Wnoncanonical-monad-instances -Wunused-packages + -Wincomplete-uni-patterns -Wincomplete-record-updates + -Wredundant-constraints -Widentities + +library + import: lang + exposed-modules: + Cardano.Api.Extras + Plutus.Streaming + build-depends: + base >=4.9 && <5, + base16-bytestring, + bytestring, + async, + cardano-api, + streaming, + hs-source-dirs: src + default-language: Haskell2010 + +executable main + import: lang + main-is: Main.hs + build-depends: + plutus-streaming, + base >=4.9 && <5, + aeson, + bytestring, + cardano-api, + optparse-applicative, + plutus-chain-index-core, + -- pretty-simple, + streaming, + + hs-source-dirs: app + default-language: Haskell2010 diff --git a/plutus-streaming/src/Cardano/Api/Extras.hs b/plutus-streaming/src/Cardano/Api/Extras.hs new file mode 100644 index 0000000000..b352b84485 --- /dev/null +++ b/plutus-streaming/src/Cardano/Api/Extras.hs @@ -0,0 +1,23 @@ +{-# LANGUAGE FlexibleInstances #-} + +module Cardano.Api.Extras where + +import Cardano.Api +import Data.ByteString.Base16 qualified as Base16 +import Data.ByteString.Char8 qualified as C8 +import Data.Proxy +import Data.String + +-- FIXME orphan instance +-- https://github.com/input-output-hk/cardano-node/pull/3608 +instance IsString (Hash BlockHeader) where + fromString = either error id . deserialiseFromRawBytesBase16 . C8.pack + where + deserialiseFromRawBytesBase16 str = + case Base16.decode str of + Right raw -> case deserialiseFromRawBytes ttoken raw of + Just x -> Right x + Nothing -> Left ("cannot deserialise " ++ show str) + Left msg -> Left ("invalid hex " ++ show str ++ ", " ++ msg) + where + ttoken = proxyToAsType (Proxy :: Proxy a) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs new file mode 100644 index 0000000000..147d7dadf4 --- /dev/null +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -0,0 +1,145 @@ +module Plutus.Streaming where + +import Cardano.Api +import Cardano.Api.ChainSync.Client +import Control.Concurrent +import Control.Concurrent.Async +import Streaming +import Streaming.Prelude qualified as S + +-- import Plutus.Contract.CardanoAPI (fromCardanoBlock, fromCardanoTx) + +-- +-- FIXME this needs IsString (Hash BlockHeader) which seems to be missing +-- +-- recentPoint :: ChainPoint +-- recentPoint = ChainPoint (SlotNo 53427524) "5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99" + +-- Simple ChainSync client (non pipelined) + +data ChainSyncEvent + = RollForward (BlockInMode CardanoMode) ChainTip + | RollBackward ChainPoint ChainTip + deriving (Show) + +withChainSyncEventStream :: + FilePath -> + NetworkId -> + ChainPoint -> + (Stream (Of ChainSyncEvent) IO () -> IO b) -> + IO b +withChainSyncEventStream filePath networkId point consumer = do + -- We use a MVar as a synchronisation point to learn if the client as + -- successfully found an intersection. He rely on the fact that + -- clientSyncChain will write into m, telling us whether it has found an + -- intersection. If this doesn't happen we will be stuck waiting forever. + -- FIXME I haven't even thought about exception safety here. + m <- newEmptyMVar + withAsync (chainSyncClient filePath networkId point m) $ \_ -> do + mc <- takeMVar m + + -- let waitForClient = do + -- mmc <- tryTakeMVar m + -- case mmc of + -- Nothing -> do + -- putStrLn "waiting ..." + -- threadDelay 1000000 + -- waitForClient + -- Just mc -> return mc + -- mc <- waitForClient + + case mc of + Nothing -> + consumer $ return () + Just c -> + -- Client gets brutally killed when the consumer finishes, we + -- should allow for a better clean up here + consumer $ S.repeatM $ readChan c + +chainSyncClient :: + FilePath -> + NetworkId -> + ChainPoint -> + MVar (Maybe (Chan ChainSyncEvent)) -> + IO () +chainSyncClient socketPath networkId point mChan = do + connectToLocalNode + connectInfo + localNodeClientProtocols + where + -- + -- Client state-machine definition + -- + + onIntersect = + ClientStIntersect + { recvMsgIntersectFound = \point' _ -> + ChainSyncClient $ do + putStrLn $ "Intersection found at " ++ show point' + c <- newChan + putMVar mChan (Just c) + requestNext c, + recvMsgIntersectNotFound = \_ -> + ChainSyncClient $ do + putStrLn "Intersection not found" + putMVar mChan Nothing + pure $ SendMsgDone () + } + + requestNext c = + -- FIXME I don't understand this bit + pure $ SendMsgRequestNext (onNext c) (pure (onNext c)) + + onNext c = + ClientStNext + { recvMsgRollForward = \bim ct -> + ChainSyncClient $ do + writeChan c (RollForward bim ct) + requestNext c, + recvMsgRollBackward = \cp ct -> + ChainSyncClient $ do + writeChan c (RollBackward cp ct) + requestNext c + } + + -- + -- Client state-machine entry point + -- + + entryPoint :: ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO () + entryPoint = + ChainSyncClient $ do + putStrLn "Connecting ..." + pure $ SendMsgFindIntersect [point] onIntersect + + -- + -- Connection protocols + -- + + localNodeClientProtocols :: LocalNodeClientProtocolsInMode CardanoMode + localNodeClientProtocols = + LocalNodeClientProtocols + { localChainSyncClient = LocalChainSyncClient entryPoint, + localTxSubmissionClient = Nothing, + localStateQueryClient = Nothing + } + + -- + -- Connection Information + -- + + connectInfo :: LocalNodeConnectInfo CardanoMode + connectInfo = + LocalNodeConnectInfo + { localConsensusModeParams = CardanoModeParams epochSlots, + localNodeNetworkId = networkId, + localNodeSocketPath = socketPath + } + + -- FIXME this comes from the config file see + -- https://input-output-hk.github.io/cardano-node//cardano-api/lib/src/Cardano.Api.LedgerState.html#local-6989586621679476739 + epochSlots = EpochSlots 40 + +-- +-- https://input-output-hk.github.io/cardano-node//cardano-api/lib/src/Cardano.Api.LedgerState.html#foldBlocks +--