Skip to content

Commit

Permalink
Add slot information for fetched blocks
Browse files Browse the repository at this point in the history
Switch ChainIndex to use the node protocol.

Split the server command channel.

Make slotCoordinator coordinate.

Fix some deadlocks

Fix some merge conflicts

Watch all the wallets

Make the node server forward input to clients.

Update materialized

Remove tx processing from node server.

Remove the follower effect

Fix tests.

Switch to implicit syncs
  • Loading branch information
raduom committed Feb 21, 2021
1 parent 5f78006 commit f503770
Show file tree
Hide file tree
Showing 19 changed files with 238 additions and 453 deletions.
3 changes: 0 additions & 3 deletions nix/pkgs/haskell/materialized-musl/.plan.nix/plutus-pab.nix
Expand Up @@ -125,13 +125,10 @@
"Cardano/Metadata/Types"
"Cardano/Node/API"
"Cardano/Node/Client"
"Cardano/Node/Follower"
"Cardano/Node/Mock"
"Cardano/Node/RandomTx"
"Cardano/Node/Server"
"Cardano/Node/Types"
"Cardano/Protocol/ChainEffect"
"Cardano/Protocol/FollowerEffect"
"Cardano/Protocol/Socket/Type"
"Cardano/Protocol/Socket/Server"
"Cardano/Protocol/Socket/Client"
Expand Down
3 changes: 0 additions & 3 deletions nix/pkgs/haskell/materialized-unix/.plan.nix/plutus-pab.nix
Expand Up @@ -125,13 +125,10 @@
"Cardano/Metadata/Types"
"Cardano/Node/API"
"Cardano/Node/Client"
"Cardano/Node/Follower"
"Cardano/Node/Mock"
"Cardano/Node/RandomTx"
"Cardano/Node/Server"
"Cardano/Node/Types"
"Cardano/Protocol/ChainEffect"
"Cardano/Protocol/FollowerEffect"
"Cardano/Protocol/Socket/Type"
"Cardano/Protocol/Socket/Server"
"Cardano/Protocol/Socket/Client"
Expand Down
1 change: 0 additions & 1 deletion plutus-pab/app/Main.hs
Expand Up @@ -32,7 +32,6 @@ import Plutus.PAB.PABLogMsg (AppMsg (..))
import Plutus.PAB.Utils (logErrorS)
import System.Exit (ExitCode (ExitFailure), exitSuccess, exitWith)


