diff --git a/docs-mintlify/data-modeling/concepts.mdx b/docs-mintlify/data-modeling/concepts.mdx
deleted file mode 100644
index 753a5c2df73e6..0000000000000
--- a/docs-mintlify/data-modeling/concepts.mdx
+++ /dev/null
@@ -1,132 +0,0 @@
----
-title: Concepts
-description: Core concepts of Cube data modeling — cubes, views, measures, dimensions, and joins.
----
-
- Data Modeling Concepts
-
-## Cubes
-
-A cube represents a dataset in your data model. Each cube is typically mapped to a single table or view in your database:
-
-```yaml
-cubes:
- - name: users
- sql_table: public.users
-```
-
-You can also use a SQL query as the data source:
-
-```yaml
-cubes:
- - name: active_users
- sql: >
- SELECT * FROM public.users
- WHERE is_active = true
-```
-
-## Measures
-
-Measures are quantitative data points — the "what" you're measuring:
-
-```yaml
-measures:
- - name: count
- type: count
-
- - name: total_revenue
- type: sum
- sql: revenue
-
- - name: average_order_value
- type: avg
- sql: amount
-```
-
-### Measure types
-
-| Type | Description |
-|------|-------------|
-| `count` | Count of rows |
-| `count_distinct` | Count of distinct values |
-| `sum` | Sum of values |
-| `avg` | Average of values |
-| `min` | Minimum value |
-| `max` | Maximum value |
-| `number` | Custom SQL expression |
-
-## Dimensions
-
-Dimensions are qualitative attributes — the "by what" you're slicing data:
-
-```yaml
-dimensions:
- - name: status
- sql: status
- type: string
-
- - name: created_at
- sql: created_at
- type: time
-```
-
-### Dimension types
-
-| Type | Description |
-|------|-------------|
-| `string` | Text values |
-| `number` | Numeric values |
-| `boolean` | True/false values |
-| `time` | Timestamps and dates |
-| `geo` | Geographic coordinates |
-
-## Joins
-
-Joins define relationships between cubes:
-
-```yaml
-cubes:
- - name: orders
- # ...
-
- joins:
- - name: customers
- relationship: many_to_one
- sql: "{CUBE}.customer_id = {customers.id}"
-```
-
-### Relationship types
-
-
-
- Each record in the first cube matches exactly one record in the joined cube.
-
-
- Multiple records in the first cube match one record in the joined cube. This is the most common type (e.g., orders → customers).
-
-
- One record in the first cube matches multiple records in the joined cube.
-
-
-
-## Views
-
-Views provide curated interfaces to your data model. They select specific measures and dimensions from one or more cubes:
-
-```yaml
-views:
- - name: order_analytics
- cubes:
- - join_path: orders
- includes:
- - count
- - total_amount
- - status
- - join_path: orders.customers
- includes:
- - name: company
-```
-
-
-Views are the recommended way to expose data to end users and AI agents. They provide a clean, focused interface without exposing the full complexity of your data graph.
-
diff --git a/docs-mintlify/data-modeling/overview.mdx b/docs-mintlify/data-modeling/overview.mdx
deleted file mode 100644
index a68df0280382c..0000000000000
--- a/docs-mintlify/data-modeling/overview.mdx
+++ /dev/null
@@ -1,87 +0,0 @@
----
-title: Data Modeling Overview
-description: Learn how to define your semantic layer with Cube data models.
----
-
- Data Modeling
-
-Data modeling in Cube is the process of defining your semantic layer — the business logic, metrics, and relationships that sit between your data warehouse and your data consumers.
-
-## Key concepts
-
-Cube's data model is built around two main objects:
-
-### Cubes
-
-Cubes represent business entities (e.g., `orders`, `customers`, `products`). They define:
-
-- **Measures** — aggregated values like `count`, `sum`, `avg`
-- **Dimensions** — attributes used for grouping and filtering
-- **Joins** — relationships between cubes
-
-```yaml
-cubes:
- - name: orders
- sql_table: public.orders
-
- measures:
- - name: count
- type: count
-
- - name: total_amount
- type: sum
- sql: amount
-
- dimensions:
- - name: id
- sql: id
- type: number
- primary_key: true
-
- - name: status
- sql: status
- type: string
-
- - name: created_at
- sql: created_at
- type: time
-```
-
-### Views
-
-Views are curated datasets built on top of cubes. They select specific measures and dimensions to create focused data products:
-
-```yaml
-views:
- - name: order_summary
- cubes:
- - join_path: orders
- includes:
- - count
- - total_amount
- - status
- - created_at
-
- - join_path: orders.customers
- includes:
- - name: company
-```
-
-## Code-first approach
-
-Cube data models are defined in YAML or JavaScript files, managed through version control:
-
-
-
- Create cube files in the `model/` directory with your business entities.
-
-
- Add joins between cubes to establish entity relationships.
-
-
- Build views that combine cubes into focused data products.
-
-
- Use development mode to test, then deploy to production.
-
-
diff --git a/docs-mintlify/docs/data-modeling/concepts/index.mdx b/docs-mintlify/docs/data-modeling/concepts/index.mdx
index 9e3a6ae34e7eb..6a3096829bf4b 100644
--- a/docs-mintlify/docs/data-modeling/concepts/index.mdx
+++ b/docs-mintlify/docs/data-modeling/concepts/index.mdx
@@ -3,11 +3,13 @@ title: Concepts
description: Learn foundational OLAP concepts like cubes, dimensions, measures, and joins used in Cube data modeling.
---
- Concepts
+Cube's key concepts are [cubes](#cubes), [views](#views), and members
+([measures](#measures), [dimensions](#dimensions)). This page is intended
+for both newcomers and regular users to refresh their understanding.
-Cube borrows a lot of terminology from [OLAP
-theory][wiki-olap], and this document is intended for both newcomers and regular
-users to refresh their understanding.
+
+
+
We'll use a sample e-commerce database with two tables, `orders` and
`line_items` to illustrate the concepts throughout this page:
@@ -136,10 +138,6 @@ data model with which data consumers can interact. They are useful for defining
metrics, managing governance and data access, and controlling ambiguous join
paths.
-
-
-
-
Views do **not** define their own members. Instead, they reference cubes by
specific join paths and include their members.
@@ -834,7 +832,7 @@ See the reference documentaton for the full list of pre-aggregation
[ref-subquery-dimensions]: /docs/data-modeling/concepts/calculated-members#subquery-dimensions
[ref-calculated-measures]: /docs/data-modeling/concepts/calculated-members#calculated-measures
[ref-working-with-joins]: /docs/data-modeling/concepts/working-with-joins
-[wiki-olap]: https://en.wikipedia.org/wiki/Online_analytical_processing
+
[wiki-view-sql]: https://en.wikipedia.org/wiki/View_(SQL)
[ref-matching-preaggs]: /docs/pre-aggregations/matching-pre-aggregations
[ref-syntax-references]: /docs/data-modeling/syntax#references
diff --git a/docs-mintlify/docs/data-modeling/overview.mdx b/docs-mintlify/docs/data-modeling/overview.mdx
index a0ab514241663..80da010e3109a 100644
--- a/docs-mintlify/docs/data-modeling/overview.mdx
+++ b/docs-mintlify/docs/data-modeling/overview.mdx
@@ -9,6 +9,10 @@ exposed through a [rich set of APIs][ref-apis] that allows end-users to
run a wide variety of analytical queries without modifying the data model
itself.
+
+
+
+
You can explore a carefully crafted sample data model if you create a [demo
diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
index 7055d7633964d..c5af7672b2674 100644
--- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
+++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
@@ -498,14 +498,14 @@ impl RocksCacheStore {
f: F,
) -> Result
where
- F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result
+ F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
self.store
- .write_operation_impl::(&self.rw_loop_queue_cf, op_name, f)
+ .write_operation_impl::(&self.rw_loop_queue_cf, op_name, f, ())
.await
}
diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs
index 791e75ef74c20..f4e862298bea8 100644
--- a/rust/cubestore/cubestore/src/lib.rs
+++ b/rust/cubestore/cubestore/src/lib.rs
@@ -214,6 +214,12 @@ impl From for CubeError {
}
}
+impl From for CubeError {
+ fn from(v: regex::Error) -> Self {
+ CubeError::from_error(v)
+ }
+}
+
impl From for CubeError {
fn from(v: ParquetError) -> Self {
CubeError::from_error(v.to_string())
diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs
index 7be2c3d4e2284..942161bfde4eb 100644
--- a/rust/cubestore/cubestore/src/metastore/mod.rs
+++ b/rust/cubestore/cubestore/src/metastore/mod.rs
@@ -26,6 +26,7 @@ use cuberockstore::rocksdb::{BlockBasedOptions, Cache, Env, MergeOperands, Optio
use log::info;
use serde::{Deserialize, Serialize};
use std::hash::Hash;
+use std::sync::Mutex;
use std::{env, io::Cursor, sync::Arc};
use crate::config::injection::DIService;
@@ -1355,8 +1356,10 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
}
}
+#[derive(Clone)]
pub struct RocksMetaStore {
store: Arc,
+ cached_tables: Arc>>>>,
disk_space_cache: Arc, SystemTime)>>>,
upload_loop: Arc,
}
@@ -1379,13 +1382,14 @@ impl RocksMetaStore {
fn new_from_store(store: Arc) -> Arc {
Arc::new(Self {
store,
+ cached_tables: Arc::new(Mutex::new(None)),
disk_space_cache: Arc::new(RwLock::new(None)),
upload_loop: Arc::new(WorkerLoop::new("Metastore upload")),
})
}
pub fn reset_cached_tables(&self) {
- *self.store.cached_tables.lock().unwrap() = None;
+ *self.cached_tables.lock().unwrap() = None;
}
pub async fn load_from_dump(
@@ -1512,19 +1516,24 @@ impl RocksMetaStore {
#[inline(always)]
pub async fn write_operation(&self, op_name: &'static str, f: F) -> Result
where
- F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result
+ F: for<'a> FnOnce(
+ DbTableRef<'a>,
+ &mut BatchPipe<'a, RocksMetaStore>,
+ ) -> Result
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
- self.store.write_operation(op_name, f).await
+ self.store
+ .write_operation_impl(&self.store.rw_loop_default_cf, op_name, f, self.clone())
+ .await
}
fn drop_table_impl(
table_id: u64,
db_ref: DbTableRef,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
) -> Result, CubeError> {
let tables_table = TableRocksTable::new(db_ref.clone());
let indexes_table = IndexRocksTable::new(db_ref.clone());
@@ -1555,7 +1564,7 @@ impl RocksMetaStore {
impl RocksMetaStore {
fn add_index(
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
rocks_index: &IndexRocksTable,
rocks_partition: &PartitionRocksTable,
table_cols: &Vec,
@@ -1586,7 +1595,7 @@ impl RocksMetaStore {
}
}
fn add_regular_index(
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
rocks_index: &IndexRocksTable,
rocks_partition: &PartitionRocksTable,
table_cols: &Vec,
@@ -1734,7 +1743,7 @@ impl RocksMetaStore {
}
fn add_aggregate_index(
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
rocks_index: &IndexRocksTable,
rocks_partition: &PartitionRocksTable,
table_cols: &Vec,
@@ -1850,7 +1859,7 @@ impl RocksMetaStore {
// Must be run under write_operation(). Returns activated row count.
fn activate_chunks_impl(
db_ref: DbTableRef,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
uploaded_chunk_ids: &[(u64, Option)],
replay_handle_id: Option,
) -> Result<(u64, HashMap*partition_id*/ u64, /*rows*/ u64>), CubeError> {
@@ -1907,7 +1916,9 @@ impl MetaStore for RocksMetaStore {
if_not_exists: bool,
) -> Result, CubeError> {
self.write_operation("create_schema", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let table = SchemaRocksTable::new(db_ref.clone());
if if_not_exists {
let rows = table.get_rows_by_index(&schema_name, &SchemaRocksIndex::Name)?;
@@ -1968,7 +1979,9 @@ impl MetaStore for RocksMetaStore {
new_schema_name: String,
) -> Result, CubeError> {
self.write_operation("rename_schema", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let table = SchemaRocksTable::new(db_ref.clone());
let existing_keys =
table.get_row_ids_by_index(&old_schema_name, &SchemaRocksIndex::Name)?;
@@ -1992,7 +2005,9 @@ impl MetaStore for RocksMetaStore {
new_schema_name: String,
) -> Result, CubeError> {
self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let table = SchemaRocksTable::new(db_ref.clone());
let old_schema = table.get_row(schema_id)?.unwrap();
@@ -2008,7 +2023,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> {
self.write_operation("delete_schema", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let table = SchemaRocksTable::new(db_ref.clone());
let existing_keys =
table.get_row_ids_by_index(&schema_name, &SchemaRocksIndex::Name)?;
@@ -2035,7 +2052,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> {
self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let tables = TableRocksTable::new(db_ref.clone()).all_rows()?;
if tables
.into_iter()
@@ -2100,7 +2119,9 @@ impl MetaStore for RocksMetaStore {
extension: Option,
) -> Result, CubeError> {
self.write_operation("create_table", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
if drop_if_exists {
if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) {
RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?;
@@ -2295,7 +2316,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn table_ready(&self, id: u64, is_ready: bool) -> Result, CubeError> {
self.write_operation("table_ready", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?)
})
@@ -2305,7 +2328,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn seal_table(&self, id: u64) -> Result, CubeError> {
self.write_operation("seal_table", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?)
})
@@ -2334,13 +2359,16 @@ impl MetaStore for RocksMetaStore {
self.write_operation(
"update_location_download_size",
move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
+
let rocks_table = TableRocksTable::new(db_ref.clone());
- Ok(rocks_table.update_with_res_fn(
+ rocks_table.update_with_res_fn(
id,
|r| r.update_location_download_size(&location, download_size),
batch_pipe,
- )?)
+ )
},
)
.await
@@ -2393,14 +2421,14 @@ impl MetaStore for RocksMetaStore {
})
.await
} else {
- let cache = self.store.cached_tables.clone();
+ let cache = self.cached_tables.clone();
if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await?
{
return Ok(t);
}
- let cache = self.store.cached_tables.clone();
+ let cache = self.cached_tables.clone();
// Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped
self.read_operation("get_tables_with_path", move |db_ref| {
let cached_tables = { cache.lock().unwrap().clone() };
@@ -2465,7 +2493,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn drop_table(&self, table_id: u64) -> Result, CubeError> {
self.write_operation("drop_table", move |db_ref, batch_pipe| {
- batch_pipe.invalidate_tables_cache();
+ batch_pipe.set_post_commit_callback(|metastore| {
+ *metastore.cached_tables.lock().unwrap() = None;
+ });
RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe)
})
.await
@@ -4819,7 +4849,7 @@ fn get_default_index_impl(db_ref: DbTableRef, table_id: u64) -> Result,
current_active: &[(IdRow, Vec>)],
new_active: &[(IdRow, u64)],
mut update_new_partition_stats: impl FnMut(/*index*/ usize, &Partition) -> Partition,
@@ -4982,6 +5012,67 @@ mod tests {
use std::time::Duration;
use std::{env, fs};
+ #[tokio::test]
+ async fn test_post_commit_callback_on_success() -> Result<(), CubeError> {
+ let config = Config::test("test_post_commit_callback_on_success");
+ let store_path = env::current_dir()?.join("test_post_commit_callback_on_success-local");
+ let remote_store_path =
+ env::current_dir()?.join("test_post_commit_callback_on_success-remote");
+ let _ = fs::remove_dir_all(store_path.clone());
+ let _ = fs::remove_dir_all(remote_store_path.clone());
+ let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
+
+ let meta_store = RocksMetaStore::new(
+ store_path.join("metastore").as_path(),
+ BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
+ config.config_obj(),
+ )?;
+
+ // Test 1: callback fires on successful writing
+ {
+ let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
+ let called_clone = called.clone();
+ meta_store
+ .write_operation("test_success", move |_db_ref, batch_pipe| {
+ batch_pipe.set_post_commit_callback(move |_metastore| {
+ called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
+ });
+ Ok(())
+ })
+ .await?;
+
+ assert!(
+ called.load(std::sync::atomic::Ordering::SeqCst),
+ "post-commit callback should fire on successful write"
+ );
+ }
+
+ // Test 2: callback does NOT fire when the closure returns Err
+ {
+ let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
+ let called_clone = called.clone();
+ let result: Result<(), _> = meta_store
+ .write_operation("test_failure", move |_db_ref, batch_pipe| {
+ batch_pipe.set_post_commit_callback(move |_metastore| {
+ called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
+ });
+ Err(CubeError::user("intentional error".to_string()))
+ })
+ .await;
+
+ assert!(result.is_err());
+ assert!(
+ !called.load(std::sync::atomic::Ordering::SeqCst),
+ "post-commit callback should NOT fire when write fails"
+ );
+ }
+
+ let _ = fs::remove_dir_all(store_path);
+ let _ = fs::remove_dir_all(remote_store_path);
+
+ Ok(())
+ }
+
#[test]
fn macro_test() {
let s = Schema {
@@ -4996,10 +5087,10 @@ mod tests {
}
#[tokio::test]
- async fn schema_test() {
+ async fn schema_test() -> Result<(), CubeError> {
let config = Config::test("schema_test");
- let store_path = env::current_dir().unwrap().join("test-local");
- let remote_store_path = env::current_dir().unwrap().join("test-remote");
+ let store_path = env::current_dir()?.join("test-local");
+ let remote_store_path = env::current_dir()?.join("test-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5009,23 +5100,13 @@ mod tests {
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- let schema_1 = meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ let schema_1 = meta_store.create_schema("foo".to_string(), false).await?;
println!("New id: {}", schema_1.id);
- let schema_2 = meta_store
- .create_schema("bar".to_string(), false)
- .await
- .unwrap();
+ let schema_2 = meta_store.create_schema("bar".to_string(), false).await?;
println!("New id: {}", schema_2.id);
- let schema_3 = meta_store
- .create_schema("boo".to_string(), false)
- .await
- .unwrap();
+ let schema_3 = meta_store.create_schema("boo".to_string(), false).await?;
println!("New id: {}", schema_3.id);
let schema_1_id = schema_1.id;
@@ -5037,34 +5118,16 @@ mod tests {
.await
.is_err());
- assert_eq!(
- meta_store.get_schema("foo".to_string()).await.unwrap(),
- schema_1
- );
- assert_eq!(
- meta_store.get_schema("bar".to_string()).await.unwrap(),
- schema_2
- );
- assert_eq!(
- meta_store.get_schema("boo".to_string()).await.unwrap(),
- schema_3
- );
+ assert_eq!(meta_store.get_schema("foo".to_string()).await?, schema_1);
+ assert_eq!(meta_store.get_schema("bar".to_string()).await?, schema_2);
+ assert_eq!(meta_store.get_schema("boo".to_string()).await?, schema_3);
- assert_eq!(
- meta_store.get_schema_by_id(schema_1_id).await.unwrap(),
- schema_1
- );
- assert_eq!(
- meta_store.get_schema_by_id(schema_2_id).await.unwrap(),
- schema_2
- );
- assert_eq!(
- meta_store.get_schema_by_id(schema_3_id).await.unwrap(),
- schema_3
- );
+ assert_eq!(meta_store.get_schema_by_id(schema_1_id).await?, schema_1);
+ assert_eq!(meta_store.get_schema_by_id(schema_2_id).await?, schema_2);
+ assert_eq!(meta_store.get_schema_by_id(schema_3_id).await?, schema_3);
assert_eq!(
- meta_store.get_schemas().await.unwrap(),
+ meta_store.get_schemas().await?,
vec![
IdRow::new(
1,
@@ -5090,8 +5153,7 @@ mod tests {
assert_eq!(
meta_store
.rename_schema("foo".to_string(), "foo1".to_string())
- .await
- .unwrap(),
+ .await?,
IdRow::new(
schema_1_id,
Schema {
@@ -5101,7 +5163,7 @@ mod tests {
);
assert!(meta_store.get_schema("foo".to_string()).await.is_err());
assert_eq!(
- meta_store.get_schema("foo1".to_string()).await.unwrap(),
+ meta_store.get_schema("foo1".to_string()).await?,
IdRow::new(
schema_1_id,
Schema {
@@ -5110,7 +5172,7 @@ mod tests {
)
);
assert_eq!(
- meta_store.get_schema_by_id(schema_1_id).await.unwrap(),
+ meta_store.get_schema_by_id(schema_1_id).await?,
IdRow::new(
schema_1_id,
Schema {
@@ -5127,8 +5189,7 @@ mod tests {
assert_eq!(
meta_store
.rename_schema_by_id(schema_2_id, "bar1".to_string())
- .await
- .unwrap(),
+ .await?,
IdRow::new(
schema_2_id,
Schema {
@@ -5138,7 +5199,7 @@ mod tests {
);
assert!(meta_store.get_schema("bar".to_string()).await.is_err());
assert_eq!(
- meta_store.get_schema("bar1".to_string()).await.unwrap(),
+ meta_store.get_schema("bar1".to_string()).await?,
IdRow::new(
schema_2_id,
Schema {
@@ -5147,7 +5208,7 @@ mod tests {
)
);
assert_eq!(
- meta_store.get_schema_by_id(schema_2_id).await.unwrap(),
+ meta_store.get_schema_by_id(schema_2_id).await?,
IdRow::new(
schema_2_id,
Schema {
@@ -5156,25 +5217,16 @@ mod tests {
)
);
- assert_eq!(
- meta_store.delete_schema("bar1".to_string()).await.unwrap(),
- ()
- );
+ meta_store.delete_schema("bar1".to_string()).await?;
assert!(meta_store.delete_schema("bar1".to_string()).await.is_err());
assert!(meta_store.delete_schema("bar".to_string()).await.is_err());
assert!(meta_store.get_schema("bar1".to_string()).await.is_err());
assert!(meta_store.get_schema("bar".to_string()).await.is_err());
- assert_eq!(
- meta_store.delete_schema_by_id(schema_3_id).await.unwrap(),
- ()
- );
+ meta_store.delete_schema_by_id(schema_3_id).await?;
assert!(meta_store.delete_schema_by_id(schema_2_id).await.is_err());
- assert_eq!(
- meta_store.delete_schema_by_id(schema_1_id).await.unwrap(),
- ()
- );
+ meta_store.delete_schema_by_id(schema_1_id).await?;
assert!(meta_store.delete_schema_by_id(schema_1_id).await.is_err());
assert!(meta_store.get_schema("foo".to_string()).await.is_err());
assert!(meta_store.get_schema("foo1".to_string()).await.is_err());
@@ -5182,13 +5234,15 @@ mod tests {
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn non_empty_schema_test() {
+ async fn non_empty_schema_test() -> Result<(), CubeError> {
let config = Config::test("non_empty_schema_test");
- let store_path = env::current_dir().unwrap().join("test-local-ne-schema");
- let remote_store_path = env::current_dir().unwrap().join("test-remote-ne-schema");
+ let store_path = env::current_dir()?.join("test-local-ne-schema");
+ let remote_store_path = env::current_dir()?.join("test-remote-ne-schema");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5197,18 +5251,11 @@ mod tests {
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- let schema1 = meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ let schema1 = meta_store.create_schema("foo".to_string(), false).await?;
- let _schema2 = meta_store
- .create_schema("foo2".to_string(), false)
- .await
- .unwrap();
+ let _schema2 = meta_store.create_schema("foo2".to_string(), false).await?;
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));
@@ -5233,8 +5280,7 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
let _table2 = meta_store
.create_table(
@@ -5257,8 +5303,7 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
assert!(meta_store
.delete_schema_by_id(schema1.get_id())
@@ -5268,13 +5313,15 @@ mod tests {
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn index_repair_test() {
+ async fn index_repair_test() -> Result<(), CubeError> {
let config = Config::test("index_repair_test");
- let store_path = env::current_dir().unwrap().join("index_repair_test-local");
- let remote_store_path = env::current_dir().unwrap().join("index_repair_test-remote");
+ let store_path = env::current_dir()?.join("index_repair_test-local");
+ let remote_store_path = env::current_dir()?.join("index_repair_test-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5284,19 +5331,14 @@ mod tests {
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo".to_string(), false).await?;
meta_store
.store
.db
- .delete(RowKey::Table(TableId::Schemas, 1).to_bytes())
- .unwrap();
+ .delete(RowKey::Table(TableId::Schemas, 1).to_bytes())?;
let result = meta_store.get_schema("foo".to_string()).await;
println!("{:?}", result);
@@ -5306,28 +5348,27 @@ mod tests {
println!("Keys in db");
for kv_res in iterator {
- let (key, _) = kv_res.unwrap();
+ let (key, _) = kv_res?;
println!("Key {:?}", RowKey::from_bytes(&key));
}
sleep(Duration::from_millis(300));
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo".to_string(), false).await?;
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn table_test() {
+ async fn table_test() -> Result<(), CubeError> {
init_test_logger().await;
let config = Config::test("table_test");
- let store_path = env::current_dir().unwrap().join("test-table-local");
- let remote_store_path = env::current_dir().unwrap().join("test-table-remote");
+ let store_path = env::current_dir()?.join("test-table-local");
+ let remote_store_path = env::current_dir()?.join("test-table-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5336,13 +5377,9 @@ mod tests {
store_path.clone().join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- let schema_1 = meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ let schema_1 = meta_store.create_schema("foo".to_string(), false).await?;
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));
columns.push(Column::new("col2".to_string(), ColumnType::String, 1));
@@ -5382,8 +5419,7 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
let table1_id = table1.id;
assert!(schema_1.id == table1.get_row().get_schema_id());
@@ -5414,8 +5450,7 @@ mod tests {
assert_eq!(
meta_store
.get_table("foo".to_string(), "boo".to_string())
- .await
- .unwrap(),
+ .await?,
table1
);
@@ -5427,25 +5462,22 @@ mod tests {
None,
None,
Index::index_type_default(),
- )
- .unwrap();
+ )?;
let expected_res = vec![IdRow::new(1, expected_index)];
- assert_eq!(meta_store.get_table_indexes(1).await.unwrap(), expected_res);
+ assert_eq!(meta_store.get_table_indexes(1).await?, expected_res);
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn default_index_field_positions_test() {
+ async fn default_index_field_positions_test() -> Result<(), CubeError> {
init_test_logger().await;
let config = Config::test("default_index_field_positions_test");
- let store_path = env::current_dir()
- .unwrap()
- .join("test-default-index-positions-local");
- let remote_store_path = env::current_dir()
- .unwrap()
- .join("test-default-index-positions-remote");
+ let store_path = env::current_dir()?.join("test-default-index-positions-local");
+ let remote_store_path = env::current_dir()?.join("test-default-index-positions-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5454,13 +5486,9 @@ mod tests {
store_path.clone().join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo".to_string(), false).await?;
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));
columns.push(Column::new("col2".to_string(), ColumnType::Bytes, 1));
@@ -5500,8 +5528,7 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
let table1_id = table1.id;
let expected_columns = vec![
@@ -5520,26 +5547,23 @@ mod tests {
None,
None,
Index::index_type_default(),
- )
- .unwrap();
+ )?;
let expected_res = vec![IdRow::new(1, expected_index)];
- assert_eq!(meta_store.get_table_indexes(1).await.unwrap(), expected_res);
+ assert_eq!(meta_store.get_table_indexes(1).await?, expected_res);
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn table_with_aggregate_index_test() {
+ async fn table_with_aggregate_index_test() -> Result<(), CubeError> {
init_test_logger().await;
let config = Config::test("table_with_aggregate_index_test");
- let store_path = env::current_dir()
- .unwrap()
- .join("test-table-aggregate-local");
- let remote_store_path = env::current_dir()
- .unwrap()
- .join("test-table-aggregate-remote");
+ let store_path = env::current_dir()?.join("test-table-aggregate-local");
+ let remote_store_path = env::current_dir()?.join("test-table-aggregate-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5548,13 +5572,9 @@ mod tests {
store_path.clone().join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo".to_string(), false).await?;
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));
columns.push(Column::new("col2".to_string(), ColumnType::String, 1));
@@ -5593,16 +5613,14 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
let table_id = table1.get_id();
assert_eq!(
meta_store
.get_table("foo".to_string(), "boo".to_string())
- .await
- .unwrap(),
+ .await?,
table1
);
@@ -5622,7 +5640,7 @@ mod tests {
)
);
- let indexes = meta_store.get_table_indexes(table_id).await.unwrap();
+ let indexes = meta_store.get_table_indexes(table_id).await?;
assert_eq!(indexes.len(), 2);
let ind = indexes
.into_iter()
@@ -5723,19 +5741,18 @@ mod tests {
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn table_with_default_index_no_measures_test() {
+ async fn table_with_default_index_no_measures_test() -> Result<(), CubeError> {
init_test_logger().await;
let config = Config::test("table_with_default_index_no_measures_test");
- let store_path = env::current_dir()
- .unwrap()
- .join("test-table-default-index-no-measure-local");
- let remote_store_path = env::current_dir()
- .unwrap()
- .join("test-table-default-index-no-measure-remote");
+ let store_path = env::current_dir()?.join("test-table-default-index-no-measure-local");
+ let remote_store_path =
+ env::current_dir()?.join("test-table-default-index-no-measure-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -5744,13 +5761,9 @@ mod tests {
store_path.clone().join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo".to_string(), false).await?;
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));
columns.push(Column::new("col2".to_string(), ColumnType::String, 1));
@@ -5784,20 +5797,18 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
let table_id = table1.get_id();
assert_eq!(
meta_store
.get_table("foo".to_string(), "boo".to_string())
- .await
- .unwrap(),
+ .await?,
table1
);
- let indexes = meta_store.get_table_indexes(table_id).await.unwrap();
+ let indexes = meta_store.get_table_indexes(table_id).await?;
assert_eq!(indexes.len(), 1);
let ind = &indexes[0];
@@ -5821,10 +5832,12 @@ mod tests {
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn cold_start_test() {
+ async fn cold_start_test() -> Result<(), CubeError> {
init_test_logger().await;
{
@@ -5834,76 +5847,60 @@ mod tests {
let _ = fs::remove_dir_all(config.remote_dir());
let services = config.configure().await;
- services.start_processing_loops().await.unwrap();
+ services.start_processing_loops().await?;
services
.meta_store
.create_schema("foo1".to_string(), false)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
+ .await?;
services
.meta_store
.create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.upload_check_point()
- .await
- .unwrap();
+ .await?;
services
.meta_store
.create_schema("bar".to_string(), false)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
- services.stop_processing_loops().await.unwrap();
+ .await?;
+ services.stop_processing_loops().await?;
Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict
- fs::remove_dir_all(config.local_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
}
{
let config = Config::test("cold_start_test");
let services2 = config.configure().await;
- services2
- .meta_store
- .get_schema("foo1".to_string())
- .await
- .unwrap();
- services2
- .meta_store
- .get_schema("foo".to_string())
- .await
- .unwrap();
- services2
- .meta_store
- .get_schema("bar".to_string())
- .await
- .unwrap();
+ services2.meta_store.get_schema("foo1".to_string()).await?;
+ services2.meta_store.get_schema("foo".to_string()).await?;
+ services2.meta_store.get_schema("bar".to_string()).await?;
- fs::remove_dir_all(config.local_dir()).unwrap();
- fs::remove_dir_all(config.remote_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
+ fs::remove_dir_all(config.remote_dir())?;
}
+
+ Ok(())
}
#[tokio::test]
- async fn get_snapshots_list() {
+ async fn get_snapshots_list() -> Result<(), CubeError> {
{
let config = Config::test("get_snapshots_list");
@@ -5911,86 +5908,76 @@ mod tests {
let _ = fs::remove_dir_all(config.remote_dir());
let services = config.configure().await;
- services.start_processing_loops().await.unwrap();
+ services.start_processing_loops().await?;
let snapshots = services
.rocks_meta_store
.as_ref()
.unwrap()
.get_snapshots_list()
- .await
- .unwrap();
+ .await?;
assert_eq!(snapshots.len(), 0);
services
.meta_store
.create_schema("foo1".to_string(), false)
- .await
- .unwrap();
+ .await?;
assert_eq!(snapshots.len(), 0);
services
.rocks_meta_store
.as_ref()
.unwrap()
.upload_check_point()
- .await
- .unwrap();
+ .await?;
let snapshots = services
.rocks_meta_store
.as_ref()
.unwrap()
.get_snapshots_list()
- .await
- .unwrap();
+ .await?;
assert_eq!(snapshots.len(), 1);
assert!(snapshots[0].current);
services
.meta_store
.create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.upload_check_point()
- .await
- .unwrap();
+ .await?;
let snapshots = services
.rocks_meta_store
.as_ref()
.unwrap()
.get_snapshots_list()
- .await
- .unwrap();
+ .await?;
assert_eq!(snapshots.len(), 2);
assert!(!snapshots[0].current);
assert!(snapshots[1].current);
services
.meta_store
.create_schema("bar".to_string(), false)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.upload_check_point()
- .await
- .unwrap();
+ .await?;
let snapshots = services
.rocks_meta_store
.as_ref()
.unwrap()
.get_snapshots_list()
- .await
- .unwrap();
+ .await?;
assert_eq!(snapshots.len(), 3);
assert!(!snapshots[0].current);
assert!(!snapshots[1].current);
assert!(snapshots[2].current);
- services.stop_processing_loops().await.unwrap();
+ services.stop_processing_loops().await?;
Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict
- fs::remove_dir_all(config.local_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
}
{
@@ -6002,18 +5989,19 @@ mod tests {
.as_ref()
.unwrap()
.get_snapshots_list()
- .await
- .unwrap();
+ .await?;
assert_eq!(snapshots.len(), 3);
assert!(!snapshots[0].current);
assert!(!snapshots[1].current);
assert!(snapshots[2].current);
- fs::remove_dir_all(config.local_dir()).unwrap();
- fs::remove_dir_all(config.remote_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
+ fs::remove_dir_all(config.remote_dir())?;
}
+
+ Ok(())
}
#[tokio::test]
- async fn set_current_snapshot() {
+ async fn set_current_snapshot() -> Result<(), CubeError> {
init_test_logger().await;
{
@@ -6023,27 +6011,24 @@ mod tests {
let _ = fs::remove_dir_all(config.remote_dir());
let services = config.configure().await;
- services.start_processing_loops().await.unwrap();
+ services.start_processing_loops().await?;
let rocks_meta_store = services.rocks_meta_store.as_ref().unwrap();
services
.meta_store
.create_schema("foo1".to_string(), false)
- .await
- .unwrap();
- rocks_meta_store.upload_check_point().await.unwrap();
+ .await?;
+ rocks_meta_store.upload_check_point().await?;
services
.meta_store
.create_schema("foo".to_string(), false)
- .await
- .unwrap();
- rocks_meta_store.upload_check_point().await.unwrap();
+ .await?;
+ rocks_meta_store.upload_check_point().await?;
services
.meta_store
.create_schema("bar".to_string(), false)
- .await
- .unwrap();
- rocks_meta_store.upload_check_point().await.unwrap();
- let snapshots = services.meta_store.get_snapshots_list().await.unwrap();
+ .await?;
+ rocks_meta_store.upload_check_point().await?;
+ let snapshots = services.meta_store.get_snapshots_list().await?;
assert_eq!(snapshots.len(), 3);
assert!(!snapshots[0].current);
assert!(!snapshots[1].current);
@@ -6076,42 +6061,33 @@ mod tests {
services
.meta_store
.create_schema("bar_after".to_string(), false)
- .await
- .unwrap();
- rocks_meta_store.upload_check_point().await.unwrap();
- rocks_meta_store.run_upload().await.unwrap();
+ .await?;
+ rocks_meta_store.upload_check_point().await?;
+ rocks_meta_store.run_upload().await?;
- let snapshots = services.meta_store.get_snapshots_list().await.unwrap();
+ let snapshots = services.meta_store.get_snapshots_list().await?;
assert_eq!(snapshots.len(), 3);
assert!(!snapshots[0].current);
assert!(snapshots[1].current);
assert!(!snapshots[2].current);
- services.stop_processing_loops().await.unwrap();
+ services.stop_processing_loops().await?;
Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict
- fs::remove_dir_all(config.local_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
}
{
let config = Config::test("set_current_snapshot");
let services2 = config.configure().await;
- let snapshots = services2.meta_store.get_snapshots_list().await.unwrap();
+ let snapshots = services2.meta_store.get_snapshots_list().await?;
assert_eq!(snapshots.len(), 3);
assert!(!snapshots[0].current);
assert!(snapshots[1].current);
assert!(!snapshots[2].current);
- services2
- .meta_store
- .get_schema("foo1".to_string())
- .await
- .unwrap();
- services2
- .meta_store
- .get_schema("foo".to_string())
- .await
- .unwrap();
+ services2.meta_store.get_schema("foo1".to_string()).await?;
+ services2.meta_store.get_schema("foo".to_string()).await?;
assert!(services2
.meta_store
.get_schema("bar".to_string())
@@ -6129,45 +6105,34 @@ mod tests {
.await;
assert!(res.is_ok());
Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict
- fs::remove_dir_all(config.local_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
}
{
let config = Config::test("set_current_snapshot");
let services3 = config.configure().await;
- let snapshots = services3.meta_store.get_snapshots_list().await.unwrap();
+ let snapshots = services3.meta_store.get_snapshots_list().await?;
assert_eq!(snapshots.len(), 3);
assert!(!snapshots[0].current);
assert!(!snapshots[1].current);
assert!(snapshots[2].current);
- services3
- .meta_store
- .get_schema("foo1".to_string())
- .await
- .unwrap();
- services3
- .meta_store
- .get_schema("foo".to_string())
- .await
- .unwrap();
- services3
- .meta_store
- .get_schema("bar".to_string())
- .await
- .unwrap();
+ services3.meta_store.get_schema("foo1".to_string()).await?;
+ services3.meta_store.get_schema("foo".to_string()).await?;
+ services3.meta_store.get_schema("bar".to_string()).await?;
services3
.meta_store
.get_schema("bar_after".to_string())
- .await
- .unwrap();
- fs::remove_dir_all(config.local_dir()).unwrap();
- fs::remove_dir_all(config.remote_dir()).unwrap();
+ .await?;
+ fs::remove_dir_all(config.local_dir())?;
+ fs::remove_dir_all(config.remote_dir())?;
}
+
+ Ok(())
}
#[tokio::test]
- async fn upload_logs_without_snapshots() {
+ async fn upload_logs_without_snapshots() -> Result<(), CubeError> {
let config = Config::test("upload_logs_without_snapshots");
let _ = fs::remove_dir_all(config.local_dir());
@@ -6175,7 +6140,7 @@ mod tests {
let services = config.configure().await;
- services.start_processing_loops().await.unwrap();
+ services.start_processing_loops().await?;
let rocks_meta_store = services.rocks_meta_store.as_ref().unwrap();
let remote_fs = services
.injector
@@ -6184,29 +6149,26 @@ mod tests {
services
.meta_store
.create_schema("foo1".to_string(), false)
- .await
- .unwrap();
- rocks_meta_store.run_upload().await.unwrap();
+ .await?;
+ rocks_meta_store.run_upload().await?;
services
.meta_store
.create_schema("foo".to_string(), false)
- .await
- .unwrap();
- rocks_meta_store.run_upload().await.unwrap();
- let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap();
+ .await?;
+ rocks_meta_store.run_upload().await?;
+ let uploaded = remote_fs.list("metastore-".to_string()).await?;
assert!(uploaded.is_empty());
- rocks_meta_store.upload_check_point().await.unwrap();
+ rocks_meta_store.upload_check_point().await?;
services
.meta_store
.create_schema("bar".to_string(), false)
- .await
- .unwrap();
+ .await?;
- rocks_meta_store.run_upload().await.unwrap();
+ rocks_meta_store.run_upload().await?;
- let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap();
+ let uploaded = remote_fs.list("metastore-".to_string()).await?;
let logs_uploaded = uploaded
.into_iter()
@@ -6215,9 +6177,9 @@ mod tests {
assert_eq!(logs_uploaded.len(), 1);
- rocks_meta_store.run_upload().await.unwrap();
+ rocks_meta_store.run_upload().await?;
- let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap();
+ let uploaded = remote_fs.list("metastore-".to_string()).await?;
let logs_uploaded = uploaded
.into_iter()
@@ -6229,12 +6191,11 @@ mod tests {
services
.meta_store
.create_schema("bar2".to_string(), false)
- .await
- .unwrap();
+ .await?;
- rocks_meta_store.run_upload().await.unwrap();
+ rocks_meta_store.run_upload().await?;
- let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap();
+ let uploaded = remote_fs.list("metastore-".to_string()).await?;
let logs_uploaded = uploaded
.into_iter()
@@ -6245,10 +6206,12 @@ mod tests {
let _ = fs::remove_dir_all(config.local_dir());
let _ = fs::remove_dir_all(config.remote_dir());
+
+ Ok(())
}
#[tokio::test]
- async fn log_replay_ordering() {
+ async fn log_replay_ordering() -> Result<(), CubeError> {
init_test_logger().await;
{
@@ -6258,27 +6221,24 @@ mod tests {
let _ = fs::remove_dir_all(config.remote_dir());
let services = config.configure().await;
- services.start_processing_loops().await.unwrap();
+ services.start_processing_loops().await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.upload_check_point()
- .await
- .unwrap();
+ .await?;
for i in 0..100 {
let schema = services
.meta_store
.create_schema(format!("foo{}", i), false)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
+ .await?;
let table = services
.meta_store
.create_table(
@@ -6301,62 +6261,51 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
- services
- .meta_store
- .drop_table(table.get_id())
- .await
- .unwrap();
+ .await?;
+ services.meta_store.drop_table(table.get_id()).await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
+ .await?;
services
.meta_store
.delete_schema_by_id(schema.get_id())
- .await
- .unwrap();
+ .await?;
services
.rocks_meta_store
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
+ .await?;
}
- services.stop_processing_loops().await.unwrap();
+ services.stop_processing_loops().await?;
Delay::new(Duration::from_millis(2000)).await;
- fs::remove_dir_all(config.local_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
}
{
let config = Config::test("log_replay_ordering");
let services2 = config.configure().await;
- let tables = services2
- .meta_store
- .get_tables_with_path(true)
- .await
- .unwrap();
+ let tables = services2.meta_store.get_tables_with_path(true).await?;
assert_eq!(tables.len(), 0);
let _ = fs::remove_dir_all(config.local_dir());
let _ = fs::remove_dir_all(config.remote_dir());
}
+
+ Ok(())
}
#[tokio::test]
- async fn discard_logs() {
+ async fn discard_logs() -> Result<(), CubeError> {
{
let config = Config::test("discard_logs");
@@ -6364,19 +6313,17 @@ mod tests {
let _ = fs::remove_dir_all(config.remote_dir());
let services = config.configure().await;
- services.start_processing_loops().await.unwrap();
+ services.start_processing_loops().await?;
services
.meta_store
.create_schema("foo1".to_string(), false)
- .await
- .unwrap();
+ .await?;
while !services
.rocks_meta_store
.as_ref()
.unwrap()
.has_pending_changes()
- .await
- .unwrap()
+ .await?
{
futures_timer::Delay::new(Duration::from_millis(100)).await;
}
@@ -6385,20 +6332,17 @@ mod tests {
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
+ .await?;
services
.meta_store
.create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ .await?;
while !services
.rocks_meta_store
.as_ref()
.unwrap()
.has_pending_changes()
- .await
- .unwrap()
+ .await?
{
futures_timer::Delay::new(Duration::from_millis(100)).await;
}
@@ -6407,20 +6351,17 @@ mod tests {
.as_ref()
.unwrap()
.upload_check_point()
- .await
- .unwrap();
+ .await?;
services
.meta_store
.create_schema("bar".to_string(), false)
- .await
- .unwrap();
+ .await?;
while !services
.rocks_meta_store
.as_ref()
.unwrap()
.has_pending_changes()
- .await
- .unwrap()
+ .await?
{
futures_timer::Delay::new(Duration::from_millis(100)).await;
}
@@ -6429,19 +6370,17 @@ mod tests {
.as_ref()
.unwrap()
.run_upload()
- .await
- .unwrap();
- services.stop_processing_loops().await.unwrap();
+ .await?;
+ services.stop_processing_loops().await?;
Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict
- fs::remove_dir_all(config.local_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
let list = LocalDirRemoteFs::list_recursive(
config.remote_dir().clone(),
"metastore-".to_string(),
)
- .await
- .unwrap();
- let re = Regex::new(r"(\d+).flex").unwrap();
+ .await?;
+ let re = Regex::new(r"(\d+).flex")?;
let last_log = list
.iter()
.filter(|f| re.captures(f.remote_path()).is_some())
@@ -6456,37 +6395,30 @@ mod tests {
println!("Truncating {:?}", file_path);
let file = std::fs::OpenOptions::new()
.write(true)
- .open(file_path.clone())
- .unwrap();
- println!("Size {}", file.metadata().unwrap().len());
- file.set_len(50).unwrap();
+ .open(file_path.clone())?;
+ println!("Size {}", file.metadata()?.len());
+ file.set_len(50)?;
}
{
let config = Config::test("discard_logs");
let services2 = config.configure().await;
- services2
- .meta_store
- .get_schema("foo1".to_string())
- .await
- .unwrap();
- services2
- .meta_store
- .get_schema("foo".to_string())
- .await
- .unwrap();
+ services2.meta_store.get_schema("foo1".to_string()).await?;
+ services2.meta_store.get_schema("foo".to_string()).await?;
- fs::remove_dir_all(config.local_dir()).unwrap();
- fs::remove_dir_all(config.remote_dir()).unwrap();
+ fs::remove_dir_all(config.local_dir())?;
+ fs::remove_dir_all(config.remote_dir())?;
}
+
+ Ok(())
}
#[tokio::test]
- async fn swap_chunks() {
+ async fn swap_chunks() -> Result<(), CubeError> {
let config = Config::test("swap_chunks");
- let store_path = env::current_dir().unwrap().join("swap_chunks_test-local");
- let remote_store_path = env::current_dir().unwrap().join("swap_chunks_test-remote");
+ let store_path = env::current_dir()?.join("swap_chunks_test-local");
+ let remote_store_path = env::current_dir()?.join("swap_chunks_test-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -6495,12 +6427,8 @@ mod tests {
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ )?;
+ meta_store.create_schema("foo".to_string(), false).await?;
let cols = vec![Column::new("name".to_string(), ColumnType::String, 0)];
meta_store
.create_table(
@@ -6523,37 +6451,32 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
- let partition = meta_store.get_partition(1).await.unwrap();
+ .await?;
+ let partition = meta_store.get_partition(1).await?;
//============= trying to swap same source chunks twice ==============
let mut source_ids: Vec = Vec::new();
let ch = meta_store
.create_chunk(partition.get_id(), 10, None, None, true)
- .await
- .unwrap();
+ .await?;
source_ids.push(ch.get_id());
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ meta_store.chunk_uploaded(ch.get_id()).await?;
let ch = meta_store
.create_chunk(partition.get_id(), 16, None, None, true)
- .await
- .unwrap();
+ .await?;
source_ids.push(ch.get_id());
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ meta_store.chunk_uploaded(ch.get_id()).await?;
let dest_chunk = meta_store
.create_chunk(partition.get_id(), 26, None, None, true)
- .await
- .unwrap();
+ .await?;
assert_eq!(dest_chunk.get_row().active(), false);
let dest_chunk2 = meta_store
.create_chunk(partition.get_id(), 26, None, None, true)
- .await
- .unwrap();
+ .await?;
assert_eq!(dest_chunk2.get_row().active(), false);
meta_store
@@ -6562,15 +6485,14 @@ mod tests {
vec![(dest_chunk.get_id(), Some(26))],
None,
)
- .await
- .unwrap();
+ .await?;
for id in source_ids.iter() {
- let ch = meta_store.get_chunk(id.to_owned()).await.unwrap();
+ let ch = meta_store.get_chunk(id.to_owned()).await?;
assert_eq!(ch.get_row().active(), false);
}
- let ch = meta_store.get_chunk(dest_chunk.get_id()).await.unwrap();
+ let ch = meta_store.get_chunk(dest_chunk.get_id()).await?;
assert_eq!(ch.get_row().active(), true);
meta_store
@@ -6586,17 +6508,15 @@ mod tests {
let mut source_ids: Vec = Vec::new();
let ch = meta_store
.create_chunk(partition.get_id(), 10, None, None, true)
- .await
- .unwrap();
+ .await?;
source_ids.push(ch.get_id());
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ meta_store.chunk_uploaded(ch.get_id()).await?;
let ch = meta_store
.create_chunk(partition.get_id(), 16, None, None, true)
- .await
- .unwrap();
+ .await?;
source_ids.push(ch.get_id());
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ meta_store.chunk_uploaded(ch.get_id()).await?;
meta_store
.swap_chunks(
@@ -6610,7 +6530,7 @@ mod tests {
);
for id in source_ids.iter() {
- let ch = meta_store.get_chunk(id.to_owned()).await.unwrap();
+ let ch = meta_store.get_chunk(id.to_owned()).await?;
assert_eq!(ch.get_row().active(), true);
}
}
@@ -6618,22 +6538,20 @@ mod tests {
assert!(true);
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn delete_old_snapshots() {
+ async fn delete_old_snapshots() -> Result<(), CubeError> {
let metastore_snapshots_lifetime_secs = 1;
let config = Config::test("delete_old_snapshots").update_config(|mut obj| {
obj.metastore_snapshots_lifetime = metastore_snapshots_lifetime_secs;
obj.minimum_metastore_snapshots_count = 2;
obj
});
- let store_path = env::current_dir()
- .unwrap()
- .join("delete_old_snapshots-local");
- let remote_store_path = env::current_dir()
- .unwrap()
- .join("delete_old_snapshots-remote");
+ let store_path = env::current_dir()?.join("delete_old_snapshots-local");
+ let remote_store_path = env::current_dir()?.join("delete_old_snapshots-remote");
let _ = fs::remove_dir_all(&store_path);
let _ = fs::remove_dir_all(&remote_store_path);
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -6642,56 +6560,38 @@ mod tests {
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
- // let list = remote_fs.list("metastore-".to_owned()).await.unwrap();
+ // let list = remote_fs.list("metastore-".to_owned()).await?;
// assert_eq!(0, list.len(), "remote fs list: {:?}", list);
let uploaded =
- BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
- .await
- .unwrap();
+ BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?;
assert_eq!(uploaded.len(), 0);
- meta_store
- .create_schema("foo1".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo1".to_string(), false).await?;
- meta_store.upload_check_point().await.unwrap();
+ meta_store.upload_check_point().await?;
let uploaded1 =
- BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
- .await
- .unwrap();
+ BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?;
assert_eq!(uploaded1.len(), 1);
- meta_store
- .create_schema("foo2".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo2".to_string(), false).await?;
- meta_store.upload_check_point().await.unwrap();
+ meta_store.upload_check_point().await?;
let uploaded2 =
- BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
- .await
- .unwrap();
+ BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?;
assert_eq!(uploaded2.len(), 2);
- meta_store
- .create_schema("foo3".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo3".to_string(), false).await?;
- meta_store.upload_check_point().await.unwrap();
+ meta_store.upload_check_point().await?;
let uploaded3 =
- BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
- .await
- .unwrap();
+ BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?;
assert_eq!(
uploaded3.len(),
@@ -6700,21 +6600,16 @@ mod tests {
uploaded3.keys().join(", ")
);
- meta_store
- .create_schema("foo4".to_string(), false)
- .await
- .unwrap();
+ meta_store.create_schema("foo4".to_string(), false).await?;
tokio::time::sleep(Duration::from_millis(
metastore_snapshots_lifetime_secs * 1000 + 100,
))
.await;
- meta_store.upload_check_point().await.unwrap();
+ meta_store.upload_check_point().await?;
let uploaded4 =
- BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
- .await
- .unwrap();
+ BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?;
// Should have 2 remaining snapshots because 2 is the minimum.
assert_eq!(uploaded4.len(), 2);
@@ -6722,17 +6617,15 @@ mod tests {
let _ = fs::remove_dir_all(&store_path);
let _ = fs::remove_dir_all(&remote_store_path);
+
+ Ok(())
}
#[tokio::test]
- async fn swap_active_partitions() {
+ async fn swap_active_partitions() -> Result<(), CubeError> {
let config = Config::test("swap_active_partitions");
- let store_path = env::current_dir()
- .unwrap()
- .join("swap_active_partitions_test-local");
- let remote_store_path = env::current_dir()
- .unwrap()
- .join("swap_active_partitions_test-remote");
+ let store_path = env::current_dir()?.join("swap_active_partitions_test-local");
+ let remote_store_path = env::current_dir()?.join("swap_active_partitions_test-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -6741,12 +6634,8 @@ mod tests {
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
- meta_store
- .create_schema("foo".to_string(), false)
- .await
- .unwrap();
+ )?;
+ meta_store.create_schema("foo".to_string(), false).await?;
let cols = vec![Column::new("name".to_string(), ColumnType::String, 0)];
meta_store
.create_table(
@@ -6769,29 +6658,25 @@ mod tests {
false,
None,
)
- .await
- .unwrap();
- let partition = meta_store.get_partition(1).await.unwrap();
+ .await?;
+ let partition = meta_store.get_partition(1).await?;
let mut source_chunks: Vec> = Vec::new();
let ch = meta_store
.create_chunk(partition.get_id(), 10, None, None, true)
- .await
- .unwrap();
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ .await?;
+ meta_store.chunk_uploaded(ch.get_id()).await?;
source_chunks.push(ch);
let ch = meta_store
.create_chunk(partition.get_id(), 16, None, None, true)
- .await
- .unwrap();
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ .await?;
+ meta_store.chunk_uploaded(ch.get_id()).await?;
source_chunks.push(ch);
let dest_partition = meta_store
.create_partition(Partition::new_child(&partition, None))
- .await
- .unwrap();
+ .await?;
meta_store
.swap_active_partitions(
@@ -6799,34 +6684,22 @@ mod tests {
vec![(dest_partition.clone(), 10)],
vec![(26, (None, None), (None, None))],
)
- .await
- .unwrap();
+ .await?;
assert_eq!(
- meta_store
- .get_partition(1)
- .await
- .unwrap()
- .get_row()
- .is_active(),
+ meta_store.get_partition(1).await?.get_row().is_active(),
false
);
assert_eq!(
meta_store
.get_partition(dest_partition.get_id())
- .await
- .unwrap()
+ .await?
.get_row()
.is_active(),
true
);
for c in source_chunks.iter() {
assert_eq!(
- meta_store
- .get_chunk(c.get_id())
- .await
- .unwrap()
- .get_row()
- .active(),
+ meta_store.get_chunk(c.get_id()).await?.get_row().active(),
false
);
}
@@ -6836,22 +6709,19 @@ mod tests {
let mut source_chunks: Vec> = Vec::new();
let ch = meta_store
.create_chunk(partition.clone().get_id(), 10, None, None, true)
- .await
- .unwrap();
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ .await?;
+ meta_store.chunk_uploaded(ch.get_id()).await?;
source_chunks.push(ch);
let ch = meta_store
.create_chunk(partition.get_id(), 16, None, None, true)
- .await
- .unwrap();
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ .await?;
+ meta_store.chunk_uploaded(ch.get_id()).await?;
source_chunks.push(ch);
let dest_partition = meta_store
.create_partition(Partition::new_child(&partition, None))
- .await
- .unwrap();
+ .await?;
match meta_store
.swap_active_partitions(
@@ -6873,27 +6743,23 @@ mod tests {
let partition = meta_store
.get_active_partitions_by_index_id(1)
- .await
- .unwrap()
+ .await?
.first()
.unwrap()
.to_owned();
let ch = meta_store
.create_chunk(partition.clone().get_id(), 10, None, None, true)
- .await
- .unwrap();
+ .await?;
source_chunks.push(ch);
let ch = meta_store
.create_chunk(partition.get_id(), 16, None, None, true)
- .await
- .unwrap();
+ .await?;
source_chunks.push(ch);
let dest_partition = meta_store
.create_partition(Partition::new_child(&partition, None))
- .await
- .unwrap();
+ .await?;
let dest_row_count = partition.get_row().main_table_row_count() + 26;
@@ -6916,23 +6782,20 @@ mod tests {
let partition = meta_store
.get_active_partitions_by_index_id(1)
- .await
- .unwrap()
+ .await?
.first()
.unwrap()
.to_owned();
let ch = meta_store
.create_chunk(partition.clone().get_id(), 10, None, None, true)
- .await
- .unwrap();
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ .await?;
+ meta_store.chunk_uploaded(ch.get_id()).await?;
source_chunks.push(ch);
let ch = meta_store
.create_chunk(partition.get_id(), 16, None, None, true)
- .await
- .unwrap();
- meta_store.chunk_uploaded(ch.get_id()).await.unwrap();
+ .await?;
+ meta_store.chunk_uploaded(ch.get_id()).await?;
source_chunks.push(ch);
let dest_row_count = partition.get_row().main_table_row_count() + 26;
@@ -6955,13 +6818,15 @@ mod tests {
assert!(true);
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
#[tokio::test]
- async fn job_priority_test() {
+ async fn job_priority_test() -> Result<(), CubeError> {
let config = Config::test("job_priority_test");
- let store_path = env::current_dir().unwrap().join("test-job-priority-local");
- let remote_store_path = env::current_dir().unwrap().join("test-job-priority-remote");
+ let store_path = env::current_dir()?.join("test-job-priority-local");
+ let remote_store_path = env::current_dir()?.join("test-job-priority-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
@@ -6970,40 +6835,35 @@ mod tests {
store_path.clone().join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
- )
- .unwrap();
+ )?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 1),
JobType::InMemoryChunksCompaction,
"node1".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 1),
JobType::PartitionCompaction,
"node1".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 2),
JobType::PartitionCompaction,
"node1".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 3),
JobType::InMemoryChunksCompaction,
"node1".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
@@ -7011,37 +6871,32 @@ mod tests {
JobType::PartitionCompaction,
"node2".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 12),
JobType::PartitionCompaction,
"node2".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 13),
JobType::InMemoryChunksCompaction,
"node2".to_string(),
))
- .await
- .unwrap();
+ .await?;
meta_store
.add_job(Job::new(
RowKey::Table(TableId::Partitions, 11),
JobType::InMemoryChunksCompaction,
"node2".to_string(),
))
- .await
- .unwrap();
+ .await?;
let job = meta_store
.start_processing_job("node1".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction);
assert_eq!(
@@ -7051,8 +6906,7 @@ mod tests {
let job = meta_store
.start_processing_job("node1".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction);
assert_eq!(
@@ -7062,8 +6916,7 @@ mod tests {
let job = meta_store
.start_processing_job("node1".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction);
assert_eq!(
@@ -7073,8 +6926,7 @@ mod tests {
let job = meta_store
.start_processing_job("node1".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction);
assert_eq!(
@@ -7084,8 +6936,7 @@ mod tests {
let job = meta_store
.start_processing_job("node2".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction);
assert_eq!(
@@ -7095,8 +6946,7 @@ mod tests {
let job = meta_store
.start_processing_job("node2".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction);
assert_eq!(
@@ -7106,8 +6956,7 @@ mod tests {
let job = meta_store
.start_processing_job("node2".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction);
assert_eq!(
@@ -7117,8 +6966,7 @@ mod tests {
let job = meta_store
.start_processing_job("node2".to_string(), false)
- .await
- .unwrap()
+ .await?
.unwrap();
assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction);
assert_eq!(
@@ -7128,6 +6976,8 @@ mod tests {
}
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
+
+ Ok(())
}
}
@@ -7136,7 +6986,7 @@ impl RocksMetaStore {
deactivate_ids: Vec,
uploaded_ids_and_sizes: Vec<(u64, Option)>,
db_ref: DbTableRef,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
check_rows: bool,
new_replay_handle_id: Option,
) -> Result<(), CubeError> {
@@ -7236,7 +7086,7 @@ impl RocksMetaStore {
impl RocksMetaStore {
fn drop_index(
db: DbTableRef,
- pipe: &mut BatchPipe,
+ pipe: &mut BatchPipe<'_, RocksMetaStore>,
index_id: u64,
update_multi_partitions: bool,
) -> Result<(), CubeError> {
diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs
index c6b54f7dc1a39..7615b63bfc559 100644
--- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs
+++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs
@@ -1,5 +1,4 @@
use crate::config::ConfigObj;
-use crate::metastore::table::TablePath;
use crate::metastore::{MetaStoreEvent, MetaStoreFs};
use crate::util::time_span::warn_long;
@@ -629,20 +628,22 @@ pub struct KeyVal {
pub val: Vec,
}
-pub struct BatchPipe<'a> {
+pub type PostCommitCallback = Box;
+
+pub struct BatchPipe<'a, S = ()> {
db: &'a DB,
write_batch: WriteBatch,
events: Vec,
- pub invalidate_tables_cache: bool,
+ post_commit_callback: Option>,
}
-impl<'a> BatchPipe<'a> {
- pub fn new(db: &'a DB) -> BatchPipe<'a> {
+impl<'a, S> BatchPipe<'a, S> {
+ pub fn new(db: &'a DB) -> BatchPipe<'a, S> {
BatchPipe {
db,
write_batch: WriteBatch::default(),
events: Vec::new(),
- invalidate_tables_cache: false,
+ post_commit_callback: None,
}
}
@@ -654,14 +655,24 @@ impl<'a> BatchPipe<'a> {
self.events.push(event);
}
- pub fn batch_write_rows(self) -> Result, CubeError> {
+ pub fn batch_write_rows(
+ self,
+ ) -> Result<(Vec, Option>), CubeError> {
let db = self.db;
db.write(self.write_batch)?;
- Ok(self.events)
+
+ Ok((self.events, self.post_commit_callback))
}
- pub fn invalidate_tables_cache(&mut self) {
- self.invalidate_tables_cache = true;
+ /// Set the callback that runs on the RW-loop thread after the RocksDB
+ /// commit succeeds. Overwrites any previously set callback. The callback
+ /// receives the store instance so it can reach shared state. Must not
+ /// panic and must not block on async work.
+ pub fn set_post_commit_callback(&mut self, f: F)
+ where
+ F: FnOnce(&S) + Send + 'static,
+ {
+ self.post_commit_callback = Some(Box::new(f));
}
}
@@ -882,8 +893,7 @@ pub struct RocksStore {
last_check_seq: Arc>,
snapshot_uploaded: Arc>,
snapshots_upload_stopped: Arc>,
- pub(crate) cached_tables: Arc>>>>,
- rw_loop_default_cf: RocksStoreRWLoop,
+ pub(crate) rw_loop_default_cf: RocksStoreRWLoop,
details: Arc,
}
@@ -936,7 +946,6 @@ impl RocksStore {
last_check_seq: Arc::new(RwLock::new(db_arc.latest_sequence_number())),
snapshots_upload_stopped: Arc::new(AsyncMutex::new(false)),
config,
- cached_tables: Arc::new(Mutex::new(None)),
rw_loop_default_cf: RocksStoreRWLoop::new("metastore", "default"),
details,
};
@@ -1019,33 +1028,34 @@ impl RocksStore {
#[inline(always)]
pub async fn write_operation(&self, op_name: &'static str, f: F) -> Result
where
- F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result
+ F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
- self.write_operation_impl::(&self.rw_loop_default_cf, op_name, f)
+ self.write_operation_impl::(&self.rw_loop_default_cf, op_name, f, ())
.await
}
- pub async fn write_operation_impl(
+ pub async fn write_operation_impl(
&self,
rw_loop: &RocksStoreRWLoop,
op_name: &'static str,
f: F,
+ store: S,
) -> Result
where
- F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result
+ F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a, S>) -> Result
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
+ S: Send + Sync + 'static,
{
let db = self.db.clone();
let mem_seq = MemorySequence::new(self.seq_store.clone());
let db_to_send = db.clone();
- let cached_tables = self.cached_tables.clone();
let loop_name = rw_loop.get_name();
let store_name = self.details.get_name();
@@ -1070,11 +1080,11 @@ impl RocksStore {
);
match res {
Ok(res) => {
- if batch.invalidate_tables_cache {
- *cached_tables.lock().unwrap() = None;
+ let (events, callback) = batch.batch_write_rows()?;
+ if let Some(cb) = callback {
+ cb(&store);
}
- let write_result = batch.batch_write_rows()?;
- tx.send(Ok((res, write_result))).map_err(|_| {
+ tx.send(Ok((res, events))).map_err(|_| {
CubeError::internal(format!(
"[{}-{}] Write operation result receiver has been dropped",
store_name, loop_name
diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs
index 8b8bacacc55c4..7635459fbeb0b 100644
--- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs
+++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs
@@ -498,11 +498,11 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
}
/// @internal Do not use this method directly, please use insert or insert_with_pk
- fn do_insert(
+ fn do_insert(
&self,
row_id: Option,
row: Self::T,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result, CubeError> {
let mut ser = flexbuffers::FlexbufferSerializer::new();
row.serialize(&mut ser).unwrap();
@@ -550,19 +550,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
Ok(IdRow::new(row_id, row))
}
- fn insert_with_pk(
+ fn insert_with_pk(
&self,
row_id: u64,
row: Self::T,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result, CubeError> {
self.do_insert(Some(row_id), row, batch_pipe)
}
- fn insert(
+ fn insert(
&self,
row: Self::T,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result, CubeError> {
self.do_insert(None, row, batch_pipe)
}
@@ -911,34 +911,34 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
self.get_row_by_index_opt(row_key, secondary_index, true)
}
- fn update_with_fn(
+ fn update_with_fn(
&self,
row_id: u64,
update_fn: impl FnOnce(&Self::T) -> Self::T,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result, CubeError> {
let row = self.get_row_or_not_found(row_id)?;
let new_row = update_fn(&row.get_row());
self.update(row_id, new_row, &row.get_row(), batch_pipe)
}
- fn update_with_res_fn(
+ fn update_with_res_fn(
&self,
row_id: u64,
update_fn: impl FnOnce(&Self::T) -> Result,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result, CubeError> {
let row = self.get_row_or_not_found(row_id)?;
let new_row = update_fn(&row.get_row())?;
self.update(row_id, new_row, &row.get_row(), batch_pipe)
}
- fn update(
+ fn update(
&self,
row_id: u64,
new_row: Self::T,
old_row: &Self::T,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result, CubeError> {
let deleted_row = self.delete_index_row(&old_row, row_id)?;
for row in deleted_row {
@@ -969,7 +969,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
Ok(IdRow::new(row_id, new_row))
}
- fn truncate(&self, batch_pipe: &mut BatchPipe) -> Result<(), CubeError> {
+ fn truncate(&self, batch_pipe: &mut BatchPipe<'_, S>) -> Result<(), CubeError> {
let iter = self.table_scan(self.snapshot())?;
for item in iter {
@@ -981,13 +981,13 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
Ok(())
}
- fn update_extended_ttl_secondary_index<'a, K: Debug>(
+ fn update_extended_ttl_secondary_index<'a, K: Debug, S>(
&self,
row_id: u64,
secondary_index: &'a impl RocksSecondaryIndex,
secondary_key_hash: SecondaryKeyHash,
extended: RocksSecondaryIndexValueTTLExtended,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result
where
K: Hash,
@@ -1025,15 +1025,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
}
}
- fn delete(&self, row_id: u64, batch_pipe: &mut BatchPipe) -> Result, CubeError> {
+ fn delete(
+ &self,
+ row_id: u64,
+ batch_pipe: &mut BatchPipe<'_, S>,
+ ) -> Result, CubeError> {
let row = self.get_row_or_not_found(row_id)?;
self.delete_row(row, batch_pipe)
}
- fn try_delete(
+ fn try_delete(
&self,
row_id: u64,
- batch_pipe: &mut BatchPipe,
+ batch_pipe: &mut BatchPipe<'_, S>,
) -> Result