Skip to content

Commit

Permalink
feat(cubestore): Support multiple parallel QUEUE RESULT_BLOCKING (#6038)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 23, 2023
1 parent 4a965ba commit d8be78a
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
expect(result).toBe('select * from bar');
});

const nonCubestoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest;

nonCubestoreTest('instant double wait resolve', async () => {
test('instant double wait resolve', async () => {
const results = await Promise.all([
queue.executeInQueue('delay', 'instant', { delay: 400, result: '2' }),
queue.executeInQueue('delay', 'instant', { delay: 400, result: '2' })
Expand Down Expand Up @@ -149,7 +147,10 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
expect(results.map(r => parseInt(r[0], 10) - parseInt(results[0][0], 10))).toEqual([0, 1, 2]);
});

nonCubestoreTest('orphaned', async () => {
const nonCubeStoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest;

// TODO: CubeStore queue support
nonCubeStoreTest('orphaned', async () => {
for (let i = 1; i <= 4; i++) {
await queue.executeInQueue('delay', `11${i}`, { delay: 50, result: `${i}` }, 0);
}
Expand All @@ -170,7 +171,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
await queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0);
});

nonCubestoreTest('queue hash process persistent flag properly', () => {
// TODO: CubeStore queue support
nonCubeStoreTest('queue hash process persistent flag properly', () => {
const query = ['select * from table'];
const key1 = queue.redisHash(query);
// @ts-ignore
Expand All @@ -197,7 +199,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
expect(result).toBe('select * from bar');
});

nonCubestoreTest('queue driver lock obtain race condition', async () => {
nonCubeStoreTest('queue driver lock obtain race condition', async () => {
const redisClient: any = await queue.queueDriver.createConnection();
const redisClient2: any = await queue.queueDriver.createConnection();
const priority = 10;
Expand Down Expand Up @@ -252,7 +254,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
await queue.queueDriver.release(redisClient2);
});

nonCubestoreTest('activated but lock is not acquired', async () => {
nonCubeStoreTest('activated but lock is not acquired', async () => {
const redisClient = await queue.queueDriver.createConnection();
const redisClient2 = await queue.queueDriver.createConnection();
const priority = 10;
Expand Down
113 changes: 113 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
t("cache_set_nx", cache_set_nx),
t("cache_prefix_keys", cache_prefix_keys),
t("queue_full_workflow", queue_full_workflow),
t("queue_ack_then_result", queue_ack_then_result),
t(
"queue_multiple_result_blocking",
queue_multiple_result_blocking,
),
];

fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
Expand Down Expand Up @@ -6622,6 +6627,114 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
}
}

async fn queue_ack_then_result(service: Box<dyn SqlClient>) {
service
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:5555" "payload1";"#)
.await
.unwrap();

service
.exec_query(r#"QUEUE ACK "STANDALONE#queue:5555" "result:5555""#)
.await
.unwrap();

let result = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#)
.await
.unwrap();

assert_eq!(
result.get_columns(),
&vec![
Column::new("payload".to_string(), ColumnType::String, 0),
Column::new("type".to_string(), ColumnType::String, 1),
]
);
assert_eq!(
result.get_rows(),
&vec![Row::new(vec![
TableValue::String("result:5555".to_string()),
TableValue::String("success".to_string())
]),]
);

// second call should not return anything, because first call should remove result
let result = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#)
.await
.unwrap();

assert_eq!(result.get_rows().len(), 0);
}

async fn queue_multiple_result_blocking(service: Box<dyn SqlClient>) {
service
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:12345" "payload1";"#)
.await
.unwrap();

let service = Arc::new(service);

{
let service_to_move = service.clone();
let blocking1 = async move {
service_to_move
.exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:12345""#)
.await
.unwrap()
};

let service_to_move = service.clone();
let blocking2 = async move {
service_to_move
.exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:12345""#)
.await
.unwrap()
};

let service_to_move = service.clone();
let ack = async move {
tokio::time::sleep(Duration::from_millis(1000)).await;

service_to_move
.exec_query(r#"QUEUE ACK "STANDALONE#queue:12345" "result:12345""#)
.await
.unwrap()
};

let (blocking1_res, blocking2_res, _ack_res) = join!(blocking1, blocking2, ack);
assert_eq!(
blocking1_res.get_columns(),
&vec![
Column::new("payload".to_string(), ColumnType::String, 0),
Column::new("type".to_string(), ColumnType::String, 1),
]
);
assert_eq!(
blocking1_res.get_rows(),
&vec![Row::new(vec![
TableValue::String("result:12345".to_string()),
TableValue::String("success".to_string())
]),]
);

assert_eq!(
blocking2_res.get_columns(),
&vec![
Column::new("payload".to_string(), ColumnType::String, 0),
Column::new("type".to_string(), ColumnType::String, 1),
]
);
assert_eq!(
blocking2_res.get_rows(),
&vec![Row::new(vec![
TableValue::String("result:12345".to_string()),
TableValue::String("success".to_string())
]),]
);
}
}

pub fn to_rows(d: &DataFrame) -> Vec<Vec<TableValue>> {
return d
.get_rows()
Expand Down
18 changes: 7 additions & 11 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,12 +702,13 @@ impl CacheStore for RocksCacheStore {
if let Some(item_row) = item_row {
queue_schema.delete(item_row.get_id(), batch_pipe)?;

let queue_result = QueueResult::new(path.clone(), result);
let queue_result = QueueResult::new(path.clone(), result.clone());
let result_row = result_schema.insert(queue_result, batch_pipe)?;

batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent {
row_id: result_row.get_id(),
path,
result,
}));

Ok(())
Expand Down Expand Up @@ -747,16 +748,11 @@ impl CacheStore for RocksCacheStore {
self.store
.write_operation(move |db_ref, batch_pipe| {
let queue_schema = QueueResultRocksTable::new(db_ref.clone());
let queue_result =
queue_schema.try_delete(ack_event.row_id, batch_pipe)?;

if let Some(queue_result) = queue_result {
Ok(Some(QueueResultResponse::Success {
value: queue_result.row.value,
}))
} else {
Ok(None)
}
queue_schema.try_delete(ack_event.row_id, batch_pipe)?;

Ok(Some(QueueResultResponse::Success {
value: ack_event.result,
}))
})
.await
}
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/cachestore/queue_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn merge(a: serde_json::Value, b: serde_json::Value) -> Option<serde_json::Value
pub struct QueueResultAckEvent {
pub path: String,
pub row_id: u64,
pub result: String,
}

#[repr(u8)]
Expand Down

0 comments on commit d8be78a

Please sign in to comment.