From d685186e5eae6e4bf52ec296222928bb236970f8 Mon Sep 17 00:00:00 2001 From: Paul Colomiets Date: Mon, 29 May 2023 13:53:46 +0300 Subject: [PATCH] tokio:: Add `Client::execute` and `Transaction::execute` methods (#247) Fixes #241 --- edgedb-tokio/src/client.rs | 45 ++++++++++++++++++++++++- edgedb-tokio/src/transaction.rs | 35 +++++++++++++++++++ edgedb-tokio/tests/func/client.rs | 3 ++ edgedb-tokio/tests/func/transactions.rs | 42 +++++++++++++++++++++++ 4 files changed, 124 insertions(+), 1 deletion(-) diff --git a/edgedb-tokio/src/client.rs b/edgedb-tokio/src/client.rs index eb2f17a0..2fc094dc 100644 --- a/edgedb-tokio/src/client.rs +++ b/edgedb-tokio/src/client.rs @@ -64,7 +64,7 @@ impl Client { /// let greeting = pool.query::("SELECT 'hello'", &()); /// // or /// let greeting: Vec = pool.query("SELECT 'hello'", &()); - /// + /// /// let two_numbers: Vec = conn.query("select {$0, $1}", &(10, 20)).await?; /// ``` /// @@ -401,6 +401,49 @@ impl Client { "query row returned zero results")) } + /// Execute a query and don't expect result + /// + /// This method can be used with both static arguments, like a tuple of + /// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`]. + /// Similarly, dynamically typed results are also supported. + pub async fn execute(&self, query: &str, arguments: &A) + -> Result<(), Error> + where A: QueryArgs, + { + let mut iteration = 0; + loop { + let mut conn = self.pool.acquire().await?; + + let conn = conn.inner(); + let state = &self.options.state; + let caps = Capabilities::MODIFICATIONS | Capabilities::DDL; + match conn.execute(query, arguments, state, caps).await { + Ok(resp) => return Ok(resp.data), + Err(e) => { + let allow_retry = match e.get::() { + // Error from a weird source, or just a bug + // Let's keep on the safe side + None => false, + Some(QueryCapabilities::Unparsed) => true, + Some(QueryCapabilities::Parsed(c)) => c.is_empty(), + }; + if allow_retry && e.has_tag(SHOULD_RETRY) { + let rule = self.options.retry.get_rule(&e); + iteration += 1; + if iteration < rule.attempts { + let duration = (rule.backoff)(iteration); + log::info!("Error: {:#}. Retrying in {:?}...", + e, duration); + sleep(duration).await; + continue; + } + } + return Err(e); + } + } + } + } + /// Execute a transaction /// /// Transaction body must be encompassed in the closure. The closure **may diff --git a/edgedb-tokio/src/transaction.rs b/edgedb-tokio/src/transaction.rs index 522d616a..8b09290c 100644 --- a/edgedb-tokio/src/transaction.rs +++ b/edgedb-tokio/src/transaction.rs @@ -433,6 +433,41 @@ impl Transaction { .ok_or_else(|| NoDataError::with_message( "query row returned zero results")) } + + /// Execute a query and don't expect result + /// + /// This method can be used with both static arguments, like a tuple of + /// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`]. + /// Similarly, dynamically typed results are also supported. + pub async fn execute(&mut self, query: &str, arguments: &A) + -> Result<(), Error> + where A: QueryArgs, + { + self.ensure_started().await?; + let flags = CompilationOptions { + implicit_limit: None, + implicit_typenames: false, + implicit_typeids: false, + explicit_objectids: true, + allow_capabilities: Capabilities::MODIFICATIONS, + io_format: IoFormat::Binary, + expected_cardinality: Cardinality::Many, + }; + let state = self.state.clone(); // TODO: optimize, by careful borrow + let ref mut conn = self.inner().conn; + let desc = conn.parse(&flags, query, &state).await?; + let inp_desc = desc.input() + .map_err(ProtocolEncodingError::with_source)?; + + let mut arg_buf = BytesMut::with_capacity(8); + arguments.encode(&mut Encoder::new( + &inp_desc.as_query_arg_context(), + &mut arg_buf, + ))?; + + conn.execute(&flags, query, &state, &desc, &arg_buf.freeze()).await?; + Ok(()) + } } #[allow(dead_code, unreachable_code)] diff --git a/edgedb-tokio/tests/func/client.rs b/edgedb-tokio/tests/func/client.rs index 1e965635..64df8524 100644 --- a/edgedb-tokio/tests/func/client.rs +++ b/edgedb-tokio/tests/func/client.rs @@ -39,6 +39,9 @@ async fn simple() -> anyhow::Result<()> { "SELECT {}", &()).await.unwrap_err(); assert!(err.is::()); + client.execute("SELECT 1+1", &()).await?; + client.execute("START MIGRATION TO {}; ABORT MIGRATION", &()).await?; + Ok(()) } diff --git a/edgedb-tokio/tests/func/transactions.rs b/edgedb-tokio/tests/func/transactions.rs index 8e76df02..4c0049f4 100644 --- a/edgedb-tokio/tests/func/transactions.rs +++ b/edgedb-tokio/tests/func/transactions.rs @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use tokio::sync::{Mutex}; +use edgedb_errors::NoDataError; use edgedb_tokio::{Client, Transaction}; use crate::server::SERVER; @@ -141,3 +142,44 @@ async fn transaction_conflict_with_complex_err() -> anyhow::Result<()> { assert_eq!(iters.load(Ordering::SeqCst), 3); Ok(()) } + +#[tokio::test] +async fn queries() -> anyhow::Result<()> { + let client = Client::new(&SERVER.config); + client.transaction(|mut tx| async move { + let value = tx.query::("SELECT 7*93", &()).await?; + assert_eq!(value, vec![651]); + + let value = tx.query_single::("SELECT 5*11", &()).await?; + assert_eq!(value, Some(55)); + + let value = tx.query_single::("SELECT {}", &()).await?; + assert_eq!(value, None); + + let value = tx.query_required_single::( + "SELECT 5*11", &()).await?; + assert_eq!(value, 55); + + let err = tx.query_required_single::( + "SELECT {}", &()).await.unwrap_err(); + assert!(err.is::()); + + let value = tx.query_json("SELECT 'x' ++ 'y'", &()).await?; + assert_eq!(value.as_ref(), r#"["xy"]"#); + + let value = tx.query_single_json("SELECT 'x' ++ 'y'", &()).await?; + assert_eq!(value.as_deref(), Some(r#""xy""#)); + + let value = tx.query_single_json("SELECT {}", &()).await?; + assert_eq!(value.as_deref(), None); + + let err = tx.query_required_single_json( + "SELECT {}", &()).await.unwrap_err(); + assert!(err.is::()); + + tx.execute("SELECT 1+1", &()).await?; + + Ok(()) + }).await?; + Ok(()) +}