Skip to content

Commit

Permalink
fix: panic in index log. under linux system open file in append mode,…
Browse files Browse the repository at this point in the history
… write_at(pos, data) also written to the end of the file [1307] (#1309)

* fix: generate duplicate series id

* fix: panic in index binlog; because under linux systen open file in append mode,write_at(pos, data) also written to the end of the file

* add index log read write unit test
  • Loading branch information
bartliu827 committed Jun 29, 2023
1 parent 631fa79 commit 2c589aa
Showing 1 changed file with 63 additions and 21 deletions.
84 changes: 63 additions & 21 deletions tskv/src/index/binlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const SEGMENT_FILE_MAGIC: [u8; 4] = [0x48, 0x49, 0x4e, 0x02];
const SEGMENT_FILE_MAX_SIZE: u64 = 64 * 1024 * 1024;
const BLOCK_HEADER_SIZE: usize = 16;

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct SeriesKeyBlock {
pub ts: i64,
pub series_id: u32,
Expand Down Expand Up @@ -137,28 +137,13 @@ pub struct BinlogWriter {

impl BinlogWriter {
pub async fn open(id: u64, path: impl AsRef<Path>) -> IndexResult<Self> {
let path = path.as_ref();

// Get file and check if new file
let mut new_file = false;
let file = if file_manager::try_exists(path) {
let f = file_manager::open_create_file(path)
.await
.map_err(|e| IndexError::FileErrors { msg: e.to_string() })?;
if f.is_empty() {
new_file = true;
}
f
} else {
new_file = true;
file_manager::create_file(path)
.await
.map_err(|e| IndexError::FileErrors { msg: e.to_string() })?
};
let file = file_manager::create_file(path.as_ref())
.await
.map_err(|e| IndexError::FileErrors { msg: e.to_string() })?;

let mut size = file.len();
if new_file {
size = 8;
if size < SEGMENT_FILE_HEADER_SIZE as u64 {
size = SEGMENT_FILE_HEADER_SIZE as u64;
BinlogWriter::write_header(&file, SEGMENT_FILE_HEADER_SIZE as u32).await?;
}

Expand Down Expand Up @@ -376,3 +361,60 @@ pub async fn repair_index_file(file_name: &str) -> IndexResult<()> {

Ok(())
}

mod test {

#[tokio::test]
async fn test_index_log_read_write() {
use std::path::PathBuf;

use crate::file_utils::make_index_binlog_file;
use crate::index::binlog::{BinlogReader, BinlogWriter, IndexBinlog, SeriesKeyBlock};

let dir = "/tmp/cnosdb/index_log_test";
let _ = std::fs::remove_dir_all(dir);

let path = PathBuf::from(dir);
let mut index = IndexBinlog::new(path.clone()).await.unwrap();

let block1 = SeriesKeyBlock {
ts: 10001,
series_id: 101,
data_len: 3,
data: "abc".into(),
};

let block2 = SeriesKeyBlock {
ts: 10002,
series_id: 102,
data_len: 3,
data: "efg".into(),
};

let block3 = SeriesKeyBlock {
ts: 10003,
series_id: 103,
data_len: 3,
data: "hij".into(),
};

index.write(&block1.encode()).await.unwrap();
index.write(&block2.encode()).await.unwrap();
index.write(&block3.encode()).await.unwrap();

let name = make_index_binlog_file(path, 1);
let tmp_file = BinlogWriter::open(1, name).await.unwrap();
let mut reader_file = BinlogReader::new(1, tmp_file.file.into()).await.unwrap();

assert_eq!(block1, reader_file.next_block().await.unwrap().unwrap());
assert_eq!(block2, reader_file.next_block().await.unwrap().unwrap());
assert_eq!(block3, reader_file.next_block().await.unwrap().unwrap());
assert_eq!(None, reader_file.next_block().await.unwrap());

index.advance_write_offset(0).await.unwrap();

assert_eq!(None, reader_file.next_block().await.unwrap());

let _ = std::fs::remove_dir_all(dir);
}
}

0 comments on commit 2c589aa

Please sign in to comment.