From 8e25805bbd05374dea2746080a7473388c7c1a47 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 14:01:53 +0900 Subject: [PATCH 1/7] Remove unused TransactionListener --- core/src/miner/miner.rs | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index e7a961f571..46f40706db 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -113,11 +113,8 @@ struct SealingWork { enabled: bool, } -type TransactionListener = Box; - pub struct Miner { mem_pool: Arc>, - transaction_listener: RwLock>, next_allowed_reseal: Mutex, next_mandatory_reseal: RwLock, sealing_block_last_request: Mutex, @@ -176,7 +173,6 @@ impl Miner { Self { mem_pool, - transaction_listener: RwLock::new(vec![]), next_allowed_reseal: Mutex::new(Instant::now()), next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period), params: RwLock::new(AuthoringParams::default()), @@ -199,11 +195,6 @@ impl Miner { self.mem_pool.write().recover_from_db(client); } - /// Set a callback to be notified about imported transactions' hashes. - pub fn add_transactions_listener(&self, f: Box) { - self.transaction_listener.write().push(f); - } - /// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing. pub fn pending_state(&self, latest_block_number: BlockNumber) -> Option { self.map_pending_block(|b| b.state().clone(), latest_block_number) @@ -351,7 +342,7 @@ impl Miner { debug_assert_eq!(insertion_results.len(), intermediate_results.iter().filter(|r| r.is_ok()).count()); let mut insertion_results_index = 0; - let results = intermediate_results + intermediate_results .into_iter() .map(|res| match res { Err(e) => Err(e), @@ -363,13 +354,7 @@ impl Miner { Ok(result) } }) - .collect(); - - for listener in &*self.transaction_listener.read() { - listener(&inserted); - } - - results + .collect() } pub fn delete_all_pending_transactions(&self) { From 6ed10c20c8d329542422be8d1a0e6028f4d340aa Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 12:19:13 +0900 Subject: [PATCH 2/7] Introduce NextMandatoryReseal in miner.rs NextMandatoryReseal limits the lifetime of the inner lock. --- core/src/miner/miner.rs | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 46f40706db..45c4973529 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -116,7 +116,7 @@ struct SealingWork { pub struct Miner { mem_pool: Arc>, next_allowed_reseal: Mutex, - next_mandatory_reseal: RwLock, + next_mandatory_reseal: NextMandatoryReseal, sealing_block_last_request: Mutex, sealing_work: Mutex, params: RwLock, @@ -131,6 +131,26 @@ pub struct Miner { immune_users: RwLock>, } +struct NextMandatoryReseal { + instant: RwLock, +} + +impl NextMandatoryReseal { + pub fn new(instant: Instant) -> Self { + Self { + instant: RwLock::new(instant), + } + } + + pub fn get(&self) -> Instant { + *self.instant.read() + } + + pub fn set(&self, instant: Instant) { + *self.instant.write() = instant; + } +} + impl Miner { /// Push listener that will handle new jobs pub fn add_work_listener(&self, notifier: Box) { @@ -174,7 +194,7 @@ impl Miner { Self { mem_pool, next_allowed_reseal: Mutex::new(Instant::now()), - next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period), + next_mandatory_reseal: NextMandatoryReseal::new(Instant::now() + options.reseal_max_period), params: RwLock::new(AuthoringParams::default()), sealing_block_last_request: Mutex::new(0), sealing_work: Mutex::new(SealingWork { @@ -611,7 +631,7 @@ impl Miner { C: BlockChainTrait + ImportBlock, { if block.transactions().is_empty() && !self.options.force_sealing - && Instant::now() <= *self.next_mandatory_reseal.read() + && Instant::now() <= self.next_mandatory_reseal.get() { cdebug!(MINER, "seal_block_internally: no sealing."); return false @@ -628,7 +648,7 @@ impl Miner { return false } - *self.next_mandatory_reseal.write() = Instant::now() + self.options.reseal_max_period; + self.next_mandatory_reseal.set(Instant::now() + self.options.reseal_max_period); let sealed = if self.engine_type().is_seal_first() { block.lock().already_sealed() } else { From cbd7475ae30e4d66c9364758132c7e78355fcec8 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 13:59:26 +0900 Subject: [PATCH 3/7] Introduce NextAllowedReseal type NextAllowedReseal limits the lifetime of the inner lock. --- core/src/miner/miner.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 45c4973529..0e84c58ff8 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -115,7 +115,7 @@ struct SealingWork { pub struct Miner { mem_pool: Arc>, - next_allowed_reseal: Mutex, + next_allowed_reseal: NextAllowedReseal, next_mandatory_reseal: NextMandatoryReseal, sealing_block_last_request: Mutex, sealing_work: Mutex, @@ -131,6 +131,8 @@ pub struct Miner { immune_users: RwLock>, } +type NextAllowedReseal = NextMandatoryReseal; + struct NextMandatoryReseal { instant: RwLock, } @@ -193,7 +195,7 @@ impl Miner { Self { mem_pool, - next_allowed_reseal: Mutex::new(Instant::now()), + next_allowed_reseal: NextAllowedReseal::new(Instant::now()), next_mandatory_reseal: NextMandatoryReseal::new(Instant::now() + options.reseal_max_period), params: RwLock::new(AuthoringParams::default()), sealing_block_last_request: Mutex::new(0), @@ -675,7 +677,7 @@ impl Miner { /// Are we allowed to do a non-mandatory reseal? fn transaction_reseal_allowed(&self) -> bool { - self.sealing_enabled.load(Ordering::Relaxed) && (Instant::now() > *self.next_allowed_reseal.lock()) + self.sealing_enabled.load(Ordering::Relaxed) && (Instant::now() > self.next_allowed_reseal.get()) } fn map_pending_block(&self, f: F, latest_block_number: BlockNumber) -> Option @@ -910,7 +912,7 @@ impl MinerService for Miner { } // Sealing successful - *self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period; + self.next_allowed_reseal.set(Instant::now() + self.options.reseal_min_period); if !self.options.no_reseal_timer { chain.set_min_timer(); } From 2397a7572254c28a8f575b1bade22c2286b63495 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 13:52:05 +0900 Subject: [PATCH 4/7] Introduce Params struct Params limits the lifetime of the inner lock. --- core/src/miner/miner.rs | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 0e84c58ff8..0c9a3f958f 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -119,7 +119,7 @@ pub struct Miner { next_mandatory_reseal: NextMandatoryReseal, sealing_block_last_request: Mutex, sealing_work: Mutex, - params: RwLock, + params: Params, engine: Arc, options: MinerOptions, @@ -153,6 +153,29 @@ impl NextMandatoryReseal { } } +struct Params { + params: RwLock, +} + +impl Params { + pub fn new(params: AuthoringParams) -> Self { + Self { + params: RwLock::new(params), + } + } + + pub fn get(&self) -> AuthoringParams { + self.params.read().clone() + } + + pub fn apply(&self, f: F) + where + F: FnOnce(&mut AuthoringParams) -> (), { + let mut params = self.params.write(); + f(&mut params); + } +} + impl Miner { /// Push listener that will handle new jobs pub fn add_work_listener(&self, notifier: Box) { @@ -197,7 +220,7 @@ impl Miner { mem_pool, next_allowed_reseal: NextAllowedReseal::new(Instant::now()), next_mandatory_reseal: NextMandatoryReseal::new(Instant::now() + options.reseal_max_period), - params: RwLock::new(AuthoringParams::default()), + params: Params::new(AuthoringParams::default()), sealing_block_last_request: Mutex::new(0), sealing_work: Mutex::new(SealingWork { queue: SealingQueue::new(options.work_queue_size), @@ -493,7 +516,7 @@ impl Miner { let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| *pb.block().header().hash()); ctrace!(MINER, "prepare_block: No existing work - making new block"); - let params = self.params.read().clone(); + let params = self.params.get(); let open_block = chain.prepare_open_block(parent_block_id, params.author, params.extra_data); let (block_number, parent_hash) = { let header = open_block.block().header(); @@ -722,11 +745,11 @@ impl MinerService for Miner { } fn authoring_params(&self) -> AuthoringParams { - self.params.read().clone() + self.params.get() } fn set_author(&self, address: Address) -> Result<(), AccountProviderError> { - self.params.write().author = address; + self.params.apply(|params| params.author = address); if self.engine_type().need_signer_key() && self.engine.seals_internally().is_some() { if let Some(ref ap) = self.accounts { @@ -750,7 +773,7 @@ impl MinerService for Miner { } fn set_extra_data(&self, extra_data: Bytes) { - self.params.write().extra_data = extra_data; + self.params.apply(|params| params.extra_data = extra_data); } fn transactions_limit(&self) -> usize { From f8925a7f5599ad38262df4870e8c1f32777beea4 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 14:28:05 +0900 Subject: [PATCH 5/7] Introduce SealingBlockLastRequest SealingBlockLastRequest limits the lifetime of the inner lock. --- core/src/miner/miner.rs | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 0c9a3f958f..fa77e417e9 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -117,7 +117,7 @@ pub struct Miner { mem_pool: Arc>, next_allowed_reseal: NextAllowedReseal, next_mandatory_reseal: NextMandatoryReseal, - sealing_block_last_request: Mutex, + sealing_block_last_request: SealingBlockLastRequest, sealing_work: Mutex, params: Params, engine: Arc, @@ -131,6 +131,30 @@ pub struct Miner { immune_users: RwLock>, } +struct SealingBlockLastRequest { + block_number: Mutex, +} + +impl SealingBlockLastRequest { + pub fn new() -> Self { + Self { + block_number: Mutex::new(0), + } + } + + pub fn get(&self) -> u64 { + *self.block_number.lock() + } + + /// Returns previous value + pub fn set(&self, block_number: u64) -> u64 { + let mut guard = self.block_number.lock(); + let prev = *guard; + *guard = block_number; + prev + } +} + type NextAllowedReseal = NextMandatoryReseal; struct NextMandatoryReseal { @@ -221,7 +245,7 @@ impl Miner { next_allowed_reseal: NextAllowedReseal::new(Instant::now()), next_mandatory_reseal: NextMandatoryReseal::new(Instant::now() + options.reseal_max_period), params: Params::new(AuthoringParams::default()), - sealing_block_last_request: Mutex::new(0), + sealing_block_last_request: SealingBlockLastRequest::new(), sealing_work: Mutex::new(SealingWork { queue: SealingQueue::new(options.work_queue_size), enabled: options.force_sealing || scheme.engine.seals_internally().is_some(), @@ -265,7 +289,7 @@ impl Miner { let mut sealing_work = self.sealing_work.lock(); if sealing_work.enabled { ctrace!(MINER, "requires_reseal: sealing enabled"); - let last_request = *self.sealing_block_last_request.lock(); + let last_request = self.sealing_block_last_request.get(); let should_disable_sealing = !self.options.force_sealing && !has_local_transactions && self.engine.seals_internally().is_none() @@ -864,16 +888,16 @@ impl MinerService for Miner { } } } - let mut sealing_block_last_request = self.sealing_block_last_request.lock(); + let best_number = client.chain_info().best_block_number; - if *sealing_block_last_request != best_number { + let prev_request = self.sealing_block_last_request.set(best_number); + if prev_request != best_number { ctrace!( MINER, "prepare_work_sealing: Miner received request (was {}, now {}) - waking up.", - *sealing_block_last_request, + prev_request, best_number ); - *sealing_block_last_request = best_number; } // Return if we restarted From b1ffc87558d28bde205151d95f1e4169768678b7 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 15:33:53 +0900 Subject: [PATCH 6/7] Introduce Notifiers Nrtofiers limits the lifetime of the inner lock. --- core/src/miner/miner.rs | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index fa77e417e9..66c417c927 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -39,7 +39,7 @@ use ctypes::{BlockHash, BlockNumber, Header, TxHash}; use cvm::ChainTimeInfo; use kvdb::KeyValueDB; use parking_lot::{Mutex, RwLock}; -use primitives::{Bytes, H256}; +use primitives::{Bytes, H256, U256}; use std::borrow::Borrow; use std::collections::HashSet; use std::iter::once; @@ -126,11 +126,38 @@ pub struct Miner { sealing_enabled: AtomicBool, accounts: Option>, - notifiers: RwLock>>, + notifiers: Notifiers, malicious_users: RwLock>, immune_users: RwLock>, } +struct Notifiers { + notifiers: RwLock>>, +} + +impl Notifiers { + pub fn new(notifiers: Vec>) -> Self { + Self { + notifiers: RwLock::new(notifiers), + } + } + + pub fn push(&self, notifier: Box) { + self.notifiers.write().push(notifier); + } + + pub fn is_empty(&self) -> bool { + self.notifiers.read().is_empty() + } + + pub fn notify(&self, pow_hash: H256, target: U256) { + // FIXME: Calling callbacks inside of lock lifetime may cause a deadlock. + for notifier in self.notifiers.read().iter() { + notifier.notify(pow_hash, target) + } + } +} + struct SealingBlockLastRequest { block_number: Mutex, } @@ -203,7 +230,7 @@ impl Params { impl Miner { /// Push listener that will handle new jobs pub fn add_work_listener(&self, notifier: Box) { - self.notifiers.write().push(notifier); + self.notifiers.push(notifier); } pub fn new( @@ -254,7 +281,7 @@ impl Miner { options, sealing_enabled: AtomicBool::new(true), accounts, - notifiers: RwLock::new(notifiers), + notifiers: Notifiers::new(notifiers), malicious_users: RwLock::new(HashSet::new()), immune_users: RwLock::new(HashSet::new()), } @@ -503,7 +530,7 @@ impl Miner { let is_new = original_work_hash.map_or(true, |h| *block.block().header().hash() != h); sealing_work.queue.push(block); // If push notifications are enabled we assume all work items are used. - if !self.notifiers.read().is_empty() && is_new { + if !self.notifiers.is_empty() && is_new { sealing_work.queue.use_last_ref(); } (Some((pow_hash, score, number)), is_new) @@ -520,9 +547,7 @@ impl Miner { if is_new { if let Some((pow_hash, score, _number)) = work { let target = self.engine.score_to_target(&score); - for notifier in self.notifiers.read().iter() { - notifier.notify(pow_hash, target) - } + self.notifiers.notify(pow_hash, target); } } } From 231b0682be6f2ec86b8ce4c5f197f9e3b1089306 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 3 Jun 2020 15:53:00 +0900 Subject: [PATCH 7/7] Introduce Users struct Users limits the lifetime of the inner lock. --- core/src/miner/miner.rs | 79 +++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 66c417c927..5fd229299f 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -43,7 +43,6 @@ use primitives::{Bytes, H256, U256}; use std::borrow::Borrow; use std::collections::HashSet; use std::iter::once; -use std::iter::FromIterator; use std::ops::Range; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -127,8 +126,39 @@ pub struct Miner { accounts: Option>, notifiers: Notifiers, - malicious_users: RwLock>, - immune_users: RwLock>, + malicious_users: Users, + immune_users: Users, +} + +struct Users { + users: RwLock>, +} + +impl Users { + pub fn new() -> Self { + Self { + users: RwLock::new(HashSet::new()), + } + } + + pub fn cloned(&self) -> Vec
{ + self.users.read().iter().map(Clone::clone).collect() + } + + pub fn contains(&self, address: &Address) -> bool { + self.users.read().contains(address) + } + + pub fn insert(&self, address: Address) -> bool { + self.users.write().insert(address) + } + + pub fn remove_users<'a>(&self, addresses: impl Iterator) { + let mut users = self.users.write(); + for address in addresses { + users.remove(address); + } + } } struct Notifiers { @@ -282,8 +312,8 @@ impl Miner { sealing_enabled: AtomicBool::new(true), accounts, notifiers: Notifiers::new(notifiers), - malicious_users: RwLock::new(HashSet::new()), - immune_users: RwLock::new(HashSet::new()), + malicious_users: Users::new(), + immune_users: Users::new(), } } @@ -368,7 +398,7 @@ impl Miner { let signer_public = tx.recover_public()?; let signer_address = public_to_address(&signer_public); if default_origin.is_local() { - self.immune_users.write().insert(signer_address); + self.immune_users.insert(signer_address); } let origin = self @@ -381,7 +411,7 @@ impl Miner { }) .unwrap_or(default_origin); - if self.malicious_users.read().contains(&signer_address) { + if self.malicious_users.contains(&signer_address) { // FIXME: just to skip, think about another way. return Ok(()) } @@ -392,7 +422,6 @@ impl Miner { if !self.is_allowed_transaction(&tx.action) { cdebug!(MINER, "Rejected transaction {:?}: {:?} is not allowed transaction", hash, tx.action); } - let immune_users = self.immune_users.read(); let tx = tx .verify_basic() .map_err(From::from) @@ -403,8 +432,8 @@ impl Miner { .and_then(|_| CodeChainMachine::verify_transaction_seal(tx, &fake_header)) .map_err(|e| { match e { - Error::Syntax(_) if !origin.is_local() && !immune_users.contains(&signer_address) => { - self.malicious_users.write().insert(signer_address); + Error::Syntax(_) if !origin.is_local() && !self.immune_users.contains(&signer_address) => { + self.malicious_users.insert(signer_address); } _ => {} } @@ -415,8 +444,8 @@ impl Miner { // This check goes here because verify_transaction takes SignedTransaction parameter self.engine.machine().verify_transaction(&tx, &fake_header, client, false).map_err(|e| { match e { - Error::Syntax(_) if !origin.is_local() && !immune_users.contains(&signer_address) => { - self.malicious_users.write().insert(signer_address); + Error::Syntax(_) if !origin.is_local() && !self.immune_users.contains(&signer_address) => { + self.malicious_users.insert(signer_address); } _ => {} } @@ -609,11 +638,10 @@ impl Miner { let tx_total = transactions.len(); let mut invalid_tx_users = HashSet::new(); - let immune_users = self.immune_users.read(); for tx in transactions { let signer_public = tx.signer_public(); let signer_address = public_to_address(&signer_public); - if self.malicious_users.read().contains(&signer_address) { + if self.malicious_users.contains(&signer_address) { invalid_transactions.push(tx.hash()); continue } @@ -647,9 +675,9 @@ impl Miner { .read() .is_local_transaction(hash) .expect("The tx is clearly fetched from the mempool") - && !immune_users.contains(&signer_address) + && !self.immune_users.contains(&signer_address) { - self.malicious_users.write().insert(signer_address); + self.malicious_users.insert(signer_address); } } _ => {} @@ -1188,32 +1216,23 @@ impl MinerService for Miner { } fn get_malicious_users(&self) -> Vec
{ - Vec::from_iter(self.malicious_users.read().iter().map(Clone::clone)) + self.malicious_users.cloned() } fn release_malicious_users(&self, prisoner_vec: Vec
) { - let mut malicious_users = self.malicious_users.write(); - for address in prisoner_vec { - malicious_users.remove(&address); - } + self.malicious_users.remove_users(prisoner_vec.iter()); } fn imprison_malicious_users(&self, prisoner_vec: Vec
) { - let mut malicious_users = self.malicious_users.write(); - for address in prisoner_vec { - malicious_users.insert(address); - } + self.malicious_users.remove_users(prisoner_vec.iter()); } fn get_immune_users(&self) -> Vec
{ - Vec::from_iter(self.immune_users.read().iter().map(Clone::clone)) + self.immune_users.cloned() } fn register_immune_users(&self, immune_user_vec: Vec
) { - let mut immune_users = self.immune_users.write(); - for address in immune_user_vec { - immune_users.insert(address); - } + self.immune_users.remove_users(immune_user_vec.iter()) } }