diff --git a/dragonfly-client-storage/src/content_linux.rs b/dragonfly-client-storage/src/content_linux.rs index e537977b..5ada9430 100644 --- a/dragonfly-client-storage/src/content_linux.rs +++ b/dragonfly-client-storage/src/content_linux.rs @@ -361,6 +361,7 @@ impl Content { writer.flush().await.inspect_err(|err| { error!("flush {:?} failed: {}", task_path, err); })?; + debug!("finish to write piece to {:?}", task_path); if length != expected_length { return Err(Error::Unknown(format!( @@ -590,6 +591,7 @@ impl Content { writer.flush().await.inspect_err(|err| { error!("flush {:?} failed: {}", task_path, err); })?; + debug!("finish to write piece to {:?}", task_path); if length != expected_length { return Err(Error::Unknown(format!( diff --git a/dragonfly-client-storage/src/content_macos.rs b/dragonfly-client-storage/src/content_macos.rs index e537977b..5ada9430 100644 --- a/dragonfly-client-storage/src/content_macos.rs +++ b/dragonfly-client-storage/src/content_macos.rs @@ -361,6 +361,7 @@ impl Content { writer.flush().await.inspect_err(|err| { error!("flush {:?} failed: {}", task_path, err); })?; + debug!("finish to write piece to {:?}", task_path); if length != expected_length { return Err(Error::Unknown(format!( @@ -590,6 +591,7 @@ impl Content { writer.flush().await.inspect_err(|err| { error!("flush {:?} failed: {}", task_path, err); })?; + debug!("finish to write piece to {:?}", task_path); if length != expected_length { return Err(Error::Unknown(format!( diff --git a/dragonfly-client-storage/src/server/tcp.rs b/dragonfly-client-storage/src/server/tcp.rs index dc203e61..314b198c 100644 --- a/dragonfly-client-storage/src/server/tcp.rs +++ b/dragonfly-client-storage/src/server/tcp.rs @@ -520,6 +520,7 @@ impl TCPServerHandler { writer.flush().await.inspect_err(|err| { error!("flush failed: {}", err); })?; + debug!("finished writing stream to tcp writer"); Ok(()) } diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index 2cefc28a..9691dadf 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -66,6 +66,9 @@ impl RocksdbStorageEngine { /// DEFAULT_LOG_MAX_FILES is the default max log files for rocksdb. const DEFAULT_LOG_MAX_FILES: usize = 10; + /// DEFAULT_BYTES_PER_SYNC is the default bytes per sync for rocksdb. + const DEFAULT_BYTES_PER_SYNC: u64 = 2 * 1024 * 1024; + /// open opens a rocksdb storage engine with the given directory and column families. pub fn open(dir: &Path, log_dir: &PathBuf, cf_names: &[&str], keep: bool) -> Result { info!("initializing metadata directory: {:?} {:?}", dir, cf_names); @@ -76,7 +79,7 @@ impl RocksdbStorageEngine { // Optimize compression. options.set_compression_type(rocksdb::DBCompressionType::Lz4); - options.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); + options.set_bottommost_compression_type(rocksdb::DBCompressionType::Lz4); // Improved parallelism. options.increase_parallelism(num_cpus::get() as i32); @@ -85,6 +88,10 @@ impl RocksdbStorageEngine { Self::DEFAULT_MAX_BACKGROUND_JOBS, )); + // Set rocksdb sync options. + options.set_use_fsync(false); + options.set_bytes_per_sync(Self::DEFAULT_BYTES_PER_SYNC); + // Set rocksdb log options. options.set_db_log_dir(log_dir); options.set_log_level(rocksdb::LogLevel::Info); @@ -155,7 +162,10 @@ impl Operations for RocksdbStorageEngine { /// put puts the object by key. fn put(&self, key: &[u8], value: &O) -> Result<()> { let cf = cf_handle::(self)?; - self.put_cf(cf, key, value.serialized()?) + let mut options = rocksdb::WriteOptions::default(); + options.set_sync(false); + + self.put_cf_opt(cf, key, value.serialized()?, &options) .or_err(ErrorType::StorageError)?; Ok(()) }