diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 93647f9d753e4..4ac3a14b89fd5 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -27,6 +27,7 @@ use log::SetLoggerError; use parquet::errors::ParquetError; use serde_derive::{Deserialize, Serialize}; use sqlparser::parser::ParserError; +use std::any::Any; use std::backtrace::Backtrace; use std::fmt; use std::fmt::Display; @@ -155,6 +156,16 @@ impl CubeError { cause: CubeErrorCauseType::Internal, } } + + pub fn from_panic_payload(payload: Box) -> Self { + if let Some(reason) = payload.downcast_ref::<&str>() { + CubeError::panic(format!("Reason: {}", reason)) + } else if let Some(reason) = payload.downcast_ref::() { + CubeError::panic(format!("Reason: {}", reason)) + } else { + CubeError::panic("Without reason".to_string()) + } + } } impl fmt::Display for CubeError { diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs index 8dd6ded4efe46..1f06ac71356f9 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs @@ -563,8 +563,16 @@ impl RocksStore { let join_handle = cube_ext::spawn_blocking(move || loop { if let Some(fun) = rw_loop_rx.blocking_recv() { - if let Err(e) = fun() { - log::error!("Error during read write loop execution: {}", e); + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) { + Err(panic_payload) => { + let restore_error = CubeError::from_panic_payload(panic_payload); + log::error!("Panic during read write loop execution: {}", restore_error); + } + Ok(res) => { + if let Err(e) = res { + log::error!("Error during read write loop execution: {}", e); + } + } } } else { return; @@ -710,10 +718,22 @@ impl RocksStore { Ok(()) })); if let Err(e) = res.await { - log::error!("[{}] Error during read write loop send: {}", store_name, e); + log::error!( + "[{}] Error during scheduling write task in loop: {}", + store_name, + e + ); + + return Err(CubeError::internal(format!( + "Error during scheduling write task in loop: {}", + e + ))); } - let (spawn_res, events) = rx.await??; + let res = rx.await.map_err(|err| { + CubeError::internal(format!("Unable to receive result for write task: {}", err)) + })?; + let (spawn_res, events) = res?; self.write_notify.notify_waiters(); @@ -888,10 +908,17 @@ impl RocksStore { Ok(()) })); if let Err(e) = res.await { - log::error!("Error during read write loop send: {}", e); + log::error!("Error during scheduling read task in loop: {}", e); + + return Err(CubeError::internal(format!( + "Error during scheduling read task in loop: {}", + e + ))); } - rx.await? + rx.await.map_err(|err| { + CubeError::internal(format!("Unable to receive result for read task: {}", err)) + })? } pub async fn read_operation_out_of_queue(&self, f: F) -> Result @@ -977,3 +1004,88 @@ impl RocksStore { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use crate::metastore::{BaseRocksStoreFs, RocksMetaStoreDetails}; + use crate::remotefs::LocalDirRemoteFs; + use std::{env, fs}; + + #[tokio::test] + async fn test_loop_panic() -> Result<(), CubeError> { + let config = Config::test("test_loop_panic"); + let store_path = env::current_dir().unwrap().join("test_loop_panic-local"); + let remote_store_path = env::current_dir().unwrap().join("test_loop_panic-remote"); + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); + + let details = Arc::new(RocksMetaStoreDetails {}); + let rocks_store = RocksStore::new( + store_path.join("metastore").as_path(), + BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()), + config.config_obj(), + details, + )?; + + // read operation + { + let r = rocks_store + .read_operation(|_| -> Result<(), CubeError> { + panic!("panic - task 1"); + }) + .await; + assert_eq!( + r.err().expect("must be error").message, + "Unable to receive result for read task: channel closed".to_string() + ); + + let r = rocks_store + .read_operation(|_| -> Result<(), CubeError> { + Err(CubeError::user("error - task 3".to_string())) + }) + .await; + assert_eq!( + r.err().expect("must be error").message, + "error - task 3".to_string() + ); + } + + // write operation + { + let r = rocks_store + .write_operation(|_, _| -> Result<(), CubeError> { + panic!("panic - task 1"); + }) + .await; + assert_eq!( + r.err().expect("must be error").message, + "Unable to receive result for write task: channel closed".to_string() + ); + + let r = rocks_store + .write_operation(|_, _| -> Result<(), CubeError> { + panic!("panic - task 2"); + }) + .await; + assert_eq!( + r.err().expect("must be error").message, + "Unable to receive result for write task: channel closed".to_string() + ); + + let r = rocks_store + .write_operation(|_, _| -> Result<(), CubeError> { + Err(CubeError::user("error - task 3".to_string())) + }) + .await; + assert_eq!( + r.err().expect("must be error").message, + "error - task 3".to_string() + ); + } + + Ok(()) + } +}