From f1d7aeffa67249077b96212c33a438442118eb38 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 24 Jan 2023 21:34:13 +0300 Subject: [PATCH 1/2] feat(cubestore): Correct orphaned detection + optimizations --- .../cubestore-sql-tests/src/tests.rs | 62 +++++++++++++++++++ .../src/cachestore/cache_rocksstore.rs | 58 +++++++++-------- .../cubestore/src/cachestore/queue_item.rs | 7 +-- 3 files changed, 96 insertions(+), 31 deletions(-) diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 42e5eb3fdd5f3..ab34fda42983e 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -229,6 +229,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { t("cache_prefix_keys", cache_prefix_keys), t("queue_full_workflow", queue_full_workflow), t("queue_ack_then_result", queue_ack_then_result), + t("queue_orphaned_timeout", queue_orphaned_timeout), t( "queue_multiple_result_blocking", queue_multiple_result_blocking, @@ -6667,6 +6668,67 @@ async fn queue_ack_then_result(service: Box) { assert_eq!(result.get_rows().len(), 0); } +async fn queue_orphaned_timeout(service: Box) { + service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .await + .unwrap(); + + service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:2" "payload2";"#) + .await + .unwrap(); + + let res = service + .exec_query(r#"QUEUE TO_CANCEL 1000 1000 "STANDALONE#queue";"#) + .await + .unwrap(); + assert_eq!(res.len(), 0); + + // only active jobs can be orphaned + // RETRIEVE updates heartbeat + { + service + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:1""#) + .await + .unwrap(); + + service + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:2""#) + .await + .unwrap(); + } + + tokio::time::sleep(Duration::from_millis(1000)).await; + + service + .exec_query(r#"QUEUE HEARTBEAT "STANDALONE#queue:2";"#) + .await + .unwrap(); + + let res = service + .exec_query(r#"QUEUE TO_CANCEL 1000 1000 "STANDALONE#queue""#) + .await + .unwrap(); + assert_eq!( + res.get_columns(), + &vec![Column::new("id".to_string(), ColumnType::String, 0),] + ); + assert_eq!( + res.get_rows(), + &vec![Row::new(vec![TableValue::String("1".to_string()),]),] + ); + + // awaiting for expiring heart beat for queue:2 + tokio::time::sleep(Duration::from_millis(1000)).await; + + let res = service + .exec_query(r#"QUEUE TO_CANCEL 1000 1000 "STANDALONE#queue""#) + .await + .unwrap(); + assert_eq!(res.len(), 2); +} + async fn queue_multiple_result_blocking(service: Box) { service .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:12345" "payload1";"#) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 9d6215691098e..7662da003df0a 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -30,6 +30,7 @@ use crate::cachestore::compaction::CompactionPreloadedState; use crate::cachestore::listener::RocksCacheStoreListener; use chrono::Utc; use itertools::Itertools; +use log::trace; use serde_derive::{Deserialize, Serialize}; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -553,11 +554,13 @@ impl CacheStore for RocksCacheStore { if item.get_row().get_status() == &QueueItemStatus::Active { if let Some(orphaned_timeout) = orphaned_timeout { - if let Some(heartbeat) = item.get_row().get_heartbeat() { - let elapsed = now - heartbeat.clone(); - if elapsed.num_milliseconds() > orphaned_timeout as i64 { - return true; - } + let elapsed = if let Some(heartbeat) = item.get_row().get_heartbeat() { + now - heartbeat.clone() + } else { + now - item.get_row().get_created().clone() + }; + if elapsed.num_milliseconds() > orphaned_timeout as i64 { + return true; } } } @@ -579,17 +582,14 @@ impl CacheStore for RocksCacheStore { self.store .read_operation(move |db_ref| { let queue_schema = QueueItemRocksTable::new(db_ref.clone()); - let index_key = QueueItemIndexKey::ByPrefix(prefix); - let items = - queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; let items = if let Some(status_filter) = status_filter { - items - .into_iter() - .filter(|item| item.get_row().status == status_filter) - .collect() + let index_key = QueueItemIndexKey::ByPrefixAndStatus(prefix, status_filter); + queue_schema + .get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)? } else { - items + let index_key = QueueItemIndexKey::ByPrefix(prefix); + queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)? }; if priority_sort { @@ -632,13 +632,17 @@ impl CacheStore for RocksCacheStore { .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; if let Some(id_row) = id_row_opt { - queue_schema.update_with_fn( - id_row.id, - |item| item.update_heartbeat(), - batch_pipe, - )?; + let mut new = id_row.get_row().clone(); + new.update_heartbeat(); + + queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?; Ok(()) } else { + trace!( + "Unable to update heartbeat for queue item with path: {}", + key + ); + Ok(()) } }) @@ -668,18 +672,20 @@ impl CacheStore for RocksCacheStore { return Ok(None); } - let new = queue_schema.update_with_fn( - id_row.id, - |item| { - let mut new = item.clone(); - new.status = QueueItemStatus::Active; + let mut new = id_row.get_row().clone(); + new.status = QueueItemStatus::Active; + // It's an important to insert heartbeat, because + // without that created datetime will be used for orphaned filtering + new.update_heartbeat(); - new - }, + let res = queue_schema.update( + id_row.get_id(), + new, + id_row.get_row(), batch_pipe, )?; - Ok(Some(new)) + Ok(Some(res)) } else { Ok(None) } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index 24a4f883a9577..32a3d78103759 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -190,11 +190,8 @@ impl QueueItem { QueueItemStatus::Pending } - pub fn update_heartbeat(&self) -> Self { - let mut new = self.clone(); - new.heartbeat = Some(Utc::now()); - - new + pub fn update_heartbeat(&mut self) { + self.heartbeat = Some(Utc::now()); } pub fn merge_extra(&self, payload: String) -> Result { From a29a9dd32a89b78eba2d0ebb6d1beb0b19663e29 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 25 Jan 2023 15:01:24 +0300 Subject: [PATCH 2/2] test(cubestore): queue_merge_extra + queue_heartbeat --- .../cubestore-sql-tests/src/tests.rs | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index ab34fda42983e..68ae308f03708 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -230,6 +230,8 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { t("queue_full_workflow", queue_full_workflow), t("queue_ack_then_result", queue_ack_then_result), t("queue_orphaned_timeout", queue_orphaned_timeout), + t("queue_heartbeat", queue_heartbeat), + t("queue_merge_extra", queue_merge_extra), t( "queue_multiple_result_blocking", queue_multiple_result_blocking, @@ -6729,6 +6731,91 @@ async fn queue_orphaned_timeout(service: Box) { assert_eq!(res.len(), 2); } +async fn queue_heartbeat(service: Box) { + service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .await + .unwrap(); + + let res = service + .exec_query(r#"SELECT heartbeat FROM system.queue WHERE prefix = 'STANDALONE#queue'"#) + .await + .unwrap(); + assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Null,]),]); + + service + .exec_query(r#"QUEUE HEARTBEAT "STANDALONE#queue:1";"#) + .await + .unwrap(); + + let res = service + .exec_query(r#"SELECT heartbeat FROM system.queue WHERE prefix = 'STANDALONE#queue'"#) + .await + .unwrap(); + + let row = res.get_rows().first().unwrap(); + match row.values().first().unwrap() { + TableValue::Timestamp(_) => {} + other => panic!("heartbeat must be a timestamp type, actual: {:?}", other), + } +} + +async fn queue_merge_extra(service: Box) { + service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .await + .unwrap(); + + // extra must be empty after creation + { + let res = service + .exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#) + .await + .unwrap(); + assert_eq!( + res.get_columns(), + &vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ] + ); + assert_eq!( + res.get_rows(), + &vec![Row::new(vec![ + TableValue::String("payload1".to_string()), + TableValue::Null + ]),] + ); + } + + service + .exec_query(r#"QUEUE MERGE_EXTRA "STANDALONE#queue:1" '{"first": true}';"#) + .await + .unwrap(); + + // extra should contains first field + { + let res = service + .exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#) + .await + .unwrap(); + assert_eq!( + res.get_columns(), + &vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ] + ); + assert_eq!( + res.get_rows(), + &vec![Row::new(vec![ + TableValue::String("payload1".to_string()), + TableValue::String("{\"first\": true}".to_string()) + ]),] + ); + } +} + async fn queue_multiple_result_blocking(service: Box) { service .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:12345" "payload1";"#)