Skip to content

Commit

Permalink
feat(wal): improve recover wal
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Jul 14, 2023
1 parent c5c84a7 commit 825dd4e
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 279 deletions.
4 changes: 1 addition & 3 deletions tskv/src/database.rs
Expand Up @@ -201,11 +201,9 @@ impl Database {
pub async fn del_tsfamily(&mut self, tf_id: u32, summary_task_sender: Sender<SummaryTask>) {
if let Some(tf) = self.ts_families.remove(&tf_id) {
tf.read().await.close();
} else {
// If no ts_family recovered from summary, do not write summary.
return;
}

// TODO(zipper): If no ts_family recovered from summary, do not write summary.
let edits = vec![VersionEdit::new_del_vnode(tf_id)];
let (task_state_sender, _task_state_receiver) = oneshot::channel();
let task = SummaryTask::new(edits, None, None, task_state_sender);
Expand Down
24 changes: 0 additions & 24 deletions tskv/src/engine_mock.rs
Expand Up @@ -40,30 +40,10 @@ impl Engine for MockEngine {
Ok(WritePointsResponse { points_number: 0 })
}

async fn write_from_wal(
&self,
id: u32,
precision: Precision,
write_batch: WritePointsRequest,
seq: u64,
) -> Result<()> {
debug!("write point");
Ok(())
}

async fn remove_tsfamily(&self, tenant: &str, database: &str, id: u32) -> Result<()> {
Ok(())
}

async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()> {
Ok(())
}

async fn flush_tsfamily(&self, tenant: &str, database: &str, id: u32) -> Result<()> {
Ok(())
}
Expand Down Expand Up @@ -98,10 +78,6 @@ impl Engine for MockEngine {
Ok(())
}

async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()> {
Ok(())
}

async fn delete_series(
&self,
tenant: &str,
Expand Down
6 changes: 6 additions & 0 deletions tskv/src/error.rs
Expand Up @@ -201,6 +201,12 @@ pub enum Error {
Points {
source: PointsError,
},

#[snafu(display("non-UTF-8 string '{message}': {source}"))]
InvalidUtf8 {
message: String,
source: std::str::Utf8Error,
},
}

impl From<PointsError> for Error {
Expand Down
287 changes: 198 additions & 89 deletions tskv/src/kvcore.rs
Expand Up @@ -36,7 +36,7 @@ use crate::summary::{Summary, SummaryProcessor, SummaryTask, VersionEdit};
use crate::tseries_family::{SuperVersion, TseriesFamily};
use crate::tsm::codec::get_str_codec;
use crate::version_set::VersionSet;
use crate::wal::{WalManager, WalTask};
use crate::wal::{self, WalDecoder, WalEntry, WalManager, WalTask};
use crate::{database, file_utils, tenant_name_from_request, Engine, Error, TseriesFamilyId};

// TODO: A small summay channel capacity can cause a block
Expand Down Expand Up @@ -189,7 +189,73 @@ impl TsKv {
.await
.unwrap();

wal_manager.recover(self).await.unwrap();
let vnode_last_seq_map = self.global_seq_ctx.cloned();
let min_log_seq = self.global_seq_ctx.min_seq();
let mut decoder = WalDecoder::new();

let wal_readers = wal_manager.readers_to_recover().await.unwrap();
for mut reader in wal_readers {
trace::info!(
"Recover: reading wal '{}' for seq {} to {}",
reader.path().display(),
reader.min_sequence(),
reader.max_sequence(),
);
if reader.is_empty() {
continue;
}

loop {
match reader.next_wal_entry().await {
Ok(Some(wal_entry_blk)) => {
let seq = wal_entry_blk.seq;
if seq < min_log_seq {
continue;
}
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
let vnode_id = blk.vnode_id();
if let Some(tsf_last_seq) = vnode_last_seq_map.get(&vnode_id) {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
if *tsf_last_seq >= seq {
continue;
}
}

self.write_from_wal(vnode_id, seq, &blk, &mut decoder)
.await
.unwrap();
}

WalEntry::DeleteVnode(blk) => {
if let Err(e) = self.remove_tsfamily_from_wal(&blk).await {
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode: {e}");
}
}
WalEntry::DeleteTable(blk) => {
if let Err(e) = self.drop_table_from_wal(&blk).await {
// Ignore delete table error.
trace::error!("Recover: failed to delete table: {e}");
}
}
_ => {}
}
}
Ok(None) | Err(Error::WalTruncated) => {
break;
}
Err(e) => {
panic!(
"Failed to recover from {}: {:?}",
reader.path().display(),
e
);
}
}
}
}

wal_manager
}
Expand Down Expand Up @@ -474,6 +540,136 @@ impl TsKv {

Ok(seq)
}

/// Tskv write the gRPC message `WritePointsRequest`(which contains
/// the tenant, user, database, some tables, and each table has some rows)
/// that from a WAL into a storage unit managed by engine.
///
/// - vnode_id - ID of the storage unit(caches and files).
/// - precision - The timestamp precision of table rows.
/// - seq - The WAL sequence of the stored gRPC message.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL, and
/// would not create any schema, if database of vnode does not exist, record
/// will be ignored.
async fn write_from_wal(
&self,
vnode_id: TseriesFamilyId,
seq: u64,
block: &wal::WriteBlock,
block_decoder: &mut WalDecoder,
) -> Result<()> {
let tenant = {
let tenant = block.tenant_utf8()?;
if tenant.is_empty() {
models::schema::DEFAULT_CATALOG
} else {
tenant
}
};
let precision = block.precision();
let points = match block_decoder.decode(block.points())? {
Some(p) => p,
None => return Ok(()),
};
let fb_points = flatbuffers::root::<fb_models::Points>(&points)
.context(error::InvalidFlatbufferSnafu)?;

let db_name = get_db_from_fb_points(&fb_points)?;
// If database not exists, skip this record.
let db = match self.get_db(tenant, &db_name).await {
Ok(db) => db,
Err(_) => return Ok(()),
};
let tsf = match db.read().await.get_tsfamily(vnode_id) {
Some(tsf) => tsf,
None => return Ok(()),
};

let ts_index = self
.get_ts_index_or_else_create(db.clone(), vnode_id)
.await?;

// Write data assuming schemas are created (strict mode).
let write_group = db
.read()
.await
.build_write_group_strict_mode(
&db_name,
precision,
fb_points.tables().ok_or(Error::CommonError {
reason: "points missing table".to_string(),
})?,
ts_index,
)
.await?;
tsf.read().await.put_points(seq, write_group)?;

Ok(())
}

/// Delete all data of a table.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn drop_table_from_wal(&self, block: &wal::DeleteTableBlock) -> Result<()> {
let tenant = block.tenant_utf8()?;
let database = block.database_utf8()?;
let table = block.table_utf8()?;
trace::info!(
"Recover: delete table, tenant: {}, database: {}, table: {}",
&tenant,
&database,
&table
);
let version_set = self.version_set.clone();
let database = database.to_string();
let table = table.to_string();
let tenant = tenant.to_string();

database::delete_table_async(tenant, database, table, version_set).await
}

/// Remove the storage unit(caches and files) managed by TsKv,
/// then remove directory of the storage unit.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn remove_tsfamily_from_wal(&self, block: &wal::DeleteVnodeBlock) -> Result<()> {
let vnode_id = block.vnode_id();
let tenant = block.tenant_utf8()?;
let database = block.database_utf8()?;
trace::info!(
"Recover: delete vnode, tenant: {}, database: {}, vnode_id: {vnode_id}",
&tenant,
&database
);

if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
let mut db_wlock = db.write().await;
db_wlock.del_ts_index(vnode_id);
db_wlock
.del_tsfamily(vnode_id, self.summary_task_sender.clone())
.await;

let ts_dir = self
.options
.storage
.ts_family_dir(&make_owner(tenant, database), vnode_id);
match std::fs::remove_dir_all(&ts_dir) {
Ok(()) => {
info!("Removed TsFamily directory '{}'", ts_dir.display());
}
Err(e) => {
error!(
"Failed to remove TsFamily directory '{}': {}",
ts_dir.display(),
e
);
}
}
}

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -542,50 +738,6 @@ impl Engine for TsKv {
res
}

