From 45129145310447165946ade07ace3e82abd1eaa6 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 19 Jan 2023 14:51:45 +0300 Subject: [PATCH 1/4] feat(cubestore): Support negative priority for queue --- .../src/CubeStoreQueueDriver.ts | 3 - .../test/unit/QueryQueue.abstract.ts | 2 +- rust/cubestore/cubestore/src/sql/parser.rs | 96 +++++++++---------- 3 files changed, 47 insertions(+), 54 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 1ef775048ba3b..7ecabdb851be9 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -39,9 +39,6 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { priority: number, options: AddToQueueOptions ): Promise { - // TODO: Fix sqlparser, support negative number - priority = priority < 0 ? 0 : priority; - const data = { queryHandler, query, diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 5936f2d0fed8b..29d7bc650ea32 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -132,7 +132,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} expect(await queue.getQueryStage('12')).toEqual(undefined); }); - nonCubestoreTest('negative priority', async () => { + test('negative priority', async () => { delayCount = 0; const results = []; diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index 151275b5bea01..51a64f0ce398f 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -225,7 +225,7 @@ impl<'a> CubeStoreParser<'a> { "set" => { let nx = self.parse_custom_token(&"nx"); let ttl = if self.parse_custom_token(&"ttl") { - Some(self.parse_number("ttl")?) + Some(self.parse_integer("ttl", false)?) } else { None }; @@ -257,17 +257,47 @@ impl<'a> CubeStoreParser<'a> { } } - fn parse_number(&mut self, var_name: &str) -> Result { + fn parse_integer( + &mut self, + var_name: &str, + allow_negative: bool, + ) -> Result + where + ::Err: std::fmt::Display, + { + let is_negative = match self.parser.peek_token() { + Token::Minus => { + self.parser.next_token(); + true + } + _ => false, + }; + match self.parser.parse_number_value()? { - Value::Number(var, false) => var.parse::().map_err(|err| { - ParserError::ParserError(format!( - "{} must be a positive integer, error: {}", - var_name, err - )) - }), + Value::Number(var, false) => { + let value = if is_negative { + "-".to_string() + &var + } else { + var + }; + + if is_negative && !allow_negative { + return Err(ParserError::ParserError(format!( + "{} must be a positive integer, actual: {}", + var_name, value + ))); + } + + value.parse::().map_err(|err| { + ParserError::ParserError(format!( + "{} must be a valid integer, error: {}", + var_name, err + )) + }) + } x => { return Err(ParserError::ParserError(format!( - "{} must be a positive integer, actual: {:?}", + "{} must be a valid integer, actual: {:?}", var_name, x ))) } @@ -312,24 +342,7 @@ impl<'a> CubeStoreParser<'a> { match command.as_str() { "add" => { let priority = if self.parse_custom_token(&"priority") { - match self.parser.parse_number_value()? { - Value::Number(priority, _) => { - let r = priority.parse::().map_err(|err| { - ParserError::ParserError(format!( - "priority must be a positive integer, error: {}", - err - )) - })?; - - r - } - x => { - return Err(ParserError::ParserError(format!( - "priority must be a positive integer, actual: {:?}", - x - ))) - } - } + self.parse_integer(&"priority", true)? } else { 0 }; @@ -358,7 +371,7 @@ impl<'a> CubeStoreParser<'a> { key: self.parser.parse_identifier()?, }), "stalled" => { - let stalled_timeout = self.parse_number("stalled timeout")?; + let stalled_timeout = self.parse_integer("stalled timeout", false)?; Ok(Statement::QueueToCancel { prefix: self.parser.parse_identifier()?, @@ -367,7 +380,7 @@ impl<'a> CubeStoreParser<'a> { }) } "orphaned" => { - let orphaned_timeout = self.parse_number("orphaned timeout")?; + let orphaned_timeout = self.parse_integer("orphaned timeout", false)?; Ok(Statement::QueueToCancel { prefix: self.parser.parse_identifier()?, @@ -376,8 +389,8 @@ impl<'a> CubeStoreParser<'a> { }) } "to_cancel" => { - let stalled_timeout = self.parse_number("stalled timeout")?; - let orphaned_timeout = self.parse_number("orphaned timeout")?; + let stalled_timeout = self.parse_integer("stalled timeout", false)?; + let orphaned_timeout = self.parse_integer("orphaned timeout", false)?; Ok(Statement::QueueToCancel { prefix: self.parser.parse_identifier()?, @@ -417,7 +430,7 @@ impl<'a> CubeStoreParser<'a> { } "retrieve" => { let concurrency = if self.parse_custom_token(&"concurrency") { - self.parse_number("concurrency")? + self.parse_integer("concurrency", false)? } else { 1 }; @@ -431,24 +444,7 @@ impl<'a> CubeStoreParser<'a> { key: self.parser.parse_identifier()?, }), "result_blocking" => { - let timeout = match self.parser.parse_number_value()? { - Value::Number(concurrency, false) => { - let r = concurrency.parse::().map_err(|err| { - ParserError::ParserError(format!( - "TIMEOUT must be a positive integer, error: {}", - err - )) - })?; - - r - } - x => { - return Err(ParserError::ParserError(format!( - "TIMEOUT must be a positive integer, actual: {:?}", - x - ))) - } - }; + let timeout = self.parse_integer(&"timeout", false)?; Ok(Statement::QueueResultBlocking { timeout, From cbf13e2938a2b959c0fc43c738791fbdc817e200 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 19 Jan 2023 15:08:43 +0300 Subject: [PATCH 2/4] chore: re-use parse_integer --- rust/cubestore/cubestore/src/sql/parser.rs | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index 51a64f0ce398f..33a8a55e7159a 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -306,22 +306,11 @@ impl<'a> CubeStoreParser<'a> { pub fn parse_metastore(&mut self) -> Result { if self.parse_custom_token("set_current") { - match self.parser.parse_number_value()? { - Value::Number(id, _) => Ok(Statement::System(SystemCommand::Metastore( - MetastoreCommand::SetCurrent { - id: id.parse::().map_err(|e| { - ParserError::ParserError(format!( - "Can't parse metastore snapshot id: {}", - e - )) - })?, - }, - ))), - x => Err(ParserError::ParserError(format!( - "Snapshot id expected but {:?} found", - x - ))), - } + Ok(Statement::System(SystemCommand::Metastore( + MetastoreCommand::SetCurrent { + id: self.parse_integer("metastore snapshot id", false)?, + }, + ))) } else { Err(ParserError::ParserError( "Unknown metastore command".to_string(), From 648b9562ce6caaa6df157ae3ef90464f91d78619 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 19 Jan 2023 15:09:30 +0300 Subject: [PATCH 3/4] chore: re-use parse_integer --- rust/cubestore/cubestore/src/sql/parser.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index 33a8a55e7159a..011382b2b025e 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -455,17 +455,9 @@ impl<'a> CubeStoreParser<'a> { { Ok(Statement::System(SystemCommand::KillAllJobs)) } else if self.parse_custom_token("repartition") { - match self.parser.parse_number_value()? { - Value::Number(id, _) => Ok(Statement::System(SystemCommand::Repartition { - partition_id: id.parse::().map_err(|e| { - ParserError::ParserError(format!("Can't parse partition id: {}", e)) - })?, - })), - x => Err(ParserError::ParserError(format!( - "Partition id expected but {:?} found", - x - ))), - } + Ok(Statement::System(SystemCommand::Repartition { + partition_id: self.parse_integer("partition id", false)?, + })) } else if self.parse_custom_token("metastore") { self.parse_metastore() } else if self.parse_custom_token("panic") && self.parse_custom_token("worker") { From 698d79706bb5a20613e5474a3f75d7c03199286f Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 19 Jan 2023 15:14:46 +0300 Subject: [PATCH 4/4] test: more tests --- .../cubestore-sql-tests/src/tests.rs | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 307a5e9ea15ec..747546b12c5f6 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -6430,7 +6430,12 @@ async fn queue_full_workflow(service: Box) { .unwrap(); service - .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload3";"#) + .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload4";"#) + .await + .unwrap(); + + service + .exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:5" "payload5";"#) .await .unwrap(); @@ -6470,6 +6475,11 @@ async fn queue_full_workflow(service: Box) { TableValue::String("pending".to_string()), TableValue::Null ]), + Row::new(vec![ + TableValue::String("5".to_string()), + TableValue::String("pending".to_string()), + TableValue::Null + ]), ] ); } @@ -6484,7 +6494,7 @@ async fn queue_full_workflow(service: Box) { { let retrieve_response = service - .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:3""#) + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:3""#) .await .unwrap(); assert_eq!( @@ -6503,6 +6513,22 @@ async fn queue_full_workflow(service: Box) { ); } + { + // concurrency limit + let retrieve_response = service + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:4""#) + .await + .unwrap(); + assert_eq!( + retrieve_response.get_columns(), + &vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ] + ); + assert_eq!(retrieve_response.get_rows().len(), 0); + } + { let active_response = service .exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)