From 4fa37f29ae1d1165425914fa455d64df8e381acc Mon Sep 17 00:00:00 2001 From: Sahra <107348755+sahra-karakoc@users.noreply.github.com> Date: Tue, 15 Aug 2023 17:37:00 +0300 Subject: [PATCH] feat: epoch entity feature parity (#57) * feat: epochManager added * fix: abi fixed * feat: epoch length updated events added * feat: start and end block added * feat: epoch signalledTokens added * feat: epoch stake and queryFeeRebates added * feat: epoch indexing rewards added * feat: substreams graph updated * fix: changed "end" keyword to "en" to allow graph rendering --- README.md | 28 +- abis/epochManager.json | 232 ++++ build.rs | 3 + proto/erc20.proto | 13 + schema.graphql | 24 + src/abi/epoch_manager.rs | 2126 ++++++++++++++++++++++++++++++++++ src/abi/mod.rs | 1 + src/db.rs | 79 ++ src/modules/curation.rs | 30 +- src/modules/epoch_manager.rs | 63 + src/modules/graph_out.rs | 22 +- src/modules/init_maps.rs | 18 +- src/modules/mod.rs | 7 +- src/modules/staking.rs | 124 +- src/pb/eth.erc20.v1.rs | 20 + src/utils.rs | 8 + substreams.yaml | 73 ++ 17 files changed, 2847 insertions(+), 24 deletions(-) create mode 100644 abis/epochManager.json create mode 100644 src/abi/epoch_manager.rs create mode 100644 src/modules/epoch_manager.rs diff --git a/README.md b/README.md index 19a2172..831d362 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ graph LR; map_events --> store_cumulative_curator_burned; store_query_fee_rebates[store: store_query_fee_rebates]; map_events --> store_query_fee_rebates; + store_epoch_count --> store_query_fee_rebates; store_query_fees_amount[store: store_query_fees_amount]; map_events --> store_query_fees_amount; store_curator_fee_rewards[store: store_curator_fee_rewards]; @@ -71,9 +72,34 @@ graph LR; map_events --> store_signal_amount; store_subgraph_deployment_rewards[store: store_subgraph_deployment_rewards]; map_indexing_rewards --> store_subgraph_deployment_rewards; + store_epoch_length[store: store_epoch_length]; + map_events --> store_epoch_length; + store_epoch_count[store: store_epoch_count]; + store_epoch_length --> store_epoch_count; + sf.substreams.v1.Clock[source: sf.substreams.v1.Clock] --> store_epoch_count; + store_epoch_start[store: store_epoch_start]; + store_epoch_count --> store_epoch_start; + sf.substreams.v1.Clock[source: sf.substreams.v1.Clock] --> store_epoch_start; + store_epoch_en[store: store_epoch_end]; + store_epoch_count --> store_epoch_en; + sf.substreams.v1.Clock[source: sf.substreams.v1.Clock] --> store_epoch_en; + store_epoch_signal[store: store_epoch_signal]; + map_storage_changes --> store_epoch_signal; + store_epoch_count --> store_epoch_signal; + store_epoch_stake[store: store_epoch_stake]; + map_events --> store_epoch_stake; + store_epoch_count --> store_epoch_stake; + store_epoch_rewards[store: store_epoch_rewards]; + map_indexing_rewards --> store_epoch_rewards; + store_epoch_count --> store_epoch_rewards; graph_out[map: graph_out]; sf.substreams.v1.Clock[source: sf.substreams.v1.Clock] --> graph_out; map_events --> graph_out; + store_epoch_start -- deltas --> graph_out; + store_epoch_en -- deltas --> graph_out; + store_epoch_signal -- deltas --> graph_out; + store_epoch_stake -- deltas --> graph_out; + store_epoch_rewards -- deltas --> graph_out; store_grt_global -- deltas --> graph_out; store_grt_balances -- deltas --> graph_out; store_graph_account_indexer -- deltas --> graph_out; @@ -94,8 +120,8 @@ graph LR; store_signal_amount -- deltas --> graph_out; store_subgraph_deployment_rewards -- deltas --> graph_out; map_indexing_rewards --> graph_out; -``` +``` ## Quickstart To build and run the substream, diff --git a/abis/epochManager.json b/abis/epochManager.json new file mode 100644 index 0000000..dd9ca6d --- /dev/null +++ b/abis/epochManager.json @@ -0,0 +1,232 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "epoch", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "epochLength", + "type": "uint256" + } + ], + "name": "EpochLengthUpdate", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "epoch", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "address", + "name": "caller", + "type": "address" + } + ], + "name": "EpochRun", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "param", + "type": "string" + } + ], + "name": "ParameterUpdated", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "address", + "name": "controller", + "type": "address" + } + ], + "name": "SetController", + "type": "event" + }, + { + "inputs": [ + { + "internalType": "contract IGraphProxy", + "name": "_proxy", + "type": "address" + } + ], + "name": "acceptProxy", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [ + { + "internalType": "contract IGraphProxy", + "name": "_proxy", + "type": "address" + }, + { "internalType": "bytes", "name": "_data", "type": "bytes" } + ], + "name": "acceptProxyAndCall", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [{ "internalType": "bytes32", "name": "", "type": "bytes32" }], + "name": "addressCache", + "outputs": [{ "internalType": "address", "name": "", "type": "address" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [ + { "internalType": "uint256", "name": "_block", "type": "uint256" } + ], + "name": "blockHash", + "outputs": [{ "internalType": "bytes32", "name": "", "type": "bytes32" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "blockNum", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "controller", + "outputs": [ + { "internalType": "contract IController", "name": "", "type": "address" } + ], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "currentEpoch", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "currentEpochBlock", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "currentEpochBlockSinceStart", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "epochLength", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [ + { "internalType": "uint256", "name": "_epoch", "type": "uint256" } + ], + "name": "epochsSince", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "epochsSinceUpdate", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [ + { "internalType": "address", "name": "_controller", "type": "address" }, + { "internalType": "uint256", "name": "_epochLength", "type": "uint256" } + ], + "name": "initialize", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [], + "name": "isCurrentEpochRun", + "outputs": [{ "internalType": "bool", "name": "", "type": "bool" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "lastLengthUpdateBlock", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "lastLengthUpdateEpoch", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "lastRunEpoch", + "outputs": [{ "internalType": "uint256", "name": "", "type": "uint256" }], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "runEpoch", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [ + { "internalType": "address", "name": "_controller", "type": "address" } + ], + "name": "setController", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [ + { "internalType": "uint256", "name": "_epochLength", "type": "uint256" } + ], + "name": "setEpochLength", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + } +] diff --git a/build.rs b/build.rs index 349d30b..23f299c 100644 --- a/build.rs +++ b/build.rs @@ -20,5 +20,8 @@ fn main() -> Result<(), anyhow::Error> { Abigen::new("controller", "./abis/controller.json")? .generate()? .write_to_file("src/abi/controller.rs")?; + Abigen::new("epochManager", "./abis/epochManager.json")? + .generate()? + .write_to_file("src/abi/epoch_manager.rs")?; Ok(()) } diff --git a/proto/erc20.proto b/proto/erc20.proto index b6bde17..7d1334e 100644 --- a/proto/erc20.proto +++ b/proto/erc20.proto @@ -61,10 +61,15 @@ message AllocationCollectedEvents { message PauseChangedEvents { repeated PauseChanged paused_changed_events = 1; } + message PartialPauseChangedEvents { repeated PartialPauseChanged partial_paused_changed_events = 1; } +message EpochLengthUpdatedEvents { + repeated EpochLengthUpdated epoch_length_updated_events = 1; +} + message IndexerStakes { repeated IndexerStake indexer_stakes = 1; } @@ -113,6 +118,7 @@ message Events { AllocationCollectedEvents allocation_collected_events = 13; PauseChangedEvents pause_changed_events = 14; PartialPauseChangedEvents partial_pause_changed_events = 15; + EpochLengthUpdatedEvents epoch_length_updated_events = 16; } message Transfer { @@ -263,6 +269,13 @@ message PartialPauseChanged { uint64 ordinal = 3; } +message EpochLengthUpdated { + string id = 1; + string last_length_update_block = 2; + string epoch_length = 3; + uint64 ordinal = 4; +} + message IndexerStake { string id = 1; bytes indexer = 2; diff --git a/schema.graphql b/schema.graphql index 52250ed..c23c19d 100644 --- a/schema.graphql +++ b/schema.graphql @@ -182,3 +182,27 @@ enum AllocationStatus { Finalized # == Closing && closedAtEpoch + channelDisputeEpochs > now(). Note, the subgraph has no way to return this value. it is implied Claimed # == not Null && tokens == 0 - i.e. finalized, and all tokens withdrawn } + +""" +Epoch aggregate data for network statistics on signaling, rewards, and query fees +""" +type Epoch @entity { + "Epoch number" + id: ID! + "Start block of the epoch" + startBlock: Int! + "End block of the epoch" + endBlock: Int! + "Signaled tokens during this epoch" + signalledTokens: BigInt! + "Stake deposited during this epoch" + stakeDeposited: BigInt! + "Rebate amount claimed from the protocol through cobbs douglas during this epoch (Doesn't correlate to the queryFeesCollected for this epoch since there's a 7 day period before claiming)" + queryFeeRebates: BigInt! + "Total indexing rewards earned in this epoch. Includes both delegator and indexer rewards" + totalRewards: BigInt! + "Total indexing rewards earned in this epoch by indexers" + totalIndexerRewards: BigInt! + "Total indexing rewards earned in this epoch by delegators" + totalDelegatorRewards: BigInt! +} diff --git a/src/abi/epoch_manager.rs b/src/abi/epoch_manager.rs new file mode 100644 index 0000000..f94ae3d --- /dev/null +++ b/src/abi/epoch_manager.rs @@ -0,0 +1,2126 @@ + const INTERNAL_ERR: &'static str = "`ethabi_derive` internal error"; + /// Contract's functions. + #[allow(dead_code, unused_imports, unused_variables)] + pub mod functions { + use super::INTERNAL_ERR; + #[derive(Debug, Clone, PartialEq)] + pub struct AcceptProxy { + pub proxy: Vec, + } + impl AcceptProxy { + const METHOD_ID: [u8; 4] = [162u8, 89u8, 77u8, 130u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Address], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + proxy: values + .pop() + .expect(INTERNAL_ERR) + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ethabi::Token::Address(ethabi::Address::from_slice(&self.proxy))], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + } + impl substreams_ethereum::Function for AcceptProxy { + const NAME: &'static str = "acceptProxy"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct AcceptProxyAndCall { + pub proxy: Vec, + pub data: Vec, + } + impl AcceptProxyAndCall { + const METHOD_ID: [u8; 4] = [156u8, 231u8, 171u8, 229u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Address, ethabi::ParamType::Bytes], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + proxy: values + .pop() + .expect(INTERNAL_ERR) + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + data: values + .pop() + .expect(INTERNAL_ERR) + .into_bytes() + .expect(INTERNAL_ERR), + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ + ethabi::Token::Address(ethabi::Address::from_slice(&self.proxy)), + ethabi::Token::Bytes(self.data.clone()), + ], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + } + impl substreams_ethereum::Function for AcceptProxyAndCall { + const NAME: &'static str = "acceptProxyAndCall"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct AddressCache { + pub param0: [u8; 32usize], + } + impl AddressCache { + const METHOD_ID: [u8; 4] = [220u8, 103u8, 90u8, 101u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::FixedBytes(32usize)], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + param0: { + let mut result = [0u8; 32]; + let v = values + .pop() + .expect(INTERNAL_ERR) + .into_fixed_bytes() + .expect(INTERNAL_ERR); + result.copy_from_slice(&v); + result + }, + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ethabi::Token::FixedBytes(self.param0.as_ref().to_vec())], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result, String> { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result, String> { + let mut values = ethabi::decode( + &[ethabi::ParamType::Address], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok( + values + .pop() + .expect("one output data should have existed") + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + ) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option> { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for AddressCache { + const NAME: &'static str = "addressCache"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable> for AddressCache { + fn output(data: &[u8]) -> Result, String> { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct BlockHash { + pub block: substreams::scalar::BigInt, + } + impl BlockHash { + const METHOD_ID: [u8; 4] = [133u8, 223u8, 81u8, 253u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + block: { + let mut v = [0 as u8; 32]; + values + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ + ethabi::Token::Uint( + ethabi::Uint::from_big_endian( + match self.block.clone().to_bytes_be() { + (num_bigint::Sign::Plus, bytes) => bytes, + (num_bigint::Sign::NoSign, bytes) => bytes, + (num_bigint::Sign::Minus, _) => { + panic!("negative numbers are not supported") + } + } + .as_slice(), + ), + ), + ], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result<[u8; 32usize], String> { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result<[u8; 32usize], String> { + let mut values = ethabi::decode( + &[ethabi::ParamType::FixedBytes(32usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut result = [0u8; 32]; + let v = values + .pop() + .expect("one output data should have existed") + .into_fixed_bytes() + .expect(INTERNAL_ERR); + result.copy_from_slice(&v); + result + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option<[u8; 32usize]> { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for BlockHash { + const NAME: &'static str = "blockHash"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable<[u8; 32usize]> for BlockHash { + fn output(data: &[u8]) -> Result<[u8; 32usize], String> { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct BlockNum {} + impl BlockNum { + const METHOD_ID: [u8; 4] = [138u8, 230u8, 61u8, 109u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for BlockNum { + const NAME: &'static str = "blockNum"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for BlockNum { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct Controller {} + impl Controller { + const METHOD_ID: [u8; 4] = [247u8, 124u8, 71u8, 145u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result, String> { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result, String> { + let mut values = ethabi::decode( + &[ethabi::ParamType::Address], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok( + values + .pop() + .expect("one output data should have existed") + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + ) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option> { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for Controller { + const NAME: &'static str = "controller"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable> for Controller { + fn output(data: &[u8]) -> Result, String> { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct CurrentEpoch {} + impl CurrentEpoch { + const METHOD_ID: [u8; 4] = [118u8, 103u8, 24u8, 8u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for CurrentEpoch { + const NAME: &'static str = "currentEpoch"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for CurrentEpoch { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct CurrentEpochBlock {} + impl CurrentEpochBlock { + const METHOD_ID: [u8; 4] = [171u8, 147u8, 18u8, 44u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for CurrentEpochBlock { + const NAME: &'static str = "currentEpochBlock"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for CurrentEpochBlock { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct CurrentEpochBlockSinceStart {} + impl CurrentEpochBlockSinceStart { + const METHOD_ID: [u8; 4] = [208u8, 207u8, 164u8, 110u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for CurrentEpochBlockSinceStart { + const NAME: &'static str = "currentEpochBlockSinceStart"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for CurrentEpochBlockSinceStart { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct EpochLength {} + impl EpochLength { + const METHOD_ID: [u8; 4] = [87u8, 215u8, 117u8, 248u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for EpochLength { + const NAME: &'static str = "epochLength"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for EpochLength { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct EpochsSince { + pub epoch: substreams::scalar::BigInt, + } + impl EpochsSince { + const METHOD_ID: [u8; 4] = [27u8, 40u8, 18u8, 109u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + epoch: { + let mut v = [0 as u8; 32]; + values + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ + ethabi::Token::Uint( + ethabi::Uint::from_big_endian( + match self.epoch.clone().to_bytes_be() { + (num_bigint::Sign::Plus, bytes) => bytes, + (num_bigint::Sign::NoSign, bytes) => bytes, + (num_bigint::Sign::Minus, _) => { + panic!("negative numbers are not supported") + } + } + .as_slice(), + ), + ), + ], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for EpochsSince { + const NAME: &'static str = "epochsSince"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for EpochsSince { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct EpochsSinceUpdate {} + impl EpochsSinceUpdate { + const METHOD_ID: [u8; 4] = [25u8, 195u8, 184u8, 45u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for EpochsSinceUpdate { + const NAME: &'static str = "epochsSinceUpdate"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for EpochsSinceUpdate { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct Initialize { + pub controller: Vec, + pub epoch_length: substreams::scalar::BigInt, + } + impl Initialize { + const METHOD_ID: [u8; 4] = [205u8, 109u8, 198u8, 135u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Address, ethabi::ParamType::Uint(256usize)], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + controller: values + .pop() + .expect(INTERNAL_ERR) + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + epoch_length: { + let mut v = [0 as u8; 32]; + values + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ + ethabi::Token::Address( + ethabi::Address::from_slice(&self.controller), + ), + ethabi::Token::Uint( + ethabi::Uint::from_big_endian( + match self.epoch_length.clone().to_bytes_be() { + (num_bigint::Sign::Plus, bytes) => bytes, + (num_bigint::Sign::NoSign, bytes) => bytes, + (num_bigint::Sign::Minus, _) => { + panic!("negative numbers are not supported") + } + } + .as_slice(), + ), + ), + ], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + } + impl substreams_ethereum::Function for Initialize { + const NAME: &'static str = "initialize"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct IsCurrentEpochRun {} + impl IsCurrentEpochRun { + const METHOD_ID: [u8; 4] = [28u8, 224u8, 93u8, 56u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Bool], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok( + values + .pop() + .expect("one output data should have existed") + .into_bool() + .expect(INTERNAL_ERR), + ) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for IsCurrentEpochRun { + const NAME: &'static str = "isCurrentEpochRun"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable for IsCurrentEpochRun { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct LastLengthUpdateBlock {} + impl LastLengthUpdateBlock { + const METHOD_ID: [u8; 4] = [204u8, 101u8, 20u8, 155u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for LastLengthUpdateBlock { + const NAME: &'static str = "lastLengthUpdateBlock"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for LastLengthUpdateBlock { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct LastLengthUpdateEpoch {} + impl LastLengthUpdateEpoch { + const METHOD_ID: [u8; 4] = [180u8, 20u8, 106u8, 11u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for LastLengthUpdateEpoch { + const NAME: &'static str = "lastLengthUpdateEpoch"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for LastLengthUpdateEpoch { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct LastRunEpoch {} + impl LastRunEpoch { + const METHOD_ID: [u8; 4] = [250u8, 161u8, 162u8, 60u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn output_call( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::output(call.return_data.as_ref()) + } + pub fn output(data: &[u8]) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + data.as_ref(), + ) + .map_err(|e| format!("unable to decode output data: {:?}", e))?; + Ok({ + let mut v = [0 as u8; 32]; + values + .pop() + .expect("one output data should have existed") + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }) + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + pub fn call(&self, address: Vec) -> Option { + use substreams_ethereum::pb::eth::rpc; + let rpc_calls = rpc::RpcCalls { + calls: vec![ + rpc::RpcCall { to_addr : address, data : self.encode(), } + ], + }; + let responses = substreams_ethereum::rpc::eth_call(&rpc_calls).responses; + let response = responses + .get(0) + .expect("one response should have existed"); + if response.failed { + return None; + } + match Self::output(response.raw.as_ref()) { + Ok(data) => Some(data), + Err(err) => { + use substreams_ethereum::Function; + substreams::log::info!( + "Call output for function `{}` failed to decode with error: {}", + Self::NAME, err + ); + None + } + } + } + } + impl substreams_ethereum::Function for LastRunEpoch { + const NAME: &'static str = "lastRunEpoch"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + impl substreams_ethereum::rpc::RPCDecodable + for LastRunEpoch { + fn output(data: &[u8]) -> Result { + Self::output(data) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct RunEpoch {} + impl RunEpoch { + const METHOD_ID: [u8; 4] = [196u8, 110u8, 88u8, 235u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Ok(Self {}) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode(&[]); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + } + impl substreams_ethereum::Function for RunEpoch { + const NAME: &'static str = "runEpoch"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct SetController { + pub controller: Vec, + } + impl SetController { + const METHOD_ID: [u8; 4] = [146u8, 238u8, 254u8, 155u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Address], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + controller: values + .pop() + .expect(INTERNAL_ERR) + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ + ethabi::Token::Address( + ethabi::Address::from_slice(&self.controller), + ), + ], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + } + impl substreams_ethereum::Function for SetController { + const NAME: &'static str = "setController"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct SetEpochLength { + pub epoch_length: substreams::scalar::BigInt, + } + impl SetEpochLength { + const METHOD_ID: [u8; 4] = [84u8, 238u8, 167u8, 150u8]; + pub fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + let maybe_data = call.input.get(4..); + if maybe_data.is_none() { + return Err("no data to decode".to_string()); + } + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + maybe_data.unwrap(), + ) + .map_err(|e| format!("unable to decode call.input: {:?}", e))?; + values.reverse(); + Ok(Self { + epoch_length: { + let mut v = [0 as u8; 32]; + values + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + }) + } + pub fn encode(&self) -> Vec { + let data = ethabi::encode( + &[ + ethabi::Token::Uint( + ethabi::Uint::from_big_endian( + match self.epoch_length.clone().to_bytes_be() { + (num_bigint::Sign::Plus, bytes) => bytes, + (num_bigint::Sign::NoSign, bytes) => bytes, + (num_bigint::Sign::Minus, _) => { + panic!("negative numbers are not supported") + } + } + .as_slice(), + ), + ), + ], + ); + let mut encoded = Vec::with_capacity(4 + data.len()); + encoded.extend(Self::METHOD_ID); + encoded.extend(data); + encoded + } + pub fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + match call.input.get(0..4) { + Some(signature) => Self::METHOD_ID == signature, + None => false, + } + } + } + impl substreams_ethereum::Function for SetEpochLength { + const NAME: &'static str = "setEpochLength"; + fn match_call(call: &substreams_ethereum::pb::eth::v2::Call) -> bool { + Self::match_call(call) + } + fn decode( + call: &substreams_ethereum::pb::eth::v2::Call, + ) -> Result { + Self::decode(call) + } + fn encode(&self) -> Vec { + self.encode() + } + } + } + /// Contract's events. + #[allow(dead_code, unused_imports, unused_variables)] + pub mod events { + use super::INTERNAL_ERR; + #[derive(Debug, Clone, PartialEq)] + pub struct EpochLengthUpdate { + pub epoch: substreams::scalar::BigInt, + pub epoch_length: substreams::scalar::BigInt, + } + impl EpochLengthUpdate { + const TOPIC_ID: [u8; 32] = [ + 37u8, + 221u8, + 214u8, + 240u8, + 0u8, + 56u8, + 213u8, + 234u8, + 192u8, + 5u8, + 29u8, + 248u8, + 60u8, + 96u8, + 132u8, + 241u8, + 64u8, + 160u8, + 21u8, + 134u8, + 240u8, + 146u8, + 226u8, + 114u8, + 141u8, + 30u8, + 215u8, + 129u8, + 201u8, + 206u8, + 36u8, + 65u8, + ]; + pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + if log.topics.len() != 2usize { + return false; + } + if log.data.len() != 32usize { + return false; + } + return log.topics.get(0).expect("bounds already checked").as_ref() + == Self::TOPIC_ID; + } + pub fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + log.data.as_ref(), + ) + .map_err(|e| format!("unable to decode log.data: {:?}", e))?; + values.reverse(); + Ok(Self { + epoch: { + let mut v = [0 as u8; 32]; + ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + log.topics[1usize].as_ref(), + ) + .map_err(|e| { + format!( + "unable to decode param 'epoch' from topic of type 'uint256': {:?}", + e + ) + })? + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + epoch_length: { + let mut v = [0 as u8; 32]; + values + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + }) + } + } + impl substreams_ethereum::Event for EpochLengthUpdate { + const NAME: &'static str = "EpochLengthUpdate"; + fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + Self::match_log(log) + } + fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + Self::decode(log) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct EpochRun { + pub epoch: substreams::scalar::BigInt, + pub caller: Vec, + } + impl EpochRun { + const TOPIC_ID: [u8; 32] = [ + 102u8, + 106u8, + 55u8, + 204u8, + 198u8, + 130u8, + 210u8, + 15u8, + 140u8, + 81u8, + 197u8, + 246u8, + 253u8, + 131u8, + 92u8, + 186u8, + 219u8, + 202u8, + 175u8, + 9u8, + 146u8, + 30u8, + 7u8, + 98u8, + 130u8, + 68u8, + 110u8, + 66u8, + 215u8, + 38u8, + 78u8, + 62u8, + ]; + pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + if log.topics.len() != 2usize { + return false; + } + if log.data.len() != 32usize { + return false; + } + return log.topics.get(0).expect("bounds already checked").as_ref() + == Self::TOPIC_ID; + } + pub fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Address], + log.data.as_ref(), + ) + .map_err(|e| format!("unable to decode log.data: {:?}", e))?; + values.reverse(); + Ok(Self { + epoch: { + let mut v = [0 as u8; 32]; + ethabi::decode( + &[ethabi::ParamType::Uint(256usize)], + log.topics[1usize].as_ref(), + ) + .map_err(|e| { + format!( + "unable to decode param 'epoch' from topic of type 'uint256': {:?}", + e + ) + })? + .pop() + .expect(INTERNAL_ERR) + .into_uint() + .expect(INTERNAL_ERR) + .to_big_endian(v.as_mut_slice()); + substreams::scalar::BigInt::from_unsigned_bytes_be(&v) + }, + caller: values + .pop() + .expect(INTERNAL_ERR) + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + }) + } + } + impl substreams_ethereum::Event for EpochRun { + const NAME: &'static str = "EpochRun"; + fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + Self::match_log(log) + } + fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + Self::decode(log) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct ParameterUpdated { + pub param: String, + } + impl ParameterUpdated { + const TOPIC_ID: [u8; 32] = [ + 150u8, + 213u8, + 164u8, + 180u8, + 237u8, + 241u8, + 206u8, + 253u8, + 9u8, + 0u8, + 193u8, + 102u8, + 214u8, + 68u8, + 71u8, + 248u8, + 218u8, + 29u8, + 1u8, + 209u8, + 134u8, + 26u8, + 106u8, + 96u8, + 137u8, + 75u8, + 91u8, + 130u8, + 162u8, + 193u8, + 92u8, + 60u8, + ]; + pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + if log.topics.len() != 1usize { + return false; + } + if log.data.len() < 64usize { + return false; + } + return log.topics.get(0).expect("bounds already checked").as_ref() + == Self::TOPIC_ID; + } + pub fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::String], + log.data.as_ref(), + ) + .map_err(|e| format!("unable to decode log.data: {:?}", e))?; + values.reverse(); + Ok(Self { + param: values + .pop() + .expect(INTERNAL_ERR) + .into_string() + .expect(INTERNAL_ERR), + }) + } + } + impl substreams_ethereum::Event for ParameterUpdated { + const NAME: &'static str = "ParameterUpdated"; + fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + Self::match_log(log) + } + fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + Self::decode(log) + } + } + #[derive(Debug, Clone, PartialEq)] + pub struct SetController { + pub controller: Vec, + } + impl SetController { + const TOPIC_ID: [u8; 32] = [ + 79u8, + 246u8, + 56u8, + 69u8, + 43u8, + 191u8, + 51u8, + 192u8, + 18u8, + 100u8, + 93u8, + 24u8, + 174u8, + 111u8, + 5u8, + 81u8, + 95u8, + 245u8, + 242u8, + 209u8, + 223u8, + 176u8, + 206u8, + 206u8, + 140u8, + 191u8, + 1u8, + 140u8, + 96u8, + 144u8, + 63u8, + 112u8, + ]; + pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + if log.topics.len() != 1usize { + return false; + } + if log.data.len() != 32usize { + return false; + } + return log.topics.get(0).expect("bounds already checked").as_ref() + == Self::TOPIC_ID; + } + pub fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + let mut values = ethabi::decode( + &[ethabi::ParamType::Address], + log.data.as_ref(), + ) + .map_err(|e| format!("unable to decode log.data: {:?}", e))?; + values.reverse(); + Ok(Self { + controller: values + .pop() + .expect(INTERNAL_ERR) + .into_address() + .expect(INTERNAL_ERR) + .as_bytes() + .to_vec(), + }) + } + } + impl substreams_ethereum::Event for SetController { + const NAME: &'static str = "SetController"; + fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool { + Self::match_log(log) + } + fn decode( + log: &substreams_ethereum::pb::eth::v2::Log, + ) -> Result { + Self::decode(log) + } + } + } \ No newline at end of file diff --git a/src/abi/mod.rs b/src/abi/mod.rs index 4075e73..561d6c0 100644 --- a/src/abi/mod.rs +++ b/src/abi/mod.rs @@ -4,4 +4,5 @@ pub mod rewards_manager; pub mod curation; pub mod gns; pub mod controller; +pub mod epoch_manager; diff --git a/src/db.rs b/src/db.rs index 8ff35c5..51659e0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -508,3 +508,82 @@ pub fn query_fees_change( } } } + +pub fn epoch_change( + epoch_start_deltas: Deltas, + epoch_end_deltas: Deltas, + epoch_signal_deltas: Deltas, + epoch_stake_deltas: Deltas, + query_fee_rebate_deltas: Deltas, + epoch_rewards_deltas: Deltas, + entity_changes: &mut EntityChanges, +) { + for delta in epoch_start_deltas.deltas { + entity_changes + .push_change( + "Epoch", + &delta.key, + delta.ordinal, + Operation::Update, // Update will create the entity if it does not exist + ) + .change("startBlock", delta); + } + for delta in epoch_end_deltas.deltas { + entity_changes + .push_change( + "Epoch", + &delta.key, + delta.ordinal, + Operation::Update, // Update will create the entity if it does not exist + ) + .change("endBlock", delta); + } + for delta in epoch_signal_deltas.deltas { + entity_changes + .push_change( + "Epoch", + &delta.key, + delta.ordinal, + Operation::Update, // Update will create the entity if it does not exist + ) + .change("signalledTokens", delta); + } + for delta in epoch_stake_deltas.deltas { + entity_changes + .push_change( + "Epoch", + &delta.key, + delta.ordinal, + Operation::Update, // Update will create the entity if it does not exist + ) + .change("stakeDeposited", delta); + } + for delta in query_fee_rebate_deltas.deltas { + entity_changes + .push_change( + "Epoch", + &delta.key, + delta.ordinal, + Operation::Update, // Update will create the entity if it does not exist + ) + .change("queryFeeRebates", delta); + } + for delta in epoch_rewards_deltas.deltas { + let name = match delta.key.as_str().split(":").last().unwrap() { + "totalRewards" => "totalRewards", + "totalIndexerRewards" => "totalIndexerRewards", + "totalDelegatorRewards" => "totalDelegatorRewards", + _ => { + continue; + } + }; + entity_changes + .push_change( + "Epoch", + &delta.key.as_str().split(":").nth(0).unwrap(), + delta.ordinal, + Operation::Update, // Update will create the entity if it does not exist + ) + .change(name, delta.new_value.to_string()); + } +} diff --git a/src/modules/curation.rs b/src/modules/curation.rs index 11bb811..d006c46 100644 --- a/src/modules/curation.rs +++ b/src/modules/curation.rs @@ -3,8 +3,8 @@ use crate::utils; use std::ops::Sub; use std::str::FromStr; use substreams::prelude::*; -use substreams::{ Hex}; use substreams::scalar::BigInt; +use substreams::Hex; use substreams::{ store::StoreAddBigInt, store::StoreSetIfNotExists, store::StoreSetIfNotExistsString, }; @@ -55,6 +55,28 @@ fn store_total_signalled(store_changes: StorageChanges, s: StoreAddBigInt) { } } +#[substreams::handlers::store] +fn store_epoch_signal(store_changes: StorageChanges, store: StoreGetBigInt, s: StoreAddBigInt) { + let curation_pools = store_changes.curation_pools.unwrap(); + match store.get_last("epoch") { + Some(epoch_count) => { + if epoch_count > BigInt::zero() { + let current_epoch = epoch_count.sub(1).to_string(); + for curation_pool in curation_pools.curation_pools { + s.add( + curation_pool.ordinal, + ¤t_epoch, + BigInt::from_str(&curation_pool.new_signal) + .unwrap() + .sub(BigInt::from_str(&curation_pool.old_signal).unwrap()), + ); + } + } + } + None => (), + } +} + #[substreams::handlers::store] fn store_signal_amount(events: Events, s: StoreAddBigInt) { let signalled_events = events.signalled_events.unwrap(); @@ -64,16 +86,14 @@ fn store_signal_amount(events: Events, s: StoreAddBigInt) { s.add( 1, Hex(&signalled.subgraph_deployment_id).to_string(), - BigInt::from_str(&signalled.signal) - .unwrap(), + BigInt::from_str(&signalled.signal).unwrap(), ); } for burned in burned_events.burned_events { s.add( 1, Hex(&burned.subgraph_deployment_id).to_string(), - BigInt::from_str(&burned.signal) - .unwrap().neg(), + BigInt::from_str(&burned.signal).unwrap().neg(), ); } } diff --git a/src/modules/epoch_manager.rs b/src/modules/epoch_manager.rs new file mode 100644 index 0000000..164c3d4 --- /dev/null +++ b/src/modules/epoch_manager.rs @@ -0,0 +1,63 @@ +use crate::pb::erc20::*; +use crate::utils; +use std::ops::Sub; +use substreams::pb::substreams::Clock; +use substreams::prelude::*; +use substreams::scalar::BigInt; +use substreams::{store::StoreAddBigInt, store::StoreSetIfNotExists}; +#[substreams::handlers::store] +fn store_epoch_length(events: Events, s: StoreSetString) { + let epoch_length_updated_events = events.epoch_length_updated_events.unwrap(); + + for event in epoch_length_updated_events.epoch_length_updated_events { + s.set( + event.ordinal, + "epoch", + &utils::concat(event.last_length_update_block, event.epoch_length), + ); + } +} + +#[substreams::handlers::store] +fn store_epoch_count(store: StoreGetString, clock: Clock, s: StoreAddBigInt) { + match store.get_last("epoch") { + Some(value) => { + let last_updated_block = value.as_str().split(":").nth(0).unwrap().to_string(); + let epoch_length = value.as_str().split(":").last().unwrap().to_string(); + // right now this is uint64 but it should be changed to BigInt when SF implements it + if (clock.number - last_updated_block.parse::().unwrap()) + % epoch_length.parse::().unwrap() + == 0 + { + s.add(1, "epoch", BigInt::one()) + } + } + None => (), + } +} + +#[substreams::handlers::store] +fn store_epoch_start(store: StoreGetBigInt, clock: Clock, s: StoreSetIfNotExistsBigInt) { + match store.get_last("epoch") { + Some(epoch_count) => { + if epoch_count > BigInt::zero() { + let current_epoch = epoch_count.sub(1).to_string(); + s.set_if_not_exists(1, current_epoch, &clock.number.into()) + } + } + None => (), + } +} + +#[substreams::handlers::store] +fn store_epoch_end(store: StoreGetBigInt, clock: Clock, s: StoreSetIfNotExistsBigInt) { + match store.get_last("epoch") { + Some(epoch_count) => { + if epoch_count > BigInt::one() { + let previous_epoch = epoch_count.clone().sub(2).to_string(); + s.set_if_not_exists(1, previous_epoch, &clock.number.into()); + } + } + None => (), + } +} diff --git a/src/modules/graph_out.rs b/src/modules/graph_out.rs index d2ffa2d..64329f2 100644 --- a/src/modules/graph_out.rs +++ b/src/modules/graph_out.rs @@ -16,6 +16,11 @@ use substreams_entity_change::pb::entity::EntityChanges; pub fn graph_out( clock: Clock, events: Events, + epoch_start_deltas: Deltas, + epoch_end_deltas: Deltas, + epoch_signal_deltas: Deltas, + epoch_stake_deltas: Deltas, + epoch_rewards_deltas: Deltas, grt_global_deltas: Deltas, grt_balance_deltas: Deltas, graph_account_indexer_deltas: Deltas, @@ -101,11 +106,25 @@ pub fn graph_out( ); let mut query_fee_rebate_changes: EntityChanges = Default::default(); - db::query_fee_rebate_change(query_fee_rebate_deltas, &mut query_fee_rebate_changes); + db::query_fee_rebate_change( + query_fee_rebate_deltas.clone(), + &mut query_fee_rebate_changes, + ); let mut query_fee_changes: EntityChanges = Default::default(); db::query_fees_change(query_fees_amount_deltas, &mut query_fee_changes); + let mut epoch_changes: EntityChanges = Default::default(); + db::epoch_change( + epoch_start_deltas, + epoch_end_deltas, + epoch_signal_deltas, + epoch_stake_deltas, + query_fee_rebate_deltas, + epoch_rewards_deltas, + &mut epoch_changes, + ); + Ok(EntityChanges { entity_changes: [ graph_network_entity_changes.entity_changes, @@ -117,6 +136,7 @@ pub fn graph_out( allocation_entity_changes.entity_changes, query_fee_rebate_changes.entity_changes, query_fee_changes.entity_changes, + epoch_changes.entity_changes, ] .concat(), }) diff --git a/src/modules/init_maps.rs b/src/modules/init_maps.rs index d0f726d..62e47f8 100644 --- a/src/modules/init_maps.rs +++ b/src/modules/init_maps.rs @@ -15,6 +15,7 @@ const REWARDS_MANAGER_CONTRACT: [u8; 20] = hex!("9Ac758AB77733b4150A901ebd659cbF const GNS_CONTRACT: [u8; 20] = hex!("aDcA0dd4729c8BA3aCf3E99F3A9f471EF37b6825"); const CURATION_CONTRACT: [u8; 20] = hex!("8FE00a685Bcb3B2cc296ff6FfEaB10acA4CE1538"); const CONTROLLER_CONTRACT: [u8; 20] = hex!("24ccd4d3ac8529ff08c58f74ff6755036e616117"); +const EPOCH_MANAGER_CONTRACT: [u8; 20] = hex!("64f990bf16552a693dcb043bb7bf3866c5e05ddb"); // -------------------- INITIAL MAPS -------------------- #[substreams::handlers::map] @@ -328,6 +329,7 @@ fn map_events(blk: eth::Block) -> Result { let mut allocation_collected_events = vec![]; let mut pause_changed_events = vec![]; let mut partial_pause_changed_events = vec![]; + let mut epoch_length_updated_events = vec![]; // Potentially consider adding log.index() to the IDs, to have them be truly unique in // transactions with potentially more than 1 of these messages @@ -337,7 +339,8 @@ fn map_events(blk: eth::Block) -> Result { || &Hex(&REWARDS_MANAGER_CONTRACT).to_string() == &Hex(&log.address()).to_string() || &Hex(&GNS_CONTRACT).to_string() == &Hex(&log.address()).to_string() || &Hex(&CURATION_CONTRACT).to_string() == &Hex(&log.address()).to_string() - || &Hex(&CONTROLLER_CONTRACT).to_string() == &Hex(&log.address()).to_string()) + || &Hex(&CONTROLLER_CONTRACT).to_string() == &Hex(&log.address()).to_string() + || &Hex(&EPOCH_MANAGER_CONTRACT).to_string() == &Hex(&log.address()).to_string()) { continue; } @@ -494,6 +497,16 @@ fn map_events(blk: eth::Block) -> Result { is_paused: event.is_paused, ordinal: log.ordinal() as u64, }) + } else if let Some(event) = + abi::epoch_manager::events::EpochLengthUpdate::match_and_decode(log) + { + epoch_length_updated_events.push(EpochLengthUpdated{ + id: Hex(&log.receipt.transaction.hash).to_string(), + last_length_update_block: blk.number.to_string(), + epoch_length: event.epoch_length.to_string(), + ordinal: log.ordinal() as u64, + }) + } } @@ -546,6 +559,9 @@ fn map_events(blk: eth::Block) -> Result { events.partial_pause_changed_events = Some(PartialPauseChangedEvents { partial_paused_changed_events: partial_pause_changed_events, }); + events.epoch_length_updated_events = Some(EpochLengthUpdatedEvents { + epoch_length_updated_events: epoch_length_updated_events, + }); Ok(events) } diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 5000562..c5808fb 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -1,9 +1,6 @@ mod init_maps; - mod graph_token; - mod staking; - mod curation; - -mod graph_out; \ No newline at end of file +mod epoch_manager; +mod graph_out; diff --git a/src/modules/staking.rs b/src/modules/staking.rs index 17c01a7..bbab201 100644 --- a/src/modules/staking.rs +++ b/src/modules/staking.rs @@ -5,13 +5,12 @@ use std::str::FromStr; use substreams::errors::Error; use substreams::prelude::*; use substreams::scalar::BigInt; -use substreams::{ Hex}; +use substreams::Hex; use substreams::{ store::StoreAddBigInt, store::StoreSetIfNotExists, store::StoreSetIfNotExistsString, store::StoreSetProto, }; -// DelegatedStake and Delegator entities track the cumulative delegated stake, not the total amount #[substreams::handlers::store] fn store_staked_tokens(storage_changes: StorageChanges, s: StoreAddBigInt) { let indexer_stakes = storage_changes.indexer_stakes.unwrap(); @@ -27,6 +26,26 @@ fn store_staked_tokens(storage_changes: StorageChanges, s: StoreAddBigInt) { } } +#[substreams::handlers::store] +fn store_epoch_stake(events: Events, store: StoreGetBigInt, s: StoreAddBigInt) { + let stake_deposited_events = events.stake_deposited_events.unwrap(); + match store.get_last("epoch") { + Some(epoch_count) => { + if epoch_count > BigInt::zero() { + let current_epoch = epoch_count.sub(1).to_string(); + for stake_deposited in stake_deposited_events.stake_deposited_events { + s.add( + stake_deposited.ordinal, + ¤t_epoch, + BigInt::from_str(&stake_deposited.tokens).unwrap(), + ); + } + } + } + None => (), + } +} + // DelegatedStake and Delegator entities track the cumulative delegated stake, not the total amount #[substreams::handlers::store] fn store_cumulative_delegated_stakes(events: Events, s: StoreAddBigInt) { @@ -129,22 +148,46 @@ fn store_subgraph_deployment_id(events: Events, s: StoreSetString) { } } #[substreams::handlers::store] -fn store_query_fee_rebates(events: Events, s: StoreAddBigInt) { +fn store_query_fee_rebates(events: Events, store: StoreGetBigInt, s: StoreAddBigInt) { let rebate_claimed_events = events.rebate_claimed_events.unwrap(); + match store.get_last("epoch") { + Some(epoch_count) => { + if epoch_count > BigInt::zero() { + let current_epoch = epoch_count.sub(1).to_string(); + for rebate_claimed in rebate_claimed_events.rebate_claimed_events.clone() { + s.add( + rebate_claimed.ordinal, + ¤t_epoch, + BigInt::from_str(&rebate_claimed.tokens).unwrap(), + ); + } + } + } + None => (), + } for rebate_claimed in rebate_claimed_events.rebate_claimed_events { s.add( rebate_claimed.ordinal, - utils::generate_key_query_fee_rebates("SubgraphDeployment".to_string(),&rebate_claimed.subgraph_deployment_id).to_string(), + utils::generate_key_query_fee_rebates( + "SubgraphDeployment".to_string(), + &rebate_claimed.subgraph_deployment_id, + ) + .to_string(), BigInt::from_str(&rebate_claimed.tokens).unwrap(), ); s.add( rebate_claimed.ordinal, - utils::generate_key_query_fee_rebates("Allocation".to_string(),&rebate_claimed.allocation_id).to_string(), + utils::generate_key_query_fee_rebates( + "Allocation".to_string(), + &rebate_claimed.allocation_id, + ) + .to_string(), BigInt::from_str(&rebate_claimed.tokens).unwrap(), ); } } + #[substreams::handlers::store] fn store_query_fees_amount(events: Events, s: StoreAddBigInt) { let allocation_collected_events = events.allocation_collected_events.unwrap(); @@ -152,12 +195,20 @@ fn store_query_fees_amount(events: Events, s: StoreAddBigInt) { for allocation_collected in allocation_collected_events.allocation_collected_events { s.add( allocation_collected.ordinal, - utils::generate_key_query_fee_rebates("SubgraphDeployment".to_string(),&allocation_collected.subgraph_deployment_id).to_string(), + utils::generate_key_query_fee_rebates( + "SubgraphDeployment".to_string(), + &allocation_collected.subgraph_deployment_id, + ) + .to_string(), BigInt::from_str(&allocation_collected.rebate_fees).unwrap(), ); s.add( allocation_collected.ordinal, - utils::generate_key_query_fee_rebates("Allocation".to_string(),&allocation_collected.allocation_id).to_string(), + utils::generate_key_query_fee_rebates( + "Allocation".to_string(), + &allocation_collected.allocation_id, + ) + .to_string(), BigInt::from_str(&allocation_collected.rebate_fees).unwrap(), ); } @@ -191,21 +242,71 @@ fn store_subgraph_deployment_rewards(indexing_rewards: IndexingRewards, s: Store for indexing_rewards in indexing_rewards.indexing_rewards { s.add( indexing_rewards.ordinal, - utils::generate_key_indexing_rewards(indexing_rewards.subgraph_deployment_id.clone(), "indexingRewardAmount".to_string()), + utils::generate_key_indexing_rewards( + indexing_rewards.subgraph_deployment_id.clone(), + "indexingRewardAmount".to_string(), + ), BigInt::from_str(&indexing_rewards.amount).unwrap(), ); s.add( indexing_rewards.ordinal, - utils::generate_key_indexing_rewards(indexing_rewards.subgraph_deployment_id.clone(), "indexingIndexerRewardAmount".to_string()), + utils::generate_key_indexing_rewards( + indexing_rewards.subgraph_deployment_id.clone(), + "indexingIndexerRewardAmount".to_string(), + ), BigInt::from_str(&indexing_rewards.indexer_rewards).unwrap(), ); s.add( indexing_rewards.ordinal, - utils::generate_key_indexing_rewards(indexing_rewards.subgraph_deployment_id, "indexingDelegatorRewardAmount".to_string()), + utils::generate_key_indexing_rewards( + indexing_rewards.subgraph_deployment_id, + "indexingDelegatorRewardAmount".to_string(), + ), BigInt::from_str(&indexing_rewards.delegator_rewards).unwrap(), ); } } +#[substreams::handlers::store] +fn store_epoch_rewards( + indexing_rewards: IndexingRewards, + store: StoreGetBigInt, + s: StoreAddBigInt, +) { + match store.get_last("epoch") { + Some(epoch_count) => { + if epoch_count > BigInt::zero() { + let current_epoch = epoch_count.sub(1).to_string(); + for indexing_rewards in indexing_rewards.indexing_rewards { + s.add( + indexing_rewards.ordinal, + utils::concat( + current_epoch.clone().to_string(), + "totalRewards".to_string(), + ), + BigInt::from_str(&indexing_rewards.amount).unwrap(), + ); + s.add( + indexing_rewards.ordinal, + utils::concat( + current_epoch.clone().to_string(), + "totalIndexerRewards".to_string(), + ), + BigInt::from_str(&indexing_rewards.indexer_rewards).unwrap(), + ); + s.add( + indexing_rewards.ordinal, + utils::concat( + current_epoch.clone().to_string(), + "totalDelegatorRewards".to_string(), + ), + BigInt::from_str(&indexing_rewards.delegator_rewards).unwrap(), + ); + } + } + } + None => (), + } +} #[substreams::handlers::map] fn map_indexing_rewards( @@ -239,7 +340,8 @@ fn map_indexing_rewards( let target_id = rewards_assigned.clone().id; for indexing_reward in &mut indexing_rewards_vec { if indexing_reward.id == target_id { - let delegator_rewards = BigInt::from_str(&delegator_reward.rewards).unwrap(); + let delegator_rewards = + BigInt::from_str(&delegator_reward.rewards).unwrap(); let indexer_rewards = BigInt::from_str(&rewards_assigned.clone().amount) .unwrap() .sub(delegator_rewards.clone()); diff --git a/src/pb/eth.erc20.v1.rs b/src/pb/eth.erc20.v1.rs index 5395db0..7b0db3b 100644 --- a/src/pb/eth.erc20.v1.rs +++ b/src/pb/eth.erc20.v1.rs @@ -97,6 +97,12 @@ pub struct PartialPauseChangedEvents { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct EpochLengthUpdatedEvents { + #[prost(message, repeated, tag="1")] + pub epoch_length_updated_events: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct IndexerStakes { #[prost(message, repeated, tag="1")] pub indexer_stakes: ::prost::alloc::vec::Vec, @@ -178,6 +184,8 @@ pub struct Events { pub pause_changed_events: ::core::option::Option, #[prost(message, optional, tag="15")] pub partial_pause_changed_events: ::core::option::Option, + #[prost(message, optional, tag="16")] + pub epoch_length_updated_events: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -445,6 +453,18 @@ pub struct PartialPauseChanged { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct EpochLengthUpdated { + #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub last_length_update_block: ::prost::alloc::string::String, + #[prost(string, tag="3")] + pub epoch_length: ::prost::alloc::string::String, + #[prost(uint64, tag="4")] + pub ordinal: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct IndexerStake { #[prost(string, tag="1")] pub id: ::prost::alloc::string::String, diff --git a/src/utils.rs b/src/utils.rs index 71cdaab..4565907 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -76,3 +76,11 @@ pub fn generate_key_indexing_rewards(who: String, id: String) -> String { id, ); } + +pub fn concat(first: String, second: String) -> String { + return format!( + "{}:{}", + first, + second, + ); +} diff --git a/substreams.yaml b/substreams.yaml index d74b108..5aab730 100644 --- a/substreams.yaml +++ b/substreams.yaml @@ -163,6 +163,7 @@ modules: valueType: bigint inputs: - map: map_events + - store: store_epoch_count - name: store_query_fees_amount kind: store @@ -204,12 +205,84 @@ modules: inputs: - map: map_indexing_rewards + - name: store_epoch_length + kind: store + initialBlock: 11446768 + updatePolicy: set + valueType: string + inputs: + - map: map_events + + - name: store_epoch_count + kind: store + initialBlock: 11446768 + updatePolicy: add + valueType: bigint + inputs: + - store: store_epoch_length + - source: sf.substreams.v1.Clock + + - name: store_epoch_start + kind: store + initialBlock: 11446768 + updatePolicy: set_if_not_exists + valueType: bigint + inputs: + - store: store_epoch_count + - source: sf.substreams.v1.Clock + + - name: store_epoch_end + kind: store + initialBlock: 11446768 + updatePolicy: set_if_not_exists + valueType: bigint + inputs: + - store: store_epoch_count + - source: sf.substreams.v1.Clock + + - name: store_epoch_signal + kind: store + initialBlock: 11446769 + updatePolicy: add + valueType: bigint + inputs: + - map: map_storage_changes + - store: store_epoch_count + + - name: store_epoch_stake + kind: store + initialBlock: 11446769 + updatePolicy: add + valueType: bigint + inputs: + - map: map_events + - store: store_epoch_count + + - name: store_epoch_rewards + kind: store + initialBlock: 11446769 + updatePolicy: add + valueType: bigint + inputs: + - map: map_indexing_rewards + - store: store_epoch_count + - name: graph_out kind: map initialBlock: 11446764 inputs: - source: sf.substreams.v1.Clock - map: map_events + - store: store_epoch_start + mode: deltas + - store: store_epoch_end + mode: deltas + - store: store_epoch_signal + mode: deltas + - store: store_epoch_stake + mode: deltas + - store: store_epoch_rewards + mode: deltas - store: store_grt_global mode: deltas - store: store_grt_balances