Skip to content

Commit

Permalink
add cache buffer for FileCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
roseboy-liu committed Jan 15, 2024
1 parent 6765f19 commit 4104fbe
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 51 deletions.
73 changes: 43 additions & 30 deletions tskv/src/file_system/file/cursor.rs
@@ -1,56 +1,66 @@
use std::io::{Error, ErrorKind, IoSlice, Result, SeekFrom};
use std::ops::Deref;

use crate::file_system::file::async_file::AsyncFile;
use crate::file_system::file::IFile;

const BUFFER_SIZE: usize = 1024 * 1024;

pub struct FileCursor {
file: AsyncFile,
pos: u64,
buf: Vec<u8>,
}

impl FileCursor {
pub fn into_file(self) -> AsyncFile {
self.file
}

pub fn file_ref(&self) -> &AsyncFile {
&self.file
}

pub fn pos(&self) -> u64 {
self.pos
self.pos + self.buf.len() as u64
}

pub fn set_pos(&mut self, pos: u64) {
fn set_pos(&mut self, pos: u64) {
self.buf.clear();
self.pos = pos;
}

pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let read = self.file.read_at(self.pos, buf).await?;
self.seek(SeekFrom::Current(read.try_into().unwrap()))
.unwrap();
self.pos += read as u64;
Ok(read)
}

pub async fn write(&mut self, buf: &[u8]) -> Result<usize> {
let size = self.file.write_at(self.pos, buf).await?;
self.seek(SeekFrom::Current(buf.len().try_into().unwrap()))
.unwrap();
Ok(size)
self.buf.extend_from_slice(buf);
self.try_flush(BUFFER_SIZE).await?;
Ok(buf.len())
}

pub async fn write_vec<'a>(&mut self, bufs: &'a [IoSlice<'a>]) -> Result<usize> {
let sum = bufs.iter().fold(0, |acc, buf| acc + buf.len());
self.buf.reserve(sum);
bufs.iter().for_each(|buf| self.buf.extend_from_slice(buf));
self.try_flush(BUFFER_SIZE).await?;
Ok(sum)
}

pub async fn write_vec<'a>(&mut self, bufs: &'a mut [IoSlice<'a>]) -> Result<usize> {
let mut p = self.pos;
for buf in bufs {
p += self.write_at(p, buf.deref()).await? as u64;
pub async fn try_flush(&mut self, buffer_size: usize) -> Result<()> {
if self.buf.is_empty() || self.buf.len() < buffer_size {
return Ok(());
}
let pos = self.pos;
self.seek(SeekFrom::Start(p)).unwrap();
Ok((p - pos) as usize)
let size = self.file.write_at(self.pos, &self.buf).await?;
self.set_pos(self.pos + size as u64);
Ok(())
}

pub async fn sync_data(&mut self) -> Result<()> {
self.try_flush(0).await?;
self.file.sync_data().await
}

pub fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.try_flush(0).await?;
self.pos = match pos {
SeekFrom::Start(pos) => Some(pos),
SeekFrom::End(delta) => {
Expand All @@ -73,18 +83,21 @@ impl FileCursor {
.ok_or_else(|| Error::new(ErrorKind::InvalidInput, "underflow or overflow during seek"))?;
Ok(self.pos)
}
}

impl From<AsyncFile> for FileCursor {
fn from(file: AsyncFile) -> Self {
FileCursor { file, pos: 0 }
pub fn len(&self) -> u64 {
self.file.len()
}
pub fn is_empty(&self) -> bool {
self.file.is_empty()
}
}

impl Deref for FileCursor {
type Target = AsyncFile;

fn deref(&self) -> &Self::Target {
&self.file
impl From<AsyncFile> for FileCursor {
fn from(file: AsyncFile) -> Self {
FileCursor {
file,
pos: 0,
buf: Vec::with_capacity(BUFFER_SIZE),
}
}
}
43 changes: 42 additions & 1 deletion tskv/src/file_system/file_manager.rs
Expand Up @@ -179,6 +179,7 @@ pub async fn open_create_file(path: impl AsRef<Path>) -> Result<AsyncFile> {

#[cfg(test)]
mod test {
use std::io::{IoSlice, SeekFrom};
use std::path::PathBuf;

use trace::info;
Expand Down Expand Up @@ -222,6 +223,24 @@ mod test {
assert!(open_file_ret_2.is_ok());
}

#[tokio::test]
async fn test_skip_write() {
let dir = "/tmp/test/file_manager/test_skip_write";
let _ = std::fs::remove_dir_all(dir);
let path = PathBuf::from(dir).join("test.txt");
let file = file_manager::create_file(&path).await.unwrap();
let data = [0, 1, 2, 3];
let len = file.write_at(0, &data).await.unwrap();
assert_eq!(data.len(), len);
let data: [u8; 4] = [4, 5, 6, 7];
let _len = file.write_at(8, &data).await.unwrap();
let mut buf = [0_u8; 12];
let size = file.read_at(0, &mut buf).await.unwrap();
let buf2 = [0, 1, 2, 3, 0, 0, 0, 0, 4, 5, 6, 7];
assert_eq!(buf2, buf);
assert_eq!(size, 12);
}

#[tokio::test]
async fn test_io_basic() {
let dir = "/tmp/test/file_manager/test_io_basic";
Expand Down Expand Up @@ -359,13 +378,35 @@ mod test {
let len = cursor.write(&[0, 1, 2, 3, 4]).await.unwrap();
assert_eq!(len, 5);
}
cursor.set_pos(5);
cursor.seek(SeekFrom::Start(0)).await.unwrap();
let mut buf = [0_u8; 8];
let len = cursor.read(&mut buf[0..5]).await.unwrap();
assert_eq!(len, 5);
assert_eq!(buf, [0, 1, 2, 3, 4, 0, 0, 0]);
}

#[tokio::test]
async fn test_cursor2() {
let dir = "/tmp/test/file_manager/test_cursor2";
let _ = std::fs::remove_dir_all(dir);
let path = PathBuf::from(dir).join("test.txt");

let file = file_manager::create_file(&path).await.unwrap();
let mut cursor: FileCursor = file.into();
let ios = [
IoSlice::new(&[0, 1, 2, 3, 4]),
IoSlice::new(&[5, 6, 7, 8, 9]),
IoSlice::new(&[10, 11, 12, 13, 14]),
];
cursor.write_vec(&ios).await.unwrap();
cursor.try_flush(0).await.unwrap();
cursor.seek(SeekFrom::Start(0)).await.unwrap();
let mut buf = [0_u8; 16];
let len = cursor.read(&mut buf).await.unwrap();
assert_eq!(len, 15);
assert_eq!(buf, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 0]);
}

#[test]
#[cfg(feature = "io_uring")]
fn test_io_uring() {
Expand Down
8 changes: 4 additions & 4 deletions tskv/src/index/binlog.rs
Expand Up @@ -214,12 +214,12 @@ pub struct BinlogReader {

impl BinlogReader {
pub async fn new(id: u64, mut cursor: FileCursor) -> IndexResult<Self> {
let header_buf = BinlogReader::reade_header(&mut cursor).await?;
let header_buf = BinlogReader::read_header(&mut cursor).await?;
let offset = byte_utils::decode_be_u32(&header_buf[4..8]);

debug!("Read index binlog begin read offset: {}", offset);

cursor.set_pos(offset as u64);
cursor.seek(SeekFrom::Start(offset as u64)).await?;

Ok(Self {
id,
Expand All @@ -229,10 +229,10 @@ impl BinlogReader {
})
}

async fn reade_header(cursor: &mut FileCursor) -> IndexResult<[u8; SEGMENT_FILE_HEADER_SIZE]> {
async fn read_header(cursor: &mut FileCursor) -> IndexResult<[u8; SEGMENT_FILE_HEADER_SIZE]> {
let mut header_buf = [0_u8; SEGMENT_FILE_HEADER_SIZE];

cursor.seek(SeekFrom::Start(0))?;
cursor.seek(SeekFrom::Start(0)).await?;
let _read = cursor.read(&mut header_buf[..]).await?;

Ok(header_buf)
Expand Down
4 changes: 2 additions & 2 deletions tskv/src/kvcore.rs
Expand Up @@ -265,7 +265,7 @@ impl TsKv {
wal_manager.write(wal_task).await;
}

async fn on_tick_sync(wal_manager: &WalManager) {
async fn on_tick_sync(wal_manager: &mut WalManager) {
if let Err(e) = wal_manager.sync().await {
error!("Failed flushing WAL file: {:?}", e);
}
Expand Down Expand Up @@ -343,7 +343,7 @@ impl TsKv {
}
}
_ = sync_ticker.tick() => {
on_tick_sync(&wal_manager).await;
on_tick_sync(&mut wal_manager).await;
}
_ = check_total_size_ticker.tick() => {
on_tick_check_total_size(
Expand Down
31 changes: 22 additions & 9 deletions tskv/src/record_file/writer.rs
Expand Up @@ -10,7 +10,6 @@ use super::{
};
use crate::error::{self, Error, Result};
use crate::file_system::file::cursor::FileCursor;
use crate::file_system::file::IFile;
use crate::file_system::file_manager;

pub struct Writer {
Expand All @@ -31,7 +30,7 @@ impl Writer {
.write(&FILE_MAGIC_NUMBER.to_be_bytes())
.await
.context(error::IOSnafu)? as u64;
// Get none as footer data.
cursor.try_flush(0).await?;
None
} else {
let footer_data = match Reader::read_footer(&path).await {
Expand All @@ -52,6 +51,7 @@ impl Writer {
// TODO: truncate this file using seek_pos_end.
cursor
.seek(SeekFrom::End(-(seek_pos_end as i64)))
.await
.context(error::IOSnafu)?;

footer_data
Expand Down Expand Up @@ -109,23 +109,27 @@ impl Writer {
// Write record header and record data.
let written_size =
self.cursor
.write_vec(&mut write_buf)
.write_vec(&write_buf)
.await
.map_err(|e| Error::WriteFile {
path: self.path.clone(),
source: e,
})?;
self.file_size += written_size as u64;
self.cursor.try_flush(0).await?;
Ok(written_size)
}

pub async fn write_footer(&mut self, mut footer: [u8; FILE_FOOTER_LEN]) -> Result<usize> {
self.sync().await?;

let pos = self.cursor.pos();
// Get file crc
let mut buf = vec![0_u8; file_crc_source_len(self.file_size(), 0_usize)];
self.cursor
.read_at(FILE_MAGIC_NUMBER_LEN as u64, &mut buf)
.seek(SeekFrom::Start(FILE_MAGIC_NUMBER_LEN as u64))
.await?;
self.cursor
.read(&mut buf)
.await
.map_err(|e| Error::ReadFile {
path: self.path.clone(),
Expand All @@ -137,20 +141,30 @@ impl Writer {
footer[4..8].copy_from_slice(&crc.to_be_bytes());
self.footer = Some(footer);

self.cursor
self.cursor.seek(SeekFrom::Start(pos)).await?;
let res = self
.cursor
.write(&footer)
.await
.map_err(|e| Error::WriteFile {
path: self.path.clone(),
source: e,
})
})?;
self.cursor
.try_flush(0)
.await
.map_err(|e| Error::WriteFile {
path: self.path.clone(),
source: e,
})?;
Ok(res)
}

pub fn footer(&self) -> Option<[u8; FILE_FOOTER_LEN]> {
self.footer
}

pub async fn sync(&self) -> Result<()> {
pub async fn sync(&mut self) -> Result<()> {
self.cursor.sync_data().await.context(error::SyncFileSnafu)
}

Expand All @@ -169,7 +183,6 @@ impl Writer {

#[cfg(test)]
mod test {

use serial_test::serial;

use super::Writer;
Expand Down
1 change: 0 additions & 1 deletion tskv/src/tsm/writer.rs
Expand Up @@ -9,7 +9,6 @@ use utils::BloomFilter;
use super::EncodedDataBlock;
use crate::error::{self, Error, Result};
use crate::file_system::file::cursor::FileCursor;
use crate::file_system::file::IFile;
use crate::file_system::file_manager;
use crate::file_utils;
use crate::tsm::{
Expand Down
2 changes: 1 addition & 1 deletion tskv/src/wal/mod.rs
Expand Up @@ -445,7 +445,7 @@ impl WalManager {
Ok(wal_readers)
}

pub async fn sync(&self) -> Result<()> {
pub async fn sync(&mut self) -> Result<()> {
self.current_file.sync().await
}

Expand Down
6 changes: 3 additions & 3 deletions tskv/src/wal/writer.rs
Expand Up @@ -182,7 +182,7 @@ impl WalWriter {
Ok((seq, written_size))
}

pub async fn sync(&self) -> Result<()> {
pub async fn sync(&mut self) -> Result<()> {
self.inner.sync().await
}

Expand Down Expand Up @@ -241,9 +241,9 @@ mod test {
let wal_config = Arc::new(WalOptions::from(&global_config));

#[rustfmt::skip]
let entries = vec![
let entries = vec![
WalEntry::Write(WriteBlock::build(
1, "cnosdb", 3, Precision::NS, vec![1, 2, 3],
1, "cnosdb", 3, Precision::NS, vec![1, 2, 3],
)),
WalEntry::DeleteVnode(DeleteVnodeBlock::build(2, "cnosdb", "public", 6)),
WalEntry::DeleteTable(DeleteTableBlock::build(3, "cnosdb", "public", "table")),
Expand Down

0 comments on commit 4104fbe

Please sign in to comment.