async fn write_from_wal(
&self,
vnode_id: TseriesFamilyId,
precision: Precision,
write_batch: WritePointsRequest,
seq: u64,
) -> Result<()> {
let tenant = tenant_name_from_request(&write_batch);
let points = Arc::new(write_batch.points);
let fb_points = flatbuffers::root::<fb_models::Points>(&points)
.context(error::InvalidFlatbufferSnafu)?;

let db_name = get_db_from_fb_points(&fb_points)?;
// If database not exists, skip this record.
let db = match self.get_db(&tenant, &db_name).await {
Ok(db) => db,
Err(_) => return Ok(()),
};
let tsf = match db.read().await.get_tsfamily(vnode_id) {
Some(tsf) => tsf,
None => return Ok(()),
};

let ts_index = self
.get_ts_index_or_else_create(db.clone(), vnode_id)
.await?;

// Write data assuming schemas are created (strict mode).
let write_group = db
.read()
.await
.build_write_group_strict_mode(
&db_name,
precision,
fb_points.tables().ok_or(Error::CommonError {
reason: "points missing table".to_string(),
})?,
ts_index,
)
.await?;
tsf.read().await.put_points(seq, write_group)?;
return Ok(());
}

async fn drop_database(&self, tenant: &str, database: &str) -> Result<()> {
if let Some(db) = self.version_set.write().await.delete_db(tenant, database) {
let mut db_wlock = db.write().await;
Expand Down Expand Up @@ -637,15 +789,6 @@ impl Engine for TsKv {
database::delete_table_async(tenant, database, table, version_set).await
}

async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()> {
let version_set = self.version_set.clone();
let database = database.to_string();
let table = table.to_string();
let tenant = tenant.to_string();

database::delete_table_async(tenant, database, table, version_set).await
}

async fn remove_tsfamily(&self, tenant: &str, database: &str, vnode_id: VnodeId) -> Result<()> {
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
// Store this action in WAL.
Expand Down Expand Up @@ -689,40 +832,6 @@ impl Engine for TsKv {
Ok(())
}

async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()> {
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
let mut db_wlock = db.write().await;
db_wlock.del_ts_index(vnode_id);
db_wlock
.del_tsfamily(vnode_id, self.summary_task_sender.clone())
.await;

let ts_dir = self
.options
.storage
.ts_family_dir(&make_owner(tenant, database), vnode_id);
match std::fs::remove_dir_all(&ts_dir) {
Ok(()) => {
info!("Removed TsFamily directory '{}'", ts_dir.display());
}
Err(e) => {
error!(
"Failed to remove TsFamily directory '{}': {}",
ts_dir.display(),
e
);
}
}
}

Ok(())
}

async fn prepare_copy_vnode(
&self,
tenant: &str,
Expand Down

0 comments on commit 825dd4e

Please sign in to comment.