Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 0 additions & 121 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,6 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
"queue_ack_then_result_v2_by_id",
queue_ack_then_result_v2_by_id,
),
t(
"queue_ack_then_result_v2_with_external_id",
queue_ack_then_result_v2_with_external_id,
),
t("queue_orphaned_timeout", queue_orphaned_timeout),
t("queue_heartbeat_by_id", queue_heartbeat_by_id),
t("queue_heartbeat_by_path", queue_heartbeat_by_path),
Expand All @@ -293,7 +289,6 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
queue_multiple_result_blocking,
),
t("queue_custom_orphaned", queue_custom_orphaned),
t("queue_result_by_external_id", queue_result_by_external_id),
t(
"queue_result_by_id_external_id_mismatch",
queue_result_by_id_external_id_mismatch,
Expand Down Expand Up @@ -363,9 +358,7 @@ lazy_static::lazy_static! {
"limit_pushdown_unique_key",
"queue_ack_then_result_v2",
"queue_ack_then_result_v2_by_id",
"queue_ack_then_result_v2_with_external_id",
"queue_custom_orphaned",
"queue_result_by_external_id",
"queue_result_by_id_external_id_mismatch",
"queue_result_ack_multiple_with_external_id",
"queue_full_workflow_v1",
Expand Down Expand Up @@ -10762,78 +10755,6 @@ async fn queue_ack_then_result_v2_by_id(service: Box<dyn SqlClient>) -> Result<(
Ok(())
}

async fn queue_ack_then_result_v2_with_external_id(
service: Box<dyn SqlClient>,
) -> Result<(), CubeError> {
let add_response = service
.exec_query(
r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-5555' "STANDALONE#queue:5555" "payload1";"#,
)
.await?;
let id = assert_queue_add_and_get_id(&add_response)?;

let ack_result = service
.exec_query(&format!(r#"QUEUE ACK {} "result:5555""#, id))
.await?;
assert_eq!(
ack_result.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(true)])]
);

// double ack for result, should be restricted
{
let ack_result = service
.exec_query(&format!(r#"QUEUE ACK {} "result:5555""#, id))
.await?;
assert_eq!(
ack_result.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(false)])]
);
}

// ack on unknown queue item
{
let ack_result = service.exec_query(r#"QUEUE ACK 10 "result:5555""#).await?;
assert_eq!(
ack_result.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(false)])]
);
}

// RESULT_BY_EXTERNAL_ID returns result and marks it for deletion
let result = service
.exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-5555""#)
.await?;
assert_queue_result_columns(&result);
assert_eq!(
result.get_rows(),
&vec![queue_result_row("result:5555", &id, Some("ext-5555"))]
);
let result = service
.exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-5555""#)
.await?;
assert_eq!(
result.get_rows(),
&vec![queue_result_row("result:5555", &id, Some("ext-5555"))]
);

// RESULT by path should also return empty (already marked as deleted)
let result = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#)
.await?;
assert_eq!(result.get_rows().len(), 0);

tokio::time::sleep(Duration::new(1, 0)).await;

// should return, because we use id
let result = service
.exec_query(&format!("QUEUE RESULT_BLOCKING 1000 {}", id))
.await?;
assert_queue_result_columns(&result);
assert_eq!(result.get_rows().len(), 1);
Ok(())
}

async fn queue_orphaned_timeout(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
// CI is super slow, sometimes it can takes up to 1 second to bootstrap Cache Store
// let's warmup it
Expand Down Expand Up @@ -11144,48 +11065,6 @@ async fn queue_custom_orphaned(service: Box<dyn SqlClient>) -> Result<(), CubeEr
Ok(())
}

async fn queue_result_by_external_id(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
let add_response = service
.exec_query(
r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-123' "STANDALONE#queue:123456789" "payload123456789";"#,
)
.await?;
let id = assert_queue_add_and_get_id(&add_response)?;

let ack_result = service
.exec_query(r#"QUEUE ACK "STANDALONE#queue:123456789" "result:123456789""#)
.await?;
assert_eq!(
ack_result.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(true)])]
);

let result = service
.exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "unknown-ext-id""#)
.await?;
assert_eq!(result.get_rows().len(), 0);

let result = service
.exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-123""#)
.await?;
assert_queue_result_columns(&result);

assert_eq!(
result.get_rows(),
&vec![queue_result_row("result:123456789", &id, Some("ext-123"))]
);

// Second call should return empty (result deleted after first retrieval)
let result = service
.exec_query(r#"QUEUE RESULT_BY_EXTERNAL_ID "ext-123""#)
.await?;
assert_eq!(
result.get_rows(),
&vec![queue_result_row("result:123456789", &id, Some("ext-123"))]
);
Ok(())
}

async fn queue_full_workflow_v2_with_external_id(
service: Box<dyn SqlClient>,
) -> Result<(), CubeError> {
Expand Down
91 changes: 32 additions & 59 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,15 @@ impl RocksCacheStore {
}));
};

