Skip to content

Commit

Permalink
Merge pull request #160 from renancloudwalk/master
Browse files Browse the repository at this point in the history
Adapt Session Arc and associate a Session to Statement, Prepared Statement & Batch
  • Loading branch information
kw217 committed Mar 13, 2023
2 parents 92ecb77 + 9748a39 commit 3b9a6fb
Show file tree
Hide file tree
Showing 51 changed files with 1,160 additions and 818 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). The
version number is tracked in the file `VERSION`.

## [Unreleased]
### Changed
- All functions were converted to be asynchronous. This allows for better performance, as asynchronous functions can run in parallel and make better use of system resources.
- The macro `stmt!` was changed from a macro to a function, named `statement`. This change fundamentally alters how the API is used and was made to provide a more intuitive and clear interface for users.
- The wait function was replaced with await from the tokio runtime. This change was made to take advantage of the asynchronous capabilities of the tokio runtime, allowing for better performance and scalability in Cassandra operations.

### Fixed
- The main development branch is now `main` not `master`. To track this, fetch and then check out `main`.

Expand Down
41 changes: 40 additions & 1 deletion README.md
Expand Up @@ -51,10 +51,49 @@ thin wrapper around the DataStax driver, you may also find the DataStax
## Example

For a straightforward example see [`simple.rs`](examples/simple.rs).

There are additional examples included with the project in [`tests`](tests/) and
[`examples`](examples/).

## New session API (version 2.0)

Version 2.0 introduces a new and safer API. `Statement`s (and
`PreparedStatement` and `Batch`) are now associated with a specific `Session`.
In addition, the legacy `.wait()` API is removed in favour of the now-ubiquitous
`.await`.

* This crate's functions have became `async`, meaning they can only be called as
part of an asynchronous workflow. To use these functions, you can either call
them from within an asynchronous function using the `.await` operator, or you
can call them from a synchronous context using the `block_on` method from
[tokio
runtime](https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on).

* The `stmt!` macro and `Statement::new` method have been replaced with the
`Session::statement()` method, which records the association with the session.
Simply update your code to use the new method instead of the macro to continue
using its functionality.

* Statements are executed with `.execute()`, which consumes
the statement: you cannot execute the same statement twice; if you need this,
recreate the statement.

* `Batch::new` is removed in favour of `Session::batch`.

* There is a new error, `BatchSessionMismatch`, which occurs if you try to add
statements from different `Session`s into the same `Batch`.

* Connection methods are tidied up. `Cluster::connect_async` is removed since
`Cluster::connect` is now async. `Session::connect` and
`Session::connect_keyspace` are removed - use `Cluster::connect` and
`Cluster::connect_keyspace` instead.

* `Session::close` (which allowed waiting until in-flight requests on the
session were complete) is removed because it is non-trivial to implement
safely. This functionality is no longer supported.

* `Cluster::set_ssl` now consumes its argument, for improved safety.


## Futures (version 0.15)

Expand Down
11 changes: 7 additions & 4 deletions examples/cloud.rs
@@ -1,7 +1,8 @@
use cassandra_cpp::*;
use std::env;

