Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix : restart lose data #1471

Merged
merged 1 commit into from Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
213 changes: 133 additions & 80 deletions tskv/src/kvcore.rs
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::panic;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -196,64 +197,70 @@ impl TsKv {
let min_log_seq = self.global_seq_ctx.min_seq();
let wal_readers = wal_manager.recover().await;
let mut recover_task = vec![];
for mut reader in wal_readers {
trace::info!(
"Recover: reading wal '{}' for seq {} to {}",
reader.path().display(),
reader.min_sequence(),
reader.max_sequence(),
);
if reader.is_empty() {
continue;
}
let vnode_last_seq_map = self.global_seq_ctx.clone();
for (vnode_id, readers) in wal_readers {
let task = async move {
let mut decoder = WalDecoder::new();
loop {
match reader.next_wal_entry().await {
Ok(Some(wal_entry_blk)) => {
let seq = wal_entry_blk.seq;
if seq < min_log_seq {
continue;
}
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
let vnode_id = blk.vnode_id();
if vnode_last_seq_map.vnode_min_seq(vnode_id) >= seq {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
continue;
}

self.write_from_wal(vnode_id, seq, &blk, &mut decoder)
.await
.unwrap();
for mut reader in readers {
info!(
"Recover: reading wal '{}' for seq {} to {}",
reader.path().display(),
reader.min_sequence(),
reader.max_sequence(),
);
if reader.is_empty() {
continue;
}
let vnode_last_seq_map = self.global_seq_ctx.clone();
let mut decoder = WalDecoder::new();
loop {
match reader.next_wal_entry().await {
Ok(Some(wal_entry_blk)) => {
let seq = wal_entry_blk.seq;
if seq < min_log_seq {
continue;
}
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
error!("write seq: {}", seq);
let vnode_id = blk.vnode_id();
if vnode_last_seq_map.vnode_min_seq(vnode_id) >= seq {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
continue;
}

self.write_from_wal(vnode_id, seq, &blk, &mut decoder)
.await
.unwrap();
}

WalEntry::DeleteVnode(blk) => {
if let Err(e) = self.remove_tsfamily_from_wal(&blk).await {
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode: {e}");
WalEntry::DeleteVnode(blk) => {
if let Err(e) = self.remove_tsfamily_from_wal(&blk).await {
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode: {e}");
}
}
}
WalEntry::DeleteTable(blk) => {
if let Err(e) = self.drop_table_from_wal(&blk).await {
// Ignore delete table error.
trace::error!("Recover: failed to delete table: {e}");
WalEntry::DeleteTable(blk) => {
error!("delete table seq: {}", seq);
if let Err(e) =
self.drop_table_from_wal(&blk, vnode_id).await
{
// Ignore delete table error.
trace::error!("Recover: failed to delete table: {e}");
}
}
_ => {}
}
_ => {}
}
}
Ok(None) | Err(Error::WalTruncated) => {
break;
}
Err(e) => {
panic!(
"Failed to recover from {}: {:?}",
reader.path().display(),
e
);
Ok(None) | Err(Error::WalTruncated) => {
break;
}
Err(e) => {
panic!(
"Failed to recover from {}: {:?}",
reader.path().display(),
e
);
}
}
}
}
Expand Down Expand Up @@ -490,7 +497,12 @@ impl TsKv {
}
}

async fn delete_table(&self, database: Arc<RwLock<Database>>, table: &str) -> Result<()> {
async fn delete_table(
&self,
database: Arc<RwLock<Database>>,
table: &str,
vnode_id: Option<VnodeId>,
) -> Result<()> {
// TODO Create global DropTable flag for droping the same table at the same time.
let db_rlock = database.read().await;
let db_owner = db_rlock.owner();
Expand All @@ -507,38 +519,75 @@ impl TsKv {
min_ts: Timestamp::MIN,
max_ts: Timestamp::MAX,
};
for (ts_family_id, ts_family) in database.read().await.ts_families().iter() {
// TODO: Concurrent delete on ts_family.
// TODO: Limit parallel delete to 1.
if let Some(ts_index) = db_rlock.get_ts_index(*ts_family_id) {
let series_ids = ts_index.get_series_id_list(table, &[]).await?;
ts_family
.write()
.await
.delete_series(&series_ids, time_range);

let field_ids: Vec<u64> = series_ids
.iter()
.flat_map(|sid| column_ids.iter().map(|fid| unite_id(*fid, *sid)))
.collect();
info!(
"Drop table: vnode {ts_family_id} deleting {} fields in table: {db_owner}.{table}",
field_ids.len()
);

let version = ts_family.read().await.super_version();
for column_file in version.version.column_files(&field_ids, time_range) {
column_file.add_tombstone(&field_ids, time_range).await?;
}
} else {
continue;
if let Some(vnode_id) = vnode_id {
if let Some(ts_family) = db_rlock.ts_families().get(&vnode_id) {
self.tsf_delete_table(
&db_rlock,
vnode_id,
ts_family.clone(),
table,
time_range,
&column_ids,
)
.await?;
}
} else {
for (ts_family_id, ts_family) in database.read().await.ts_families().iter() {
// TODO: Concurrent delete on ts_family.
// TODO: Limit parallel delete to 1.
self.tsf_delete_table(
&db_rlock,
*ts_family_id,
ts_family.clone(),
table,
time_range,
&column_ids,
)
.await?;
}
}
}

Ok(())
}

async fn tsf_delete_table<Db>(
&self,
db: &Db,
ts_family_id: TseriesFamilyId,
ts_family: Arc<RwLock<TseriesFamily>>,
table: &str,
time_range: &TimeRange,
column_ids: &[ColumnId],
) -> Result<()>
where
Db: Deref<Target = Database>,
{
let db_owner = db.owner();
if let Some(ts_index) = db.get_ts_index(ts_family_id) {
let series_ids = ts_index.get_series_id_list(table, &[]).await?;
ts_family
.write()
.await
.delete_series(&series_ids, time_range);

let field_ids: Vec<u64> = series_ids
.iter()
.flat_map(|sid| column_ids.iter().map(|fid| unite_id(*fid, *sid)))
.collect();
info!(
"Drop table: vnode {ts_family_id} deleting {} fields in table: {db_owner}.{table}",
field_ids.len()
);

let version = ts_family.read().await.super_version();
for column_file in version.version.column_files(&field_ids, time_range) {
column_file.add_tombstone(&field_ids, time_range).await?;
}
}
Ok(())
}

async fn drop_columns(
&self,
database: Arc<RwLock<Database>>,
Expand Down Expand Up @@ -694,7 +743,11 @@ impl TsKv {
/// Delete all data of a table.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn drop_table_from_wal(&self, block: &wal::DeleteTableBlock) -> Result<()> {
async fn drop_table_from_wal(
&self,
block: &wal::DeleteTableBlock,
vnode_id: VnodeId,
) -> Result<()> {
let tenant = block.tenant_utf8()?;
let database = block.database_utf8()?;
let table = block.table_utf8()?;
Expand All @@ -703,7 +756,7 @@ impl TsKv {
&tenant, &database, &table
);
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
return self.delete_table(db, table).await;
return self.delete_table(db, table, Some(vnode_id)).await;
}
Ok(())
}
Expand Down Expand Up @@ -861,7 +914,7 @@ impl Engine for TsKv {
source: error::ChannelReceiveError::WriteWalResult { source: e },
})??;

return self.delete_table(db, table).await;
return self.delete_table(db, table, None).await;
}

Ok(())
Expand Down
8 changes: 4 additions & 4 deletions tskv/src/wal/mod.rs
Expand Up @@ -35,7 +35,7 @@
//! +------------+---------------+--------------+--------------+
//! ```

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Display;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -636,12 +636,12 @@ impl WalManager {
}
Ok(())
}
pub async fn recover(&self) -> Vec<WalReader> {
let mut recover_task = vec![];
pub async fn recover(&self) -> BTreeMap<VnodeId, Vec<WalReader>> {
let mut recover_task = BTreeMap::new();
for (vnode_id, vnode_wal) in self.wal_set.iter() {
let min_seq = self.global_seq_ctx.vnode_min_seq(*vnode_id);
if let Ok(readers) = vnode_wal.recover(min_seq).await {
recover_task.extend(readers);
recover_task.insert(*vnode_id, readers);
} else {
panic!("Failed to recover wal for vnode '{}'", vnode_id)
}
Expand Down