Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
priority: number,
options: AddToQueueOptions
): Promise<AddToQueueResponse> {
// TODO: Fix sqlparser, support negative number
priority = priority < 0 ? 0 : priority;

const data = {
queryHandler,
query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
expect(await queue.getQueryStage('12')).toEqual(undefined);
});

nonCubestoreTest('negative priority', async () => {
test('negative priority', async () => {
delayCount = 0;
const results = [];

Expand Down
30 changes: 28 additions & 2 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6430,7 +6430,12 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
.unwrap();

service
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload3";"#)
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload4";"#)
.await
.unwrap();

service
.exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:5" "payload5";"#)
.await
.unwrap();

Expand Down Expand Up @@ -6470,6 +6475,11 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
TableValue::String("pending".to_string()),
TableValue::Null
]),
Row::new(vec![
TableValue::String("5".to_string()),
TableValue::String("pending".to_string()),
TableValue::Null
]),
]
);
}
Expand All @@ -6484,7 +6494,7 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {

{
let retrieve_response = service
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:3""#)
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:3""#)
.await
.unwrap();
assert_eq!(
Expand All @@ -6503,6 +6513,22 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
);
}

{
// concurrency limit
let retrieve_response = service
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:4""#)
.await
.unwrap();
assert_eq!(
retrieve_response.get_columns(),
&vec![
Column::new("payload".to_string(), ColumnType::String, 0),
Column::new("extra".to_string(), ColumnType::String, 1),
]
);
assert_eq!(retrieve_response.get_rows().len(), 0);
}

{
let active_response = service
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
Expand Down
131 changes: 54 additions & 77 deletions rust/cubestore/cubestore/src/sql/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl<'a> CubeStoreParser<'a> {
"set" => {
let nx = self.parse_custom_token(&"nx");
let ttl = if self.parse_custom_token(&"ttl") {
Some(self.parse_number("ttl")?)
Some(self.parse_integer("ttl", false)?)
} else {
None
};
Expand Down Expand Up @@ -257,17 +257,47 @@ impl<'a> CubeStoreParser<'a> {
}
}

fn parse_number(&mut self, var_name: &str) -> Result<u32, ParserError> {
fn parse_integer<R: num::Integer + std::str::FromStr>(
&mut self,
var_name: &str,
allow_negative: bool,
) -> Result<R, ParserError>
where
<R as std::str::FromStr>::Err: std::fmt::Display,
{
let is_negative = match self.parser.peek_token() {
Token::Minus => {
self.parser.next_token();
true
}
_ => false,
};

match self.parser.parse_number_value()? {
Value::Number(var, false) => var.parse::<u32>().map_err(|err| {
ParserError::ParserError(format!(
"{} must be a positive integer, error: {}",
var_name, err
))
}),
Value::Number(var, false) => {
let value = if is_negative {
"-".to_string() + &var
} else {
var
};

if is_negative && !allow_negative {
return Err(ParserError::ParserError(format!(
"{} must be a positive integer, actual: {}",
var_name, value
)));
}

value.parse::<R>().map_err(|err| {
ParserError::ParserError(format!(
"{} must be a valid integer, error: {}",
var_name, err
))
})
}
x => {
return Err(ParserError::ParserError(format!(
"{} must be a positive integer, actual: {:?}",
"{} must be a valid integer, actual: {:?}",
var_name, x
)))
}
Expand All @@ -276,22 +306,11 @@ impl<'a> CubeStoreParser<'a> {

pub fn parse_metastore(&mut self) -> Result<Statement, ParserError> {
if self.parse_custom_token("set_current") {
match self.parser.parse_number_value()? {
Value::Number(id, _) => Ok(Statement::System(SystemCommand::Metastore(
MetastoreCommand::SetCurrent {
id: id.parse::<u128>().map_err(|e| {
ParserError::ParserError(format!(
"Can't parse metastore snapshot id: {}",
e
))
})?,
},
))),
x => Err(ParserError::ParserError(format!(
"Snapshot id expected but {:?} found",
x
))),
}
Ok(Statement::System(SystemCommand::Metastore(
MetastoreCommand::SetCurrent {
id: self.parse_integer("metastore snapshot id", false)?,
},
)))
} else {
Err(ParserError::ParserError(
"Unknown metastore command".to_string(),
Expand All @@ -312,24 +331,7 @@ impl<'a> CubeStoreParser<'a> {
match command.as_str() {
"add" => {
let priority = if self.parse_custom_token(&"priority") {
match self.parser.parse_number_value()? {
Value::Number(priority, _) => {
let r = priority.parse::<i64>().map_err(|err| {
ParserError::ParserError(format!(
"priority must be a positive integer, error: {}",
err
))
})?;

r
}
x => {
return Err(ParserError::ParserError(format!(
"priority must be a positive integer, actual: {:?}",
x
)))
}
}
self.parse_integer(&"priority", true)?
} else {
0
};
Expand Down Expand Up @@ -358,7 +360,7 @@ impl<'a> CubeStoreParser<'a> {
key: self.parser.parse_identifier()?,
}),
"stalled" => {
let stalled_timeout = self.parse_number("stalled timeout")?;
let stalled_timeout = self.parse_integer("stalled timeout", false)?;

Ok(Statement::QueueToCancel {
prefix: self.parser.parse_identifier()?,
Expand All @@ -367,7 +369,7 @@ impl<'a> CubeStoreParser<'a> {
})
}
"orphaned" => {
let orphaned_timeout = self.parse_number("orphaned timeout")?;
let orphaned_timeout = self.parse_integer("orphaned timeout", false)?;

Ok(Statement::QueueToCancel {
prefix: self.parser.parse_identifier()?,
Expand All @@ -376,8 +378,8 @@ impl<'a> CubeStoreParser<'a> {
})
}
"to_cancel" => {
let stalled_timeout = self.parse_number("stalled timeout")?;
let orphaned_timeout = self.parse_number("orphaned timeout")?;
let stalled_timeout = self.parse_integer("stalled timeout", false)?;
let orphaned_timeout = self.parse_integer("orphaned timeout", false)?;

Ok(Statement::QueueToCancel {
prefix: self.parser.parse_identifier()?,
Expand Down Expand Up @@ -417,7 +419,7 @@ impl<'a> CubeStoreParser<'a> {
}
"retrieve" => {
let concurrency = if self.parse_custom_token(&"concurrency") {
self.parse_number("concurrency")?
self.parse_integer("concurrency", false)?
} else {
1
};
Expand All @@ -431,24 +433,7 @@ impl<'a> CubeStoreParser<'a> {
key: self.parser.parse_identifier()?,
}),
"result_blocking" => {
let timeout = match self.parser.parse_number_value()? {
Value::Number(concurrency, false) => {
let r = concurrency.parse::<u64>().map_err(|err| {
ParserError::ParserError(format!(
"TIMEOUT must be a positive integer, error: {}",
err
))
})?;

r
}
x => {
return Err(ParserError::ParserError(format!(
"TIMEOUT must be a positive integer, actual: {:?}",
x
)))
}
};
let timeout = self.parse_integer(&"timeout", false)?;

Ok(Statement::QueueResultBlocking {
timeout,
Expand All @@ -470,17 +455,9 @@ impl<'a> CubeStoreParser<'a> {
{
Ok(Statement::System(SystemCommand::KillAllJobs))
} else if self.parse_custom_token("repartition") {
match self.parser.parse_number_value()? {
Value::Number(id, _) => Ok(Statement::System(SystemCommand::Repartition {
partition_id: id.parse::<u64>().map_err(|e| {
ParserError::ParserError(format!("Can't parse partition id: {}", e))
})?,
})),
x => Err(ParserError::ParserError(format!(
"Partition id expected but {:?} found",
x
))),
}
Ok(Statement::System(SystemCommand::Repartition {
partition_id: self.parse_integer("partition id", false)?,
}))
} else if self.parse_custom_token("metastore") {
self.parse_metastore()
} else if self.parse_custom_token("panic") && self.parse_custom_token("worker") {
Expand Down