-
Notifications
You must be signed in to change notification settings - Fork 56
/
paging.rs
86 lines (72 loc) · 2.77 KB
/
paging.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
mod help;
use cassandra_cpp::*;
static NUM_CONCURRENT_REQUESTS: usize = 100;
const PAGE_SIZE: i32 = 10;
static CREATE_TABLE: &str =
"CREATE TABLE IF NOT EXISTS examples.paging (key ascii, value text, PRIMARY KEY \
(key));";
static SELECT_QUERY: &str = "SELECT * FROM paging";
static INSERT_QUERY: &str = "INSERT INTO paging (key, value) VALUES (?, ?);";
// FIXME uuids not yet working
async fn insert_into_paging(session: &Session /* , uuid_gen:&mut UuidGen */) -> Result<()> {
let mut futures = Vec::with_capacity(NUM_CONCURRENT_REQUESTS);
let prepared_statement = session.prepare(INSERT_QUERY).await?;
for i in 0..NUM_CONCURRENT_REQUESTS {
let key: &str = &(i.to_string());
println!("key ={:?}", key);
let mut statement = prepared_statement.bind();
statement.bind(0, key)?;
statement.bind(1, key)?;
let future = statement.execute();
futures.push(future);
}
futures::future::try_join_all(futures).await?;
Ok(())
}
async fn select_from_paging(session: &Session) -> Result<Vec<(String, String)>> {
let mut has_more_pages = true;
let mut res = vec![];
let mut prev_result = None;
// FIXME must understand statement lifetime better for paging
while has_more_pages {
let mut statement = session.statement(SELECT_QUERY);
statement.set_paging_size(PAGE_SIZE)?;
if let Some(result) = prev_result.take() {
statement.set_paging_state(result)?;
}
let result = statement.execute().await?;
println!("{:?}", result);
let mut iter = result.iter();
while let Some(row) = iter.next() {
match row.get_column(0)?.get_string() {
Ok(key) => {
let key_str = key.to_string();
let value = row.get_column(1)?.get_string()?;
println!("key: '{:?}' value: '{:?}'", &key_str, &value);
res.push((key_str, value));
}
Err(err) => panic!("{}", err),
}
}
drop(iter);
has_more_pages = result.has_more_pages();
if has_more_pages {
prev_result = Some(result);
}
}
Ok(res)
}
#[tokio::test]
async fn test_paging() -> Result<()> {
// let uuid_gen = &mut UuidGen::new();
let session = help::create_test_session().await;
help::create_example_keyspace(&session).await;
session.execute(CREATE_TABLE).await?;
session.execute("USE examples").await?;
insert_into_paging(&session /* , uuid_gen */).await?;
let mut results: Vec<(String, String)> = select_from_paging(&session).await?;
results.sort_by_key(|kv| kv.0.clone());
results.dedup_by_key(|kv| kv.0.clone());
assert_eq!(results.len(), NUM_CONCURRENT_REQUESTS);
Ok(())
}