From a0691526a98920c513790a494199ffbb4ccba891 Mon Sep 17 00:00:00 2001 From: Konstantinos Lambrou-Latreille Date: Fri, 3 Mar 2023 13:11:00 -0500 Subject: [PATCH] PLT-214 Refactored the Epoch-StakePoolDelegation indexer to the Marconi interface. * Rewrote the Epoch-StakePoolDelegation using the marconi-core interface and to make it support rollbacks and resuming. * Computed the LedgerState directly with functions in `ouroboros-network` instead of using `cardano-api` and `cardano-streaming` (allows more fined grained control of what to extract). * Moved orphan instances of EpochStakepoolSize indexer to the Orphans module and added rountrip ToField/FromField tests. * Added LedgerState serialization/deserialization functions in Orphans module. --- cardano-streaming/src/Cardano/Streaming.hs | 19 +- marconi-chain-index/app/Main.hs | 4 +- marconi-chain-index/marconi-chain-index.cabal | 17 + .../performance/monitor-marconi-sync-all.sh | 55 ++ .../performance/monitor-marconi-sync.sh | 15 +- .../src/Marconi/ChainIndex/Indexers.hs | 198 +++++-- .../Marconi/ChainIndex/Indexers/EpochSPD.hs | 503 ++++++++++++++++++ .../ChainIndex/Indexers/EpochStakepoolSize.hs | 65 +-- .../ChainIndex/Node/Client/GenesisConfig.hs | 432 +++++++++++++++ .../src/Marconi/ChainIndex/Orphans.hs | 76 ++- .../src/Marconi/ChainIndex/Utils.hs | 15 + .../test-lib/Gen/Marconi/ChainIndex/Types.hs | 17 +- .../test/Spec/Marconi/ChainIndex/Orphans.hs | 30 ++ 13 files changed, 1335 insertions(+), 111 deletions(-) create mode 100755 marconi-chain-index/performance/monitor-marconi-sync-all.sh create mode 100644 marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochSPD.hs create mode 100644 marconi-chain-index/src/Marconi/ChainIndex/Node/Client/GenesisConfig.hs create mode 100644 marconi-chain-index/src/Marconi/ChainIndex/Utils.hs diff --git a/cardano-streaming/src/Cardano/Streaming.hs b/cardano-streaming/src/Cardano/Streaming.hs index 26b78a12cb..e2bdfa70cd 100644 --- a/cardano-streaming/src/Cardano/Streaming.hs +++ b/cardano-streaming/src/Cardano/Streaming.hs @@ -1,5 +1,5 @@ {-# LANGUAGE LambdaCase #-} -{-# LANGUAGE MultiWayIf #-} + module Cardano.Streaming ( withChainSyncEventStream , CS.ChainSyncEvent (..) @@ -180,7 +180,9 @@ ledgerStatesPipelined pipelineSize config socket validationMode = do -- keeps waiting for more blocks when chainsync server and client are -- fully synchronized. foldLedgerState - :: C.Env -> LedgerStateHistory -> C.ValidationMode + :: C.Env + -> LedgerStateHistory + -> C.ValidationMode -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r -> S.Stream (S.Of C.LedgerState) IO r foldLedgerState env initialLedgerStateHistory validationMode = @@ -188,7 +190,9 @@ foldLedgerState env initialLedgerStateHistory validationMode = -- | Like `foldLedgerState`, but also produces blocks and `C.LedgerEvent`s. foldLedgerStateEvents - :: C.Env -> LedgerStateHistory -> C.ValidationMode + :: C.Env + -> LedgerStateHistory + -> C.ValidationMode -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r -> S.Stream (S.Of (C.BlockInMode C.CardanoMode, LedgerStateEvents)) IO r foldLedgerStateEvents env initialLedgerStateHistory validationMode = loop initialLedgerStateHistory @@ -204,7 +208,7 @@ foldLedgerStateEvents env initialLedgerStateHistory validationMode = loop initia Left r -> pure r Right (chainSyncEvent, source') -> do ledgerStateHistory' <- case chainSyncEvent of - CS.RollForward (blockInMode@(C.BlockInMode block _)) _ct -> do + CS.RollForward blockInMode@(C.BlockInMode block _) _ct -> do newLedgerState <- liftIO $ applyBlock_ (getLastLedgerState ledgerStateHistory) block let (ledgerStateHistory', committedStates) = pushLedgerState env ledgerStateHistory (CS.bimSlotNo blockInMode) newLedgerState blockInMode forM_ committedStates $ \(_, (ledgerState, ledgerEvents), currBlockMay) -> case currBlockMay of @@ -224,7 +228,12 @@ getEnvAndInitialLedgerStateHistory configPath = do return (env, initialLedgerStateHistory) -applyBlockThrow :: C.Env -> C.LedgerState -> C.ValidationMode -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent]) +applyBlockThrow + :: C.Env + -> C.LedgerState + -> C.ValidationMode + -> C.Block era + -> IO (C.LedgerState, [C.LedgerEvent]) applyBlockThrow env ledgerState validationMode block = case C.applyBlock env ledgerState validationMode block of Left err -> IO.throw err Right ls -> pure ls diff --git a/marconi-chain-index/app/Main.hs b/marconi-chain-index/app/Main.hs index 96045c1060..33928cdf38 100644 --- a/marconi-chain-index/app/Main.hs +++ b/marconi-chain-index/app/Main.hs @@ -19,12 +19,12 @@ main = do , (Indexers.mintBurnWorker (\_ -> pure ()), Cli.mintBurnDbPath o) ] <> case Cli.optionsNodeConfigPath o of Just configPath -> - [(Indexers.epochStakepoolSizeWorker configPath, Cli.epochStakepoolSizeDbPath o)] + [(Indexers.epochStakepoolSizeWorker' configPath (const $ pure ()), Cli.epochStakepoolSizeDbPath o)] Nothing -> [] Indexers.runIndexers (Cli.optionsSocketPath o) (Cli.optionsNetworkId o) (Cli.optionsChainPoint o) - "marconi" + "marconi-chain-index" indexers diff --git a/marconi-chain-index/marconi-chain-index.cabal b/marconi-chain-index/marconi-chain-index.cabal index 079fe103df..541d83724f 100644 --- a/marconi-chain-index/marconi-chain-index.cabal +++ b/marconi-chain-index/marconi-chain-index.cabal @@ -53,13 +53,16 @@ library Marconi.ChainIndex.Indexers Marconi.ChainIndex.Indexers.AddressDatum Marconi.ChainIndex.Indexers.Datum + Marconi.ChainIndex.Indexers.EpochSPD Marconi.ChainIndex.Indexers.EpochStakepoolSize Marconi.ChainIndex.Indexers.MintBurn Marconi.ChainIndex.Indexers.ScriptTx Marconi.ChainIndex.Indexers.Utxo Marconi.ChainIndex.Logging + Marconi.ChainIndex.Node.Client.GenesisConfig Marconi.ChainIndex.Orphans Marconi.ChainIndex.Types + Marconi.ChainIndex.Utils -------------------- -- Local components @@ -74,12 +77,22 @@ library build-depends: , cardano-api , cardano-binary + , cardano-crypto-class + , cardano-crypto-wrapper + , cardano-data , cardano-ledger-alonzo , cardano-ledger-babbage + , cardano-ledger-byron , cardano-ledger-core , cardano-ledger-shelley , cardano-ledger-shelley-ma + , cardano-protocol-tpraos + , cardano-slotting , iohk-monitoring + , ouroboros-consensus + , ouroboros-consensus-byron + , ouroboros-consensus-protocol + , ouroboros-network ------------------------ -- Non-IOG dependencies @@ -90,7 +103,9 @@ library , base , base16-bytestring , bytestring + , cborg , containers + , directory , filepath , lens , mwc-random @@ -106,7 +121,9 @@ library , text , time , transformers + , transformers-except , vector-map + , yaml library json-rpc import: lang diff --git a/marconi-chain-index/performance/monitor-marconi-sync-all.sh b/marconi-chain-index/performance/monitor-marconi-sync-all.sh new file mode 100755 index 0000000000..797a927ed1 --- /dev/null +++ b/marconi-chain-index/performance/monitor-marconi-sync-all.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash + +CURRENT_SCRIPT_PATH=$0 +CURRENT_SCRIPT_BASEPATH=$(dirname "$CURRENT_SCRIPT_PATH") + +monitor_indexer () { + indexer=$1 + network=$2 + + case $network in + mainnet) + networkid="764824073" + echo $networkid + indexers_cli="--disable-datum --disable-script-tx --disable-address-datum --disable-mintburn --disable-epoch-stakepool-size" + ;; + preprod) + networkid="1" + echo $networkid + indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-address-datum --disable-mintburn" + ;; + esac + + echo "Syncing ${indexer} indexer..." + case $indexer in + utxo) + indexers_cli="--disable-datum --disable-script-tx --disable-address-datum --disable-mintburn --disable-epoch-stakepool-size" + ;; + addressdatum) + indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-mintburn --disable-epoch-stakepool-size" + ;; + mintburn) + indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-address-datum --disable-epoch-stakepool-size" + ;; + epochsdd) + indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-address-datum --disable-mintburn" + ;; + esac + + cabal build marconi-chain-index + $(cabal exec -- which marconi-chain-index) \ + -s "$HOME"/cardano-node/${network}/cardano-node.socket \ + --testnet-magic ${networkid} \ + ${indexers_cli} \ + -d ~/cardano-node/${network}/marconi-chain-index \ + --node-config-path "$HOME"/cardano-node/${network}/${network}-config.json >> "${CURRENT_SCRIPT_BASEPATH}"/marconi-chain-index-"${indexer}".log & + pid=$! + "${CURRENT_SCRIPT_BASEPATH}"/monitor-marconi-sync.sh "${CURRENT_SCRIPT_BASEPATH}"/marconi-chain-index-"${indexer}".log $pid >> >(tee "${CURRENT_SCRIPT_BASEPATH}"/marconi-chain-index-"${indexer}"-monitor.log) +} + +if [[ $1 ]] && [ "$1" == "utxo" ] || [ "$1" == "addressdatum" ] || [ "$1" == "mintburn" ] || [ "$1" == "epochsdd" ]; then indexer=$1; else echo "provide indexer to run: utxo, addressdatum, mintburn or epochsdd (arg0)" && exit 1; fi +if [[ $2 ]] && [ "$2" == "mainnet" ] || [ "$2" == "preprod" ]; then network=$2; else echo "provide network (mainnet, preprod) (arg1)" && exit 1; fi + +monitor_indexer "$indexer" "$network" + +trap 'kill $(jobs -p)' EXIT diff --git a/marconi-chain-index/performance/monitor-marconi-sync.sh b/marconi-chain-index/performance/monitor-marconi-sync.sh index 5c9b5a0447..28c2b82b25 100755 --- a/marconi-chain-index/performance/monitor-marconi-sync.sh +++ b/marconi-chain-index/performance/monitor-marconi-sync.sh @@ -1,23 +1,22 @@ #!/usr/bin/env bash -# For monitoring time to sync marconi. Logs cpu, mem and sync %. +# For monitoring time to sync marconi-chain-index. Logs cpu, mem and sync %. -# Run marconi with "> >(tee marconi-log.txt)" and then run this script with marconi's log as arg -if [[ $1 ]]; then marconi_log=$1; else echo "provide marconi log file (arg0)" && exit 1; fi +# Run marconi-chain-index with "> >(tee marconi-chain-index-log.txt)" and then run this script with marconi-chain-index's log as arg +if [[ $1 ]]; then marconi_chain_index_log=$1; else echo "provide marconi-chain-index log file (arg0)" && exit 1; fi +if [[ $2 ]]; then pid=$2; else echo "provide marconi-chain-index PID (arg1)" && exit 1; fi startMs=$(date +%s) -pid=$(pidof marconi) - if [[ -z $pid ]] then echo -e "pid empty, exiting.\n" exit 1 else - echo -e "marconi pid=$pid\n" + echo -e "marconi-chain-index pid=$pid\n" fi -exec > >(tee monitor-marconi-log.txt) +exec > >(tee monitor-marconi-chain-index-log.txt) echo -e "Starting monitoring at $(date)" echo -e "Process started at $(ps -p "$pid" -o start_time=)\n" @@ -25,7 +24,7 @@ ps -p "$pid" -o etime,%cpu,rss,%mem while true do - last_log=$(tail -n1 "$marconi_log") + last_log=$(tail -n1 "$marconi_chain_index_log") sync_percent=$(echo "$last_log" | cut -d '(' -f2 | cut -d ')' -f1) echo -e "Synced: $sync_percent" ps -p "$pid" -o etime=,%cpu=,rss=,%mem= diff --git a/marconi-chain-index/src/Marconi/ChainIndex/Indexers.hs b/marconi-chain-index/src/Marconi/ChainIndex/Indexers.hs index 09bf1a9d0a..108464a39d 100644 --- a/marconi-chain-index/src/Marconi/ChainIndex/Indexers.hs +++ b/marconi-chain-index/src/Marconi/ChainIndex/Indexers.hs @@ -9,7 +9,20 @@ module Marconi.ChainIndex.Indexers where +import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, + ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo) +import Cardano.Api qualified as C +import Cardano.Api.Shelley qualified as C +import Cardano.Api.Shelley qualified as Shelley +import Cardano.BM.Setup (withTrace) +import Cardano.BM.Trace (logError) +import Cardano.BM.Tracing (defaultConfigStdout) +import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo +import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound), + withChainSyncEventStream) +import Cardano.Streaming qualified as CS import Control.Concurrent (MVar, forkIO, modifyMVar_, newMVar, readMVar) +import Control.Concurrent.MVar (modifyMVar) import Control.Concurrent.QSemN (QSemN, newQSemN, signalQSemN, waitQSemN) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) @@ -18,42 +31,41 @@ import Control.Lens (view, (&)) import Control.Lens.Operators ((^.)) import Control.Monad (forever, void) import Control.Monad.Trans.Class (lift) +import Control.Monad.Trans.Except (runExceptT) import Data.List (findIndex, foldl1', intersect) import Data.Map (Map) import Data.Map qualified as Map import Data.Maybe (fromMaybe, mapMaybe) import Data.Text qualified as TS +import Data.Word (Word64) import Database.SQLite.Simple qualified as SQL -import Streaming.Prelude qualified as S - -import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, - ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo) -import Cardano.Api qualified as C -import Cardano.Api.Shelley qualified as Shelley -import Cardano.BM.Setup (withTrace) -import Cardano.BM.Trace (logError) -import Cardano.BM.Tracing (defaultConfigStdout) -import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo -import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound), - withChainSyncEventStream) -import Cardano.Streaming qualified as CS -import Marconi.ChainIndex.Logging (logging) -import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>)) -import Prettyprinter.Render.Text (renderStrict) - import Marconi.ChainIndex.Indexers.AddressDatum (AddressDatumDepth (AddressDatumDepth), AddressDatumHandle, AddressDatumIndex) import Marconi.ChainIndex.Indexers.AddressDatum qualified as AddressDatum import Marconi.ChainIndex.Indexers.Datum (DatumIndex) import Marconi.ChainIndex.Indexers.Datum qualified as Datum +import Marconi.ChainIndex.Indexers.EpochSPD (EpochSPDHandle, EpochSPDIndex) +import Marconi.ChainIndex.Indexers.EpochSPD qualified as EpochSPD import Marconi.ChainIndex.Indexers.EpochStakepoolSize qualified as EpochStakepoolSize import Marconi.ChainIndex.Indexers.MintBurn qualified as MintBurn import Marconi.ChainIndex.Indexers.ScriptTx qualified as ScriptTx import Marconi.ChainIndex.Indexers.Utxo qualified as Utxo +import Marconi.ChainIndex.Logging (logging) +import Marconi.ChainIndex.Node.Client.GenesisConfig (NetworkConfigFile (NetworkConfigFile), initExtLedgerStateVar, + mkProtocolInfoCardano, readCardanoGenesisConfig, readNetworkConfig, + renderGenesisConfigError) import Marconi.ChainIndex.Types (TargetAddresses) - import Marconi.Core.Index.VSplit qualified as Ix import Marconi.Core.Storable qualified as Storable +import Ouroboros.Consensus.Config qualified as O +import Ouroboros.Consensus.Ledger.Abstract qualified as O +import Ouroboros.Consensus.Ledger.Extended qualified as O +import Ouroboros.Consensus.Node qualified as O +import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>)) +import Prettyprinter.Render.Text (renderStrict) +import Streaming.Prelude qualified as S +import System.Directory (createDirectoryIfMissing) +import System.FilePath (takeDirectory, ()) -- DatumIndexer getDatums :: BlockInMode CardanoMode -> [(SlotNo, (Hash ScriptData, ScriptData))] @@ -67,13 +79,13 @@ getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) = concatMap extra $ txBody scriptDataFromCardanoTxBody :: C.TxBody era -> Map (Hash ScriptData) ScriptData -scriptDataFromCardanoTxBody (Shelley.ShelleyTxBody _ _ _ (C.TxBodyScriptData _ dats _) _ _) = +scriptDataFromCardanoTxBody (C.ShelleyTxBody _ _ _ (C.TxBodyScriptData _ dats _) _ _) = extractData dats where extractData :: Alonzo.TxDats era -> Map (Hash ScriptData) ScriptData extractData (Alonzo.TxDats' xs) = Map.fromList - . fmap ((\x -> (C.hashScriptData x, x)) . Shelley.fromAlonzoData) + . fmap ((\x -> (C.hashScriptData x, x)) . C.fromAlonzoData) . Map.elems $ xs scriptDataFromCardanoTxBody _ = mempty @@ -90,13 +102,11 @@ scriptDataFromCardanoTxBody _ = mempty how many we are waiting. -} data Coordinator = Coordinator - { _channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode)) - , _barrier :: QSemN - , _indexerCount :: Int + { _channel :: !(TChan (ChainSyncEvent (BlockInMode CardanoMode))) + , _barrier :: !QSemN + , _indexerCount :: !Int } - - initialCoordinator :: Int -> IO Coordinator initialCoordinator indexerCount = Coordinator <$> newBroadcastTChanIO @@ -134,7 +144,10 @@ utxoWorker_ :: (Utxo.UtxoIndexer -> IO ()) -- ^ callback function used in the queryApi thread, needs to be non-blocking -> Utxo.Depth -> Maybe TargetAddresses -- ^ Target addresses to filter for - -> Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> FilePath -> IO (IO (), MVar Utxo.UtxoIndexer) + -> Coordinator + -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) + -> FilePath + -> IO (IO (), MVar Utxo.UtxoIndexer) utxoWorker_ callback depth maybeTargetAddresses Coordinator{_barrier} ch path = do ix <- Utxo.open path depth mIndexer <- newMVar ix @@ -203,7 +216,7 @@ addressDatumWorker_ onInsert targetAddresses depth Coordinator{_barrier} ch path RollForward (BlockInMode (Block (BlockHeader slotNo bh _) txs) _) _ -> do -- TODO Redo. Inefficient filtering let addressFilter = - fmap (\targetAddrs -> \addr -> addr `elem` targetAddrs) + fmap (flip elem) targetAddresses addressDatumIndexEvent = AddressDatum.toAddressDatumIndexEvent addressFilter txs (C.ChainPoint slotNo bh) @@ -254,6 +267,108 @@ scriptTxWorker onInsert coordinator path = do -- * Epoch stakepool size indexer +epochStakepoolSizeWorker_ + :: FilePath + -> (Storable.State EpochSPDHandle -> IO ()) + -> Word64 -- Security param + -> Coordinator + -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) + -> FilePath + -> IO (IO b, MVar EpochSPDIndex) +epochStakepoolSizeWorker_ + nodeConfigPath + onInsert + securityParam + Coordinator{_barrier} + ch + dbPath = do + let ledgerStateDir = takeDirectory dbPath "ledgerStates" + createDirectoryIfMissing False ledgerStateDir + indexerMVar <- newMVar =<< EpochSPD.open dbPath ledgerStateDir securityParam + + nodeConfigE <- runExceptT $ readNetworkConfig (NetworkConfigFile nodeConfigPath) + nodeConfig <- either (error . show) pure nodeConfigE + genesisConfigE <- runExceptT $ readCardanoGenesisConfig nodeConfig + genesisConfig <- either (error . show . renderGenesisConfigError) pure genesisConfigE + + let initialLedgerState = O.ledgerState $ initExtLedgerStateVar genesisConfig + hfLedgerConfig = O.topLevelConfigLedger $ O.pInfoConfig (mkProtocolInfoCardano genesisConfig) + + loop currentLedgerState maybeEpochNo = do + signalQSemN _barrier 1 + chainSyncEvent <- atomically $ readTChan ch + + newLedgerState <- case chainSyncEvent of + RollForward blockInMode@(C.BlockInMode (C.Block (C.BlockHeader slotNo bh bn) _) _) chainTip -> do + -- Compute new LedgerState given block and old LedgerState + let newLedgerState = + O.lrResult + $ O.tickThenReapplyLedgerResult + hfLedgerConfig + (C.toConsensusBlock blockInMode) + currentLedgerState + let newEpochNo = EpochSPD.getEpochNo newLedgerState + + -- If the block is rollbackable, we always store the LedgerState. If the block is + -- immutable, we only store it right before a new epoch. + -- let isLastEventOfEpoch = maybeEpochNo /= newEpochNo + let isLastEventOfEpoch = maybeEpochNo /= newEpochNo + let storableEvent = + EpochSPD.toStorableEvent + currentLedgerState + slotNo + bh + bn + chainTip + securityParam + isLastEventOfEpoch + modifyMVar_ indexerMVar $ Storable.insert storableEvent + readMVar indexerMVar >>= onInsert -- refresh the query STM/CPS with new storage pointers/counters state + + pure newLedgerState + + RollBackward C.ChainPointAtGenesis _ct -> do + modifyMVar_ indexerMVar $ \ix -> fromMaybe ix <$> Storable.rewind C.ChainPointAtGenesis ix + pure initialLedgerState + RollBackward cp _ct -> + modifyMVar indexerMVar $ \ix -> do + newIndex <- fromMaybe ix <$> Storable.rewind cp ix + -- The possible points from which we can possibly rollback should be available + -- in the buffer events and from the resumable points. + -- For that assumption to be correct, we absolutely need + -- * to make sure that 'EpochSPD.open' was called with the correct + -- 'SecurityParam' (k) value of connect Cardano network. + -- * that the resumablePoints correctly returns the points from which we + -- have saved a LedgerState on disk. + EpochSPD.LedgerStateAtPointResult maybeLedgerState <- + Storable.query Storable.QEverything newIndex (EpochSPD.LedgerStateAtPointQuery cp) + case maybeLedgerState of + Nothing -> do + error "Could not find LedgerState from which to rollback from in EpochSPD indexer. Should not happen!" + Just ledgerState -> + pure (newIndex, ledgerState) + + loop newLedgerState $ EpochSPD.getEpochNo newLedgerState + + pure (loop initialLedgerState Nothing, indexerMVar) + +epochStakepoolSizeWorker' + :: FilePath + -> (Storable.State EpochSPDHandle -> IO ()) + -> Worker +epochStakepoolSizeWorker' nodeConfigPath onInsert coordinator path = do + workerChannel <- atomically . dupTChan $ _channel coordinator + (loop, ix) <- + epochStakepoolSizeWorker_ + nodeConfigPath + onInsert + 2160 + coordinator + workerChannel + path + void $ forkIO loop + readMVar ix >>= Storable.resumeFromStorage . view Storable.handle + epochStakepoolSizeWorker :: FilePath -> Worker epochStakepoolSizeWorker configPath Coordinator{_barrier,_channel} dbPath = do tchan <- atomically $ dupTChan _channel @@ -262,7 +377,7 @@ epochStakepoolSizeWorker configPath Coordinator{_barrier,_channel} dbPath = do chainSyncEvents :: S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO () chainSyncEvents = do lift $ signalQSemN _barrier 1 - S.yield =<< (lift $ atomically $ readTChan tchan) + S.yield =<< lift (atomically $ readTChan tchan) chainSyncEvents dbCon <- SQL.open dbPath @@ -345,8 +460,8 @@ runIndexers -> TS.Text -> [(Worker, Maybe FilePath)] -> IO () -runIndexers socketPath networkId cliChainPoint traceName list = do - (returnedCp, coordinator) <- initializeIndexers $ mapMaybe (traverse id) list +runIndexers socketPath networkId cliChainPoint traceName workers = do + (returnedCp, coordinator) <- initializeIndexers $ mapMaybe sequenceA workers -- If the user specifies the chain point then use that, -- otherwise use what the indexers provide. @@ -355,13 +470,16 @@ runIndexers socketPath networkId cliChainPoint traceName list = do cliCp -> [cliCp] c <- defaultConfigStdout - withTrace c traceName $ \trace -> let - io = withChainSyncEventStream socketPath networkId chainPoints (mkIndexerStream coordinator . logging trace) - handleException NoIntersectionFound = - logError trace $ - renderStrict $ - layoutPretty defaultLayoutOptions $ - "No intersection found when looking for the chain point" - <+> pretty chainPoints <> "." - <+> "Please check the slot number and the block hash do belong to the chain" - in io `catch` handleException + withTrace c traceName $ \trace -> do + let io = withChainSyncEventStream socketPath + networkId + chainPoints + (mkIndexerStream coordinator . logging trace) + handleException NoIntersectionFound = + logError trace $ + renderStrict $ + layoutPretty defaultLayoutOptions $ + "No intersection found when looking for the chain point" + <+> pretty chainPoints <> "." + <+> "Please check the slot number and the block hash do belong to the chain" + io `catch` handleException diff --git a/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochSPD.hs b/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochSPD.hs new file mode 100644 index 0000000000..5753a3694b --- /dev/null +++ b/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochSPD.hs @@ -0,0 +1,503 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TupleSections #-} + +-- | Module for indexing the stakepool delegation per epoch in the Cardano blockchain. +-- +-- This module will create the SQL tables: +-- +-- + table: epoch_spd +-- +-- @ +-- |---------+--------+----------+--------+-----------------+---------| +-- | epochNo | poolId | lovelace | slotNo | blockHeaderHash | blockNo | +-- |---------+--------+----------+--------+-----------------+---------| +-- @ +-- +-- To create this table, we need to compute the `LedgerState` from `ouroboros-network` (called +-- `NewEpochState` in `cardano-ledger`) at each 'Rollforward' chain sync event. Using the +-- 'LegderState', we can easily compute the epochNo as well as the stake pool delegation for that +-- epoch. +-- +-- The main issue with this indexer is that building the LedgerState and saving it on disk for being +-- able to resume is VERY resource intensive. Syncing time for this indexer is over 20h and uses +-- about ~16GB of RAM (which will keep increasing as the blockchain continues to grow). +-- +-- Here is a synopsis of what this indexer does. +-- +-- We assume a few things: +-- +-- * the construction of 'LedgerState' is done outside of this indexer. +-- * the 'Storable.insert' function is called with the *last* event of an epoch (therefore, the +-- last 'LedgerState' before starting a new epoch). We do that because we only care about the SPD +-- (Stake Pool Delegation) from the last block before a new epoch. +-- +-- Once the 'Storable.StorableEvent' is stored on disk, we perform various steps: +-- +-- 1. we save the SPD for the current epoch in the `epoch_spd` table +-- 2. we save the 'LedgerState's in the filesystem a binary files (the ledger state file path has +-- the format: `ledgerState___.bin`. We only store a +-- 'LedgerState' if it's rollbackable or if the last one of a given epoch. +-- 3. we delete immutable 'LedgerState' binary files expect latest one (this step is necessary for +-- resuming). +-- +-- The indexer provides the following queries: +-- +-- * C.EpochNo -> SPD (the actualy query that clients will be interested in) +-- * C.ChainPoint -> LedgerState (query that is necessary for resuming) +module Marconi.ChainIndex.Indexers.EpochSPD + ( -- * EpochSPDIndex + EpochSPDIndex + , EpochSPDHandle + , StorableEvent(..) + , StorableQuery(..) + , StorableResult(..) + , toStorableEvent + , EpochSPDDepth (..) + , open + , getEpochNo + ) where + +import Cardano.Api qualified as C +import Cardano.Api.Shelley qualified as C +import Cardano.Ledger.Coin qualified as Ledger +import Cardano.Ledger.Compactible qualified as Ledger +import Cardano.Ledger.Era qualified as Ledger +import Cardano.Ledger.Shelley.API qualified as Ledger +import Cardano.Slotting.Slot (EpochNo) +import Codec.CBOR.Read qualified as CBOR +import Codec.CBOR.Write qualified as CBOR +import Control.Monad (forM_, when) +import Data.ByteString.Base16 qualified as Base16 +import Data.ByteString.Lazy qualified as BS +import Data.Coerce (coerce) +import Data.Data (Proxy (Proxy)) +import Data.Foldable (toList) +import Data.List qualified as List +import Data.List.NonEmpty qualified as NE +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Maybe (catMaybes, mapMaybe) +import Data.Ord (Down (Down)) +import Data.Set (Set) +import Data.Set qualified as Set +import Data.Text qualified as Text +import Data.Text.Encoding qualified as Text +import Data.Tuple (swap) +import Data.VMap qualified as VMap +import Data.Word (Word64) +import Database.SQLite.Simple qualified as SQL +import GHC.Generics (Generic) +import Marconi.ChainIndex.Orphans (decodeLedgerState, encodeLedgerState) +import Marconi.ChainIndex.Utils (isBlockRollbackable) +import Marconi.Core.Storable (Buffered (persistToStorage), HasPoint (getPoint), QueryInterval, Queryable (queryStorage), + Resumable, Rewindable (rewindStorage), State, StorableEvent, StorableMonad, StorablePoint, + StorableQuery, StorableResult, emptyState) +import Marconi.Core.Storable qualified as Storable +import Ouroboros.Consensus.Cardano.Block qualified as O +import Ouroboros.Consensus.Shelley.Ledger qualified as O +import System.Directory (listDirectory, removeFile) +import System.FilePath (dropExtension, ()) +import Text.RawString.QQ (r) +import Text.Read (readMaybe) + +data EpochSPDHandle = EpochSPDHandle + { _epochSDPHandleConnection :: !SQL.Connection + , _epochSDPHandleLedgerStateDirPath :: !FilePath + , _epochSDPHandleSecurityParam :: !Word64 + } + +type instance StorableMonad EpochSPDHandle = IO + +data instance StorableEvent EpochSPDHandle = + EpochSPDEvent + { epochSPDEventLedgerState :: Maybe (O.LedgerState (O.CardanoBlock O.StandardCrypto)) + , epochSPDEventEpochNo :: Maybe C.EpochNo + , epochSPDEventSPD :: Map C.PoolId C.Lovelace + , epochSPDEventSlotNo :: C.SlotNo + , epochSPDEventBlockHeaderHash :: C.Hash C.BlockHeader + , epochSPDEventBlockNo :: C.BlockNo + , epochSPDEventChainTip :: C.ChainTip -- ^ Actual tip of the chain + , epochSPDEventIsLastEventOfEpoch :: Bool + } + deriving (Eq, Show) + +type instance StorablePoint EpochSPDHandle = C.ChainPoint + +instance HasPoint (StorableEvent EpochSPDHandle) C.ChainPoint where + getPoint (EpochSPDEvent _ _ _ s bhh _ _ _) = C.ChainPoint s bhh + +data instance StorableQuery EpochSPDHandle = + SPDByEpochNoQuery C.EpochNo + | LedgerStateAtPointQuery C.ChainPoint + +data instance StorableResult EpochSPDHandle = + SPDByEpochNoResult (Map C.PoolId C.Lovelace) + | LedgerStateAtPointResult (Maybe (O.LedgerState (O.CardanoBlock O.StandardCrypto))) + deriving (Eq, Show) + +newtype EpochSPDDepth = EpochSPDDepth Int + +type EpochSPDIndex = State EpochSPDHandle + +toStorableEvent + :: O.LedgerState (O.HardForkBlock (O.CardanoEras O.StandardCrypto)) + -> C.SlotNo + -> C.Hash C.BlockHeader + -> C.BlockNo + -> C.ChainTip + -> Word64 -- ^ Security param + -> Bool -- ^ Is the last event of the current epoch + -> StorableEvent EpochSPDHandle +toStorableEvent ledgerState slotNo bhh bn chainTip securityParam isLastEventOfEpoch = do + let doesStoreLedgerState = isBlockRollbackable securityParam bn chainTip || isLastEventOfEpoch + EpochSPDEvent + (if doesStoreLedgerState then Just ledgerState else Nothing) + (getEpochNo ledgerState) + (getStakeMap ledgerState) + slotNo + bhh + bn + chainTip + isLastEventOfEpoch + +-- | From LedgerState, get epoch stake pool delegation: a mapping of pool ID to amount staked in +-- lovelace. We do this by getting the '_pstakeSet' stake snapshot and then use '_delegations' and +-- '_stake' to resolve it into the desired mapping. +getStakeMap + :: O.LedgerState (O.CardanoBlock O.StandardCrypto) + -> Map C.PoolId C.Lovelace +getStakeMap ledgerState' = case ledgerState' of + O.LedgerStateByron _ -> mempty + O.LedgerStateShelley st -> getStakeMapFromShelleyBlock st + O.LedgerStateAllegra st -> getStakeMapFromShelleyBlock st + O.LedgerStateMary st -> getStakeMapFromShelleyBlock st + O.LedgerStateAlonzo st -> getStakeMapFromShelleyBlock st + O.LedgerStateBabbage st -> getStakeMapFromShelleyBlock st + where + getStakeMapFromShelleyBlock + :: forall proto era c + . (c ~ Ledger.Crypto era, c ~ O.StandardCrypto) + => O.LedgerState (O.ShelleyBlock proto era) + -> Map C.PoolId C.Lovelace + getStakeMapFromShelleyBlock st = spd + where + nes = O.shelleyLedgerState st :: Ledger.NewEpochState era + + stakeSnapshot = Ledger._pstakeSet . Ledger.esSnapshots . Ledger.nesEs $ nes :: Ledger.SnapShot c + + stakes = Ledger.unStake + $ Ledger._stake stakeSnapshot + + delegations :: VMap.VMap VMap.VB VMap.VB (Ledger.Credential 'Ledger.Staking c) (Ledger.KeyHash 'Ledger.StakePool c) + delegations = Ledger._delegations stakeSnapshot + + spd :: Map C.PoolId C.Lovelace + spd = Map.fromListWith (+) + $ map swap + $ catMaybes + $ VMap.elems + $ VMap.mapWithKey + (\cred spkHash -> + (\c -> ( C.Lovelace $ coerce $ Ledger.fromCompact c + , C.StakePoolKeyHash spkHash + ) + ) + <$> VMap.lookup cred stakes) + delegations + +getEpochNo :: O.LedgerState (O.CardanoBlock O.StandardCrypto) -> Maybe EpochNo +getEpochNo ledgerState' = case ledgerState' of + O.LedgerStateByron _st -> Nothing + O.LedgerStateShelley st -> getEpochNoFromShelleyBlock st + O.LedgerStateAllegra st -> getEpochNoFromShelleyBlock st + O.LedgerStateMary st -> getEpochNoFromShelleyBlock st + O.LedgerStateAlonzo st -> getEpochNoFromShelleyBlock st + O.LedgerStateBabbage st -> getEpochNoFromShelleyBlock st + where + getEpochNoFromShelleyBlock = Just . Ledger.nesEL . O.shelleyLedgerState + +data EpochSPDRow = EpochSPDRow + { epochSPDRowEpochNo :: !C.EpochNo + , epochSPDRowPoolId :: !C.PoolId + , epochSPDRowLovelace :: !C.Lovelace + , epochSPDRowSlotNo :: !C.SlotNo + , epochSPDRowBlockHeaderHash :: !(C.Hash C.BlockHeader) + , epochSPDRowBlockNo :: !C.BlockNo + } deriving (Eq, Show, Generic, SQL.FromRow, SQL.ToRow) + +instance Buffered EpochSPDHandle where + -- We should only store on disk SPD from the last slot of each epoch. + persistToStorage + :: Foldable f + => f (StorableEvent EpochSPDHandle) + -> EpochSPDHandle + -> IO EpochSPDHandle + persistToStorage events h@(EpochSPDHandle c ledgerStateDirPath securityParam) = do + let eventsList = toList events + + SQL.execute_ c "BEGIN" + forM_ (concatMap eventToEpochSPDRows $ filter epochSPDEventIsLastEventOfEpoch eventsList) $ \row -> + SQL.execute c + [r|INSERT INTO epoch_spd + ( epochNo + , poolId + , lovelace + , slotNo + , blockHeaderHash + , blockNo + ) VALUES (?, ?, ?, ?, ?, ?)|] row + SQL.execute_ c "COMMIT" + + -- We store the LedgerState if one of following conditions hold: + -- * the LedgerState cannot be rollbacked and is the last of an epoch + -- * the LedgerState can be rollbacked + let writeLedgerState ledgerState (C.SlotNo slotNo) blockHeaderHash (C.BlockNo blockNo) isRollbackable = do + let fname = ledgerStateDirPath + "ledgerState_" + <> (if isRollbackable then "volatile_" else "") + <> show slotNo + <> "_" + <> Text.unpack (C.serialiseToRawBytesHexText blockHeaderHash) + <> "_" + <> show blockNo + <> ".bin" + -- TODO We should delete the file is the write operation was interrumpted by the + -- user. Tried using something like `onException`, but it doesn't run the cleanup + -- function. Not sure how to do the cleanup here without restoring doing it outside + -- the thread where this indexer is running. + BS.writeFile fname (CBOR.toLazyByteString $ encodeLedgerState ledgerState) + forM_ eventsList + $ \(EpochSPDEvent + maybeLedgerState + maybeEpochNo + _ + slotNo + blockHeaderHash + blockNo + chainTip + isLastEventOfEpoch) -> do + case (maybeEpochNo, maybeLedgerState) of + (Just _, Just ledgerState) -> do + let isRollbackable = isBlockRollbackable securityParam blockNo chainTip + when (isRollbackable || isLastEventOfEpoch) $ do + writeLedgerState ledgerState slotNo blockHeaderHash blockNo isRollbackable + -- We don't store any 'LedgerState' if the era doesn't have epochs (Byron era) or if + -- we don't have access to the 'LedgerState'. + _noLedgerStateOrEpochNo -> pure () + + -- Remove all immutable LedgerStates from the filesystem expect the most recent immutable + -- one which is from the last slot of latest epoch. + -- A 'LedgerState' is considered immutable if its 'blockNo' is '< latestBlockNo - securityParam'. + case NE.nonEmpty eventsList of + Nothing -> pure () + Just nonEmptyEvents -> do + let chainTip = + NE.head + $ NE.sortWith (\case C.ChainTipAtGenesis -> Down Nothing; + C.ChainTip _ _ bn -> Down (Just bn) + ) + $ fmap epochSPDEventChainTip nonEmptyEvents + + ledgerStateFilePaths <- + mapMaybe (\fp -> fmap (fp,) $ chainTipsFromLedgerStateFilePath fp) + <$> listDirectory ledgerStateDirPath + + -- Delete volatile LedgerState which have become immutable. + let oldVolatileLedgerStateFilePaths = + fmap fst + $ filter (\(_, (isVolatile, _, _, blockNo)) -> + isVolatile && not (isBlockRollbackable securityParam blockNo chainTip)) + ledgerStateFilePaths + forM_ oldVolatileLedgerStateFilePaths $ \fp -> removeFile $ ledgerStateDirPath fp + + -- Delete all immutable LedgerStates expect the latest one + let immutableLedgerStateFilePaths = + filter (\(_, (isVolatile, _, _, _)) -> not isVolatile) ledgerStateFilePaths + case NE.nonEmpty immutableLedgerStateFilePaths of + Nothing -> pure () + Just nonEmptyLedgerStateFilePaths -> do + let oldImmutableLedgerStateFilePaths = + fmap (\(fp, _, _) -> fp) + $ filter (\(_, _, isImmutableBlock) -> isImmutableBlock) + $ NE.tail + $ NE.sortWith (\(_, (_, _, blockNo), isImmutableBlock) -> + Down (blockNo, isImmutableBlock)) + $ fmap (\(fp, (_, slotNo, bhh, blockNo)) -> + ( fp + , (slotNo, bhh, blockNo) + , not $ isBlockRollbackable securityParam blockNo chainTip) + ) + nonEmptyLedgerStateFilePaths + forM_ oldImmutableLedgerStateFilePaths + $ \fp -> removeFile $ ledgerStateDirPath fp + + pure h + + -- TODO For now, this indexer does not support storing volatile events on disk, as we assume + -- that all events that are persisted are commited. + -- Eventually, we would *probably* want to support storing 'EpochSPDEvent' on disk. + getStoredEvents + :: EpochSPDHandle + -> IO [StorableEvent EpochSPDHandle] + getStoredEvents EpochSPDHandle {} = do + pure [] + +eventToEpochSPDRows + :: StorableEvent EpochSPDHandle + -> [EpochSPDRow] +eventToEpochSPDRows (EpochSPDEvent _ maybeEpochNo m slotNo blockHeaderHash blockNo _ _) = + mapMaybe + (\(keyHash, lovelace) -> + fmap (\epochNo -> EpochSPDRow + epochNo + keyHash + lovelace + slotNo + blockHeaderHash + blockNo) maybeEpochNo) + $ Map.toList m + +instance Queryable EpochSPDHandle where + queryStorage + :: Foldable f + => QueryInterval C.ChainPoint + -> f (StorableEvent EpochSPDHandle) + -> EpochSPDHandle + -> StorableQuery EpochSPDHandle + -> IO (StorableResult EpochSPDHandle) + + queryStorage _ events (EpochSPDHandle c _ _) (SPDByEpochNoQuery epochNo) = do + case List.find (\e -> epochSPDEventEpochNo e == Just epochNo) (toList events) of + Just e -> + pure $ SPDByEpochNoResult $ epochSPDEventSPD e + Nothing -> do + res :: [(C.PoolId, C.Lovelace)] <- SQL.query c + [r|SELECT poolId, lovelace + FROM epoch_spd + WHERE epochNo = ? + |] (SQL.Only epochNo) + pure $ SPDByEpochNoResult $ Map.fromList res + + queryStorage _ _ EpochSPDHandle {} (LedgerStateAtPointQuery C.ChainPointAtGenesis) = do + pure $ LedgerStateAtPointResult Nothing + queryStorage + _ + events + (EpochSPDHandle _ ledgerStateDirPath _) + (LedgerStateAtPointQuery (C.ChainPoint slotNo _)) = do + case List.find (\e -> epochSPDEventSlotNo e == slotNo) (toList events) of + Nothing -> do + ledgerStateFilePaths <- listDirectory ledgerStateDirPath + let ledgerStateFilePath = + List.find + (\fp -> fmap (\(_, sn, _, _) -> sn) + (chainTipsFromLedgerStateFilePath fp) == Just slotNo + ) + ledgerStateFilePaths + case ledgerStateFilePath of + Nothing -> pure $ LedgerStateAtPointResult Nothing + Just fp -> do + ledgerStateBs <- BS.readFile $ ledgerStateDirPath fp + let ledgerState = + either + (const Nothing) + (Just . snd) + $ CBOR.deserialiseFromBytes decodeLedgerState ledgerStateBs + pure $ LedgerStateAtPointResult ledgerState + Just event -> pure $ LedgerStateAtPointResult $ epochSPDEventLedgerState event + +instance Rewindable EpochSPDHandle where + rewindStorage + :: C.ChainPoint + -> EpochSPDHandle + -> IO (Maybe EpochSPDHandle) + rewindStorage C.ChainPointAtGenesis h@(EpochSPDHandle c ledgerStateDirPath _) = do + SQL.execute_ c "DELETE FROM epoch_spd" + + ledgerStateFilePaths <- listDirectory ledgerStateDirPath + forM_ ledgerStateFilePaths (\f -> removeFile $ ledgerStateDirPath f) + pure $ Just h + rewindStorage (C.ChainPoint sn _) h@(EpochSPDHandle c ledgerStateDirPath _) = do + SQL.execute c "DELETE FROM epoch_spd WHERE slotNo > ?" (SQL.Only sn) + + ledgerStateFilePaths <- listDirectory ledgerStateDirPath + forM_ ledgerStateFilePaths $ \fp -> do + case chainTipsFromLedgerStateFilePath fp of + Nothing -> pure () + Just (_, slotNo, _, _) | slotNo > sn -> removeFile $ ledgerStateDirPath fp + Just _ -> pure () + + pure $ Just h + +instance Resumable EpochSPDHandle where + resumeFromStorage + :: EpochSPDHandle + -> IO [C.ChainPoint] + resumeFromStorage (EpochSPDHandle c ledgerStateDirPath _) = do + epochSPDChainPoints :: Set C.ChainPoint <- Set.fromList . fmap (uncurry C.ChainPoint) <$> + SQL.query c + [r|SELECT slotNo, blockHeaderHash + FROM epoch_spd|] () + + ledgerStateFilepaths <- listDirectory ledgerStateDirPath + let ledgerStateChainPoints = + Set.fromList + $ fmap (\(_, sn, bhh, _) -> C.ChainPoint sn bhh) + $ mapMaybe chainTipsFromLedgerStateFilePath ledgerStateFilepaths + + let resumablePoints = + List.sortOn Down + $ Set.toList + $ Set.intersection epochSPDChainPoints ledgerStateChainPoints + + pure $ resumablePoints ++ [C.ChainPointAtGenesis] + +chainTipsFromLedgerStateFilePath :: FilePath -> Maybe (Bool, C.SlotNo, C.Hash C.BlockHeader, C.BlockNo) +chainTipsFromLedgerStateFilePath ledgerStateFilepath = + case Text.splitOn "_" (Text.pack $ dropExtension ledgerStateFilepath) of + [_, slotNoStr, bhhStr, blockNoStr] -> do + (False,,,) + <$> parseSlotNo slotNoStr + <*> parseBlockHeaderHash bhhStr + <*> parseBlockNo blockNoStr + [_, "volatile", slotNoStr, bhhStr, blockNoStr] -> do + (True,,,) + <$> parseSlotNo slotNoStr + <*> parseBlockHeaderHash bhhStr + <*> parseBlockNo blockNoStr + _anyOtherFailure -> Nothing + where + parseSlotNo slotNoStr = C.SlotNo <$> readMaybe (Text.unpack slotNoStr) + parseBlockHeaderHash bhhStr = do + bhhBs <- either (const Nothing) Just $ Base16.decode $ Text.encodeUtf8 bhhStr + C.deserialiseFromRawBytes (C.proxyToAsType Proxy) bhhBs + parseBlockNo blockNoStr = C.BlockNo <$> readMaybe (Text.unpack blockNoStr) + +open + :: FilePath + -- ^ SQLite database file path + -> FilePath + -- ^ Directory from which we will save the various 'LedgerState' as different points in time. + -> Word64 + -> IO (State EpochSPDHandle) +open dbPath ledgerStateDirPath securityParam = do + c <- SQL.open dbPath + SQL.execute_ c "PRAGMA journal_mode=WAL" + SQL.execute_ c + [r|CREATE TABLE IF NOT EXISTS epoch_spd + ( epochNo INT NOT NULL + , poolId BLOB NOT NULL + , lovelace INT NOT NULL + , slotNo INT NOT NULL + , blockHeaderHash BLOB NOT NULL + , blockNo INT NOT NULL + )|] + emptyState 1 (EpochSPDHandle c ledgerStateDirPath securityParam) diff --git a/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochStakepoolSize.hs b/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochStakepoolSize.hs index cdae1930aa..40f06f265f 100644 --- a/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochStakepoolSize.hs +++ b/marconi-chain-index/src/Marconi/ChainIndex/Indexers/EpochStakepoolSize.hs @@ -1,34 +1,16 @@ -{-# OPTIONS_GHC -Wno-orphans #-} -{-# OPTIONS_GHC -Wno-overlapping-patterns #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TypeApplications #-} -module Marconi.ChainIndex.Indexers.EpochStakepoolSize where +{-# OPTIONS_GHC -Wno-overlapping-patterns #-} -import Control.Monad.Trans.Class (lift) -import Data.Coerce (coerce) -import Data.Foldable (forM_) -import Data.Function (on, (&)) -import Data.List (groupBy) -import Data.Map qualified as M -import Data.Maybe qualified as P -import Data.Sequence qualified as Seq -import Data.Tuple (swap) -import Data.VMap qualified as VMap -import Database.SQLite.Simple qualified as SQL -import Database.SQLite.Simple.FromField qualified as SQL -import Database.SQLite.Simple.ToField qualified as SQL -import Streaming.Prelude qualified as S +module Marconi.ChainIndex.Indexers.EpochStakepoolSize where import Cardano.Api qualified as C import Cardano.Api.Shelley qualified as C -import Cardano.Streaming qualified as CS - import Cardano.Ledger.Coin qualified as L import Cardano.Ledger.Compactible qualified as L import Cardano.Ledger.Credential qualified as LC @@ -37,10 +19,23 @@ import Cardano.Ledger.Keys qualified as LK import Cardano.Ledger.Shelley.EpochBoundary qualified as Shelley import Cardano.Ledger.Shelley.LedgerState qualified as SL import Cardano.Ledger.Shelley.LedgerState qualified as Shelley +import Cardano.Streaming qualified as CS +import Cardano.Streaming.Helpers (getEpochNo) +import Control.Monad.Trans.Class (lift) +import Data.Coerce (coerce) +import Data.Foldable (forM_) +import Data.Function (on, (&)) +import Data.List (groupBy) +import Data.Map qualified as M +import Data.Maybe (catMaybes, mapMaybe) +import Data.Sequence qualified as Seq +import Data.Tuple (swap) +import Data.VMap qualified as VMap +import Database.SQLite.Simple qualified as SQL +import Marconi.ChainIndex.Orphans () import Ouroboros.Consensus.Cardano.Block qualified as O import Ouroboros.Consensus.Shelley.Ledger qualified as O - -import Cardano.Streaming.Helpers (getEpochNo) +import Streaming.Prelude qualified as S -- * Event @@ -107,7 +102,7 @@ getStakeMap ledgerState' = case ledgerState' of delegations = Shelley._delegations stakeSnapshot res :: M.Map C.PoolId C.Lovelace - res = M.fromListWith (+) $ map swap $ P.catMaybes $ VMap.elems $ + res = M.fromListWith (+) $ map swap $ catMaybes $ VMap.elems $ VMap.mapWithKey (\cred spkHash -> (\c -> (C.Lovelace $ coerce $ L.fromCompact c, f spkHash)) <$> VMap.lookup cred stakes) delegations f :: LK.KeyHash 'LK.StakePool c -> C.PoolId @@ -145,27 +140,6 @@ sqlite c source = do S.yield event loop source'' - -instance SQL.ToField C.EpochNo where - toField (C.EpochNo word64) = SQL.toField word64 -instance SQL.FromField C.EpochNo where - fromField f = C.EpochNo <$> SQL.fromField f - -instance SQL.ToField C.Lovelace where - toField = SQL.toField @Integer . coerce -instance SQL.FromField C.Lovelace where - fromField = coerce . SQL.fromField @Integer - -instance SQL.FromField C.PoolId where - fromField f = do - bs <- SQL.fromField f - case C.deserialiseFromRawBytes (C.AsHash C.AsStakePoolKey) bs of - Just h -> pure h - _ -> SQL.returnError SQL.ConversionFailed f " PoolId" - -instance SQL.ToField C.PoolId where - toField = SQL.toField . C.serialiseToRawBytes - queryByEpoch :: SQL.Connection -> C.EpochNo -> IO Event queryByEpoch c epochNo = do xs :: [(C.PoolId, C.Lovelace)] <- SQL.query c "SELECT poolId, lovelace FROM stakepool_delegation WHERE epochNo = ?" (SQL.Only epochNo) @@ -182,6 +156,5 @@ queryAll c = do lastTwo (_, a, b) = (a, b) result = all' & groupBy ((==) `on` (\(e, _, _) -> e)) - & map (\case xs@((e, _, _) : _) -> Just $ Event (e, M.fromList $ map lastTwo xs); _ -> Nothing) - & P.catMaybes + & mapMaybe (\case xs@((e, _, _) : _) -> Just $ Event (e, M.fromList $ map lastTwo xs); _ -> Nothing) pure result diff --git a/marconi-chain-index/src/Marconi/ChainIndex/Node/Client/GenesisConfig.hs b/marconi-chain-index/src/Marconi/ChainIndex/Node/Client/GenesisConfig.hs new file mode 100644 index 0000000000..4cb167b168 --- /dev/null +++ b/marconi-chain-index/src/Marconi/ChainIndex/Node/Client/GenesisConfig.hs @@ -0,0 +1,432 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Marconi.ChainIndex.Node.Client.GenesisConfig where + +import Cardano.Chain.Genesis qualified as Byron +import Cardano.Chain.Update qualified as Byron +import Cardano.Crypto.Hash qualified as Crypto +import Cardano.Crypto.Hashing (decodeAbstractHash) +import Cardano.Crypto.ProtocolMagic (RequiresNetworkMagic) +import Cardano.Ledger.Alonzo.Genesis qualified as Ledger +import Cardano.Ledger.Shelley.API qualified as Ledger +import Control.Exception (IOException, catch) +import Control.Monad (when) +import Control.Monad.Trans.Except (ExceptT (ExceptT), except) +import Control.Monad.Trans.Except.Extra (firstExceptT, handleIOExceptT, hoistEither, left) +import Data.Aeson as Aeson (FromJSON (parseJSON), Object, eitherDecodeStrict', withObject, (.:), (.:?)) +import Data.Aeson.Types (Parser) +import Data.ByteString as BS (ByteString, readFile) +import Data.ByteString.Base16 qualified as Base16 +import Data.Foldable (asum) +import Data.Text (Text) +import Data.Text qualified as Text +import Data.Text.Encoding qualified as Text +import Data.Yaml qualified as Yaml +import Ouroboros.Consensus.Cardano qualified as Consensus +import Ouroboros.Consensus.Cardano.Block qualified as Consensus +import Ouroboros.Consensus.Cardano.Block qualified as HFC +import Ouroboros.Consensus.Cardano.Node qualified as Consensus +import Ouroboros.Consensus.Ledger.Extended qualified as Ledger +import Ouroboros.Consensus.Mempool.TxLimits qualified as TxLimits +import Ouroboros.Consensus.Node qualified as Consensus +import Ouroboros.Consensus.Protocol.Praos.Translate () +import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () +import Ouroboros.Consensus.Shelley.Node.Praos qualified as Consensus +import System.FilePath (takeDirectory, ()) + +-- Usually only one constructor, but may have two when we are preparing for a HFC event. +data GenesisConfig + = GenesisCardano + !NodeConfig + !Byron.Config + !ShelleyConfig + !Ledger.AlonzoGenesis + +data ShelleyGenesisError + = ShelleyGenesisReadError !FilePath !Text + | ShelleyGenesisHashMismatch !GenesisHashShelley !GenesisHashShelley -- actual, expected + | ShelleyGenesisDecodeError !FilePath !Text + deriving Show + +data AlonzoGenesisError + = AlonzoGenesisReadError !FilePath !Text + | AlonzoGenesisHashMismatch !GenesisHashAlonzo !GenesisHashAlonzo -- actual, expected + | AlonzoGenesisDecodeError !FilePath !Text + deriving Show + +renderAlonzoGenesisError :: AlonzoGenesisError -> Text +renderAlonzoGenesisError sge = + case sge of + AlonzoGenesisReadError fp err -> + mconcat + [ "There was an error reading the genesis file: ", Text.pack fp + , " Error: ", err + ] + + AlonzoGenesisHashMismatch actual expected -> + mconcat + [ "Wrong Alonzo genesis file: the actual hash is ", renderHash (unGenesisHashAlonzo actual) + , ", but the expected Alonzo genesis hash given in the node " + , "configuration file is ", renderHash (unGenesisHashAlonzo expected), "." + ] + + AlonzoGenesisDecodeError fp err -> + mconcat + [ "There was an error parsing the genesis file: ", Text.pack fp + , " Error: ", err + ] + +renderShelleyGenesisError :: ShelleyGenesisError -> Text +renderShelleyGenesisError sge = + case sge of + ShelleyGenesisReadError fp err -> + mconcat + [ "There was an error reading the genesis file: ", Text.pack fp + , " Error: ", err + ] + + ShelleyGenesisHashMismatch actual expected -> + mconcat + [ "Wrong Shelley genesis file: the actual hash is ", renderHash (unGenesisHashShelley actual) + , ", but the expected Shelley genesis hash given in the node " + , "configuration file is ", renderHash (unGenesisHashShelley expected), "." + ] + + ShelleyGenesisDecodeError fp err -> + mconcat + [ "There was an error parsing the genesis file: ", Text.pack fp + , " Error: ", err + ] + +renderHash :: Crypto.Hash Crypto.Blake2b_256 ByteString -> Text +renderHash h = Text.decodeUtf8 $ Base16.encode (Crypto.hashToBytes h) + +data ShelleyConfig = ShelleyConfig + { scConfig :: !(Ledger.ShelleyGenesis Consensus.StandardShelley) + , scGenesisHash :: !GenesisHashShelley + } + +newtype GenesisFile = GenesisFile + { unGenesisFile :: FilePath + } deriving Show + +newtype GenesisHashByron = GenesisHashByron + { unGenesisHashByron :: Text + } deriving newtype (Eq, Show) + +newtype GenesisHashShelley = GenesisHashShelley + { unGenesisHashShelley :: Crypto.Hash Crypto.Blake2b_256 ByteString + } deriving newtype (Eq, Show) + +newtype GenesisHashAlonzo = GenesisHashAlonzo + { unGenesisHashAlonzo :: Crypto.Hash Crypto.Blake2b_256 ByteString + } deriving newtype (Eq, Show) + +newtype LedgerStateDir = LedgerStateDir + { unLedgerStateDir :: FilePath + } deriving Show + +newtype NetworkName = NetworkName + { unNetworkName :: Text + } deriving Show + +newtype NetworkConfigFile = NetworkConfigFile + { unNetworkConfigFile :: FilePath + } deriving Show + +newtype SocketPath = SocketPath + { unSocketPath :: FilePath + } deriving Show + +data NodeConfig = NodeConfig + { ncPBftSignatureThreshold :: !(Maybe Double) + , ncByronGenesisFile :: !GenesisFile + , ncByronGenesisHash :: !GenesisHashByron + , ncShelleyGenesisFile :: !GenesisFile + , ncShelleyGenesisHash :: !GenesisHashShelley + , ncAlonzoGenesisFile :: !GenesisFile + , ncAlonzoGenesisHash :: !GenesisHashAlonzo + , ncRequiresNetworkMagic :: !RequiresNetworkMagic + , ncByronSoftwareVersion :: !Byron.SoftwareVersion + , ncByronProtocolVersion :: !Byron.ProtocolVersion + + -- Per-era parameters for the hardfok transitions: + , ncByronToShelley :: !(Consensus.ProtocolTransitionParamsShelleyBased + Consensus.StandardShelley) + , ncShelleyToAllegra :: !(Consensus.ProtocolTransitionParamsShelleyBased + Consensus.StandardAllegra) + , ncAllegraToMary :: !(Consensus.ProtocolTransitionParamsShelleyBased + Consensus.StandardMary) + , ncMaryToAlonzo :: !Consensus.TriggerHardFork + , ncAlonzoToBabbage :: !Consensus.TriggerHardFork + } + +instance FromJSON NodeConfig where + parseJSON = + Aeson.withObject "NodeConfig" parse + where + parse :: Object -> Parser NodeConfig + parse o = + NodeConfig + <$> o .:? "PBftSignatureThreshold" + <*> fmap GenesisFile (o .: "ByronGenesisFile") + <*> fmap GenesisHashByron (o .: "ByronGenesisHash") + <*> fmap GenesisFile (o .: "ShelleyGenesisFile") + <*> fmap GenesisHashShelley (o .: "ShelleyGenesisHash") + <*> fmap GenesisFile (o .: "AlonzoGenesisFile") + <*> fmap GenesisHashAlonzo (o .: "AlonzoGenesisHash") + <*> o .: "RequiresNetworkMagic" + <*> parseByronSoftwareVersion o + <*> parseByronProtocolVersion o + <*> (Consensus.ProtocolTransitionParamsShelleyBased () + <$> parseShelleyHardForkEpoch o) + <*> (Consensus.ProtocolTransitionParamsShelleyBased () + <$> parseAllegraHardForkEpoch o) + <*> (Consensus.ProtocolTransitionParamsShelleyBased () + <$> parseMaryHardForkEpoch o) + <*> parseAlonzoHardForkEpoch o + <*> parseBabbageHardForkEpoch o + + parseByronProtocolVersion :: Object -> Parser Byron.ProtocolVersion + parseByronProtocolVersion o = + Byron.ProtocolVersion + <$> o .: "LastKnownBlockVersion-Major" + <*> o .: "LastKnownBlockVersion-Minor" + <*> o .: "LastKnownBlockVersion-Alt" + + parseByronSoftwareVersion :: Object -> Parser Byron.SoftwareVersion + parseByronSoftwareVersion o = + Byron.SoftwareVersion + <$> fmap Byron.ApplicationName (o .: "ApplicationName") + <*> o .: "ApplicationVersion" + + parseShelleyHardForkEpoch :: Object -> Parser Consensus.TriggerHardFork + parseShelleyHardForkEpoch o = + asum + [ Consensus.TriggerHardForkAtEpoch <$> o .: "TestShelleyHardForkAtEpoch" + , pure $ Consensus.TriggerHardForkAtVersion 2 -- Mainnet default + ] + + parseAllegraHardForkEpoch :: Object -> Parser Consensus.TriggerHardFork + parseAllegraHardForkEpoch o = + asum + [ Consensus.TriggerHardForkAtEpoch <$> o .: "TestAllegraHardForkAtEpoch" + , pure $ Consensus.TriggerHardForkAtVersion 3 -- Mainnet default + ] + + parseMaryHardForkEpoch :: Object -> Parser Consensus.TriggerHardFork + parseMaryHardForkEpoch o = + asum + [ Consensus.TriggerHardForkAtEpoch <$> o .: "TestMaryHardForkAtEpoch" + , pure $ Consensus.TriggerHardForkAtVersion 4 -- Mainnet default + ] + + parseAlonzoHardForkEpoch :: Object -> Parser Consensus.TriggerHardFork + parseAlonzoHardForkEpoch o = + asum + [ Consensus.TriggerHardForkAtEpoch <$> o .: "TestAlonzoHardForkAtEpoch" + , pure $ Consensus.TriggerHardForkAtVersion 5 -- Mainnet default + ] + parseBabbageHardForkEpoch :: Object -> Parser Consensus.TriggerHardFork + parseBabbageHardForkEpoch o = + asum + [ Consensus.TriggerHardForkAtEpoch <$> o .: "TestBabbageHardForkAtEpoch" + , pure $ Consensus.TriggerHardForkAtVersion 7 -- Mainnet default + ] + +readNetworkConfig :: NetworkConfigFile -> ExceptT Text IO NodeConfig +readNetworkConfig (NetworkConfigFile ncf) = do + ncfg <- (except . parseNodeConfig) =<< readByteString ncf "node" + return ncfg + { ncByronGenesisFile = adjustGenesisFilePath (mkAdjustPath ncf) (ncByronGenesisFile ncfg) + , ncShelleyGenesisFile = adjustGenesisFilePath (mkAdjustPath ncf) (ncShelleyGenesisFile ncfg) + , ncAlonzoGenesisFile = adjustGenesisFilePath (mkAdjustPath ncf) (ncAlonzoGenesisFile ncfg) + } + +adjustGenesisFilePath :: (FilePath -> FilePath) -> GenesisFile -> GenesisFile +adjustGenesisFilePath f (GenesisFile p) = GenesisFile (f p) + +mkAdjustPath :: FilePath -> (FilePath -> FilePath) +mkAdjustPath nodeConfigFilePath fp = takeDirectory nodeConfigFilePath fp + +parseNodeConfig :: ByteString -> Either Text NodeConfig +parseNodeConfig bs = + case Yaml.decodeEither' bs of + Left err -> Left $ "Error parsing node config: " <> textShow err + Right nc -> Right nc + +readCardanoGenesisConfig + :: NodeConfig + -> ExceptT GenesisConfigError IO GenesisConfig +readCardanoGenesisConfig enc = + GenesisCardano enc + <$> readByronGenesisConfig enc + <*> readShelleyGenesisConfig enc + <*> readAlonzoGenesisConfig enc + +readByronGenesisConfig + :: NodeConfig + -> ExceptT GenesisConfigError IO Byron.Config +readByronGenesisConfig enc = do + let file = unGenesisFile $ ncByronGenesisFile enc + genHash <- firstExceptT NEError + . hoistEither + $ decodeAbstractHash (unGenesisHashByron $ ncByronGenesisHash enc) + firstExceptT (NEByronConfig file) + $ Byron.mkConfigFromFile (ncRequiresNetworkMagic enc) file genHash + +readShelleyGenesisConfig + :: NodeConfig + -> ExceptT GenesisConfigError IO ShelleyConfig +readShelleyGenesisConfig enc = do + let file = unGenesisFile $ ncShelleyGenesisFile enc + firstExceptT (NEShelleyConfig file . renderShelleyGenesisError) + $ readShelleyGenesis (GenesisFile file) (ncShelleyGenesisHash enc) + +readAlonzoGenesisConfig + :: NodeConfig + -> ExceptT GenesisConfigError IO Ledger.AlonzoGenesis +readAlonzoGenesisConfig enc = do + let file = unGenesisFile $ ncAlonzoGenesisFile enc + firstExceptT (NEAlonzoConfig file . renderAlonzoGenesisError) + $ readAlonzoGenesis (GenesisFile file) (ncAlonzoGenesisHash enc) + +readAlonzoGenesis + :: GenesisFile -> GenesisHashAlonzo + -> ExceptT AlonzoGenesisError IO Ledger.AlonzoGenesis +readAlonzoGenesis (GenesisFile file) expectedGenesisHash = do + content <- handleIOExceptT (AlonzoGenesisReadError file . textShow) $ BS.readFile file + let genesisHash = GenesisHashAlonzo (Crypto.hashWith id content) + checkExpectedGenesisHash genesisHash + firstExceptT (AlonzoGenesisDecodeError file . Text.pack) + . hoistEither + $ Aeson.eitherDecodeStrict' content + where + checkExpectedGenesisHash :: GenesisHashAlonzo -> ExceptT AlonzoGenesisError IO () + checkExpectedGenesisHash actual = + when (actual /= expectedGenesisHash) $ + left (AlonzoGenesisHashMismatch actual expectedGenesisHash) + +readShelleyGenesis + :: GenesisFile -> GenesisHashShelley + -> ExceptT ShelleyGenesisError IO ShelleyConfig +readShelleyGenesis (GenesisFile file) expectedGenesisHash = do + content <- handleIOExceptT (ShelleyGenesisReadError file . textShow) $ BS.readFile file + let genesisHash = GenesisHashShelley (Crypto.hashWith id content) + checkExpectedGenesisHash genesisHash + genesis <- firstExceptT (ShelleyGenesisDecodeError file . Text.pack) + . hoistEither + $ Aeson.eitherDecodeStrict' content + pure $ ShelleyConfig genesis genesisHash + where + checkExpectedGenesisHash :: GenesisHashShelley -> ExceptT ShelleyGenesisError IO () + checkExpectedGenesisHash actual = + when (actual /= expectedGenesisHash) $ + left (ShelleyGenesisHashMismatch actual expectedGenesisHash) + +data GenesisConfigError + = NEError !Text + | NEByronConfig !FilePath !Byron.ConfigurationError + | NEShelleyConfig !FilePath !Text + | NEAlonzoConfig !FilePath !Text + | NECardanoConfig !Text + +renderGenesisConfigError :: GenesisConfigError -> Text +renderGenesisConfigError ne = + case ne of + NEError t -> "Error: " <> t + NEByronConfig fp ce -> + mconcat + [ "Failed reading Byron genesis file ", textShow fp, ": ", textShow ce + ] + NEShelleyConfig fp txt -> + mconcat + [ "Failed reading Shelley genesis file ", textShow fp, ": ", txt + ] + NEAlonzoConfig fp txt -> + mconcat + [ "Failed reading Alonzo genesis file ", textShow fp, ": ", txt + ] + NECardanoConfig err -> + mconcat + [ "With Cardano protocol, Byron/Shelley config mismatch:\n" + , " ", err + ] + +initExtLedgerStateVar :: GenesisConfig -> Ledger.ExtLedgerState (HFC.HardForkBlock (Consensus.CardanoEras Consensus.StandardCrypto)) +initExtLedgerStateVar genesisConfig = Consensus.pInfoInitLedger protocolInfo + where + protocolInfo = mkProtocolInfoCardano genesisConfig + +mkProtocolInfoCardano :: + GenesisConfig -> + Consensus.ProtocolInfo + IO + (HFC.HardForkBlock + (Consensus.CardanoEras Consensus.StandardCrypto)) +mkProtocolInfoCardano (GenesisCardano dnc byronGenesis shelleyGenesis alonzoGenesis) + = Consensus.protocolInfoCardano + Consensus.ProtocolParamsByron + { Consensus.byronGenesis = byronGenesis + , Consensus.byronPbftSignatureThreshold = Consensus.PBftSignatureThreshold <$> ncPBftSignatureThreshold dnc + , Consensus.byronProtocolVersion = ncByronProtocolVersion dnc + , Consensus.byronSoftwareVersion = ncByronSoftwareVersion dnc + , Consensus.byronLeaderCredentials = Nothing + , Consensus.byronMaxTxCapacityOverrides = TxLimits.mkOverrides TxLimits.noOverridesMeasure + } + Consensus.ProtocolParamsShelleyBased + { Consensus.shelleyBasedGenesis = scConfig shelleyGenesis + , Consensus.shelleyBasedInitialNonce = shelleyPraosNonce shelleyGenesis + , Consensus.shelleyBasedLeaderCredentials = [] + } + Consensus.ProtocolParamsShelley + { Consensus.shelleyProtVer = shelleyProtVer dnc + , Consensus.shelleyMaxTxCapacityOverrides = TxLimits.mkOverrides TxLimits.noOverridesMeasure + } + Consensus.ProtocolParamsAllegra + { Consensus.allegraProtVer = shelleyProtVer dnc + , Consensus.allegraMaxTxCapacityOverrides = TxLimits.mkOverrides TxLimits.noOverridesMeasure + } + Consensus.ProtocolParamsMary + { Consensus.maryProtVer = shelleyProtVer dnc + , Consensus.maryMaxTxCapacityOverrides = TxLimits.mkOverrides TxLimits.noOverridesMeasure + } + Consensus.ProtocolParamsAlonzo + { Consensus.alonzoProtVer = shelleyProtVer dnc + , Consensus.alonzoMaxTxCapacityOverrides = TxLimits.mkOverrides TxLimits.noOverridesMeasure + } + Consensus.ProtocolParamsBabbage + { Consensus.babbageProtVer = shelleyProtVer dnc + , Consensus.babbageMaxTxCapacityOverrides = TxLimits.mkOverrides TxLimits.noOverridesMeasure + } + (ncByronToShelley dnc) + (ncShelleyToAllegra dnc) + (ncAllegraToMary dnc) + (Consensus.ProtocolTransitionParamsShelleyBased alonzoGenesis (ncMaryToAlonzo dnc)) + (Consensus.ProtocolTransitionParamsShelleyBased alonzoGenesis (ncAlonzoToBabbage dnc)) + +shelleyProtVer :: NodeConfig -> Ledger.ProtVer +shelleyProtVer dnc = + let bver = ncByronProtocolVersion dnc in + Ledger.ProtVer + (fromIntegral $ Byron.pvMajor bver) + (fromIntegral $ Byron.pvMinor bver) + +shelleyPraosNonce :: ShelleyConfig -> Ledger.Nonce +shelleyPraosNonce sCfg = Ledger.Nonce (Crypto.castHash . unGenesisHashShelley $ scGenesisHash sCfg) + +textShow :: Show a => a -> Text +textShow = Text.pack . show + +readByteString :: FilePath -> Text -> ExceptT Text IO ByteString +readByteString fp cfgType = ExceptT $ + catch (Right <$> BS.readFile fp) $ \(_ :: IOException) -> + return $ Left $ mconcat + [ "Cannot read the ", cfgType, " configuration file at : ", Text.pack fp ] diff --git a/marconi-chain-index/src/Marconi/ChainIndex/Orphans.hs b/marconi-chain-index/src/Marconi/ChainIndex/Orphans.hs index b952d4e389..ed4dd16d9f 100644 --- a/marconi-chain-index/src/Marconi/ChainIndex/Orphans.hs +++ b/marconi-chain-index/src/Marconi/ChainIndex/Orphans.hs @@ -6,21 +6,30 @@ module Marconi.ChainIndex.Orphans where import Cardano.Api qualified as C -import Cardano.Binary (fromCBOR, toCBOR) +import Cardano.Api.Shelley qualified as C +import Cardano.Binary qualified as CBOR +import Codec.CBOR.Read qualified as CBOR import Codec.Serialise (Serialise (decode, encode)) import Data.Aeson (FromJSON, ToJSON) import Data.Aeson qualified as Aeson import Data.ByteString.Base16 qualified as Base16 import Data.ByteString.Lazy (toStrict) +import Data.Coerce (coerce) import Data.Functor ((<&>)) import Data.Maybe (fromMaybe) import Data.Proxy (Proxy (Proxy)) +import Data.SOP.Strict (K (K), NP (Nil, (:*)), fn, type (:.:) (Comp)) import Data.Text.Encoding qualified as Text import Data.Typeable (Typeable) import Database.SQLite.Simple qualified as SQL import Database.SQLite.Simple.FromField qualified as SQL import Database.SQLite.Simple.Ok qualified as SQL import Database.SQLite.Simple.ToField qualified as SQL +import Ouroboros.Consensus.Byron.Ledger qualified as O +import Ouroboros.Consensus.Cardano.Block qualified as O +import Ouroboros.Consensus.HardFork.Combinator qualified as O +import Ouroboros.Consensus.HardFork.Combinator.Serialisation.Common qualified as O +import Ouroboros.Consensus.Shelley.Ledger qualified as O import Prettyprinter (Pretty (pretty), (<+>)) instance Pretty C.ChainTip where @@ -55,6 +64,9 @@ instance SQL.FromField (C.Hash C.BlockHeader) where instance Pretty C.SlotNo where pretty (C.SlotNo n) = "Slot" <+> pretty n +deriving newtype instance SQL.ToField C.SlotNo +deriving newtype instance SQL.FromField C.SlotNo + -- * C.BlockNo instance Pretty C.BlockNo where @@ -100,8 +112,8 @@ instance SQL.ToField (C.Hash C.ScriptData) where -- * C.ScriptData instance Serialise C.ScriptData where - encode = toCBOR - decode = fromCBOR + encode = CBOR.toCBOR + decode = CBOR.fromCBOR instance SQL.FromField C.ScriptData where fromField f = SQL.fromField f >>= @@ -177,20 +189,47 @@ instance SQL.FromField C.ScriptHash where (const $ SQL.returnError SQL.ConversionFailed f "Cannot deserialise scriptDataHash.") pure . C.deserialiseFromRawBytesHex (C.proxyToAsType Proxy) +-- * O.LedgerState (O.CardanoBlock O.StandardCrypto) + +instance SQL.ToField (O.LedgerState (O.CardanoBlock O.StandardCrypto)) where + toField = SQL.SQLBlob . CBOR.toStrictByteString . encodeLedgerState + +instance SQL.FromField (O.LedgerState (O.CardanoBlock O.StandardCrypto)) where + fromField f = SQL.fromField f >>= either + (const $ SQL.returnError SQL.ConversionFailed f "Cannot deserialise LedgerState.") + (pure . snd) . CBOR.deserialiseFromBytes decodeLedgerState + -- * ToField/FromField deriving newtype instance SQL.ToField C.BlockNo deriving newtype instance SQL.FromField C.BlockNo -deriving newtype instance SQL.ToField C.SlotNo -deriving newtype instance SQL.FromField C.SlotNo - deriving newtype instance SQL.ToField C.AssetName deriving newtype instance SQL.FromField C.AssetName deriving newtype instance SQL.ToField C.Quantity deriving newtype instance SQL.FromField C.Quantity +instance SQL.ToField C.EpochNo where + toField (C.EpochNo word64) = SQL.toField word64 +instance SQL.FromField C.EpochNo where + fromField f = C.EpochNo <$> SQL.fromField f + +instance SQL.ToField C.Lovelace where + toField = SQL.toField @Integer . coerce +instance SQL.FromField C.Lovelace where + fromField = coerce . SQL.fromField @Integer + +instance SQL.FromField C.PoolId where + fromField f = do + bs <- SQL.fromField f + case C.deserialiseFromRawBytes (C.AsHash C.AsStakePoolKey) bs of + Just h -> pure h + Nothing -> SQL.returnError SQL.ConversionFailed f " PoolId" + +instance SQL.ToField C.PoolId where + toField = SQL.toField . C.serialiseToRawBytes + instance SQL.ToField C.PolicyId where -- C.PolicyId is a newtype over C.ScriptHash but no ToField available for it. toField = SQL.toField . C.serialiseToRawBytes instance SQL.FromField C.PolicyId where @@ -200,3 +239,28 @@ instance SQL.FromField C.PolicyId where fromFieldViaRawBytes :: (C.SerialiseAsRawBytes a, Typeable a) => C.AsType a -> SQL.Field -> SQL.Ok a fromFieldViaRawBytes as f = maybe err pure . C.deserialiseFromRawBytes as =<< SQL.fromField f where err = SQL.returnError SQL.ConversionFailed f "can't deserialise via SerialiseAsRawBytes" + +encodeLedgerState :: O.LedgerState (O.CardanoBlock O.StandardCrypto) -> CBOR.Encoding +encodeLedgerState (O.HardForkLedgerState st) = + O.encodeTelescope + (byron :* shelley :* allegra :* mary :* alonzo :* babbage :* Nil) + st + where + byron = fn (K . O.encodeByronLedgerState) + shelley = fn (K . O.encodeShelleyLedgerState) + allegra = fn (K . O.encodeShelleyLedgerState) + mary = fn (K . O.encodeShelleyLedgerState) + alonzo = fn (K . O.encodeShelleyLedgerState) + babbage = fn (K . O.encodeShelleyLedgerState) + +decodeLedgerState :: forall s. CBOR.Decoder s (O.LedgerState (O.CardanoBlock O.StandardCrypto)) +decodeLedgerState = + O.HardForkLedgerState + <$> O.decodeTelescope (byron :* shelley :* allegra :* mary :* alonzo :* babbage :* Nil) + where + byron = Comp O.decodeByronLedgerState + shelley = Comp O.decodeShelleyLedgerState + allegra = Comp O.decodeShelleyLedgerState + mary = Comp O.decodeShelleyLedgerState + alonzo = Comp O.decodeShelleyLedgerState + babbage = Comp O.decodeShelleyLedgerState diff --git a/marconi-chain-index/src/Marconi/ChainIndex/Utils.hs b/marconi-chain-index/src/Marconi/ChainIndex/Utils.hs new file mode 100644 index 0000000000..5d92e51b24 --- /dev/null +++ b/marconi-chain-index/src/Marconi/ChainIndex/Utils.hs @@ -0,0 +1,15 @@ +module Marconi.ChainIndex.Utils + ( isBlockRollbackable + ) where + +import Cardano.Api qualified as C +import Data.Word (Word64) + +isBlockRollbackable :: Word64 -> C.BlockNo -> C.ChainTip -> Bool +isBlockRollbackable securityParam (C.BlockNo chainSyncBlockNo) localChainTip = + let chainTipBlockNo = + case localChainTip of + C.ChainTipAtGenesis -> 0 + (C.ChainTip _ _ (C.BlockNo bn)) -> bn + -- TODO Need to confirm if it's "<" or "<=" + in chainTipBlockNo - chainSyncBlockNo <= securityParam diff --git a/marconi-chain-index/test-lib/Gen/Marconi/ChainIndex/Types.hs b/marconi-chain-index/test-lib/Gen/Marconi/ChainIndex/Types.hs index d47f1b292d..2aa838f602 100644 --- a/marconi-chain-index/test-lib/Gen/Marconi/ChainIndex/Types.hs +++ b/marconi-chain-index/test-lib/Gen/Marconi/ChainIndex/Types.hs @@ -23,12 +23,15 @@ module Gen.Marconi.ChainIndex.Types , genAssetId , genPolicyId , genQuantity + , genEpochNo + , genPoolId ) where import Cardano.Api qualified as C import Cardano.Api.Shelley qualified as C import Cardano.Binary qualified as CBOR import Cardano.Crypto.Hash.Class qualified as CRYPTO +import Cardano.Ledger.Keys (KeyHash (KeyHash)) import Cardano.Ledger.SafeHash (unsafeMakeSafeHash) import Data.ByteString (ByteString) import Data.ByteString qualified as BS @@ -283,10 +286,6 @@ genProtocolParametersForPlutusScripts = ratioToRational :: Ratio Int64 -> Rational ratioToRational = toRational - -- Copied from cardano-api. Delete when this function is reexported - genEpochNo :: Gen C.EpochNo - genEpochNo = C.EpochNo <$> Gen.word64 (Range.linear 0 10) - -- Copied from cardano-api. Delete when this function is reexported genNat :: Gen Natural genNat = Gen.integral (Range.linear 0 10) @@ -315,3 +314,13 @@ genPolicyId = -- TODO Copied from cardano-api. Delete once reexported genQuantity :: Range Integer -> Gen C.Quantity genQuantity range = fromInteger <$> Gen.integral range + +-- TODO Copied from cardano-api. Delete once reexported +genEpochNo :: Gen C.EpochNo +genEpochNo = C.EpochNo <$> Gen.word64 (Range.linear 0 10) + +genPoolId :: Gen (C.Hash C.StakePoolKey) +genPoolId = C.StakePoolKeyHash . KeyHash . mkDummyHash <$> Gen.int (Range.linear 0 10) + where + mkDummyHash :: forall h a. CRYPTO.HashAlgorithm h => Int -> CRYPTO.Hash h a + mkDummyHash = coerce . CRYPTO.hashWithSerialiser @h CBOR.toCBOR diff --git a/marconi-chain-index/test/Spec/Marconi/ChainIndex/Orphans.hs b/marconi-chain-index/test/Spec/Marconi/ChainIndex/Orphans.hs index 4881c72931..2c6a3ffeba 100644 --- a/marconi-chain-index/test/Spec/Marconi/ChainIndex/Orphans.hs +++ b/marconi-chain-index/test/Spec/Marconi/ChainIndex/Orphans.hs @@ -77,6 +77,21 @@ tests = testGroup "Spec.Marconi.ChainIndex.Orphans" "C.PolicyId" "propSQLFieldRoundtripPolicyId" propSQLFieldRoundtripPolicyId + + , testPropertyNamed + "C.EpochNo" + "propSQLFieldRoundtripEpochNo" + propSQLFieldRoundtripEpochNo + + , testPropertyNamed + "C.Lovelace" + "propSQLFieldRoundtripLovelace" + propSQLFieldRoundtripLovelace + + , testPropertyNamed + "C.PoolId" + "propSQLFieldRoundtripPoolId" + propSQLFieldRoundtripPoolId ] , testGroup "ToJSON/FromJSON rountrip" @@ -162,3 +177,18 @@ propSQLFieldRoundtripPolicyId = property $ do p <- forAll Gen.genPolicyId tripping p SQL.toField (\sqlData -> SQL.fromField $ SQL.Field sqlData 0) +propSQLFieldRoundtripEpochNo :: Property +propSQLFieldRoundtripEpochNo = property $ do + p <- forAll Gen.genEpochNo + tripping p SQL.toField (\sqlData -> SQL.fromField $ SQL.Field sqlData 0) + +propSQLFieldRoundtripLovelace :: Property +propSQLFieldRoundtripLovelace = property $ do + p <- forAll CGen.genLovelace + tripping p SQL.toField (\sqlData -> SQL.fromField $ SQL.Field sqlData 0) + +propSQLFieldRoundtripPoolId :: Property +propSQLFieldRoundtripPoolId = property $ do + p <- forAll Gen.genPoolId + tripping p SQL.toField (\sqlData -> SQL.fromField $ SQL.Field sqlData 0) +