diff --git a/interactive_engine/executor/store/Cargo.toml b/interactive_engine/executor/store/Cargo.toml index 18251e12ba4c..558551444d0f 100644 --- a/interactive_engine/executor/store/Cargo.toml +++ b/interactive_engine/executor/store/Cargo.toml @@ -35,3 +35,6 @@ rev = "082dfcfbb4b51e61d1c01672951b84cb464601e2" [build-dependencies] protoc-grpcio = "0.3.0" + +[[bin]] +name = "write_bench" diff --git a/interactive_engine/executor/store/src/bin/write_bench.rs b/interactive_engine/executor/store/src/bin/write_bench.rs new file mode 100644 index 000000000000..695b8a5d5496 --- /dev/null +++ b/interactive_engine/executor/store/src/bin/write_bench.rs @@ -0,0 +1,98 @@ +use maxgraph_common::util::{fs, Timer}; +use maxgraph_store::db::api::{GraphConfigBuilder, TypeDefBuilder, ValueType, Value}; +use maxgraph_store::db::graph::store::GraphStore; +use maxgraph_store::db::api::multi_version_graph::MultiVersionGraph; +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::collections::hash_map::DefaultHasher; +use std::env; +use std::sync::atomic::{AtomicU64, Ordering, AtomicI64}; +use std::sync::Arc; +use std::time::Duration; + +fn main() { + let args: Vec = env::args().collect(); + + let write_buffer_mb = &args[1]; + let compaction_style = &args[2]; + let max_write_buffer_num = &args[3]; + let level_zero_compaction_trigger = &args[4]; + let max_level_base_mb = &args[5]; + let background_job = &args[6]; + let thread_count = &args[7]; + + println!("write_buffer_mb {}, compaction_style {}, background_job {}, thread_count {}", write_buffer_mb, compaction_style, background_job, thread_count); + let path = format!("write_bench_data_dir"); + fs::rmr(&path).unwrap(); + let mut builder = GraphConfigBuilder::new(); + builder.set_storage_engine("rocksdb"); + builder.add_storage_option("store.rocksdb.compression.type", "none"); + builder.add_storage_option("store.rocksdb.stats.dump.period.sec", "60"); + builder.add_storage_option("store.rocksdb.write.buffer.mb", write_buffer_mb); + builder.add_storage_option("store.rocksdb.compaction.style", compaction_style); + builder.add_storage_option("store.rocksdb.background.jobs", background_job); + builder.add_storage_option("store.rocksdb.max.write.buffer.num", max_write_buffer_num); + builder.add_storage_option("store.rocksdb.level0.compaction.trigger", level_zero_compaction_trigger); + builder.add_storage_option("store.rocksdb.max.level.base.mb", max_level_base_mb); + let config = builder.build(); + let store = Arc::new(GraphStore::open(&config, &path).unwrap()); + println!("store opened."); + let mut type_def_builer = TypeDefBuilder::new(); + type_def_builer.version(1); + type_def_builer.add_property(1, 1, "id".to_string(), ValueType::Long, None, true, "id".to_string()); + type_def_builer.add_property(2, 2, "name".to_string(), ValueType::String, None, false, "name".to_string()); + let type_def = type_def_builer.build(); + let label_id = 1; + store.create_vertex_type(1, 1, label_id, &type_def, 1).unwrap(); + println!("schema created"); + let str_len = 100; + let timer = Timer::new(); + let mut tmp_time = 0.0; + let mut tmp_count = 0; + let val = "c".repeat(str_len); + let mut handles = Vec::new(); + let total_count = Arc::new(AtomicU64::new(0)); + let snapshot_idx = Arc::new(AtomicI64::new(1)); + for i in 0..thread_count.parse().unwrap() { + let task_id = i; + let counter = total_count.clone(); + let val = val.clone(); + let store = store.clone(); + let snapshot_idx = snapshot_idx.clone(); + println!("task {} starting", task_id); + let handle = std::thread::spawn(move || { + let mut idx = i * 100000000000 + 2; + loop { + let snapshot_id = snapshot_idx.load(Ordering::SeqCst); + let vertex_id = idx; + let mut properties = HashMap::new(); + properties.insert(1, Value::long(i)); + properties.insert(2, Value::string(&val)); + let mut hasher = DefaultHasher::new(); + vertex_id.hash(&mut hasher); + let hash_id = hasher.finish(); + store.insert_overwrite_vertex(snapshot_id, hash_id as i64, label_id, &properties).unwrap(); + idx += 1; + counter.fetch_add(1, Ordering::Relaxed); + } + }); + handles.push(handle); + } + println!("{}\t{}\t{}", "time(sec)", "speed(record/s)", "total"); + std::thread::spawn(move || { + loop { + std::thread::sleep(Duration::from_secs(3)); + let total_write = total_count.load(Ordering::Relaxed); + let write_count = total_write - tmp_count; + let total_time = timer.elasped_secs(); + let t = total_time - tmp_time; + println!("{:.0}\t{:.2}\t{:.0}", total_time, write_count as f64 / t, total_write); + tmp_count = total_write; + tmp_time = total_time; + snapshot_idx.fetch_add(1, Ordering::SeqCst); + } + }); + for handle in handles { + handle.join().unwrap(); + } +} diff --git a/interactive_engine/executor/store/src/db/storage/rocksdb.rs b/interactive_engine/executor/store/src/db/storage/rocksdb.rs index 9ac51bf77093..3f0b6274b554 100644 --- a/interactive_engine/executor/store/src/db/storage/rocksdb.rs +++ b/interactive_engine/executor/store/src/db/storage/rocksdb.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use crate::db::api::*; use super::{StorageIter, StorageRes, ExternalStorage, ExternalStorageBackup}; use crate::db::storage::{KvPair, RawBytes}; +use rocksdb::{DBCompressionType, DBCompactionStyle}; pub struct RocksDB { db: Arc, @@ -201,6 +202,47 @@ fn init_options(options: &HashMap) -> Options { let mut ret = Options::default(); ret.create_if_missing(true); // TODO: Add other customized db options. + if let Some(conf_str) = options.get("store.rocksdb.compression.type") { + match conf_str.as_str() { + "none" => ret.set_compression_type(DBCompressionType::None), + "snappy" => ret.set_compression_type(DBCompressionType::Snappy), + "zlib" => ret.set_compression_type(DBCompressionType::Zlib), + "bz2" => ret.set_compression_type(DBCompressionType::Bz2), + "lz4" => ret.set_compression_type(DBCompressionType::Lz4), + "lz4hc" => ret.set_compression_type(DBCompressionType::Lz4hc), + "zstd" => ret.set_compression_type(DBCompressionType::Zstd), + _ => panic!("invalid compression_type config"), + } + } + if let Some(conf_str) = options.get("store.rocksdb.stats.dump.period.sec") { + ret.set_stats_dump_period_sec(conf_str.parse().unwrap()); + } + if let Some(conf_str) = options.get("store.rocksdb.compaction.style") { + match conf_str.as_str() { + "universal" => ret.set_compaction_style(DBCompactionStyle::Universal), + "level" => ret.set_compaction_style(DBCompactionStyle::Level), + _ => panic!("invalid compaction_style config"), + } + } + if let Some(conf_str) = options.get("store.rocksdb.write.buffer.mb") { + let size_mb: usize = conf_str.parse().unwrap(); + let size_bytes = size_mb * 1024 * 1024; + ret.set_write_buffer_size(size_bytes); + } + if let Some(conf_str) = options.get("store.rocksdb.max.write.buffer.num") { + ret.set_max_write_buffer_number(conf_str.parse().unwrap()); + } + if let Some(conf_str) = options.get("store.rocksdb.level0.compaction.trigger") { + ret.set_level_zero_file_num_compaction_trigger(conf_str.parse().unwrap()); + } + if let Some(conf_str) = options.get("store.rocksdb.max.level.base.mb") { + let size_mb: u64 = conf_str.parse().unwrap(); + ret.set_max_bytes_for_level_base(size_mb * 1024 * 1024); + } + if let Some(conf_str) = options.get("store.rocksdb.background.jobs") { + let background_jobs = conf_str.parse().unwrap(); + ret.set_max_background_jobs(background_jobs); + } ret }