Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Recovering WAL records will not add schema to tskv; stores drop_table & remove_tsfamily in WAL #1340

Merged
merged 4 commits into from Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions common/protos/src/models_helper.rs
Expand Up @@ -380,9 +380,11 @@ mod test {

pub fn create_random_points_include_delta<'a>(
fbb: &mut flatbuffers::FlatBufferBuilder<'a>,
database: &str,
table: &str,
num: usize,
) -> WIPOffset<Points<'a>> {
let db = fbb.create_vector("db".as_bytes());
let db = fbb.create_vector(database.as_bytes());
let mut tags_names: HashMap<&str, usize> = HashMap::new();
tags_names.insert("ta", 0);
tags_names.insert("tb", 1);
Expand Down Expand Up @@ -464,7 +466,7 @@ mod test {
let fb_schema = build_fb_schema_offset(fbb, &schema);

let point = fbb.create_vector(&points);
let tab = fbb.create_vector("table".as_bytes());
let tab = fbb.create_vector(table.as_bytes());

let mut table_builder = TableBuilder::new(fbb);

Expand All @@ -487,6 +489,7 @@ mod test {

pub fn create_big_random_points<'a>(
fbb: &mut flatbuffers::FlatBufferBuilder<'a>,
table: &str,
num: usize,
) -> WIPOffset<Points<'a>> {
let db = fbb.create_vector("db".as_bytes());
Expand Down Expand Up @@ -534,7 +537,7 @@ mod test {
let fb_schema = build_fb_schema_offset(fbb, &schema);

let point = fbb.create_vector(&points);
let tab = fbb.create_vector("table".as_bytes());
let tab = fbb.create_vector(table.as_bytes());

let mut table_builder = TableBuilder::new(fbb);

Expand Down
7 changes: 4 additions & 3 deletions common/trace/src/lib.rs
Expand Up @@ -5,6 +5,7 @@ pub mod span;
pub mod span_ctx;

use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -28,7 +29,7 @@ use tracing_subscriber::{filter, fmt, EnvFilter, Layer, Registry};

/// only use for unit test
/// parameter only use for first call
pub fn init_default_global_tracing(dir: &str, file_name: &str, level: &str) {
pub fn init_default_global_tracing(dir: impl AsRef<Path>, file_name: &str, level: &str) {
static START: Once = Once::new();

START.call_once(|| {
Expand Down Expand Up @@ -87,7 +88,7 @@ pub fn targets_filter(level: LevelFilter, defined_tokio_trace: bool) -> filter::
}

pub fn init_process_global_tracing(
log_path: &str,
log_path: impl AsRef<Path>,
log_level: &str,
log_file_prefix_name: &str,
tokio_trace: Option<&TokioTrace>,
Expand All @@ -107,7 +108,7 @@ pub fn init_process_global_tracing(
}

pub fn init_global_tracing(
log_path: &str,
log_path: impl AsRef<Path>,
log_level: &str,
log_file_prefix_name: &str,
tokio_trace: Option<&TokioTrace>,
Expand Down
7 changes: 6 additions & 1 deletion meta/src/bin/main.rs
Expand Up @@ -131,7 +131,12 @@ async fn detect_node_heartbeat(heartbeat_config: HeartBeatConfig, app: Data<Meta
interval.tick().await;

if let Ok(_leader) = app.raft.is_leader().await {
let opt_list = app.store.state_machine.write().await.children_data::<NodeMetrics>(&metrics_path);
let opt_list = app
.store
.state_machine
.write()
.await
.children_data::<NodeMetrics>(&metrics_path);

if let Ok(list) = opt_list {
let node_metrics_list: Vec<NodeMetrics> = list.into_values().collect();
Expand Down
2 changes: 1 addition & 1 deletion tskv/benches/kvcore_bench.rs
Expand Up @@ -69,7 +69,7 @@ fn big_write(c: &mut Criterion) {
for _i in 0..50 {
let _database = "db".to_string();
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let points = models_helper::create_big_random_points(&mut fbb, 10);
let points = models_helper::create_big_random_points(&mut fbb, "big_write", 10);
fbb.finish(points, None);
let points = fbb.finished_data().to_vec();

Expand Down
1 change: 1 addition & 0 deletions tskv/src/database.rs
Expand Up @@ -203,6 +203,7 @@ impl Database {
tf.read().await.close();
roseboy-liu marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO(zipper): If no ts_family recovered from summary, do not write summary.
let edits = vec![VersionEdit::new_del_vnode(tf_id)];
let (task_state_sender, _task_state_receiver) = oneshot::channel();
let task = SummaryTask::new(edits, None, None, task_state_sender);
Expand Down
11 changes: 0 additions & 11 deletions tskv/src/engine_mock.rs
Expand Up @@ -40,17 +40,6 @@ impl Engine for MockEngine {
Ok(WritePointsResponse { points_number: 0 })
}

async fn write_from_wal(
&self,
id: u32,
precision: Precision,
write_batch: WritePointsRequest,
seq: u64,
) -> Result<()> {
debug!("write point");
Ok(())
}

async fn remove_tsfamily(&self, tenant: &str, database: &str, id: u32) -> Result<()> {
Ok(())
}
Expand Down
47 changes: 38 additions & 9 deletions tskv/src/error.rs
Expand Up @@ -126,15 +126,23 @@ pub enum Error {
message: String,
},

#[snafu(display("fails to send to channel"))]
Send,
/// Failed to send someting to a channel
#[snafu(display("{source}"))]
ChannelSend {
source: ChannelSendError,
},

#[snafu(display("fails to receive from channel"))]
Receive {
source: tokio::sync::oneshot::error::RecvError,
/// Failed to receive something from a channel
#[snafu(display("{source}"))]
ChannelReceive {
source: ChannelReceiveError,
},

#[snafu(display("wal truncated"))]
/// WAL file is truncated, it's because CnosDB didn't shutdown properly.
///
/// This error is handled by WAL module:
/// just stop the current WAL file reading, go to the next WAL file.
#[snafu(display("Internal handled: WAL truncated"))]
WalTruncated,

#[snafu(display("read/write record file block: {}", reason))]
Expand Down Expand Up @@ -193,6 +201,12 @@ pub enum Error {
Points {
source: PointsError,
},

#[snafu(display("non-UTF-8 string '{message}': {source}"))]
InvalidUtf8 {
message: String,
source: std::str::Utf8Error,
},
}

impl From<PointsError> for Error {
Expand Down Expand Up @@ -296,12 +310,27 @@ impl From<Status> for Error {
}
}

#[derive(Snafu, Debug)]
pub enum ChannelSendError {
#[snafu(display("Failed to send a WAL task"))]
WalTask,
}

#[derive(Snafu, Debug)]
pub enum ChannelReceiveError {
#[snafu(display("Failed to receive write WAL result: {source}"))]
WriteWalResult {
source: tokio::sync::oneshot::error::RecvError,
},
}

#[test]
fn test_mod_code() {
let e = Error::Schema {
source: SchemaError::ColumnAlreadyExists {
name: "".to_string(),
source: SchemaError::TableNotFound {
database: String::new(),
table: String::new(),
},
};
assert!(e.code().starts_with("02"));
assert_eq!(e.code(), "020004");
}
24 changes: 16 additions & 8 deletions tskv/src/file_utils.rs
Expand Up @@ -16,28 +16,32 @@ lazy_static! {
static ref INDEX_BINLOG_FILE_NAME_PATTERN: Regex = Regex::new(r"_\d{6}\.binlog").unwrap();
}

// Summary file.
/// Make a path for summary file by it's directory and id.
pub fn make_summary_file(path: impl AsRef<Path>, number: u64) -> PathBuf {
let p = format!("summary-{:06}", number);
path.as_ref().join(p)
}

/// Make a path for summary temporary file by it's directory.
pub fn make_summary_file_tmp(path: impl AsRef<Path>) -> PathBuf {
let p = "summary.tmp".to_string();
path.as_ref().join(p)
}

/// Check a summary file's name.
pub fn check_summary_file_name(file_name: &str) -> bool {
SUMMARY_FILE_NAME_PATTERN.is_match(file_name)
}

/// Rename a file, from old path to new path.
pub async fn rename(old_name: impl AsRef<Path>, new_name: impl AsRef<Path>) -> Result<()> {
fs::create_dir_all(new_name.as_ref().parent().unwrap()).await?;
fs::rename(old_name, new_name)
.await
.map_err(|e| Error::IO { source: e })
}

/// Get id from a summary file's name.
pub fn get_summary_file_id(file_name: &str) -> Result<u64> {
if !check_summary_file_name(file_name) {
return Err(Error::InvalidFileName {
Expand All @@ -54,17 +58,18 @@ pub fn get_summary_file_id(file_name: &str) -> Result<u64> {
})
}

// index binlog files.

/// Make a path for index binlog file by it's directory and id.
pub fn make_index_binlog_file(path: impl AsRef<Path>, sequence: u64) -> PathBuf {
let p = format!("_{:06}.binlog", sequence);
path.as_ref().join(p)
}

/// Check a index binlog file's name.
pub fn check_index_binlog_file_name(file_name: &str) -> bool {
INDEX_BINLOG_FILE_NAME_PATTERN.is_match(file_name)
}

/// Get id from a index binlog file's name.
pub fn get_index_binlog_file_id(file_name: &str) -> Result<u64> {
if !check_index_binlog_file_name(file_name) {
return Err(Error::InvalidFileName {
Expand All @@ -81,16 +86,18 @@ pub fn get_index_binlog_file_id(file_name: &str) -> Result<u64> {
})
}

// WAL (write ahead log) file.
/// Make a path for WAL (write ahead log) file by it's directory and id.
pub fn make_wal_file(path: impl AsRef<Path>, sequence: u64) -> PathBuf {
let p = format!("_{:06}.wal", sequence);
path.as_ref().join(p)
}

/// Check a WAL file's name.
pub fn check_wal_file_name(file_name: &str) -> bool {
WAL_FILE_NAME_PATTERN.is_match(file_name)
}

/// Get id from a WAL file's name.
pub fn get_wal_file_id(file_name: &str) -> Result<u64> {
if !check_wal_file_name(file_name) {
return Err(Error::InvalidFileName {
Expand All @@ -107,12 +114,13 @@ pub fn get_wal_file_id(file_name: &str) -> Result<u64> {
})
}

// TSM file
/// Make a path for TSM file by it's directory and id.
pub fn make_tsm_file_name(path: impl AsRef<Path>, sequence: u64) -> PathBuf {
let p = format!("_{:06}.tsm", sequence);
path.as_ref().join(p)
}

/// Get id from a TSM file's name.
pub fn get_tsm_file_id_by_path(tsm_path: impl AsRef<Path>) -> Result<u64> {
let path = tsm_path.as_ref();
let file_name = path
Expand All @@ -137,19 +145,19 @@ pub fn get_tsm_file_id_by_path(tsm_path: impl AsRef<Path>) -> Result<u64> {
})
}

// TSM tombstone file
/// Make a path for TSM tombstone file by it's directory and id.
pub fn make_tsm_tombstone_file_name(path: impl AsRef<Path>, sequence: u64) -> PathBuf {
let p = format!("_{:06}.tombstone", sequence);
path.as_ref().join(p)
}

// delta file
/// Make a path for TSM delta file by it's directory and id.
pub fn make_delta_file_name(path: impl AsRef<Path>, sequence: u64) -> PathBuf {
let p = format!("_{:06}.delta", sequence);
path.as_ref().join(p)
}

// Common
/// Get the file's path that has the maximum id of files in a directory.
pub fn get_max_sequence_file_name<F>(
dir: impl AsRef<Path>,
get_sequence: F,
Expand Down