Skip to content

Commit

Permalink
Local bench groot (#1326)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianliplus committed Feb 28, 2022
1 parent 953cc3e commit e3f055e
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 0 deletions.
3 changes: 3 additions & 0 deletions interactive_engine/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ rev = "082dfcfbb4b51e61d1c01672951b84cb464601e2"

[build-dependencies]
protoc-grpcio = "0.3.0"

[[bin]]
name = "write_bench"
98 changes: 98 additions & 0 deletions interactive_engine/executor/store/src/bin/write_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use maxgraph_common::util::{fs, Timer};
use maxgraph_store::db::api::{GraphConfigBuilder, TypeDefBuilder, ValueType, Value};
use maxgraph_store::db::graph::store::GraphStore;
use maxgraph_store::db::api::multi_version_graph::MultiVersionGraph;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
use std::env;
use std::sync::atomic::{AtomicU64, Ordering, AtomicI64};
use std::sync::Arc;
use std::time::Duration;

fn main() {
let args: Vec<String> = env::args().collect();

let write_buffer_mb = &args[1];
let compaction_style = &args[2];
let max_write_buffer_num = &args[3];
let level_zero_compaction_trigger = &args[4];
let max_level_base_mb = &args[5];
let background_job = &args[6];
let thread_count = &args[7];

println!("write_buffer_mb {}, compaction_style {}, background_job {}, thread_count {}", write_buffer_mb, compaction_style, background_job, thread_count);
let path = format!("write_bench_data_dir");
fs::rmr(&path).unwrap();
let mut builder = GraphConfigBuilder::new();
builder.set_storage_engine("rocksdb");
builder.add_storage_option("store.rocksdb.compression.type", "none");
builder.add_storage_option("store.rocksdb.stats.dump.period.sec", "60");
builder.add_storage_option("store.rocksdb.write.buffer.mb", write_buffer_mb);
builder.add_storage_option("store.rocksdb.compaction.style", compaction_style);
builder.add_storage_option("store.rocksdb.background.jobs", background_job);
builder.add_storage_option("store.rocksdb.max.write.buffer.num", max_write_buffer_num);
builder.add_storage_option("store.rocksdb.level0.compaction.trigger", level_zero_compaction_trigger);
builder.add_storage_option("store.rocksdb.max.level.base.mb", max_level_base_mb);
let config = builder.build();
let store = Arc::new(GraphStore::open(&config, &path).unwrap());
println!("store opened.");
let mut type_def_builer = TypeDefBuilder::new();
type_def_builer.version(1);
type_def_builer.add_property(1, 1, "id".to_string(), ValueType::Long, None, true, "id".to_string());
type_def_builer.add_property(2, 2, "name".to_string(), ValueType::String, None, false, "name".to_string());
let type_def = type_def_builer.build();
let label_id = 1;
store.create_vertex_type(1, 1, label_id, &type_def, 1).unwrap();
println!("schema created");
let str_len = 100;
let timer = Timer::new();
let mut tmp_time = 0.0;
let mut tmp_count = 0;
let val = "c".repeat(str_len);
let mut handles = Vec::new();
let total_count = Arc::new(AtomicU64::new(0));
let snapshot_idx = Arc::new(AtomicI64::new(1));
for i in 0..thread_count.parse().unwrap() {
let task_id = i;
let counter = total_count.clone();
let val = val.clone();
let store = store.clone();
let snapshot_idx = snapshot_idx.clone();
println!("task {} starting", task_id);
let handle = std::thread::spawn(move || {
let mut idx = i * 100000000000 + 2;
loop {
let snapshot_id = snapshot_idx.load(Ordering::SeqCst);
let vertex_id = idx;
let mut properties = HashMap::new();
properties.insert(1, Value::long(i));
properties.insert(2, Value::string(&val));
let mut hasher = DefaultHasher::new();
vertex_id.hash(&mut hasher);
let hash_id = hasher.finish();
store.insert_overwrite_vertex(snapshot_id, hash_id as i64, label_id, &properties).unwrap();
idx += 1;
counter.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
println!("{}\t{}\t{}", "time(sec)", "speed(record/s)", "total");
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_secs(3));
let total_write = total_count.load(Ordering::Relaxed);
let write_count = total_write - tmp_count;
let total_time = timer.elasped_secs();
let t = total_time - tmp_time;
println!("{:.0}\t{:.2}\t{:.0}", total_time, write_count as f64 / t, total_write);
tmp_count = total_write;
tmp_time = total_time;
snapshot_idx.fetch_add(1, Ordering::SeqCst);
}
});
for handle in handles {
handle.join().unwrap();
}
}
42 changes: 42 additions & 0 deletions interactive_engine/executor/store/src/db/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use crate::db::api::*;
use super::{StorageIter, StorageRes, ExternalStorage, ExternalStorageBackup};
use crate::db::storage::{KvPair, RawBytes};
use rocksdb::{DBCompressionType, DBCompactionStyle};

pub struct RocksDB {
db: Arc<DB>,
Expand Down Expand Up @@ -201,6 +202,47 @@ fn init_options(options: &HashMap<String, String>) -> Options {
let mut ret = Options::default();
ret.create_if_missing(true);
// TODO: Add other customized db options.
if let Some(conf_str) = options.get("store.rocksdb.compression.type") {
match conf_str.as_str() {
"none" => ret.set_compression_type(DBCompressionType::None),
"snappy" => ret.set_compression_type(DBCompressionType::Snappy),
"zlib" => ret.set_compression_type(DBCompressionType::Zlib),
"bz2" => ret.set_compression_type(DBCompressionType::Bz2),
"lz4" => ret.set_compression_type(DBCompressionType::Lz4),
"lz4hc" => ret.set_compression_type(DBCompressionType::Lz4hc),
"zstd" => ret.set_compression_type(DBCompressionType::Zstd),
_ => panic!("invalid compression_type config"),
}
}
if let Some(conf_str) = options.get("store.rocksdb.stats.dump.period.sec") {
ret.set_stats_dump_period_sec(conf_str.parse().unwrap());
}
if let Some(conf_str) = options.get("store.rocksdb.compaction.style") {
match conf_str.as_str() {
"universal" => ret.set_compaction_style(DBCompactionStyle::Universal),
"level" => ret.set_compaction_style(DBCompactionStyle::Level),
_ => panic!("invalid compaction_style config"),
}
}
if let Some(conf_str) = options.get("store.rocksdb.write.buffer.mb") {
let size_mb: usize = conf_str.parse().unwrap();
let size_bytes = size_mb * 1024 * 1024;
ret.set_write_buffer_size(size_bytes);
}
if let Some(conf_str) = options.get("store.rocksdb.max.write.buffer.num") {
ret.set_max_write_buffer_number(conf_str.parse().unwrap());
}
if let Some(conf_str) = options.get("store.rocksdb.level0.compaction.trigger") {
ret.set_level_zero_file_num_compaction_trigger(conf_str.parse().unwrap());
}
if let Some(conf_str) = options.get("store.rocksdb.max.level.base.mb") {
let size_mb: u64 = conf_str.parse().unwrap();
ret.set_max_bytes_for_level_base(size_mb * 1024 * 1024);
}
if let Some(conf_str) = options.get("store.rocksdb.background.jobs") {
let background_jobs = conf_str.parse().unwrap();
ret.set_max_background_jobs(background_jobs);
}
ret
}

Expand Down

0 comments on commit e3f055e

Please sign in to comment.