Skip to content

Commit

Permalink
tokio:: Add Client::execute and Transaction::execute methods (#247)
Browse files Browse the repository at this point in the history
Fixes #241
  • Loading branch information
tailhook committed May 29, 2023
1 parent 9c9bee7 commit d685186
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 1 deletion.
45 changes: 44 additions & 1 deletion edgedb-tokio/src/client.rs
Expand Up @@ -64,7 +64,7 @@ impl Client {
/// let greeting = pool.query::<String, _>("SELECT 'hello'", &());
/// // or
/// let greeting: Vec<String> = pool.query("SELECT 'hello'", &());
///
///
/// let two_numbers: Vec<i32> = conn.query("select {<int32>$0, <int32>$1}", &(10, 20)).await?;
/// ```
///
Expand Down Expand Up @@ -401,6 +401,49 @@ impl Client {
"query row returned zero results"))
}

/// Execute a query and don't expect result
///
/// This method can be used with both static arguments, like a tuple of
/// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`].
/// Similarly, dynamically typed results are also supported.
pub async fn execute<A>(&self, query: &str, arguments: &A)
-> Result<(), Error>
where A: QueryArgs,
{
let mut iteration = 0;
loop {
let mut conn = self.pool.acquire().await?;

let conn = conn.inner();
let state = &self.options.state;
let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
match conn.execute(query, arguments, state, caps).await {
Ok(resp) => return Ok(resp.data),
Err(e) => {
let allow_retry = match e.get::<QueryCapabilities>() {
// Error from a weird source, or just a bug
// Let's keep on the safe side
None => false,
Some(QueryCapabilities::Unparsed) => true,
Some(QueryCapabilities::Parsed(c)) => c.is_empty(),
};
if allow_retry && e.has_tag(SHOULD_RETRY) {
let rule = self.options.retry.get_rule(&e);
iteration += 1;
if iteration < rule.attempts {
let duration = (rule.backoff)(iteration);
log::info!("Error: {:#}. Retrying in {:?}...",
e, duration);
sleep(duration).await;
continue;
}
}
return Err(e);
}
}
}
}

/// Execute a transaction
///
/// Transaction body must be encompassed in the closure. The closure **may
Expand Down
35 changes: 35 additions & 0 deletions edgedb-tokio/src/transaction.rs
Expand Up @@ -433,6 +433,41 @@ impl Transaction {
.ok_or_else(|| NoDataError::with_message(
"query row returned zero results"))
}

/// Execute a query and don't expect result
///
/// This method can be used with both static arguments, like a tuple of
/// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`].
/// Similarly, dynamically typed results are also supported.
pub async fn execute<A>(&mut self, query: &str, arguments: &A)
-> Result<(), Error>
where A: QueryArgs,
{
self.ensure_started().await?;
let flags = CompilationOptions {
implicit_limit: None,
implicit_typenames: false,
implicit_typeids: false,
explicit_objectids: true,
allow_capabilities: Capabilities::MODIFICATIONS,
io_format: IoFormat::Binary,
expected_cardinality: Cardinality::Many,
};
let state = self.state.clone(); // TODO: optimize, by careful borrow
let ref mut conn = self.inner().conn;
let desc = conn.parse(&flags, query, &state).await?;
let inp_desc = desc.input()
.map_err(ProtocolEncodingError::with_source)?;

let mut arg_buf = BytesMut::with_capacity(8);
arguments.encode(&mut Encoder::new(
&inp_desc.as_query_arg_context(),
&mut arg_buf,
))?;

conn.execute(&flags, query, &state, &desc, &arg_buf.freeze()).await?;
Ok(())
}
}

#[allow(dead_code, unreachable_code)]
Expand Down
3 changes: 3 additions & 0 deletions edgedb-tokio/tests/func/client.rs
Expand Up @@ -39,6 +39,9 @@ async fn simple() -> anyhow::Result<()> {
"SELECT <int64>{}", &()).await.unwrap_err();
assert!(err.is::<NoDataError>());

client.execute("SELECT 1+1", &()).await?;
client.execute("START MIGRATION TO {}; ABORT MIGRATION", &()).await?;

Ok(())
}

Expand Down
42 changes: 42 additions & 0 deletions edgedb-tokio/tests/func/transactions.rs
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};

use tokio::sync::{Mutex};

use edgedb_errors::NoDataError;
use edgedb_tokio::{Client, Transaction};

use crate::server::SERVER;
Expand Down Expand Up @@ -141,3 +142,44 @@ async fn transaction_conflict_with_complex_err() -> anyhow::Result<()> {
assert_eq!(iters.load(Ordering::SeqCst), 3);
Ok(())
}

#[tokio::test]
async fn queries() -> anyhow::Result<()> {
let client = Client::new(&SERVER.config);
client.transaction(|mut tx| async move {
let value = tx.query::<i64, _>("SELECT 7*93", &()).await?;
assert_eq!(value, vec![651]);

let value = tx.query_single::<i64, _>("SELECT 5*11", &()).await?;
assert_eq!(value, Some(55));

let value = tx.query_single::<i64, _>("SELECT <int64>{}", &()).await?;
assert_eq!(value, None);

let value = tx.query_required_single::<i64, _>(
"SELECT 5*11", &()).await?;
assert_eq!(value, 55);

let err = tx.query_required_single::<i64, _>(
"SELECT <int64>{}", &()).await.unwrap_err();
assert!(err.is::<NoDataError>());

let value = tx.query_json("SELECT 'x' ++ 'y'", &()).await?;
assert_eq!(value.as_ref(), r#"["xy"]"#);

let value = tx.query_single_json("SELECT 'x' ++ 'y'", &()).await?;
assert_eq!(value.as_deref(), Some(r#""xy""#));

let value = tx.query_single_json("SELECT <str>{}", &()).await?;
assert_eq!(value.as_deref(), None);

let err = tx.query_required_single_json(
"SELECT <int64>{}", &()).await.unwrap_err();
assert!(err.is::<NoDataError>());

tx.execute("SELECT 1+1", &()).await?;

Ok(())
}).await?;
Ok(())
}

0 comments on commit d685186

Please sign in to comment.