Skip to content

Commit

Permalink
fix(cubestore): Handle corrupted log files and discard them with error
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 1, 2021
1 parent d547677 commit 00a1c1a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
8 changes: 7 additions & 1 deletion rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ extern crate lazy_static;

use arrow::error::ArrowError;
use core::fmt;
use flexbuffers::DeserializationError;
use flexbuffers::{DeserializationError, ReaderError};
use log::SetLoggerError;
use parquet::errors::ParquetError;
use serde_derive::{Deserialize, Serialize};
Expand Down Expand Up @@ -283,4 +283,10 @@ impl From<PoisonError<std::sync::MutexGuard<'_, std::collections::HashMap<TableI
fn from(v: PoisonError<std::sync::MutexGuard<'_, std::collections::HashMap<TableId, u64>>>) -> Self {
CubeError::from_error(v)
}
}

impl From<ReaderError> for CubeError {
fn from(v: ReaderError) -> Self {
CubeError::from_error(v)
}
}
64 changes: 63 additions & 1 deletion rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ impl WriteBatchContainer {

let mut buffer = Vec::new();
tokio::io::AsyncReadExt::read_to_end(&mut file, &mut buffer).await?;
let r = flexbuffers::Reader::get_root(&buffer).unwrap();
let r = flexbuffers::Reader::get_root(&buffer)?;
Ok(Self::deserialize(r)?)
}
}
Expand Down Expand Up @@ -3046,4 +3046,66 @@ mod tests {
fs::remove_dir_all(config.local_dir()).unwrap();
fs::remove_dir_all(config.remote_dir()).unwrap();
}

#[tokio::test]
async fn discard_logs() {
let config = Config::test("discard_logs");

let _ = fs::remove_dir_all(config.local_dir());
let _ = fs::remove_dir_all(config.remote_dir());

{
{
let services = config.configure().await;
services.start_processing_loops().await.unwrap();
services
.meta_store
.create_schema("foo1".to_string(), false)
.await
.unwrap();
services.meta_store.run_upload().await.unwrap();
services
.meta_store
.create_schema("foo".to_string(), false)
.await
.unwrap();
services.meta_store.upload_check_point().await.unwrap();
services
.meta_store
.create_schema("bar".to_string(), false)
.await
.unwrap();
services.meta_store.run_upload().await.unwrap();
services.stop_processing_loops().await.unwrap();
}
tokio::time::delay_for(Duration::from_millis(1000)).await; // TODO logger init conflict
fs::remove_dir_all(config.local_dir()).unwrap();
let list = LocalDirRemoteFs::list_recursive(config.remote_dir().clone(), "metastore-".to_string(), config.remote_dir().clone()).await.unwrap();
let re = Regex::new(r"(\d+).flex").unwrap();
let last_log = list.iter()
.filter(|f| re.captures(f.remote_path()).is_some())
.max_by_key(|f| re.captures(f.remote_path()).unwrap().get(1).map(|m| m.as_str().parse::<u64>().unwrap()))
.unwrap();
let file_path = config.remote_dir().join(last_log.remote_path());
println!("Truncating {:?}", file_path);
let file = std::fs::OpenOptions::new().write(true).open(file_path.clone()).unwrap();
println!("Size {}", file.metadata().unwrap().len());
file.set_len(50).unwrap();

let services2 = config.configure().await;
services2
.meta_store
.get_schema("foo1".to_string())
.await
.unwrap();
services2
.meta_store
.get_schema("foo".to_string())
.await
.unwrap();
}

fs::remove_dir_all(config.local_dir()).unwrap();
fs::remove_dir_all(config.remote_dir()).unwrap();
}
}
2 changes: 1 addition & 1 deletion rust/cubestore/src/remotefs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl LocalDirRemoteFs {
async move { Self::list_recursive(remote_dir, remote_prefix, dir).await }.boxed()
}

async fn list_recursive(
pub async fn list_recursive(
remote_dir: PathBuf,
remote_prefix: String,
dir: PathBuf,
Expand Down

0 comments on commit 00a1c1a

Please sign in to comment.