From a448d1cb52e16ec98909e5db6d17e393cfb22762 Mon Sep 17 00:00:00 2001 From: goldenfiredo Date: Wed, 17 Mar 2021 09:44:33 +0800 Subject: [PATCH 1/6] fix bug --- src/main.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index fd53cf3..858c6a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,6 +54,7 @@ type PrClient = pruntime_client::PRuntimeClient; const DIEM_CONTRACT_ID: u32 = 5; const INTERVAL: u64 = 1_000 * 60 * 3; +const RECEIVING_EVENTS_LIMIT: u64 = 100; use crate::error::Error; use crate::types::{CommandReqData}; @@ -283,18 +284,16 @@ impl DiemBridge { let command_value = serde_json::to_value(&CommandReqData::AccountData { account_data_b64 })?; let _ = self.push_command(command_value.to_string(), &client, signer).await; - if account_info.sequence_number > 0 { - // Sync receiving transactions - let _ = self.sync_receiving_transactions( - &pr, - account_view.received_events_key.0.clone().to_string(), - account_view.sequence_number.clone(), - account_address.clone(), - &mut state_initiated, - &client, - signer, - ).await?; - } + // Sync receiving transactions + let _ = self.sync_receiving_transactions( + &pr, + account_view.received_events_key.0.clone().to_string(), + RECEIVING_EVENTS_LIMIT, + account_address.clone(), + &mut state_initiated, + &client, + signer, + ).await?; // Sync sending transactions let _ = self.sync_sent_transactions(&pr, account_address, &mut state_initiated, &client, signer).await?; @@ -309,14 +308,14 @@ impl DiemBridge { &mut self, pr: &PrClient, received_events_key: String, - sequence_number: u64, + limit: u64, account_address: String, state_initiated: &mut bool, client: &XtClient, signer: &mut SrSigner, ) -> Result<(), Error> { let mut batch = JsonRpcBatch::new(); - batch.add_get_events_request(received_events_key.to_string(), 0, sequence_number); + batch.add_get_events_request(received_events_key.to_string(), 0, limit); let resp = self.request_rpc(batch).map_err(|_| Error::FailedToGetReceivingTransactions)?; let received_events = EventView::vec_from_response(resp).unwrap(); From f058bd455b6108f88cb0fe7aa1134cd41c7b8612 Mon Sep 17 00:00:00 2001 From: goldenfiredo Date: Thu, 18 Mar 2021 13:26:25 +0800 Subject: [PATCH 2/6] submit signed transaction to diem --- src/error.rs | 1 + src/main.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++----- src/types.rs | 11 +++++++++-- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/error.rs b/src/error.rs index cd1ed4e..3466268 100644 --- a/src/error.rs +++ b/src/error.rs @@ -14,6 +14,7 @@ pub enum Error { FailedToCallPushCommand, FailedToGetReceivingTransactions, FailedToGetSentTransactions, + FailedToSubmitTransaction, } impl From for Error { diff --git a/src/main.rs b/src/main.rs index 858c6a3..91d34a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use diem_types::{ }, chain_id::ChainId, ledger_info::LedgerInfoWithSignatures, - transaction::TransactionInfo, + transaction::{TransactionInfo, SignedTransaction}, epoch_change::EpochChangeProof, proof::{ AccountStateProof, @@ -42,7 +42,7 @@ mod error; mod runtimes; use std::cmp; -use crate::types::{Runtime, Payload}; +use crate::types::{Runtime, Payload, QueryReqData, QueryRespData, TransactionData}; use subxt::Signer; use subxt::system::AccountStoreExt; use core::marker::PhantomData; @@ -53,13 +53,14 @@ type XtClient = subxt::Client; type PrClient = pruntime_client::PRuntimeClient; const DIEM_CONTRACT_ID: u32 = 5; -const INTERVAL: u64 = 1_000 * 60 * 3; +const INTERVAL: u64 = 1_000 * 60 * 1; const RECEIVING_EVENTS_LIMIT: u64 = 100; use crate::error::Error; use crate::types::{CommandReqData}; use serde::{Serialize, Deserialize}; +use codec::Decode; #[derive(Debug, StructOpt)] #[structopt(name = "pDiem")] @@ -138,7 +139,7 @@ impl DiemBridge { pub fn new(url: &str) -> Result { let rpc_client = JsonRpcClient::new(Url::parse(url).unwrap()).unwrap(); Ok(DiemBridge { - chain_id: ChainId::new(2), + chain_id: ChainId::new(2), //TESTNET: 2, TESTING:4 rpc_client, sent_events_key: None, received_events_key: None, @@ -554,6 +555,46 @@ impl DiemBridge { Err(Error::FailedToGetResponse) } } + + async fn submit_signed_transaction( + &mut self, + pr: &PrClient, + start_seq: &mut u64, + ) -> Result<(), Error> { + + let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::GetSignedTransactions { start: *start_seq}).await?; + println!("query signed transaction resp:{:?}", resp); + if let QueryRespData::GetSignedTransactions { queue_b64 } = resp { + let data = base64::decode(&queue_b64).unwrap(); + let transaction_data: Vec = Decode::decode(&mut &data[..]).unwrap(); + for td in transaction_data.clone() { + println!("transaction data:{:?}", td); + let signed_tx: SignedTransaction = bcs::from_bytes(&td.signed_tx).unwrap(); + println!("signed transaction:{:?}", signed_tx); + let mut batch = JsonRpcBatch::new(); + batch.add_submit_request(signed_tx); + //let _resp = self.request_rpc(batch).map_err(|_| Error::FailedToSubmitTransaction)?; + match self.request_rpc(batch) { + Ok(_) => { + println!("submit transaction for {:?}", hex::decode(&td.address)); + + if td.sequence > *start_seq { + *start_seq = td.sequence + } + } + Err(_) => { + println!("request rpc error"); + } + } + + } + if transaction_data.len() > 0 { + *start_seq = *start_seq + 1; + } + } + + Ok(()) + } } async fn bridge(args: Args) -> Result<(), Error> { @@ -575,8 +616,12 @@ async fn bridge(args: Args) -> Result<(), Error> { //hard code Alice account let addr: String = "0xd4f0c053205ba934bb2ac0c4e8479e77".to_string(); + let mut start_seq = 1; + loop { - let _= diem.sync_account(&pr, addr.clone(), &client, &mut signer).await; + let _ = diem.sync_account(&pr, addr.clone(), &client, &mut signer).await; + + let _ = diem.submit_signed_transaction(&pr, &mut start_seq).await; println!("Waiting for next loop"); tokio::time::delay_for(std::time::Duration::from_millis(INTERVAL)).await; diff --git a/src/types.rs b/src/types.rs index 8d5105a..53dc11d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,5 @@ use serde::{Serialize, Deserialize, de::DeserializeOwned}; - +use codec::{Encode, Decode}; // Node Runtime use crate::runtimes::PhalaNodeRuntime; pub type Runtime = PhalaNodeRuntime; @@ -66,10 +66,17 @@ pub enum CommandReqData { #[derive(Serialize, Deserialize, Debug)] pub enum QueryReqData { - + GetSignedTransactions { start: u64 } } #[derive(Serialize, Deserialize, Debug)] pub enum QueryRespData { + GetSignedTransactions { queue_b64: String } +} +#[derive(Serialize, Deserialize, Debug, Clone, Encode, Decode)] +pub struct TransactionData { + pub sequence: u64, + pub address: Vec, + pub signed_tx: Vec, } \ No newline at end of file From affd065af7c0dcc6b034b7929d995d808bad3a37 Mon Sep 17 00:00:00 2001 From: goldenfiredo Date: Thu, 25 Mar 2021 21:20:25 +0800 Subject: [PATCH 3/6] support to sync multiple accounts --- src/main.rs | 100 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 42 deletions(-) diff --git a/src/main.rs b/src/main.rs index 91d34a1..9cd49c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use structopt::StructOpt; +use std::collections::BTreeMap; use diem_client::{ AccountData, @@ -95,13 +96,17 @@ pub struct DiemBridge { trusted_state: Option, latest_epoch_change_li: Option, latest_li: Option, - sent_events_key: Option, - received_events_key:Option, - sent_events: Option>, - received_events: Option>, - transactions: Option>, - account: Option, - balances: Option>, + //sent_events_key: Option, + //received_events_key:Option, + //sent_events: Option>, + //received_events: Option>, + //transactions: Option>, + //account: Option, + received_events: BTreeMap>, + transactions: BTreeMap>, + account: BTreeMap, + //balances: Option>, + address: Vec, } #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] @@ -141,17 +146,18 @@ impl DiemBridge { Ok(DiemBridge { chain_id: ChainId::new(2), //TESTNET: 2, TESTING:4 rpc_client, - sent_events_key: None, - received_events_key: None, + //sent_events_key: None, + //received_events_key: None, epoch_change_proof: None, trusted_state: None, latest_epoch_change_li: None, latest_li: None, - sent_events: None, - received_events: None, - transactions:None, - account: None, - balances: None, + //sent_events: None, + received_events: BTreeMap::>::new(), + transactions: BTreeMap::>::new(), + account: BTreeMap::::new(), + //balances: None, + address: Vec::new(), }) } @@ -257,28 +263,29 @@ impl DiemBridge { let resp = self.request_rpc(batch).map_err(|_| Error::FailedToGetResponse)?; if let Some(account_view) = AccountView::optional_from_response(resp).unwrap() { - self.account = Some(AccountData { + self.account.insert(account_address.clone(), AccountData { address, authentication_key: account_view.authentication_key.into_bytes().ok(), key_pair: None, sequence_number: account_view.sequence_number, status: AccountStatus::Persisted, }); - self.sent_events_key = Some(account_view.sent_events_key.clone()); - self.received_events_key = Some(account_view.received_events_key.clone()); - self.balances = Some(account_view.balances.clone()); - let balances: Vec = self.balances.as_ref().unwrap() + let sent_events_key = account_view.sent_events_key; + let received_events_key = account_view.received_events_key.clone(); + let balances = Some(account_view.balances.clone()); + + let amounts: Vec = balances.as_ref().unwrap() .iter() .map(|b| Amount{ amount: b.amount, currency: b.currency.clone() }).collect(); - + let account = self.account.get(&account_address).unwrap(); let account_info = AccountInfo { - address: self.account.as_ref().unwrap().address, - authentication_key: self.account.as_ref().unwrap().authentication_key.clone(), - sequence_number: self.account.as_ref().unwrap().sequence_number, - sent_events_key: self.sent_events_key.clone().unwrap().0, - received_events_key: self.received_events_key.clone().unwrap().0, - balances, + address: account.address, + authentication_key: account.authentication_key.clone(), + sequence_number: account.sequence_number, + sent_events_key: sent_events_key.0, + received_events_key: received_events_key.0, + balances: amounts, }; let account_data_b64 = base64::encode(&bcs::to_bytes(&account_info).unwrap()); @@ -322,8 +329,8 @@ impl DiemBridge { let received_events = EventView::vec_from_response(resp).unwrap(); let mut new_events: Vec = Vec::new(); for event in received_events.clone() { - let exist = self.received_events.as_ref().is_some() - && self.received_events.as_ref().unwrap().iter().any(|x| x.transaction_version == event.transaction_version); + let exist = self.received_events.get(&account_address).is_some() + && self.received_events.get(&account_address).unwrap().iter().any(|x| x.transaction_version == event.transaction_version); if !exist { println!("new received event!"); new_events.push(event); @@ -350,7 +357,7 @@ impl DiemBridge { } } - self.received_events = Some(received_events); + self.received_events.insert(account_address, received_events); Ok(()) } @@ -365,17 +372,17 @@ impl DiemBridge { ) -> Result<(), Error> { let mut batch = JsonRpcBatch::new(); batch.add_get_account_transactions_request( - self.account.as_ref().unwrap().address.clone(), + self.account.get(&account_address).unwrap().address.clone(), 0, - self.account.as_ref().unwrap().sequence_number.clone(), + self.account.get(&account_address).unwrap().sequence_number.clone(), false ); let resp = self.request_rpc(batch).map_err(|_| Error::FailedToGetSentTransactions)?; let mut need_sync_transactions: Vec = Vec::new(); let transactions = TransactionView::vec_from_response(resp).unwrap(); for transaction in transactions.clone() { - let exist = self.transactions.as_ref().is_some() - && self.transactions.as_ref().unwrap().iter().any(|x| x.version == transaction.version); + let exist = self.transactions.get(&account_address).is_some() + && self.transactions.get(&account_address).unwrap().iter().any(|x| x.version == transaction.version); if !exist { println!("new transaction!"); match transaction.transaction { @@ -402,7 +409,7 @@ impl DiemBridge { ).await?; } - self.transactions = Some(transactions); + self.transactions.insert(account_address, transactions); Ok(()) } @@ -415,7 +422,7 @@ impl DiemBridge { client: &XtClient, signer: &mut SrSigner, ) -> Result<(), Error> { - if let Ok(transaction_with_proof) = self.get_transaction_proof(&transaction) { + if let Ok(transaction_with_proof) = self.get_transaction_proof(account_address.clone(), &transaction) { println!("transaction_with_proof:{:?}", transaction_with_proof); let transaction_with_proof_b64 = base64::encode(&bcs::to_bytes(&transaction_with_proof).unwrap()); @@ -463,10 +470,11 @@ impl DiemBridge { fn get_transaction_proof( &mut self, + account_address: String, transaction: &TransactionView, ) -> Result { let mut batch = JsonRpcBatch::new(); - let account = self.account.as_ref().unwrap().address.clone(); + let account = self.account.get(&account_address).unwrap().address.clone(); batch.add_get_account_state_with_proof_request( account, Some(transaction.version), @@ -499,7 +507,7 @@ impl DiemBridge { let _ = account_transaction_state_proof.verify( self.latest_li.as_ref().unwrap().ledger_info(), transaction.version, - self.account.as_ref().unwrap().address.hash(), + self.account.get(&account_address).unwrap().address.hash(), Some(&account_state_blob), ); println!("Transaction was verified"); @@ -561,7 +569,6 @@ impl DiemBridge { pr: &PrClient, start_seq: &mut u64, ) -> Result<(), Error> { - let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::GetSignedTransactions { start: *start_seq}).await?; println!("query signed transaction resp:{:?}", resp); if let QueryRespData::GetSignedTransactions { queue_b64 } = resp { @@ -573,10 +580,14 @@ impl DiemBridge { println!("signed transaction:{:?}", signed_tx); let mut batch = JsonRpcBatch::new(); batch.add_submit_request(signed_tx); - //let _resp = self.request_rpc(batch).map_err(|_| Error::FailedToSubmitTransaction)?; match self.request_rpc(batch) { Ok(_) => { - println!("submit transaction for {:?}", hex::decode(&td.address)); + let receiver_address = "0x".to_string() + &hex::encode_upper(td.address.clone()); + println!("submit transaction for {:?}", receiver_address); + + if !self.address.contains(&receiver_address) { + self.address.push(receiver_address); + } if td.sequence > *start_seq { *start_seq = td.sequence @@ -614,12 +625,17 @@ async fn bridge(args: Args) -> Result<(), Error> { diem.init_state(Some(&pr), &client, &mut signer).await?; //hard code Alice account - let addr: String = "0xd4f0c053205ba934bb2ac0c4e8479e77".to_string(); + let alice_addr: String = "0xD4F0C053205BA934BB2AC0C4E8479E77".to_string(); + diem.address.push(alice_addr); let mut start_seq = 1; loop { - let _ = diem.sync_account(&pr, addr.clone(), &client, &mut signer).await; + let address = diem.address.clone(); + for addr in address { + println!("sync account: {:}", addr); + let _ = diem.sync_account(&pr, addr.clone(), &client, &mut signer).await; + } let _ = diem.submit_signed_transaction(&pr, &mut start_seq).await; From 315ae348c8802d34a250bbe3d3a6ae4c30be4e03 Mon Sep 17 00:00:00 2001 From: goldenfiredo Date: Sat, 3 Apr 2021 19:20:30 +0800 Subject: [PATCH 4/6] 1. read pdiem state when launch; 2. sync trusted state in loop; 3. refactor code --- src/main.rs | 100 ++++++++++++++++++++++-------------------------- src/runtimes.rs | 12 +++--- src/types.rs | 16 ++++++-- 3 files changed, 65 insertions(+), 63 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9cd49c2..01db729 100644 --- a/src/main.rs +++ b/src/main.rs @@ -208,7 +208,8 @@ impl DiemBridge { &mut self, pr: Option<&PrClient>, client: &XtClient, - signer: &mut SrSigner + signer: &mut SrSigner, + initialized: bool, ) -> Result<(), Error> { let mut batch = JsonRpcBatch::new(); batch.add_get_state_proof_request(0); @@ -230,15 +231,23 @@ impl DiemBridge { self.epoch_change_proof = Some(epoch_change_proof.clone()); // Update Latest version state - let _ = self.verify_state_proof(ledger_info_with_signatures, epoch_change_proof); + let _ = self.verify_state_proof(ledger_info_with_signatures.clone(), epoch_change_proof.clone()); println!("trusted_state: {:#?}", self.trusted_state); println!("ledger_info_with_signatures: {:#?}", self.latest_li); if pr.is_some() { - let trusted_state_b64 = base64::encode(&bcs::to_bytes(&zero_ledger_info_with_sigs).unwrap()); + if initialized { + let trusted_state_b64 = base64::encode(&bcs::to_bytes(&zero_ledger_info_with_sigs).unwrap()); + + let command_value = serde_json::to_value(&CommandReqData::SetTrustedState { trusted_state_b64 })?; + let _ = self.push_command(command_value.to_string(), &client, signer).await; + } else { + let ledger_info_with_signatures_b64 = base64::encode(&bcs::to_bytes(&ledger_info_with_signatures).unwrap()); + let epoch_change_proof_b64 = base64::encode(&bcs::to_bytes(&epoch_change_proof).unwrap()); - let command_value = serde_json::to_value(&CommandReqData::SetTrustedState { trusted_state_b64 })?; - let _ = self.push_command(command_value.to_string(), &client, signer).await; + let command_value = serde_json::to_value(&CommandReqData::VerifyEpochProof { ledger_info_with_signatures_b64, epoch_change_proof_b64 })?; + let _ = self.push_command(command_value.to_string(), &client, signer).await; + } } Ok(()) @@ -250,12 +259,10 @@ impl DiemBridge { async fn sync_account( &mut self, - pr: &PrClient, account_address: String, client: &XtClient, signer: &mut SrSigner, ) -> Result<(), Error> { - let mut state_initiated = false; // Init account information let mut batch = JsonRpcBatch::new(); let address = AccountAddress::from_hex_literal(&account_address).unwrap(); @@ -271,7 +278,7 @@ impl DiemBridge { status: AccountStatus::Persisted, }); - let sent_events_key = account_view.sent_events_key; + let sent_events_key = account_view.sent_events_key.clone(); let received_events_key = account_view.received_events_key.clone(); let balances = Some(account_view.balances.clone()); @@ -294,17 +301,15 @@ impl DiemBridge { // Sync receiving transactions let _ = self.sync_receiving_transactions( - &pr, account_view.received_events_key.0.clone().to_string(), RECEIVING_EVENTS_LIMIT, account_address.clone(), - &mut state_initiated, &client, signer, ).await?; // Sync sending transactions - let _ = self.sync_sent_transactions(&pr, account_address, &mut state_initiated, &client, signer).await?; + let _ = self.sync_sent_transactions(account_address, &client, signer).await?; } else { println!("get account view error"); } @@ -314,11 +319,9 @@ impl DiemBridge { async fn sync_receiving_transactions( &mut self, - pr: &PrClient, received_events_key: String, limit: u64, account_address: String, - state_initiated: &mut bool, client: &XtClient, signer: &mut SrSigner, ) -> Result<(), Error> { @@ -337,20 +340,11 @@ impl DiemBridge { } } - if new_events.len() > 0 && *state_initiated == false { - if let Err(_) = self.init_state(None, &client, signer).await { - return Err(Error::FailedToInitState); - } - - *state_initiated = true; - } - for event in new_events { if let Ok(transaction) = self.get_transaction_by_version(event.transaction_version) { println!("received transaction:{:?}", transaction); let _ = self.sync_transaction_with_proof( - &transaction, &pr, - account_address.clone(), &client, signer + &transaction, account_address.clone(), &client, signer ).await?; } else { println!("get_transaction_by_version error"); @@ -364,20 +358,20 @@ impl DiemBridge { async fn sync_sent_transactions( &mut self, - pr: &PrClient, account_address: String, - state_initiated: &mut bool, client: &XtClient, signer: &mut SrSigner, ) -> Result<(), Error> { + println!("account:{:?}", self.account); let mut batch = JsonRpcBatch::new(); batch.add_get_account_transactions_request( self.account.get(&account_address).unwrap().address.clone(), 0, self.account.get(&account_address).unwrap().sequence_number.clone(), - false + true ); let resp = self.request_rpc(batch).map_err(|_| Error::FailedToGetSentTransactions)?; + println!("add_get_account_transactions_request resp:{:?}", resp); let mut need_sync_transactions: Vec = Vec::new(); let transactions = TransactionView::vec_from_response(resp).unwrap(); for transaction in transactions.clone() { @@ -394,18 +388,9 @@ impl DiemBridge { } } - if need_sync_transactions.len() > 0 && *state_initiated == false { - if let Err(_) = self.init_state(None, &client, signer).await { - return Err(Error::FailedToInitState); - } - - *state_initiated = true; - } - for transaction in need_sync_transactions { let _ = self.sync_transaction_with_proof( - &transaction, &pr, - account_address.clone(), &client, signer + &transaction, account_address.clone(), &client, signer ).await?; } @@ -417,7 +402,6 @@ impl DiemBridge { async fn sync_transaction_with_proof( &mut self, transaction: &TransactionView, - pr: &PrClient, account_address: String, client: &XtClient, signer: &mut SrSigner, @@ -579,13 +563,13 @@ impl DiemBridge { let signed_tx: SignedTransaction = bcs::from_bytes(&td.signed_tx).unwrap(); println!("signed transaction:{:?}", signed_tx); let mut batch = JsonRpcBatch::new(); - batch.add_submit_request(signed_tx); + let _ = batch.add_submit_request(signed_tx); match self.request_rpc(batch) { Ok(_) => { let receiver_address = "0x".to_string() + &hex::encode_upper(td.address.clone()); println!("submit transaction for {:?}", receiver_address); - if !self.address.contains(&receiver_address) { + if td.new_account && !self.address.contains(&receiver_address) { self.address.push(receiver_address); } @@ -622,26 +606,34 @@ async fn bridge(args: Args) -> Result<(), Error> { let pr = PrClient::new(&args.pruntime_endpoint); - diem.init_state(Some(&pr), &client, &mut signer).await?; + diem.init_state(Some(&pr), &client, &mut signer, true).await?; - //hard code Alice account - let alice_addr: String = "0xD4F0C053205BA934BB2AC0C4E8479E77".to_string(); - diem.address.push(alice_addr); + let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::CurrentState).await?; + if let QueryRespData::CurrentState { state } = resp { + println!("current state: {:?}", state); - let mut start_seq = 1; + diem.address = state.account_address; + let mut start_seq = state.queue_seq; - loop { - let address = diem.address.clone(); - for addr in address { - println!("sync account: {:}", addr); - let _ = diem.sync_account(&pr, addr.clone(), &client, &mut signer).await; - } + loop { + diem.init_state(Some(&pr), &client, &mut signer, false).await?; - let _ = diem.submit_signed_transaction(&pr, &mut start_seq).await; + let address = diem.address.clone(); + for addr in address { + println!("sync account: {:}", addr); + let _ = diem.sync_account(addr.clone(), &client, &mut signer).await; + } + + let _ = diem.submit_signed_transaction(&pr, &mut start_seq).await; - println!("Waiting for next loop"); - tokio::time::delay_for(std::time::Duration::from_millis(INTERVAL)).await; + println!("Waiting for next loop\n"); + tokio::time::delay_for(std::time::Duration::from_millis(INTERVAL)).await; + } + } else { + println!("query state error"); } + + Ok(()) } #[tokio::main] @@ -651,4 +643,4 @@ async fn main() { Ok(()) => println!("bridge() exited sucessfully"), Err(e) => panic!("bridge() exited with result: {:?}", e) } -} \ No newline at end of file +} diff --git a/src/runtimes.rs b/src/runtimes.rs index d7546ec..f93b637 100644 --- a/src/runtimes.rs +++ b/src/runtimes.rs @@ -41,7 +41,7 @@ use subxt::{ register_default_type_sizes }; -use self::phala::PhalaModuleEventTypeRegistry; +use self::phala::PhalaEventTypeRegistry; /// PhalaNode concrete type definitions compatible with those for kusama, v0.7 /// @@ -59,7 +59,7 @@ impl Runtime for PhalaNodeRuntime { fn register_type_sizes(event_type_registry: &mut EventTypeRegistry) { event_type_registry.with_system(); - event_type_registry.with_phala_module(); + event_type_registry.with_phala(); event_type_registry.with_balances(); register_default_type_sizes(event_type_registry); } @@ -81,7 +81,7 @@ impl Balances for PhalaNodeRuntime { type Balance = u128; } -impl phala::PhalaModule for PhalaNodeRuntime {} +impl phala::Phala for PhalaNodeRuntime {} pub mod phala { use codec::{Encode, Decode}; use subxt::{ @@ -98,14 +98,14 @@ pub mod phala { #[module] - pub trait PhalaModule: System + Balances { + pub trait Phala: System + Balances { } #[derive(Clone, Debug, PartialEq, Call, Encode)] - pub struct PushCommandCall { + pub struct PushCommandCall { pub _runtime: PhantomData, pub contract_id: u32, pub payload: Vec, } -} \ No newline at end of file +} diff --git a/src/types.rs b/src/types.rs index 53dc11d..56c296c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -62,16 +62,19 @@ pub enum CommandReqData { AccountData { account_data_b64: String }, VerifyTransaction { account_address: String, transaction_with_proof_b64: String }, SetTrustedState { trusted_state_b64: String }, + VerifyEpochProof { ledger_info_with_signatures_b64: String, epoch_change_proof_b64: String }, } #[derive(Serialize, Deserialize, Debug)] pub enum QueryReqData { - GetSignedTransactions { start: u64 } + GetSignedTransactions { start: u64 }, + CurrentState, } #[derive(Serialize, Deserialize, Debug)] pub enum QueryRespData { - GetSignedTransactions { queue_b64: String } + GetSignedTransactions { queue_b64: String }, + CurrentState { state: State }, } #[derive(Serialize, Deserialize, Debug, Clone, Encode, Decode)] @@ -79,4 +82,11 @@ pub struct TransactionData { pub sequence: u64, pub address: Vec, pub signed_tx: Vec, -} \ No newline at end of file + pub new_account: bool, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct State { + pub queue_seq: u64, + pub account_address: Vec, +} From 6188d71ac5904a03601a82ef89f4b6bc11e5ed14 Mon Sep 17 00:00:00 2001 From: goldenfiredo Date: Wed, 7 Apr 2021 10:37:36 +0800 Subject: [PATCH 5/6] init chain_id --- src/main.rs | 18 +++++++++++------- src/types.rs | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 01db729..3b5cbb4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use diem_types::{ account_address::{ AccountAddress, }, - chain_id::ChainId, + chain_id::{ChainId, NamedChain}, ledger_info::LedgerInfoWithSignatures, transaction::{TransactionInfo, SignedTransaction}, epoch_change::EpochChangeProof, @@ -34,7 +34,6 @@ use diem_json_rpc_client::{ JsonRpcBatch, JsonRpcClient, ResponseAsView, JsonRpcResponse, }; use std::{convert::TryFrom}; -use diem_json_rpc_types::views::AmountView; use diem_types::account_state_blob::AccountStateBlob; mod pruntime_client; @@ -143,8 +142,14 @@ pub struct TransactionWithProof { impl DiemBridge { pub fn new(url: &str) -> Result { let rpc_client = JsonRpcClient::new(Url::parse(url).unwrap()).unwrap(); + let chain_id = if url == "https://testnet.diem.com" { + NamedChain::TESTNET + } else { + NamedChain::TESTING + }; + Ok(DiemBridge { - chain_id: ChainId::new(2), //TESTNET: 2, TESTING:4 + chain_id: ChainId::new(chain_id.id()), rpc_client, //sent_events_key: None, //received_events_key: None, @@ -605,13 +610,12 @@ async fn bridge(args: Args) -> Result<(), Error> { let mut signer: SrSigner = subxt::PairSigner::new(pair); let pr = PrClient::new(&args.pruntime_endpoint); - - diem.init_state(Some(&pr), &client, &mut signer, true).await?; - - let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::CurrentState).await?; + let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::CurrentState{ chain_id: diem.chain_id.id() }).await?; if let QueryRespData::CurrentState { state } = resp { println!("current state: {:?}", state); + diem.init_state(Some(&pr), &client, &mut signer, true).await?; + diem.address = state.account_address; let mut start_seq = state.queue_seq; diff --git a/src/types.rs b/src/types.rs index 56c296c..4551bb1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -68,7 +68,7 @@ pub enum CommandReqData { #[derive(Serialize, Deserialize, Debug)] pub enum QueryReqData { GetSignedTransactions { start: u64 }, - CurrentState, + CurrentState { chain_id: u8 }, } #[derive(Serialize, Deserialize, Debug)] From 55bd97245494ebb7d8cd7d1b637b9aff139f86fd Mon Sep 17 00:00:00 2001 From: goldenfiredo Date: Thu, 15 Apr 2021 23:24:36 +0800 Subject: [PATCH 6/6] review code --- src/main.rs | 39 +++++++++++++++------------------------ src/types.rs | 6 +++--- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3b5cbb4..7105a41 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,6 @@ type XtClient = subxt::Client; type PrClient = pruntime_client::PRuntimeClient; const DIEM_CONTRACT_ID: u32 = 5; -const INTERVAL: u64 = 1_000 * 60 * 1; const RECEIVING_EVENTS_LIMIT: u64 = 100; use crate::error::Error; @@ -86,6 +85,10 @@ struct Args { default_value = "ws://localhost:9944", long, help = "Substrate rpc websocket endpoint")] substrate_ws_endpoint: String, + + #[structopt(default_value = "15", long, + help = "The interval in seconds.")] + interval: u64, } pub struct DiemBridge { @@ -95,16 +98,9 @@ pub struct DiemBridge { trusted_state: Option, latest_epoch_change_li: Option, latest_li: Option, - //sent_events_key: Option, - //received_events_key:Option, - //sent_events: Option>, - //received_events: Option>, - //transactions: Option>, - //account: Option, received_events: BTreeMap>, transactions: BTreeMap>, account: BTreeMap, - //balances: Option>, address: Vec, } @@ -147,21 +143,17 @@ impl DiemBridge { } else { NamedChain::TESTING }; - + println!("{}", url); Ok(DiemBridge { chain_id: ChainId::new(chain_id.id()), rpc_client, - //sent_events_key: None, - //received_events_key: None, epoch_change_proof: None, trusted_state: None, latest_epoch_change_li: None, latest_li: None, - //sent_events: None, received_events: BTreeMap::>::new(), transactions: BTreeMap::>::new(), account: BTreeMap::::new(), - //balances: None, address: Vec::new(), }) } @@ -220,7 +212,6 @@ impl DiemBridge { batch.add_get_state_proof_request(0); if let Ok(resp) = self.request_rpc(batch) { let state_proof = StateProofView::from_response(resp).unwrap(); - //println!("state_proof:\n{:?}", state_proof); let epoch_change_proof: EpochChangeProof = bcs::from_bytes(&state_proof.epoch_change_proof.into_bytes().unwrap()).unwrap(); @@ -244,7 +235,7 @@ impl DiemBridge { if initialized { let trusted_state_b64 = base64::encode(&bcs::to_bytes(&zero_ledger_info_with_sigs).unwrap()); - let command_value = serde_json::to_value(&CommandReqData::SetTrustedState { trusted_state_b64 })?; + let command_value = serde_json::to_value(&CommandReqData::SetTrustedState { trusted_state_b64, chain_id: self.chain_id.id() })?; let _ = self.push_command(command_value.to_string(), &client, signer).await; } else { let ledger_info_with_signatures_b64 = base64::encode(&bcs::to_bytes(&ledger_info_with_signatures).unwrap()); @@ -270,7 +261,7 @@ impl DiemBridge { ) -> Result<(), Error> { // Init account information let mut batch = JsonRpcBatch::new(); - let address = AccountAddress::from_hex_literal(&account_address).unwrap(); + let address = AccountAddress::from_hex_literal(&("0x".to_string() + &account_address)).unwrap(); batch.add_get_account_request(address); let resp = self.request_rpc(batch).map_err(|_| Error::FailedToGetResponse)?; @@ -300,8 +291,8 @@ impl DiemBridge { balances: amounts, }; - let account_data_b64 = base64::encode(&bcs::to_bytes(&account_info).unwrap()); - let command_value = serde_json::to_value(&CommandReqData::AccountData { account_data_b64 })?; + let account_info_b64 = base64::encode(&bcs::to_bytes(&account_info).unwrap()); + let command_value = serde_json::to_value(&CommandReqData::AccountInfo { account_info_b64 })?; let _ = self.push_command(command_value.to_string(), &client, signer).await; // Sync receiving transactions @@ -553,7 +544,7 @@ impl DiemBridge { } } - async fn submit_signed_transaction( + async fn maybe_submit_signed_transaction( &mut self, pr: &PrClient, start_seq: &mut u64, @@ -563,7 +554,7 @@ impl DiemBridge { if let QueryRespData::GetSignedTransactions { queue_b64 } = resp { let data = base64::decode(&queue_b64).unwrap(); let transaction_data: Vec = Decode::decode(&mut &data[..]).unwrap(); - for td in transaction_data.clone() { + for td in &transaction_data { println!("transaction data:{:?}", td); let signed_tx: SignedTransaction = bcs::from_bytes(&td.signed_tx).unwrap(); println!("signed transaction:{:?}", signed_tx); @@ -571,7 +562,7 @@ impl DiemBridge { let _ = batch.add_submit_request(signed_tx); match self.request_rpc(batch) { Ok(_) => { - let receiver_address = "0x".to_string() + &hex::encode_upper(td.address.clone()); + let receiver_address = hex::encode_upper(td.address.clone()); println!("submit transaction for {:?}", receiver_address); if td.new_account && !self.address.contains(&receiver_address) { @@ -610,7 +601,7 @@ async fn bridge(args: Args) -> Result<(), Error> { let mut signer: SrSigner = subxt::PairSigner::new(pair); let pr = PrClient::new(&args.pruntime_endpoint); - let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::CurrentState{ chain_id: diem.chain_id.id() }).await?; + let resp = pr.query(DIEM_CONTRACT_ID, QueryReqData::CurrentState).await?; if let QueryRespData::CurrentState { state } = resp { println!("current state: {:?}", state); @@ -628,10 +619,10 @@ async fn bridge(args: Args) -> Result<(), Error> { let _ = diem.sync_account(addr.clone(), &client, &mut signer).await; } - let _ = diem.submit_signed_transaction(&pr, &mut start_seq).await; + let _ = diem.maybe_submit_signed_transaction(&pr, &mut start_seq).await; println!("Waiting for next loop\n"); - tokio::time::delay_for(std::time::Duration::from_millis(INTERVAL)).await; + tokio::time::delay_for(std::time::Duration::from_millis(args.interval * 1000)).await; } } else { println!("query state error"); diff --git a/src/types.rs b/src/types.rs index 4551bb1..8281447 100644 --- a/src/types.rs +++ b/src/types.rs @@ -59,16 +59,16 @@ impl Resp for QueryReq { #[derive(Serialize, Deserialize, Debug)] pub enum CommandReqData { - AccountData { account_data_b64: String }, + AccountInfo { account_info_b64: String }, VerifyTransaction { account_address: String, transaction_with_proof_b64: String }, - SetTrustedState { trusted_state_b64: String }, + SetTrustedState { trusted_state_b64: String, chain_id: u8 }, VerifyEpochProof { ledger_info_with_signatures_b64: String, epoch_change_proof_b64: String }, } #[derive(Serialize, Deserialize, Debug)] pub enum QueryReqData { GetSignedTransactions { start: u64 }, - CurrentState { chain_id: u8 }, + CurrentState, } #[derive(Serialize, Deserialize, Debug)]