Skip to content

Commit

Permalink
fix(cubestore): Row with id 1 is not found for SchemaRocksTable -- fi…
Browse files Browse the repository at this point in the history
…x log replay order during metastore loading from dump
  • Loading branch information
paveltiunov committed Dec 7, 2022
1 parent 22cd580 commit b74c072
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 1 deletion.
103 changes: 103 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Expand Up @@ -4894,6 +4894,109 @@ mod tests {
}
}

#[tokio::test]
async fn log_replay_ordering() {
{
let config = Config::test("log_replay_ordering");

let _ = fs::remove_dir_all(config.local_dir());
let _ = fs::remove_dir_all(config.remote_dir());

let services = config.configure().await;
services.start_processing_loops().await.unwrap();
services
.rocks_meta_store
.as_ref()
.unwrap()
.upload_check_point()
.await
.unwrap();
for i in 0..100 {
let schema = services
.meta_store
.create_schema(format!("foo{}", i), false)
.await
.unwrap();
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
.await
.unwrap();
let table = services
.meta_store
.create_table(
format!("foo{}", i),
format!("table{}", i),
vec![Column::new("foo".to_string(), ColumnType::String, 0)],
None,
None,
Vec::new(),
false,
None,
None,
None,
None,
None,
None,
None,
)
.await
.unwrap();
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
.await
.unwrap();
services
.meta_store
.drop_table(table.get_id())
.await
.unwrap();
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
.await
.unwrap();
services
.meta_store
.delete_schema_by_id(schema.get_id())
.await
.unwrap();
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
.await
.unwrap();
}
services.stop_processing_loops().await.unwrap();

Delay::new(Duration::from_millis(1000)).await; // TODO logger init conflict
fs::remove_dir_all(config.local_dir()).unwrap();
}

{
let config = Config::test("log_replay_ordering");

let services2 = config.configure().await;
let tables = services2
.meta_store
.get_tables_with_path(true)
.await
.unwrap();
assert_eq!(tables.len(), 0);
fs::remove_dir_all(config.local_dir()).unwrap();
fs::remove_dir_all(config.remote_dir()).unwrap();
}
}

#[tokio::test]
async fn discard_logs() {
{
Expand Down
17 changes: 16 additions & 1 deletion rust/cubestore/cubestore/src/metastore/rocks_fs.rs
Expand Up @@ -210,7 +210,22 @@ impl BaseRocksStoreFs {
.remote_fs
.list(&format!("{}-{}-logs", self.name, snapshot))
.await?;
for log_file in logs_to_batch.iter() {
let mut logs_to_batch_to_seq = logs_to_batch
.into_iter()
.map(|f| -> Result<_, CubeError> {
let last = f
.split("/")
.last()
.ok_or(CubeError::internal(format!("Can't split path: {}", f)))?;
let result = last.replace(".flex", "").parse::<usize>().map_err(|e| {
CubeError::internal(format!("Can't parse flex path {}: {}", f, e))
})?;
Ok((f, result))
})
.collect::<Result<Vec<_>, _>>()?;
logs_to_batch_to_seq.sort_unstable_by_key(|(_, seq)| *seq);

for (log_file, _) in logs_to_batch_to_seq.iter() {
let path_to_log = self.remote_fs.local_file(log_file).await?;
let batch = WriteBatchContainer::read_from_file(&path_to_log).await;
if let Ok(batch) = batch {
Expand Down

0 comments on commit b74c072

Please sign in to comment.