diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index e5c3389bf83f2..bd14875b42504 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -279,10 +279,6 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> { "queue_ack_then_result_v2_by_id", queue_ack_then_result_v2_by_id, ), - t( - "queue_ack_then_result_v2_with_external_id", - queue_ack_then_result_v2_with_external_id, - ), t("queue_orphaned_timeout", queue_orphaned_timeout), t("queue_heartbeat_by_id", queue_heartbeat_by_id), t("queue_heartbeat_by_path", queue_heartbeat_by_path), @@ -293,7 +289,6 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> { queue_multiple_result_blocking, ), t("queue_custom_orphaned", queue_custom_orphaned), - t("queue_result_by_external_id", queue_result_by_external_id), t( "queue_result_by_id_external_id_mismatch", queue_result_by_id_external_id_mismatch, @@ -363,9 +358,7 @@ lazy_static::lazy_static! { "limit_pushdown_unique_key", "queue_ack_then_result_v2", "queue_ack_then_result_v2_by_id", - "queue_ack_then_result_v2_with_external_id", "queue_custom_orphaned", - "queue_result_by_external_id", "queue_result_by_id_external_id_mismatch", "queue_result_ack_multiple_with_external_id", "queue_full_workflow_v1", @@ -10762,78 +10755,6 @@ async fn queue_ack_then_result_v2_by_id(service: Box) -> Result<( Ok(()) } -async fn queue_ack_then_result_v2_with_external_id( - service: Box, -) -> Result<(), CubeError> { - let add_response = service - .exec_query( - r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-5555' "STANDALONE#queue:5555" "payload1";"#, - ) - .await?; - let id = assert_queue_add_and_get_id(&add_response)?; - - let ack_result = service - .exec_query(&format!(r#"QUEUE ACK {} "result:5555""#, id)) - .await?; - assert_eq!( - ack_result.get_rows(), - &vec![Row::new(vec![TableValue::Boolean(true)])] - ); - - // double ack for result, should be restricted - { - let ack_result = service - .exec_query(&format!(r#"QUEUE ACK {} "result:5555""#, id)) - .await?; - assert_eq!( - ack_result.get_rows(), - &vec![Row::new(vec![TableValue::Boolean(false)])] - ); - } - - // ack on unknown queue item - { - let ack_result = service.exec_query(r#"QUEUE ACK 10 "result:5555""#).await?; - assert_eq!( - ack_result.get_rows(), - &vec![Row::new(vec![TableValue::Boolean(false)])] - ); - } - - // RESULT_BY_EXTERNAL_ID returns result and marks it for deletion - let result = service - .exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-5555""#) - .await?; - assert_queue_result_columns(&result); - assert_eq!( - result.get_rows(), - &vec![queue_result_row("result:5555", &id, Some("ext-5555"))] - ); - let result = service - .exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-5555""#) - .await?; - assert_eq!( - result.get_rows(), - &vec![queue_result_row("result:5555", &id, Some("ext-5555"))] - ); - - // RESULT by path should also return empty (already marked as deleted) - let result = service - .exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#) - .await?; - assert_eq!(result.get_rows().len(), 0); - - tokio::time::sleep(Duration::new(1, 0)).await; - - // should return, because we use id - let result = service - .exec_query(&format!("QUEUE RESULT_BLOCKING 1000 {}", id)) - .await?; - assert_queue_result_columns(&result); - assert_eq!(result.get_rows().len(), 1); - Ok(()) -} - async fn queue_orphaned_timeout(service: Box) -> Result<(), CubeError> { // CI is super slow, sometimes it can takes up to 1 second to bootstrap Cache Store // let's warmup it @@ -11144,48 +11065,6 @@ async fn queue_custom_orphaned(service: Box) -> Result<(), CubeEr Ok(()) } -async fn queue_result_by_external_id(service: Box) -> Result<(), CubeError> { - let add_response = service - .exec_query( - r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-123' "STANDALONE#queue:123456789" "payload123456789";"#, - ) - .await?; - let id = assert_queue_add_and_get_id(&add_response)?; - - let ack_result = service - .exec_query(r#"QUEUE ACK "STANDALONE#queue:123456789" "result:123456789""#) - .await?; - assert_eq!( - ack_result.get_rows(), - &vec![Row::new(vec![TableValue::Boolean(true)])] - ); - - let result = service - .exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "unknown-ext-id""#) - .await?; - assert_eq!(result.get_rows().len(), 0); - - let result = service - .exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-123""#) - .await?; - assert_queue_result_columns(&result); - - assert_eq!( - result.get_rows(), - &vec![queue_result_row("result:123456789", &id, Some("ext-123"))] - ); - - // Second call should return empty (result deleted after first retrieval) - let result = service - .exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-123""#) - .await?; - assert_eq!( - result.get_rows(), - &vec![queue_result_row("result:123456789", &id, Some("ext-123"))] - ); - Ok(()) -} - async fn queue_full_workflow_v2_with_external_id( service: Box, ) -> Result<(), CubeError> { diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index b2f3b3d10fc49..7055d7633964d 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -613,11 +613,15 @@ impl RocksCacheStore { })); }; - // try external_id first (if provided), then fall back to path lookup + // try (path, external_id) first (if provided), then fall back to path lookup // external_id can be different for path, because path is re-used across different requests across time if let Some(ref external_id) = external_id { + let path = match &key { + QueueKey::ByPath(p) => p.clone(), + QueueKey::ById(_) => unreachable!("already handled ById above"), + }; if let Some(queue_result) = - result_schema.get_row_by_external_id(external_id.clone())? + result_schema.get_row_by_path_and_external_id(path, external_id.clone())? { let id = queue_result.get_id(); let external_id = queue_result.get_row().get_external_id().clone(); @@ -906,10 +910,7 @@ pub trait CacheStore: DIService + Send + Sync { key: QueueKey, external_id: Option, ) -> Result, CubeError>; - async fn queue_result_by_external_id( - &self, - external_id: String, - ) -> Result, CubeError>; + async fn queue_result_blocking( &self, key: QueueKey, @@ -1550,37 +1551,6 @@ impl CacheStore for RocksCacheStore { self.lookup_queue_result_by_key(key, external_id).await } - async fn queue_result_by_external_id( - &self, - external_id: String, - ) -> Result, CubeError> { - self.write_operation_queue("queue_result_by_external_id", move |db_ref, batch_pipe| { - let result_schema = QueueResultRocksTable::new(db_ref.clone()); - let queue_result = result_schema.get_row_by_external_id(external_id)?; - - if let Some(queue_result) = queue_result { - if queue_result.get_row().is_deleted() { - let id = queue_result.get_id(); - let external_id = queue_result.get_row().get_external_id().clone(); - Ok(Some(QueueResultResponse::Success { - value: Some(queue_result.into_row().value), - id, - external_id, - })) - } else { - Self::queue_result_ready_to_delete_impl( - &result_schema, - batch_pipe, - queue_result, - ) - } - } else { - Ok(None) - } - }) - .await - } - async fn queue_result_blocking( &self, key: QueueKey, @@ -1830,15 +1800,6 @@ impl CacheStore for ClusterCacheStoreClient { panic!("CacheStore cannot be used on the worker node! queue_result was used.") } - async fn queue_result_by_external_id( - &self, - _external_id: String, - ) -> Result, CubeError> { - panic!( - "CacheStore cannot be used on the worker node! queue_result_by_external_id was used." - ) - } - async fn queue_result_blocking( &self, _key: QueueKey, @@ -2393,6 +2354,7 @@ mod tests { assert!(res.is_ok(), "First insert with external_id should succeed"); assert!(res.unwrap().added); + // Same external_id but different path should succeed (uniqueness is per path now) let res = cachestore .queue_add(QueueAddPayload { path: "prefix:path2".to_string(), @@ -2404,13 +2366,26 @@ mod tests { external_id: Some("ext-dup".to_string()), }) .await; - assert!(res.is_err(), "Duplicate external_id should fail"); - let err_msg = res.unwrap_err().to_string(); assert!( - err_msg.contains("Unique constraint violation"), - "Expected unique constraint error, got: {}", - err_msg + res.is_ok(), + "Same external_id with different path should succeed" ); + assert!(res.unwrap().added); + + // Same path returns added: false (ByPath uniqueness), not an error + let res = cachestore + .queue_add(QueueAddPayload { + path: "prefix:path1".to_string(), + value: "v1-dup".to_string(), + priority: 0, + orphaned: None, + process_id: None, + exclusive: false, + external_id: Some("ext-dup".to_string()), + }) + .await; + assert!(res.is_ok(), "Duplicate path should return added: false"); + assert!(!res.unwrap().added); // Multiple inserts with None external_id should all succeed { @@ -2543,7 +2518,7 @@ mod tests { }) .await?; - // Simulate migration: rebuild the ByExternalId index. + // Simulate migration: rebuild the ByPathAndExternalId index. cachestore .read_operation_queue("test_rebuild_index", move |db_ref| { let queue_schema = QueueItemRocksTable::new(db_ref.clone()); @@ -2576,7 +2551,7 @@ mod tests { res.err() ); - // Uniqueness for real external_id values should still be enforced after rebuild + // Same external_id with different path should succeed after rebuild (uniqueness is per path) let res = cachestore .queue_add(QueueAddPayload { path: "prefix:path_ext_dup".to_string(), @@ -2589,13 +2564,11 @@ mod tests { }) .await; assert!( - res.is_err(), - "Duplicate external_id should still fail after rebuild" + res.is_ok(), + "Same external_id with different path should succeed after rebuild, got: {:?}", + res.err() ); - assert!(res - .unwrap_err() - .to_string() - .contains("Unique constraint violation")); + assert!(res.unwrap().added); RocksCacheStore::cleanup_test_cachestore("test_queue_add_none_ext_rebuild"); diff --git a/rust/cubestore/cubestore/src/cachestore/lazy.rs b/rust/cubestore/cubestore/src/cachestore/lazy.rs index 678f418d298c7..a5062cc55e47f 100644 --- a/rust/cubestore/cubestore/src/cachestore/lazy.rs +++ b/rust/cubestore/cubestore/src/cachestore/lazy.rs @@ -299,16 +299,6 @@ impl CacheStore for LazyRocksCacheStore { self.init().await?.queue_result(key, external_id).await } - async fn queue_result_by_external_id( - &self, - external_id: String, - ) -> Result, CubeError> { - self.init() - .await? - .queue_result_by_external_id(external_id) - .await - } - async fn queue_result_blocking( &self, key: QueueKey, diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index bc4b6400bd31d..dbfc1c5e22acd 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -362,7 +362,7 @@ pub(crate) enum QueueItemRocksIndex { ByPath = 1, ByPrefixAndStatus = 2, ByPrefix = 3, - ByExternalId = 4, + ByPathAndExternalId = 4, } pub struct QueueItemRocksTable<'a> { @@ -408,7 +408,7 @@ rocks_table_new!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { Box::new(QueueItemRocksIndex::ByPath), Box::new(QueueItemRocksIndex::ByPrefixAndStatus), Box::new(QueueItemRocksIndex::ByPrefix), - Box::new(QueueItemRocksIndex::ByExternalId), + Box::new(QueueItemRocksIndex::ByPathAndExternalId), ] }); @@ -417,7 +417,7 @@ pub enum QueueItemIndexKey { ByPath(String), ByPrefixAndStatus(String, QueueItemStatus), ByPrefix(String), - ByExternalId(Option), + ByPathAndExternalId(String, Option), } base_rocks_secondary_index!(QueueItem, QueueItemRocksIndex); @@ -433,9 +433,10 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { QueueItemRocksIndex::ByPrefix => { QueueItemIndexKey::ByPrefix(row.get_prefix().clone().unwrap_or("".to_string())) } - QueueItemRocksIndex::ByExternalId => { - QueueItemIndexKey::ByExternalId(row.get_external_id().clone()) - } + QueueItemRocksIndex::ByPathAndExternalId => QueueItemIndexKey::ByPathAndExternalId( + row.get_path(), + row.get_external_id().clone(), + ), } } @@ -443,8 +444,12 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { match key { QueueItemIndexKey::ByPath(s) => s.as_bytes().to_vec(), QueueItemIndexKey::ByPrefix(s) => s.as_bytes().to_vec(), - QueueItemIndexKey::ByExternalId(s) => { - s.as_deref().unwrap_or("__null__").as_bytes().to_vec() + QueueItemIndexKey::ByPathAndExternalId(path, s) => { + let mut r = Vec::new(); + r.extend_from_slice(path.as_bytes()); + r.push(0u8); + r.extend_from_slice(s.as_deref().unwrap_or("__null__").as_bytes()); + r } QueueItemIndexKey::ByPrefixAndStatus(prefix, s) => { let mut r = Vec::with_capacity(prefix.len() + 1); @@ -466,7 +471,7 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { QueueItemRocksIndex::ByPath => true, QueueItemRocksIndex::ByPrefixAndStatus => false, QueueItemRocksIndex::ByPrefix => false, - QueueItemRocksIndex::ByExternalId => true, + QueueItemRocksIndex::ByPathAndExternalId => true, } } @@ -475,7 +480,7 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { QueueItemRocksIndex::ByPath => 1, QueueItemRocksIndex::ByPrefixAndStatus => 2, QueueItemRocksIndex::ByPrefix => 1, - QueueItemRocksIndex::ByExternalId => 1, + QueueItemRocksIndex::ByPathAndExternalId => 2, } } @@ -493,7 +498,7 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { fn should_index_row(&self, row: &QueueItem) -> bool { match self { - Self::ByExternalId => row.external_id.is_some(), + Self::ByPathAndExternalId => row.external_id.is_some(), _ => true, } } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_result.rs b/rust/cubestore/cubestore/src/cachestore/queue_result.rs index d2eaa34aa065d..49579969dbff1 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_result.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_result.rs @@ -61,7 +61,7 @@ impl QueueResult { #[derive(Clone, Copy, Debug)] pub(crate) enum QueueResultRocksIndex { ByPath = 1, - ByExternalId = 2, + ByPathAndExternalId = 2, } pub struct QueueResultRocksTable<'a> { db: crate::metastore::DbTableRef<'a>, @@ -87,13 +87,14 @@ impl<'a> QueueResultRocksTable<'a> { Ok(row.filter(|r| r.get_row().get_expire() >= &Utc::now())) } - pub fn get_row_by_external_id( + pub fn get_row_by_path_and_external_id( &self, + path: String, external_id: String, ) -> Result>, CubeError> { - let index_key = QueueResultIndexKey::ByExternalId(Some(external_id)); - let row = - self.get_single_opt_row_by_index(&index_key, &QueueResultRocksIndex::ByExternalId)?; + let index_key = QueueResultIndexKey::ByPathAndExternalId(path, Some(external_id)); + let row = self + .get_single_opt_row_by_index(&index_key, &QueueResultRocksIndex::ByPathAndExternalId)?; Ok(row.filter(|r| r.get_row().get_expire() >= &Utc::now())) } @@ -120,14 +121,14 @@ impl<'a> BaseRocksTable for QueueResultRocksTable<'a> { rocks_table_new!(QueueResult, QueueResultRocksTable, TableId::QueueResults, { vec![ Box::new(QueueResultRocksIndex::ByPath), - Box::new(QueueResultRocksIndex::ByExternalId), + Box::new(QueueResultRocksIndex::ByPathAndExternalId), ] }); #[derive(Hash, Clone, Debug)] pub enum QueueResultIndexKey { ByPath(String), - ByExternalId(Option), + ByPathAndExternalId(String, Option), } base_rocks_secondary_index!(QueueResult, QueueResultRocksIndex); @@ -136,17 +137,22 @@ impl RocksSecondaryIndex for QueueResultRocksI fn typed_key_by(&self, row: &QueueResult) -> QueueResultIndexKey { match self { QueueResultRocksIndex::ByPath => QueueResultIndexKey::ByPath(row.get_path().clone()), - QueueResultRocksIndex::ByExternalId => { - QueueResultIndexKey::ByExternalId(row.get_external_id().clone()) - } + QueueResultRocksIndex::ByPathAndExternalId => QueueResultIndexKey::ByPathAndExternalId( + row.get_path().clone(), + row.get_external_id().clone(), + ), } } fn key_to_bytes(&self, key: &QueueResultIndexKey) -> Vec { match key { QueueResultIndexKey::ByPath(s) => s.as_bytes().to_vec(), - QueueResultIndexKey::ByExternalId(s) => { - s.as_deref().unwrap_or("__null__").as_bytes().to_vec() + QueueResultIndexKey::ByPathAndExternalId(path, s) => { + let mut r = Vec::new(); + r.extend_from_slice(path.as_bytes()); + r.push(0u8); + r.extend_from_slice(s.as_deref().unwrap_or("__null__").as_bytes()); + r } } } @@ -154,14 +160,14 @@ impl RocksSecondaryIndex for QueueResultRocksI fn is_unique(&self) -> bool { match self { QueueResultRocksIndex::ByPath => false, - QueueResultRocksIndex::ByExternalId => true, + QueueResultRocksIndex::ByPathAndExternalId => true, } } fn version(&self) -> u32 { match self { QueueResultRocksIndex::ByPath => 1, - QueueResultRocksIndex::ByExternalId => 1, + QueueResultRocksIndex::ByPathAndExternalId => 2, } } @@ -179,7 +185,7 @@ impl RocksSecondaryIndex for QueueResultRocksI fn should_index_row(&self, row: &QueueResult) -> bool { match self { - Self::ByExternalId => row.external_id.is_some(), + Self::ByPathAndExternalId => row.external_id.is_some(), _ => true, } } diff --git a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs index 0c6a0539342d0..4d0671dae40ca 100644 --- a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs +++ b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs @@ -866,13 +866,6 @@ impl CacheStore for CacheStoreMock { panic!("CacheStore mock!") } - async fn queue_result_by_external_id( - &self, - _external_id: String, - ) -> Result, CubeError> { - panic!("CacheStore mock!") - } - async fn queue_result_blocking( &self, _key: QueueKey, diff --git a/rust/cubestore/cubestore/src/sql/cachestore.rs b/rust/cubestore/cubestore/src/sql/cachestore.rs index 69bb2a4d5dddb..bc4aaee7af5b8 100644 --- a/rust/cubestore/cubestore/src/sql/cachestore.rs +++ b/rust/cubestore/cubestore/src/sql/cachestore.rs @@ -549,32 +549,6 @@ impl CacheStoreSqlService { true, ) } - QueueCommand::ResultByExternalId { key } => { - let ack_result = self - .cachestore - .queue_result_by_external_id(key.value) - .await?; - - let rows = if let Some(ack_result) = ack_result { - vec![ack_result.into_queue_result_row()] - } else { - vec![] - }; - - ( - Arc::new(DataFrame::new( - vec![ - Column::new("payload".to_string(), ColumnType::String, 0), - Column::new("type".to_string(), ColumnType::String, 1), - Column::new("id".to_string(), ColumnType::String, 2), - Column::new("external_id".to_string(), ColumnType::String, 3), - ], - rows, - )), - None, - true, - ) - } QueueCommand::ResultBlocking { timeout, key } => { let ack_result = self.cachestore.queue_result_blocking(key, timeout).await?; diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index 75f87cb34dd4c..9e2284d717080 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -156,9 +156,6 @@ pub enum QueueCommand { key: QueueKey, external_id: Option, }, - ResultByExternalId { - key: Ident, - }, ResultBlocking { key: QueueKey, timeout: u64, @@ -183,7 +180,6 @@ impl QueueCommand { QueueCommand::MergeExtra { .. } => "merge_extra", QueueCommand::Retrieve { .. } => "retrieve", QueueCommand::Result { .. } => "result", - QueueCommand::ResultByExternalId { .. } => "result_by_external_id", QueueCommand::ResultBlocking { .. } => "result_blocking", QueueCommand::Truncate { .. } => "truncate", } @@ -641,9 +637,6 @@ impl<'a> CubeStoreParser<'a> { external_id, } } - "result_by_external_id" => QueueCommand::ResultByExternalId { - key: self.parser.parse_identifier()?, - }, "result_blocking" => { let timeout = self.parse_integer(&"timeout", false)?;