fn main() {
#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 4 {
eprintln!(
Expand All @@ -21,9 +22,11 @@ fn main() {
.unwrap();
cluster.set_credentials(username, password).unwrap();

let session = cluster.connect().unwrap();
let statement = stmt!("SELECT release_version FROM system.local");
let result = session.execute(&statement).wait().unwrap();
let session = cluster.connect().await.unwrap();
let result = session
.execute("SELECT release_version FROM system.local")
.await
.unwrap();
let row = result.first_row().unwrap();
let version: String = row.get_by_name("release_version").unwrap();
println!("release_version: {}", version);
Expand Down
26 changes: 11 additions & 15 deletions examples/collections.rs
Expand Up @@ -3,15 +3,11 @@
use cassandra_cpp::*;
use std::collections::hash_map::HashMap;

fn do_work(session: &Session) -> Result<()> {
let create_keyspace = stmt!("CREATE KEYSPACE IF NOT EXISTS testks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 };");
let create_table = stmt!("CREATE TABLE IF NOT EXISTS testks.user (first_name text PRIMARY KEY, addresses map<text, text>, email set<text>, last_name text, phone_numbers list<text>, title int);");
let mut insert_data = stmt!("INSERT INTO testks.user (first_name, addresses, email, last_name, phone_numbers, title) VALUES (?, ?, ?, ?, ?, ?);");
let query = stmt!("SELECT * FROM testks.user;");
async fn do_work(session: &Session) -> Result<()> {
session.execute("CREATE KEYSPACE IF NOT EXISTS testks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 };").await?;
session.execute("CREATE TABLE IF NOT EXISTS testks.user (first_name text PRIMARY KEY, addresses map<text, text>, email set<text>, last_name text, phone_numbers list<text>, title int);").await?;

session.execute(&create_keyspace).wait()?;

session.execute(&create_table).wait()?;
let mut insert_data = session.statement("INSERT INTO testks.user (first_name, addresses, email, last_name, phone_numbers, title) VALUES (?, ?, ?, ?, ?, ?);");

insert_data.bind(0, "Paul")?;
insert_data.bind_null(1)?;
Expand All @@ -25,9 +21,9 @@ fn do_work(session: &Session) -> Result<()> {
phones.append_string("789-012")?;
insert_data.bind(4, phones)?; // TODO: bind should really accept Vec<T>, and map should accept HashMap<T, U>. Requires generic CassCollection::append.
insert_data.bind(5, 13)?;
session.execute(&insert_data).wait()?;
insert_data.execute().await?;

let result = session.execute(&query).wait()?;
let result = session.execute("SELECT * FROM testks.user;").await?;

println!("Overall result: {}", result);
for row in result.iter() {
Expand Down Expand Up @@ -61,12 +57,12 @@ fn do_work(session: &Session) -> Result<()> {
Ok(())
}

fn main() {
let contact_points = "127.0.0.1";
#[tokio::main]
async fn main() -> Result<()> {
let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_load_balance_round_robin();

let session = cluster.connect().unwrap();
do_work(&session).unwrap();
let session = cluster.connect().await?;
do_work(&session).await
}
File renamed without changes.
31 changes: 14 additions & 17 deletions examples/simple.rs
@@ -1,24 +1,21 @@
use cassandra_cpp::*;

fn main() {
let query = stmt!("SELECT keyspace_name FROM system_schema.keyspaces;");
let col_name = "keyspace_name";

let contact_points = "127.0.0.1";

#[tokio::main]
async fn main() -> Result<()> {
let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_load_balance_round_robin();
let session = cluster.connect().await?;

match cluster.connect() {
Ok(ref mut session) => {
let result = session.execute(&query).wait().unwrap();
println!("{}", result);
for row in result.iter() {
let col: String = row.get_by_name(col_name).unwrap();
println!("ks name = {}", col);
}
}
err => println!("{:?}", err),
let result = session
.execute("SELECT keyspace_name FROM system_schema.keyspaces;")
.await?;
println!("{}", result);

for row in result.iter() {
let col: String = row.get_by_name("keyspace_name")?;
println!("ks name = {}", col);
}

Ok(())
}
24 changes: 0 additions & 24 deletions examples/simple_async.rs

This file was deleted.

59 changes: 20 additions & 39 deletions examples/ssl.rs
@@ -1,53 +1,34 @@
use cassandra_cpp::*;
use std::fs;

fn main() {
let query = stmt!("SELECT keyspace_name FROM system_schema.keyspaces;");
#[tokio::main]
async fn main() -> Result<()> {
let col_name = "keyspace_name";
let contact_points = "127.0.0.1";
let tls_ca_certificate_path = "ca/certificate/path";

let ca_cert = match fs::read_to_string(tls_ca_certificate_path) {
Ok(cert) => {
println!("Read in certificate file");
cert
}
Err(e) => {
panic!("Failed to open certificate file. Error: {}", e);
}
};

let cert =
fs::read_to_string(tls_ca_certificate_path).expect("Failed to open certificate file");
let mut ssl = cassandra_cpp::Ssl::default();

match cassandra_cpp::Ssl::add_trusted_cert(&mut ssl, &ca_cert) {
Ok(_o) => {
println!("Added trusted certificate");
}
Err(e) => {
panic!("Failed to add trusted certificate. Error: {}", e);
}
}

println!("Set verification level");
let verify_level = vec![cassandra_cpp::SslVerifyFlag::PEER_IDENTITY];
cassandra_cpp::Ssl::set_verify_flags(&mut ssl, &verify_level);
ssl.add_trusted_cert(cert)?;
ssl.set_verify_flags(&[cassandra_cpp::SslVerifyFlag::PEER_IDENTITY]);

let mut cluster = Cluster::default();
cluster.set_contact_points(contact_points).unwrap();
cluster.set_load_balance_round_robin();

println!("Adding SSL to cluster");
cluster.set_ssl(&mut ssl);

match cluster.connect() {
Ok(ref mut session) => {
let result = session.execute(&query).wait().unwrap();
println!("{}", result);
for row in result.iter() {
let col: String = row.get_by_name(col_name).unwrap();
println!("ks name = {}", col);
}
}
err => println!("{:?}", err),
cluster.set_ssl(ssl);

let session = cluster.connect().await?;
let result = session
.execute("SELECT keyspace_name FROM system_schema.keyspaces;")
.await
.unwrap();

println!("{}", result);
for row in result.iter() {
let col: String = row.get_by_name(col_name).unwrap();
println!("ks name = {}", col);
}

Ok(())
}

0 comments on commit 3b9a6fb

Please sign in to comment.