Skip to content

Commit

Permalink
Use fine grained locks to enhance performance on large multi-cores (#160
Browse files Browse the repository at this point in the history
)

* Added parallelism to rocksdb in the bench.
* Add a tcp-connections flag
* Add db folder option
* Use parking lock mutex
* Use many locks, to avoid contention.

Co-authored-by: George Danezis <george@danez.is>
  • Loading branch information
gdanezis and George Danezis committed Jan 13, 2022
1 parent a65b129 commit 5b221aa
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 20 deletions.
1 change: 1 addition & 0 deletions fastpay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ toml = "0.5.8"
strum = "0.23.0"
strum_macros = "0.23.1"
num_cpus = "1.13.1"
rocksdb = "0.17.0"

fastpay_core = { path = "../fastpay_core" }
fastx-adapter = { path = "../fastx_programmability/adapter" }
Expand Down
38 changes: 32 additions & 6 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use structopt::StructOpt;
use tokio::runtime::Runtime;
use tokio::{runtime::Builder, time};

use rocksdb::Options;
use std::env;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::thread;
use strum_macros::EnumString;
Expand All @@ -32,6 +34,9 @@ struct ClientServerBenchmark {
/// Hostname
#[structopt(long, default_value = "127.0.0.1")]
host: String,
/// Path to the database
#[structopt(long, default_value = "")]
db_dir: String,
/// Base port number
#[structopt(long, default_value = "9555")]
port: u32,
Expand All @@ -56,6 +61,12 @@ struct ClientServerBenchmark {
/// Which execution path to track. OrdersAndCerts or OrdersOnly or CertsOnly
#[structopt(long, default_value = "OrdersAndCerts")]
benchmark_type: BenchmarkType,
/// Number of connections to the server
#[structopt(long, default_value = "0")]
tcp_connections: usize,
/// Number of database cpus
#[structopt(long, default_value = "1")]
db_cpus: usize,
}
#[derive(Debug, Clone, PartialEq, EnumString)]
enum BenchmarkType {
Expand All @@ -75,6 +86,12 @@ fn main() {

// Make multi-threaded runtime for the authority
let b = benchmark.clone();
let connections = if benchmark.tcp_connections > 0 {
benchmark.tcp_connections
} else {
num_cpus::get()
};

thread::spawn(move || {
let runtime = Builder::new_multi_thread()
.enable_all()
Expand All @@ -91,12 +108,12 @@ fn main() {
});

// Make a single-core runtime for the client.
let runtime = Builder::new_current_thread()
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_stack_size(15 * 1024 * 1024)
.build()
.unwrap();
runtime.block_on(benchmark.launch_client(orders));
runtime.block_on(benchmark.launch_client(connections, orders));
}

impl ClientServerBenchmark {
Expand All @@ -113,10 +130,18 @@ impl ClientServerBenchmark {
let (public_auth0, secret_auth0) = keys.pop().unwrap();

// Create a random directory to store the DB
let dir = env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
let path = if self.db_dir.is_empty() {
let dir = env::temp_dir();
dir.join(format!("DB_{:?}", ObjectID::random()))
} else {
let dir = Path::new(&self.db_dir);
dir.join(format!("DB_{:?}", ObjectID::random()))
};
fs::create_dir(&path).unwrap();
info!("Open database on path: {:?}", path.as_os_str());

let mut opts = Options::default();
opts.increase_parallelism(self.db_cpus as i32);
let store = Arc::new(AuthorityStore::open(path, None));

// Seed user accounts.
Expand Down Expand Up @@ -207,7 +232,7 @@ impl ClientServerBenchmark {
server.spawn().await.unwrap()
}

async fn launch_client(&self, mut orders: Vec<Bytes>) {
async fn launch_client(&self, connections: usize, mut orders: Vec<Bytes>) {
time::sleep(Duration::from_millis(1000)).await;
let order_len_factor = if self.benchmark_type == BenchmarkType::OrdersAndCerts {
2
Expand All @@ -217,8 +242,9 @@ impl ClientServerBenchmark {
let items_number = orders.len() / order_len_factor;
let time_start = Instant::now();

let connections: usize = num_cpus::get();
// let connections: usize = num_cpus::get();
let max_in_flight = self.max_in_flight / connections as usize;
info!("Number of TCP connections: {}", connections);
info!("Set max_in_flight to {}", max_in_flight);

info!("Sending requests.");
Expand Down
2 changes: 2 additions & 0 deletions fastpay_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ futures = "0.3.5"
rand = "0.7.3"
serde = { version = "1.0.133", features = ["derive"] }
tokio = { version = "1.15.0", features = ["full"] }
parking_lot = "0.11.2"
itertools = "0.10.3"

fastx-adapter = { path = "../fastx_programmability/adapter" }
fastx-framework = { path = "../fastx_programmability/framework" }
Expand Down
42 changes: 29 additions & 13 deletions fastpay_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::*;

use rocksdb::Options;
use std::collections::BTreeSet;
use std::convert::TryInto;
use std::path::Path;
use std::sync::Mutex;
use typed_store::rocks::{open_cf, DBMap};
use typed_store::traits::Map;

Expand All @@ -14,7 +15,7 @@ pub struct AuthorityStore {
certificates: DBMap<TransactionDigest, CertifiedOrder>,
parent_sync: DBMap<ObjectRef, TransactionDigest>,
signed_effects: DBMap<TransactionDigest, SignedOrderEffects>,
check_and_write_lock: Mutex<()>,
lock_table: Vec<parking_lot::Mutex<()>>,
}

impl AuthorityStore {
Expand Down Expand Up @@ -42,10 +43,31 @@ impl AuthorityStore {
certificates: DBMap::reopen(&db, Some("certificates")).expect("Cannot open CF."),
parent_sync: DBMap::reopen(&db, Some("parent_sync")).expect("Cannot open CF."),
signed_effects: DBMap::reopen(&db, Some("signed_effects")).expect("Cannot open CF."),
check_and_write_lock: Mutex::new(()),
lock_table: (0..1024)
.into_iter()
.map(|_| parking_lot::Mutex::new(()))
.collect(),
}
}

/// A function that aquires all locks associated with the objects (in order to avoid deadlocks.)
fn aqcuire_locks(&self, _input_objects: &[ObjectRef]) -> Vec<parking_lot::MutexGuard<'_, ()>> {
let num_locks = self.lock_table.len();
// TODO: randomize the lock mapping based on a secet to avoid DoS attacks.
let lock_number: BTreeSet<usize> = _input_objects
.iter()
.map(|(_, _, digest)| {
usize::from_le_bytes(digest.0[0..8].try_into().unwrap()) % num_locks
})
.collect();
// Note: we need to iterate over the sorted unique elements, hence the use of a Set
// in order to prevent deadlocks when trying to aquire many locks.
lock_number
.into_iter()
.map(|lock_seq| self.lock_table[lock_seq].lock())
.collect()
}

// Methods to read the store

// TODO: add object owner index to improve performance https://github.com/MystenLabs/fastnft/issues/127
Expand Down Expand Up @@ -225,10 +247,7 @@ impl AuthorityStore {
// new locks must be atomic, and not writes should happen in between.
{
// Aquire the lock to ensure no one else writes when we are in here.
let _lock = self
.check_and_write_lock
.lock()
.map_err(|_| FastPayError::StorageError)?;
let _mutexes = self.aqcuire_locks(mutable_input_objects);

let locks = self
.order_lock
Expand Down Expand Up @@ -257,7 +276,7 @@ impl AuthorityStore {
// Atomic write of all locks
lock_batch.write().map_err(|_| FastPayError::StorageError)

// Implicit: drop(_lock);
// Implicit: drop(_mutexes);
} // End of critical region
}

Expand Down Expand Up @@ -365,10 +384,7 @@ impl AuthorityStore {
// new locks must be atomic, and no writes should happen in between.
{
// Aquire the lock to ensure no one else writes when we are in here.
let _lock = self
.check_and_write_lock
.lock()
.map_err(|_| FastPayError::StorageError)?;
let _mutexes = self.aqcuire_locks(&active_inputs[..]);

// Check the locks are still active
// TODO: maybe we could just check if the certificate is there instead?
Expand All @@ -385,7 +401,7 @@ impl AuthorityStore {
.write()
.map_err(|_| FastPayError::StorageError)?;

// implict: drop(_lock);
// implict: drop(_mutexes);
} // End of critical region

Ok(OrderInfoResponse {
Expand Down
1 change: 1 addition & 0 deletions fastx_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde-name = "0.2.0"
sha3 = "0.9"
structopt = "0.3.25"
thiserror = "1.0.30"
rocksdb = "0.17.0"

move-binary-format = { git = "https://github.com/diem/diem", rev="346301f33b3489bb4e486ae6c0aa5e030223b492" }
move-core-types = { git = "https://github.com/diem/diem", rev="346301f33b3489bb4e486ae6c0aa5e030223b492" }
2 changes: 1 addition & 1 deletion fastx_types/src/base_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const TRANSACTION_DIGEST_LENGTH: usize = 32;
pub struct TransactionDigest([u8; TRANSACTION_DIGEST_LENGTH]);
// Each object has a unique digest
#[derive(Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash, Debug, Serialize, Deserialize)]
pub struct ObjectDigest([u8; 32]); // We use SHA3-256 hence 32 bytes here
pub struct ObjectDigest(pub [u8; 32]); // We use SHA3-256 hence 32 bytes here

// TODO: migrate TxContext type + these constants to a separate file
/// 0x81D51F48E5DFC02DBC8F6003517274F7
Expand Down

1 comment on commit 5b221aa

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bench results

�[0m�[0m�[1m�[32m Finished�[0m release [optimized + debuginfo] target(s) in 2.68s
�[0m�[0m�[1m�[32m Running�[0m target/release/bench
[2022-01-13T18:26:35Z INFO bench] Starting benchmark: OrdersAndCerts
[2022-01-13T18:26:35Z INFO bench] Preparing accounts.
[2022-01-13T18:26:35Z INFO bench] Open database on path: "/tmp/DB_C238090488CC5783C9A1007A61A0B5F4"
[2022-01-13T18:26:40Z INFO bench] Preparing transactions.
[2022-01-13T18:26:51Z INFO fastpay::network] Listening to Tcp traffic on 127.0.0.1:9555
[2022-01-13T18:26:52Z INFO bench] Number of TCP connections: 2
[2022-01-13T18:26:52Z INFO bench] Set max_in_flight to 500
[2022-01-13T18:26:52Z INFO bench] Sending requests.
[2022-01-13T18:26:52Z INFO fastpay::network] Sending Tcp requests to 127.0.0.1:9555
[2022-01-13T18:26:54Z INFO fastpay::network] 127.0.0.1:9555 has processed 5000 packets
[2022-01-13T18:26:56Z INFO fastpay::network] In flight 500 Remaining 35000
[2022-01-13T18:26:56Z INFO fastpay::network] 127.0.0.1:9555 has processed 10000 packets
[2022-01-13T18:26:58Z INFO fastpay::network] 127.0.0.1:9555 has processed 15000 packets
[2022-01-13T18:27:00Z INFO fastpay::network] In flight 500 Remaining 30000
[2022-01-13T18:27:00Z INFO fastpay::network] 127.0.0.1:9555 has processed 20000 packets
[2022-01-13T18:27:01Z INFO fastpay::network] 127.0.0.1:9555 has processed 25000 packets
[2022-01-13T18:27:02Z INFO fastpay::network] In flight 500 Remaining 25000
[2022-01-13T18:27:02Z INFO fastpay::network] 127.0.0.1:9555 has processed 30000 packets
[2022-01-13T18:27:04Z INFO fastpay::network] 127.0.0.1:9555 has processed 35000 packets
[2022-01-13T18:27:04Z INFO fastpay::network] In flight 500 Remaining 20000
[2022-01-13T18:27:05Z INFO fastpay::network] 127.0.0.1:9555 has processed 40000 packets
[2022-01-13T18:27:05Z INFO fastpay::network] In flight 500 Remaining 20000
[2022-01-13T18:27:06Z INFO fastpay::network] 127.0.0.1:9555 has processed 45000 packets
[2022-01-13T18:27:06Z INFO fastpay::network] In flight 500 Remaining 15000
[2022-01-13T18:27:07Z INFO fastpay::network] 127.0.0.1:9555 has processed 50000 packets
[2022-01-13T18:27:07Z INFO fastpay::network] In flight 500 Remaining 15000
[2022-01-13T18:27:08Z INFO fastpay::network] 127.0.0.1:9555 has processed 55000 packets
[2022-01-13T18:27:09Z INFO fastpay::network] In flight 500 Remaining 10000
[2022-01-13T18:27:09Z INFO fastpay::network] 127.0.0.1:9555 has processed 60000 packets
[2022-01-13T18:27:10Z INFO fastpay::network] 127.0.0.1:9555 has processed 65000 packets
[2022-01-13T18:27:11Z INFO fastpay::network] In flight 500 Remaining 5000
[2022-01-13T18:27:11Z INFO fastpay::network] 127.0.0.1:9555 has processed 70000 packets
[2022-01-13T18:27:12Z INFO fastpay::network] In flight 500 Remaining 5000
[2022-01-13T18:27:12Z INFO fastpay::network] 127.0.0.1:9555 has processed 75000 packets
[2022-01-13T18:27:13Z INFO fastpay::network] Done sending Tcp requests to 127.0.0.1:9555
[2022-01-13T18:27:14Z INFO fastpay::network] 127.0.0.1:9555 has processed 80000 packets
[2022-01-13T18:27:14Z INFO fastpay::network] Done sending Tcp requests to 127.0.0.1:9555
[2022-01-13T18:27:14Z INFO bench] Received 80000 responses.
[2022-01-13T18:27:14Z WARN bench] Completed benchmark for OrdersAndCerts
Total time: 21670020us, items: 40000, tx/sec: 1845.8681625582255

Please sign in to comment.