main :: IO ()
main = do
AppOpts { minLogLevel, configPath, logConfigPath, runEkgServer, cmd } <- parseOptions
Expand Down
4 changes: 2 additions & 2 deletions plutus-pab/app/PSGenerator.hs
Expand Up @@ -52,7 +52,7 @@ import Plutus.PAB.Events.Contract (ContractEven
import Plutus.PAB.Events.Node (NodeEvent)
import Plutus.PAB.Events.User (UserEvent)
import Plutus.PAB.Events.Wallet (WalletEvent)
import Plutus.PAB.MockApp (defaultWallet, syncAll)
import Plutus.PAB.MockApp (defaultWallet)
import qualified Plutus.PAB.MockApp as MockApp
import Plutus.PAB.Types (ContractExe)
import qualified Plutus.PAB.Webserver.API as API
Expand Down Expand Up @@ -184,7 +184,7 @@ writeTestData outputDir = do
schema :: ContractSignatureResponse TestContracts <-
Webserver.contractSchema (csContract currencyInstance1)
pure (report, schema)
syncAll
-- syncAll
void Chain.processBlock
pure result
BSL.writeFile
Expand Down
3 changes: 0 additions & 3 deletions plutus-pab/plutus-pab.cabal
Expand Up @@ -56,13 +56,10 @@ library
Cardano.Metadata.Types
Cardano.Node.API
Cardano.Node.Client
Cardano.Node.Follower
Cardano.Node.Mock
Cardano.Node.RandomTx
Cardano.Node.Server
Cardano.Node.Types
Cardano.Protocol.ChainEffect
Cardano.Protocol.FollowerEffect
Cardano.Protocol.Socket.Type
Cardano.Protocol.Socket.Server
Cardano.Protocol.Socket.Client
Expand Down
110 changes: 43 additions & 67 deletions plutus-pab/src/Cardano/ChainIndex/Server.hs
Expand Up @@ -13,6 +13,7 @@ module Cardano.ChainIndex.Server(
, syncState
) where

-- <<<<<<< HEAD
import Cardano.BM.Data.Trace (Trace)
import Cardano.Node.Types (FollowerID)
import Control.Concurrent (forkIO, threadDelay)
Expand Down Expand Up @@ -50,6 +51,8 @@ import Wallet.Emulator.ChainIndex (ChainIndexControlEffect, Chai
import qualified Wallet.Emulator.ChainIndex as ChainIndex
import Wallet.Emulator.NodeClient (ChainClientNotification (BlockValidated, SlotChanged))

import qualified Debug.Trace as Dbg

-- $chainIndex
-- The PAB chain index that keeps track of transaction data (UTXO set enriched
-- with datums)
Expand All @@ -64,6 +67,7 @@ app trace stateVar =
(liftIO . processIndexEffects trace stateVar)
(healthcheck :<|> startWatching :<|> watchedAddresses :<|> confirmedBlocks :<|> WalletEffects.transactionConfirmed :<|> WalletEffects.nextTx)

-- <<<<<<< HEAD
main :: ChainIndexTrace -> ChainIndexConfig -> BaseUrl -> Availability -> IO ()
main trace ChainIndexConfig{ciBaseUrl} nodeBaseUrl availability = runLogEffects trace $ do
nodeClientEnv <- liftIO getNode
Expand All @@ -80,6 +84,25 @@ main trace ChainIndexConfig{ciBaseUrl} nodeBaseUrl availability = runLogEffects
warpSettings = Warp.defaultSettings & Warp.setPort servicePort & Warp.setBeforeMainLoop isAvailable
getNode = newManager defaultManagerSettings >>= \manager -> pure $ mkClientEnv manager nodeBaseUrl

-- =======
-- main :: Trace IO ChainIndexServerMsg -> ChainIndexConfig -> FilePath -> Availability -> IO ()
-- main trace ChainIndexConfig{ciBaseUrl} socketPath availability = runLogEffects trace $ do
-- let port = baseUrlPort ciBaseUrl
-- mVarState <- liftIO $ newMVar initialAppState
-- logInfo StartingNodeClientThread
-- clientHandler <-
-- liftIO $ runClientNode socketPath $ updateChainState mVarState
-- -- void $ liftIO $ forkIO $ runLogEffects trace $ updateThread 10 mVarState clientHandler
-- let warpSettings :: Warp.Settings
-- warpSettings = Warp.defaultSettings & Warp.setPort port & Warp.setBeforeMainLoop (available availability)
-- logInfo $ StartingChainIndex port
-- liftIO $ Warp.runSettings warpSettings $ app mVarState
-- where
-- updateChainState :: MVar AppState -> Block -> Slot -> IO ()
-- updateChainState mv block slot =
-- processIndexEffects mv $ do
-- syncState block slot
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)

healthcheck :: Monad m => m NoContent
healthcheck = pure NoContent
Expand All @@ -93,78 +116,18 @@ watchedAddresses = WalletEffects.watchedAddresses
confirmedBlocks :: (Member ChainIndexEffect effs) => Eff effs [Block]
confirmedBlocks = WalletEffects.confirmedBlocks


-- | Update the chain index by asking the node for new blocks since the last
-- time.
syncState ::
( Member (State AppState) effs
, Member (LogMsg ChainIndexServerMsg) effs
, Member NodeFollowerEffect effs
, Member ChainIndexControlEffect effs
)
=> Eff effs ()
syncState = do
followerID <- use indexFollowerID >>=
maybe (logInfo ObtainingFollowerID >> NodeFollower.newFollower >>= \i -> assign indexFollowerID (Just i) >> return i) pure
logDebug $ UpdatingChainIndex followerID
newBlocks <- NodeFollower.getBlocks followerID
logInfo $ ReceivedBlocksTxns (length newBlocks) (length $ fold newBlocks)
currentSlot <- SlotChanged <$> getSlot
let notifications = BlockValidated <$> newBlocks
traverse_ ChainIndex.chainIndexNotify (notifications ++ [currentSlot])

-- | Get the latest transactions from the node and update the index accordingly
updateThread ::
forall effs.
( LastMember IO effs
, Member (LogMsg ChainIndexServerMsg) effs
( -- Member (LogMsg ChainIndexServerMsg) effs
Member ChainIndexControlEffect effs
)
=> ChainIndexTrace
-> Second
-- ^ Interval between two queries for new blocks
-> MVar AppState
-- ^ State of the chain index
-> ClientEnv
-- ^ Servant client for the node API
-> Eff effs ()
updateThread trace updateInterval mv nodeClientEnv = do
logInfo ObtainingFollowerID
followerID <-
liftIO $
runClientM NodeClient.newFollower nodeClientEnv >>=
either (error . show) pure
logInfo $ ObtainedFollowerID followerID
forever $ do
watching <- processIndexEffects trace mv WalletEffects.watchedAddresses
unless (watching == mempty) (fetchNewBlocks trace followerID nodeClientEnv mv)
liftIO $ threadDelay $ fromIntegral $ toMicroseconds updateInterval

fetchNewBlocks ::
forall effs.
( LastMember IO effs
, Member (LogMsg ChainIndexServerMsg) effs
)
=> ChainIndexTrace
-> FollowerID
-> ClientEnv
-> MVar AppState
=> Block
-> Slot
-> Eff effs ()
fetchNewBlocks trace followerID nodeClientEnv mv = do
logInfo AskingNodeForNewBlocks
newBlocks <-
liftIO $
runClientM (NodeClient.getBlocks followerID) nodeClientEnv >>=
either (error . show) pure
logInfo (ReceivedBlocksTxns (length newBlocks) (length $ fold newBlocks))
logInfo AskingNodeForCurrentSlot
curentSlot <-
liftIO $
runClientM NodeClient.getCurrentSlot nodeClientEnv >>=
either (error . show) pure
let notifications = BlockValidated <$> newBlocks
traverse_
(processIndexEffects trace mv . ChainIndex.chainIndexNotify)
(notifications ++ [SlotChanged curentSlot])
syncState block (Slot slot) = do
let slotChanged = SlotChanged (Slot slot)
traverse_ ChainIndex.chainIndexNotify [BlockValidated block, slotChanged]

type ChainIndexEffects m
= '[ ChainIndexControlEffect
Expand All @@ -180,6 +143,7 @@ processIndexEffects ::
-> MVar AppState
-> Eff (ChainIndexEffects IO) a
-> m a
-- <<<<<<< HEAD
processIndexEffects trace stateVar eff = do
AppState {_indexState, _indexEvents, _indexFollowerID} <- liftIO $ takeMVar stateVar
(result, newState) <- liftIO
Expand All @@ -189,6 +153,18 @@ processIndexEffects trace stateVar eff = do
& handleLogMsgTrace (toChainIndexServerMsg trace)
& runM
liftIO $ putMVar stateVar AppState{_indexState=newState, _indexEvents=_indexEvents, _indexFollowerID=_indexFollowerID }
-- =======
-- processIndexEffects stateVar eff = do
-- AppState{_indexState, _indexEvents} <- liftIO $ takeMVar stateVar
-- (result, newState) <- liftIO
-- $ runM
-- $ runStderrLog
-- $ interpret renderLogMessages
-- $ Eff.runState _indexState
-- $ ChainIndex.handleChainIndex
-- $ ChainIndex.handleChainIndexControl eff
-- liftIO $ putMVar stateVar AppState{ _indexState=newState, _indexEvents=_indexEvents }
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
pure result
where
toChainIndexServerMsg :: Trace m ChainIndexServerMsg -> Trace m ChainIndexEvent
Expand Down
35 changes: 14 additions & 21 deletions plutus-pab/src/Cardano/ChainIndex/Types.hs
Expand Up @@ -18,21 +18,26 @@ import Data.Text.Prettyprint.Doc (Pretty (..), parens, (<+>))
import GHC.Generics (Generic)
import Servant.Client (BaseUrl)

-- <<<<<<< HEAD
import Cardano.BM.Data.Tracer (ToObject (..))
import Cardano.BM.Data.Tracer.Extras (Tagged (..), mkObjectStr)
import Cardano.Node.Types (FollowerID)
import Ledger.Address (Address)
import Wallet.Emulator.ChainIndex (ChainIndexEvent, ChainIndexState)
-- =======
-- import Cardano.BM.Data.Tracer (ToObject (..))
-- import Cardano.BM.Data.Tracer.Extras (Tagged (..), mkObjectStr)
-- import Ledger.Address (Address)
-- import Wallet.Emulator.ChainIndex (ChainIndexEvent, ChainIndexState)
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)

data AppState =
AppState
{ _indexState :: ChainIndexState
, _indexEvents :: Seq (LogMessage ChainIndexEvent)
, _indexFollowerID :: Maybe FollowerID
{ _indexState :: ChainIndexState
, _indexEvents :: Seq (LogMessage ChainIndexEvent)
} deriving (Eq, Show)

initialAppState :: AppState
initialAppState = AppState mempty mempty Nothing
initialAppState = AppState mempty mempty

data ChainIndexConfig =
ChainIndexConfig
Expand All @@ -47,16 +52,10 @@ makeLenses ''ChainIndexConfig

-- | Messages from the ChainIndex Server
data ChainIndexServerMsg =
-- | Obtaining a new follower
ObtainingFollowerID
-- | Obtained a new follower 'FollowerID'
| ObtainedFollowerID FollowerID
-- | Updating the chain index with 'FollowerID'
| UpdatingChainIndex FollowerID
-- | Requesting new blocks from the node
| AskingNodeForNewBlocks
AskingNodeForNewBlocks
-- | Requesting the current slot from the node
| AskingNodeForCurrentSlot
-- | AskingNodeForCurrentSlot
-- | Starting a node client thread
| StartingNodeClientThread
-- | Starting ChainIndex service
Expand All @@ -72,24 +71,18 @@ data ChainIndexServerMsg =

instance Pretty ChainIndexServerMsg where
pretty = \case
ObtainingFollowerID -> "Obtaining follower ID"
ObtainedFollowerID i -> "Obtained follower ID:" <+> pretty i
UpdatingChainIndex i -> "Updating chain index with follower ID" <+> pretty i
ReceivedBlocksTxns blocks txns -> "Received" <+> pretty blocks <+> "blocks" <+> parens (pretty txns <+> "transactions")
AskingNodeForNewBlocks -> "Asking the node for new blocks"
AskingNodeForCurrentSlot -> "Asking the node for the current slot"
-- AskingNodeForCurrentSlot -> "Asking the node for the current slot"
StartingNodeClientThread -> "Starting node client thread"
StartingChainIndex port -> "Starting chain index on port: " <> pretty port
ChainEvent e -> "Processing chain index event: " <> pretty e

instance ToObject ChainIndexServerMsg where
toObject _ = \case
ObtainingFollowerID -> mkObjectStr "obtaining FollowerID" ()
ObtainedFollowerID fID -> mkObjectStr "obtained FollowerID" (Tagged @"followerID" fID)
UpdatingChainIndex fID -> mkObjectStr "updating chainIndex with FollowerID" (Tagged @"followerID" fID)
ReceivedBlocksTxns x y -> mkObjectStr "received block transactions" (Tagged @"blocks" x, Tagged @"transactions" y)
AskingNodeForNewBlocks -> mkObjectStr "asking for new blocks" ()
AskingNodeForCurrentSlot -> mkObjectStr "asking node for current slot" ()
-- AskingNodeForCurrentSlot -> mkObjectStr "asking node for current slot" ()
StartingNodeClientThread -> mkObjectStr "starting node client thread" ()
StartingChainIndex p -> mkObjectStr "starting chain index" (Tagged @"port" p)
ChainEvent e -> mkObjectStr "processing chain event" (Tagged @"event" e)
19 changes: 12 additions & 7 deletions plutus-pab/src/Cardano/Node/API.hs
Expand Up @@ -4,27 +4,32 @@
module Cardano.Node.API
( API
, NodeAPI
, FollowerAPI
) where

import Cardano.Node.Types (FollowerID, MockServerLogMsg)
-- <<<<<<< HEAD
import Cardano.Node.Types (MockServerLogMsg)
import Control.Monad.Freer.Extras.Log (LogMessage)
import Ledger (Block, Slot, Tx)
import Servant.API (Capture, Get, JSON, NoContent, Post, Put, ReqBody, (:<|>), (:>))
-- =======
-- import Control.Monad.Freer.Log (LogMessage)
-- import Ledger (Slot, Tx)
-- import Servant.API (Get, JSON, NoContent, Post, ReqBody, (:<|>), (:>))
-- import Wallet.Emulator.Chain (ChainEvent)
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)

type API
= "healthcheck" :> Get '[ JSON] NoContent
:<|> "mempool" :> ReqBody '[ JSON] Tx :> Post '[ JSON] NoContent
:<|> "slot" :> Get '[ JSON] Slot
:<|> "mock" :> NodeAPI
:<|> "follower" :> FollowerAPI

-- Routes that are not guaranteed to exist on the real node
type NodeAPI
= "random-tx" :> Get '[ JSON] Tx
-- <<<<<<< HEAD
:<|> "consume-event-history" :> Post '[ JSON] [LogMessage MockServerLogMsg]

-- Protocol 1 of the node (node followers can request new blocks)
type FollowerAPI
= "subscribe" :> Put '[ JSON] FollowerID
:<|> Capture "follower-id" FollowerID :> "blocks" :> Post '[ JSON] [Block]
-- =======
-- :<|> "consume-event-history" :> Post '[ JSON] [LogMessage ChainEvent]
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)

0 comments on commit f503770

Please sign in to comment.