Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cache buffer for FileCursor #1927

Merged
merged 1 commit into from Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 43 additions & 30 deletions tskv/src/file_system/file/cursor.rs
@@ -1,56 +1,66 @@
use std::io::{Error, ErrorKind, IoSlice, Result, SeekFrom};
use std::ops::Deref;

use crate::file_system::file::async_file::AsyncFile;
use crate::file_system::file::IFile;

const BUFFER_SIZE: usize = 1024 * 1024;

pub struct FileCursor {
file: AsyncFile,
pos: u64,
buf: Vec<u8>,
}

impl FileCursor {
pub fn into_file(self) -> AsyncFile {
self.file
}

pub fn file_ref(&self) -> &AsyncFile {
&self.file
}

pub fn pos(&self) -> u64 {
self.pos
self.pos + self.buf.len() as u64
}

pub fn set_pos(&mut self, pos: u64) {
fn set_pos(&mut self, pos: u64) {
self.buf.clear();
self.pos = pos;
}

pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let read = self.file.read_at(self.pos, buf).await?;
self.seek(SeekFrom::Current(read.try_into().unwrap()))
.unwrap();
self.pos += read as u64;
Ok(read)
}

pub async fn write(&mut self, buf: &[u8]) -> Result<usize> {
let size = self.file.write_at(self.pos, buf).await?;
self.seek(SeekFrom::Current(buf.len().try_into().unwrap()))
.unwrap();
Ok(size)
self.buf.extend_from_slice(buf);
self.try_flush(BUFFER_SIZE).await?;
Ok(buf.len())
}

pub async fn write_vec<'a>(&mut self, bufs: &'a [IoSlice<'a>]) -> Result<usize> {
let sum = bufs.iter().fold(0, |acc, buf| acc + buf.len());
self.buf.reserve(sum);
bufs.iter().for_each(|buf| self.buf.extend_from_slice(buf));
self.try_flush(BUFFER_SIZE).await?;
Ok(sum)
}

pub async fn write_vec<'a>(&mut self, bufs: &'a mut [IoSlice<'a>]) -> Result<usize> {
let mut p = self.pos;
for buf in bufs {
p += self.write_at(p, buf.deref()).await? as u64;
pub async fn try_flush(&mut self, buffer_size: usize) -> Result<()> {
if self.buf.is_empty() || self.buf.len() < buffer_size {
return Ok(());
}
let pos = self.pos;
self.seek(SeekFrom::Start(p)).unwrap();
Ok((p - pos) as usize)
let size = self.file.write_at(self.pos, &self.buf).await?;
self.set_pos(self.pos + size as u64);
Ok(())
}

pub async fn sync_data(&mut self) -> Result<()> {
self.try_flush(0).await?;
self.file.sync_data().await
}

pub fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.try_flush(0).await?;
self.pos = match pos {
SeekFrom::Start(pos) => Some(pos),
SeekFrom::End(delta) => {
Expand All @@ -73,18 +83,21 @@ impl FileCursor {
.ok_or_else(|| Error::new(ErrorKind::InvalidInput, "underflow or overflow during seek"))?;
Ok(self.pos)
}
}

impl From<AsyncFile> for FileCursor {
fn from(file: AsyncFile) -> Self {
FileCursor { file, pos: 0 }
pub fn len(&self) -> u64 {
self.file.len()
}
pub fn is_empty(&self) -> bool {
self.file.is_empty()
}
}

impl Deref for FileCursor {
type Target = AsyncFile;

fn deref(&self) -> &Self::Target {
&self.file
impl From<AsyncFile> for FileCursor {
fn from(file: AsyncFile) -> Self {
FileCursor {
file,
pos: 0,
buf: Vec::with_capacity(BUFFER_SIZE),
}
}
roseboy-liu marked this conversation as resolved.
Show resolved Hide resolved
}
43 changes: 42 additions & 1 deletion tskv/src/file_system/file_manager.rs
Expand Up @@ -179,6 +179,7 @@ pub async fn open_create_file(path: impl AsRef<Path>) -> Result<AsyncFile> {

#[cfg(test)]
mod test {
use std::io::{IoSlice, SeekFrom};
use std::path::PathBuf;

use trace::info;
Expand Down Expand Up @@ -222,6 +223,24 @@ mod test {
assert!(open_file_ret_2.is_ok());
}

#[tokio::test]
async fn test_skip_write() {
let dir = "/tmp/test/file_manager/test_skip_write";
let _ = std::fs::remove_dir_all(dir);
let path = PathBuf::from(dir).join("test.txt");
let file = file_manager::create_file(&path).await.unwrap();
let data = [0, 1, 2, 3];
let len = file.write_at(0, &data).await.unwrap();
assert_eq!(data.len(), len);
let data: [u8; 4] = [4, 5, 6, 7];
let _len = file.write_at(8, &data).await.unwrap();
let mut buf = [0_u8; 12];
let size = file.read_at(0, &mut buf).await.unwrap();
let buf2 = [0, 1, 2, 3, 0, 0, 0, 0, 4, 5, 6, 7];
assert_eq!(buf2, buf);
assert_eq!(size, 12);
}

#[tokio::test]
async fn test_io_basic() {
let dir = "/tmp/test/file_manager/test_io_basic";
Expand Down Expand Up @@ -359,13 +378,35 @@ mod test {
let len = cursor.write(&[0, 1, 2, 3, 4]).await.unwrap();
assert_eq!(len, 5);
}
cursor.set_pos(5);
cursor.seek(SeekFrom::Start(0)).await.unwrap();
let mut buf = [0_u8; 8];
let len = cursor.read(&mut buf[0..5]).await.unwrap();
assert_eq!(len, 5);
assert_eq!(buf, [0, 1, 2, 3, 4, 0, 0, 0]);
}

#[tokio::test]
async fn test_cursor2() {
let dir = "/tmp/test/file_manager/test_cursor2";
let _ = std::fs::remove_dir_all(dir);
let path = PathBuf::from(dir).join("test.txt");

let file = file_manager::create_file(&path).await.unwrap();
let mut cursor: FileCursor = file.into();
let ios = [
IoSlice::new(&[0, 1, 2, 3, 4]),
IoSlice::new(&[5, 6, 7, 8, 9]),
IoSlice::new(&[10, 11, 12, 13, 14]),
];
cursor.write_vec(&ios).await.unwrap();
cursor.try_flush(0).await.unwrap();
cursor.seek(SeekFrom::Start(0)).await.unwrap();
let mut buf = [0_u8; 16];
let len = cursor.read(&mut buf).await.unwrap();
assert_eq!(len, 15);
assert_eq!(buf, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 0]);
}

#[test]
#[cfg(feature = "io_uring")]
fn test_io_uring() {
Expand Down
8 changes: 4 additions & 4 deletions tskv/src/index/binlog.rs
Expand Up @@ -214,12 +214,12 @@ pub struct BinlogReader {

impl BinlogReader {
pub async fn new(id: u64, mut cursor: FileCursor) -> IndexResult<Self> {
let header_buf = BinlogReader::reade_header(&mut cursor).await?;
let header_buf = BinlogReader::read_header(&mut cursor).await?;
let offset = byte_utils::decode_be_u32(&header_buf[4..8]);

debug!("Read index binlog begin read offset: {}", offset);

cursor.set_pos(offset as u64);
cursor.seek(SeekFrom::Start(offset as u64)).await?;

Ok(Self {
id,
Expand All @@ -229,10 +229,10 @@ impl BinlogReader {
})
}

async fn reade_header(cursor: &mut FileCursor) -> IndexResult<[u8; SEGMENT_FILE_HEADER_SIZE]> {
async fn read_header(cursor: &mut FileCursor) -> IndexResult<[u8; SEGMENT_FILE_HEADER_SIZE]> {
let mut header_buf = [0_u8; SEGMENT_FILE_HEADER_SIZE];

cursor.seek(SeekFrom::Start(0))?;
cursor.seek(SeekFrom::Start(0)).await?;
let _read = cursor.read(&mut header_buf[..]).await?;

Ok(header_buf)
Expand Down
4 changes: 2 additions & 2 deletions tskv/src/kvcore.rs
Expand Up @@ -265,7 +265,7 @@ impl TsKv {
wal_manager.write(wal_task).await;
}

async fn on_tick_sync(wal_manager: &WalManager) {
async fn on_tick_sync(wal_manager: &mut WalManager) {
if let Err(e) = wal_manager.sync().await {
error!("Failed flushing WAL file: {:?}", e);
}
Expand Down Expand Up @@ -343,7 +343,7 @@ impl TsKv {
}
}
_ = sync_ticker.tick() => {
on_tick_sync(&wal_manager).await;
on_tick_sync(&mut wal_manager).await;
}
_ = check_total_size_ticker.tick() => {
on_tick_check_total_size(
Expand Down
31 changes: 22 additions & 9 deletions tskv/src/record_file/writer.rs
Expand Up @@ -10,7 +10,6 @@ use super::{
};
use crate::error::{self, Error, Result};
use crate::file_system::file::cursor::FileCursor;
use crate::file_system::file::IFile;
use crate::file_system::file_manager;

pub struct Writer {
Expand All @@ -31,7 +30,7 @@ impl Writer {
.write(&FILE_MAGIC_NUMBER.to_be_bytes())
.await
.context(error::IOSnafu)? as u64;
// Get none as footer data.
cursor.try_flush(0).await?;
None
} else {
let footer_data = match Reader::read_footer(&path).await {
Expand All @@ -52,6 +51,7 @@ impl Writer {
// TODO: truncate this file using seek_pos_end.
cursor
.seek(SeekFrom::End(-(seek_pos_end as i64)))
.await
.context(error::IOSnafu)?;

footer_data
Expand Down Expand Up @@ -109,23 +109,27 @@ impl Writer {
// Write record header and record data.
let written_size =
self.cursor
.write_vec(&mut write_buf)
.write_vec(&write_buf)
.await
.map_err(|e| Error::WriteFile {
path: self.path.clone(),
source: e,
})?;
self.file_size += written_size as u64;
self.cursor.try_flush(0).await?;
Ok(written_size)
}

pub async fn write_footer(&mut self, mut footer: [u8; FILE_FOOTER_LEN]) -> Result<usize> {
self.sync().await?;

let pos = self.cursor.pos();
// Get file crc
let mut buf = vec![0_u8; file_crc_source_len(self.file_size(), 0_usize)];
self.cursor
.read_at(FILE_MAGIC_NUMBER_LEN as u64, &mut buf)
.seek(SeekFrom::Start(FILE_MAGIC_NUMBER_LEN as u64))
.await?;
self.cursor
.read(&mut buf)
.await
.map_err(|e| Error::ReadFile {
path: self.path.clone(),
Expand All @@ -137,20 +141,30 @@ impl Writer {
footer[4..8].copy_from_slice(&crc.to_be_bytes());
self.footer = Some(footer);

self.cursor
self.cursor.seek(SeekFrom::Start(pos)).await?;
let res = self
.cursor
.write(&footer)
.await
.map_err(|e| Error::WriteFile {
path: self.path.clone(),
source: e,
})
})?;
self.cursor
.try_flush(0)
.await
.map_err(|e| Error::WriteFile {
path: self.path.clone(),
source: e,
})?;
Ok(res)
}

pub fn footer(&self) -> Option<[u8; FILE_FOOTER_LEN]> {
self.footer
}

pub async fn sync(&self) -> Result<()> {
pub async fn sync(&mut self) -> Result<()> {
self.cursor.sync_data().await.context(error::SyncFileSnafu)
}

Expand All @@ -169,7 +183,6 @@ impl Writer {

#[cfg(test)]
mod test {

use serial_test::serial;

use super::Writer;
Expand Down
1 change: 0 additions & 1 deletion tskv/src/tsm/writer.rs
Expand Up @@ -9,7 +9,6 @@ use utils::BloomFilter;
use super::EncodedDataBlock;
use crate::error::{self, Error, Result};
use crate::file_system::file::cursor::FileCursor;
use crate::file_system::file::IFile;
use crate::file_system::file_manager;
use crate::file_utils;
use crate::tsm::{
Expand Down
2 changes: 1 addition & 1 deletion tskv/src/wal/mod.rs
Expand Up @@ -445,7 +445,7 @@ impl WalManager {
Ok(wal_readers)
}

pub async fn sync(&self) -> Result<()> {
pub async fn sync(&mut self) -> Result<()> {
self.current_file.sync().await
}

Expand Down
6 changes: 3 additions & 3 deletions tskv/src/wal/writer.rs
Expand Up @@ -182,7 +182,7 @@ impl WalWriter {
Ok((seq, written_size))
}

pub async fn sync(&self) -> Result<()> {
pub async fn sync(&mut self) -> Result<()> {
self.inner.sync().await
}

Expand Down Expand Up @@ -241,9 +241,9 @@ mod test {
let wal_config = Arc::new(WalOptions::from(&global_config));

#[rustfmt::skip]
let entries = vec![
let entries = vec![
WalEntry::Write(WriteBlock::build(
1, "cnosdb", 3, Precision::NS, vec![1, 2, 3],
1, "cnosdb", 3, Precision::NS, vec![1, 2, 3],
)),
WalEntry::DeleteVnode(DeleteVnodeBlock::build(2, "cnosdb", "public", 6)),
WalEntry::DeleteTable(DeleteTableBlock::build(3, "cnosdb", "public", "table")),
Expand Down