// try external_id first (if provided), then fall back to path lookup
// try (path, external_id) first (if provided), then fall back to path lookup
// external_id can be different for path, because path is re-used across different requests across time
if let Some(ref external_id) = external_id {
let path = match &key {
QueueKey::ByPath(p) => p.clone(),
QueueKey::ById(_) => unreachable!("already handled ById above"),
};
if let Some(queue_result) =
result_schema.get_row_by_external_id(external_id.clone())?
result_schema.get_row_by_path_and_external_id(path, external_id.clone())?
{
let id = queue_result.get_id();
let external_id = queue_result.get_row().get_external_id().clone();
Expand Down Expand Up @@ -906,10 +910,7 @@ pub trait CacheStore: DIService + Send + Sync {
key: QueueKey,
external_id: Option<String>,
) -> Result<Option<QueueResultResponse>, CubeError>;
async fn queue_result_by_external_id(
&self,
external_id: String,
) -> Result<Option<QueueResultResponse>, CubeError>;

async fn queue_result_blocking(
&self,
key: QueueKey,
Expand Down Expand Up @@ -1550,37 +1551,6 @@ impl CacheStore for RocksCacheStore {
self.lookup_queue_result_by_key(key, external_id).await
}

async fn queue_result_by_external_id(
&self,
external_id: String,
) -> Result<Option<QueueResultResponse>, CubeError> {
self.write_operation_queue("queue_result_by_external_id", move |db_ref, batch_pipe| {
let result_schema = QueueResultRocksTable::new(db_ref.clone());
let queue_result = result_schema.get_row_by_external_id(external_id)?;

if let Some(queue_result) = queue_result {
if queue_result.get_row().is_deleted() {
let id = queue_result.get_id();
let external_id = queue_result.get_row().get_external_id().clone();
Ok(Some(QueueResultResponse::Success {
value: Some(queue_result.into_row().value),
id,
external_id,
}))
} else {
Self::queue_result_ready_to_delete_impl(
&result_schema,
batch_pipe,
queue_result,
)
}
} else {
Ok(None)
}
})
.await
}

async fn queue_result_blocking(
&self,
key: QueueKey,
Expand Down Expand Up @@ -1830,15 +1800,6 @@ impl CacheStore for ClusterCacheStoreClient {
panic!("CacheStore cannot be used on the worker node! queue_result was used.")
}

async fn queue_result_by_external_id(
&self,
_external_id: String,
) -> Result<Option<QueueResultResponse>, CubeError> {
panic!(
"CacheStore cannot be used on the worker node! queue_result_by_external_id was used."
)
}

async fn queue_result_blocking(
&self,
_key: QueueKey,
Expand Down Expand Up @@ -2393,6 +2354,7 @@ mod tests {
assert!(res.is_ok(), "First insert with external_id should succeed");
assert!(res.unwrap().added);

// Same external_id but different path should succeed (uniqueness is per path now)
let res = cachestore
.queue_add(QueueAddPayload {
path: "prefix:path2".to_string(),
Expand All @@ -2404,13 +2366,26 @@ mod tests {
external_id: Some("ext-dup".to_string()),
})
.await;
assert!(res.is_err(), "Duplicate external_id should fail");
let err_msg = res.unwrap_err().to_string();
assert!(
err_msg.contains("Unique constraint violation"),
"Expected unique constraint error, got: {}",
err_msg
res.is_ok(),
"Same external_id with different path should succeed"
);
assert!(res.unwrap().added);

// Same path returns added: false (ByPath uniqueness), not an error
let res = cachestore
.queue_add(QueueAddPayload {
path: "prefix:path1".to_string(),
value: "v1-dup".to_string(),
priority: 0,
orphaned: None,
process_id: None,
exclusive: false,
external_id: Some("ext-dup".to_string()),
})
.await;
assert!(res.is_ok(), "Duplicate path should return added: false");
assert!(!res.unwrap().added);

// Multiple inserts with None external_id should all succeed
{
Expand Down Expand Up @@ -2543,7 +2518,7 @@ mod tests {
})
.await?;

// Simulate migration: rebuild the ByExternalId index.
// Simulate migration: rebuild the ByPathAndExternalId index.
cachestore
.read_operation_queue("test_rebuild_index", move |db_ref| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
Expand Down Expand Up @@ -2576,7 +2551,7 @@ mod tests {
res.err()
);

// Uniqueness for real external_id values should still be enforced after rebuild
// Same external_id with different path should succeed after rebuild (uniqueness is per path)
let res = cachestore
.queue_add(QueueAddPayload {
path: "prefix:path_ext_dup".to_string(),
Expand All @@ -2589,13 +2564,11 @@ mod tests {
})
.await;
assert!(
res.is_err(),
"Duplicate external_id should still fail after rebuild"
res.is_ok(),
"Same external_id with different path should succeed after rebuild, got: {:?}",
res.err()
);
assert!(res
.unwrap_err()
.to_string()
.contains("Unique constraint violation"));
assert!(res.unwrap().added);

RocksCacheStore::cleanup_test_cachestore("test_queue_add_none_ext_rebuild");

Expand Down
10 changes: 0 additions & 10 deletions rust/cubestore/cubestore/src/cachestore/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,6 @@ impl CacheStore for LazyRocksCacheStore {
self.init().await?.queue_result(key, external_id).await
}

async fn queue_result_by_external_id(
&self,
external_id: String,
) -> Result<Option<QueueResultResponse>, CubeError> {
self.init()
.await?
.queue_result_by_external_id(external_id)
.await
}

async fn queue_result_blocking(
&self,
key: QueueKey,
Expand Down
Loading
Loading