diff --git a/plutus-pab-executables/plutus-pab.yaml.sample b/plutus-pab-executables/plutus-pab.yaml.sample index 227ee316d0..bab824ec71 100644 --- a/plutus-pab-executables/plutus-pab.yaml.sample +++ b/plutus-pab-executables/plutus-pab.yaml.sample @@ -10,6 +10,7 @@ pabWebserverConfig: # available. If this is not set, calls to unavailable endpoints fail # immediately. endpointTimeout: 5 + enableMarconi: False walletServerConfig: tag: LocalWalletConfig diff --git a/plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs b/plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs index 40b7bb39c8..ba91b231f0 100644 --- a/plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs +++ b/plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs @@ -17,6 +17,7 @@ import Cardano.Protocol.Socket.Client (ChainSyncEvent (..)) import Cardano.Protocol.Socket.Client qualified as Client import Cardano.Protocol.Socket.Mock.Client qualified as MockClient import Control.Lens.Operators +import Control.Monad (when) import Data.Either (isLeft) import Data.IORef (newIORef, readIORef, writeIORef) import Data.List (findIndex) @@ -27,23 +28,25 @@ import Data.Text (unpack) import Ledger (Block, Slot (..), TxId (..)) import Marconi.Index.TxConfirmationStatus (TxInfo (..)) import Marconi.Index.TxConfirmationStatus qualified as Ix +import Plutus.ChainIndex.TxIdState qualified as TxIdState import Plutus.HystericalScreams.Index.VSqlite qualified as Ix import Plutus.PAB.Core.ContractInstance.STM (BlockchainEnv (..), InstanceClientEnv (..), InstancesState, OpenTxOutProducedRequest (..), OpenTxOutSpentRequest (..), - emptyBlockchainEnv, getIndexerTxChanges, getUtxoIndexTxChanges) + emptyBlockchainEnv, getUtxoIndexTxChanges) import Plutus.PAB.Core.ContractInstance.STM qualified as S import Plutus.Trace.Emulator.ContractInstance (IndexedBlock (..), indexBlock) import Plutus.PAB.Types (Config (Config), DbConfig (DbConfig, dbConfigFile), - DevelopmentOptions (DevelopmentOptions, pabResumeFrom, pabRollbackHistory), dbConfig, - developmentOptions, nodeServerConfig) + DevelopmentOptions (DevelopmentOptions, pabResumeFrom, pabRollbackHistory), + WebserverConfig (WebserverConfig, enableMarconi), dbConfig, developmentOptions, + nodeServerConfig, pabWebserverConfig) import Cardano.Node.Types (NodeMode (..), PABServerConfig (PABServerConfig, pscNetworkId, pscNodeMode, pscSlotConfig, pscSocketPath)) import Control.Concurrent.STM (STM) import Control.Concurrent.STM qualified as STM import Control.Lens -import Control.Monad (forM_, void, when) +import Control.Monad (forM_, void) import Control.Tracer (nullTracer) import Data.Foldable (foldl') import Data.Maybe (catMaybes, maybeToList) @@ -77,6 +80,8 @@ startNodeClient config instancesState = do , pabResumeFrom = resumePoint } , dbConfig = DbConfig { dbConfigFile = dbFile } + , pabWebserverConfig = + WebserverConfig { enableMarconi = useMarconiIndexer } } = config params <- Params.fromPABServerConfig $ nodeServerConfig config env <- STM.atomically $ emptyBlockchainEnv pabRollbackHistory params @@ -86,8 +91,13 @@ startNodeClient config instancesState = do (\block slot -> handleSyncAction $ processMockBlock instancesState env block slot ) AlonzoNode -> do - utxoIx <- Ix.open (unpack dbFile) (Ix.Depth 2160) >>= newIORef - let env' = env { beTxChanges = Right utxoIx } + env' <- + if useMarconiIndexer + then do + utxoIx <- Ix.open (unpack dbFile) (Ix.Depth 2160) >>= newIORef + pure $ env { beTxChanges = Right utxoIx } + else do + pure env let resumePoints = maybeToList $ toCardanoPoint resumePoint void $ Client.runChainSync socket nullTracer slotConfig networkId resumePoints (\block -> do @@ -137,23 +147,26 @@ processChainSyncEvent -> BlockchainEnv -> ChainSyncEvent -> IO (Either SyncActionFailure (Slot, BlockNumber)) -processChainSyncEvent instancesState blockchainEnv event = do +processChainSyncEvent instancesState env@BlockchainEnv{beTxChanges} event = do case event of - Resume _ -> STM.atomically $ Right <$> blockAndSlot blockchainEnv + Resume _ -> STM.atomically $ Right <$> blockAndSlot env RollForward (BlockInMode (C.Block header transactions) era) _ -> - withIsCardanoEra era (processBlock instancesState header blockchainEnv transactions era) + withIsCardanoEra era (processBlock instancesState header env transactions era) RollBackward chainPoint _ -> do - -- Rollback the index - ix' <- readIORef $ getIndexerTxChanges blockchainEnv - events <- concat <$> Ix.getEvents (ix' ^. Ix.storage) - -- TODO: Stop ignoring errors. - let nextIx = fromMaybe ix' $ do - slot <- chainPointToSlotNo chainPoint - offset <- findIndex (\(TxInfo _ _ sn) -> sn < slot) events - Ix.rewind offset ix' - writeIORef (getIndexerTxChanges blockchainEnv) nextIx - - STM.atomically $ runRollback blockchainEnv chainPoint + either (const $ pure ()) + (\ixRef -> do + -- Rollback the index + ix' <- readIORef ixRef + events <- concat <$> Ix.getEvents (ix' ^. Ix.storage) + -- TODO: Stop ignoring errors. + let nextIx = fromMaybe ix' $ do + slot <- chainPointToSlotNo chainPoint + offset <- findIndex (\(TxInfo _ _ sn) -> sn < slot) events + Ix.rewind offset ix' + writeIORef ixRef nextIx) + beTxChanges + + STM.atomically $ runRollback env chainPoint data SyncActionFailure = RollbackFailure RollbackFailed @@ -162,24 +175,38 @@ data SyncActionFailure -- | Roll back the chain to the given ChainPoint and slot. runRollback :: BlockchainEnv -> ChainPoint -> STM (Either SyncActionFailure (Slot, BlockNumber)) -runRollback env@BlockchainEnv{beLastSyncedBlockSlot, beTxOutChanges} chainPoint = do +runRollback env@BlockchainEnv{beTxChanges, beLastSyncedBlockSlot, beTxOutChanges} chainPoint = do currentSlot <- STM.readTVar beLastSyncedBlockSlot txOutBalanceStateIndex <- STM.readTVar beTxOutChanges let point = fromCardanoPoint chainPoint - rs = TxOutBalance.rollback point txOutBalanceStateIndex + rs' = TxOutBalance.rollback point txOutBalanceStateIndex -- Check to see if the rollback is just through a sequence of empty blocks ending at the tip. emptyRollBack = point > tipAsPoint (viewTip txOutBalanceStateIndex) && pointSlot point <= currentSlot + rs <- case beTxChanges of + Left ix' -> do + txIdStateIndex <- STM.readTVar ix' + pure $ TxIdState.rollback point txIdStateIndex + Right _ -> + pure $ Right RollbackResult { newTip = TipAtGenesis + , rolledBackIndex = mempty + } if emptyRollBack then Right <$> blockAndSlot env else case rs of - Left e' -> pure $ Left (RollbackFailure e') - Right RollbackResult{rolledBackIndex=rolledBackTxOutBalanceStateIndex} -> do - STM.writeTVar beTxOutChanges rolledBackTxOutBalanceStateIndex - Right <$> blockAndSlot env + Left e -> pure $ Left (RollbackFailure e) + Right RollbackResult{rolledBackIndex=rolledBackTxIdStateIndex} -> do + case rs' of + Left e' -> pure $ Left (RollbackFailure e') + Right RollbackResult{rolledBackIndex=rolledBackTxOutBalanceStateIndex} -> do + STM.writeTVar beTxOutChanges rolledBackTxOutBalanceStateIndex + either (\ix' -> STM.writeTVar ix' rolledBackTxIdStateIndex) + (const $ pure ()) + beTxChanges + Right <$> blockAndSlot env -- | Get transaction ID and validity from a transaction. txEvent :: ChainIndexTx -> (TxId, TxOutBalance, TxValidity) @@ -197,7 +224,7 @@ processBlock :: forall era. C.IsCardanoEra era -> [C.Tx era] -> C.EraInMode era C.CardanoMode -> IO (Either SyncActionFailure (Slot, BlockNumber)) -processBlock instancesState header env transactions era = do +processBlock instancesState header env@BlockchainEnv{beTxChanges} transactions era = do let C.BlockHeader (C.SlotNo slot) _ _ = header tip = fromCardanoBlockHeader header -- We ignore cardano transactions that we couldn't convert to @@ -213,9 +240,13 @@ processBlock instancesState header env transactions era = do updateInstances (indexBlock ciTxs) instEnv updateEmulatorTransactionState tip env (txEvent <$> ciTxs) - ix' <- readIORef $ getIndexerTxChanges env - nextIx <- Ix.insert (mkEvent tip <$> ciTxs) ix' - writeIORef (getIndexerTxChanges env) nextIx + either (const $ pure ()) + (\ixRef -> do + ix' <- readIORef ixRef + nextIx <- Ix.insert (mkEvent tip <$> ciTxs) ix' + writeIORef ixRef nextIx) + beTxChanges + pure stmResult mkEvent :: Tip -> ChainIndexTx -> TxInfo diff --git a/plutus-pab/src/Plutus/PAB/Core/ContractInstance/STM.hs b/plutus-pab/src/Plutus/PAB/Core/ContractInstance/STM.hs index 1e123bae25..e1ffd4f7fe 100644 --- a/plutus-pab/src/Plutus/PAB/Core/ContractInstance/STM.hs +++ b/plutus-pab/src/Plutus/PAB/Core/ContractInstance/STM.hs @@ -18,7 +18,6 @@ module Plutus.PAB.Core.ContractInstance.STM( , currentSlot , lastSyncedBlockSlot , getUtxoIndexTxChanges - , getIndexerTxChanges -- * State of a contract instance , InstanceState(..) , emptyInstanceState @@ -57,7 +56,7 @@ import Control.Concurrent.STM (STM, TMVar, TVar) import Control.Concurrent.STM qualified as STM import Control.Monad (guard, (<=<)) import Data.Aeson (Value) -import Data.Either (fromLeft, fromRight) +import Data.Either (fromLeft) import Data.Foldable (fold) import Data.IORef (IORef) import Data.List.NonEmpty (NonEmpty) @@ -167,10 +166,6 @@ getUtxoIndexTxChanges :: BlockchainEnv -> TVar (UtxoIndex TxIdState) getUtxoIndexTxChanges BlockchainEnv{beTxChanges} = fromLeft (error "Changes use an indexer for storage.") beTxChanges -getIndexerTxChanges :: BlockchainEnv -> IORef TCSIndex -getIndexerTxChanges BlockchainEnv{beTxChanges} = - fromRight (error "Changes use in-memory storage, not an indexer.") beTxChanges - -- | Initialise an empty 'BlockchainEnv' value emptyBlockchainEnv :: Maybe Int -> Params -> STM BlockchainEnv emptyBlockchainEnv rollbackHistory params = diff --git a/plutus-pab/src/Plutus/PAB/Types.hs b/plutus-pab/src/Plutus/PAB/Types.hs index 4fdb0903ba..cc232f9b19 100644 --- a/plutus-pab/src/Plutus/PAB/Types.hs +++ b/plutus-pab/src/Plutus/PAB/Types.hs @@ -162,6 +162,7 @@ data WebserverConfig = , staticDir :: Maybe FilePath , permissiveCorsPolicy :: Bool -- ^ If true; use a very permissive CORS policy (any website can interact.) , endpointTimeout :: Maybe Second + , enableMarconi :: Bool } deriving (Show, Eq, Generic) deriving anyclass (FromJSON, ToJSON) @@ -175,6 +176,7 @@ defaultWebServerConfig = , staticDir = Nothing , permissiveCorsPolicy = False , endpointTimeout = Nothing + , enableMarconi = False } instance Default WebserverConfig where