diff --git a/src/conn/mod.rs b/src/conn/mod.rs index e6040b33..8c68a213 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -28,6 +28,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::queryable::query_result::ResultSetMetaState; use crate::{ conn::{pool::Pool, stmt_cache::StmtCache}, consts::{CapabilityFlags, Command, StatusFlags}, @@ -81,7 +82,7 @@ struct ConnInner { last_ok_packet: Option>, last_err_packet: Option>, pool: Option, - pending_result: Option, + pending_result: ResultSetMetaState, tx_status: TxStatus, opts: Opts, last_io: Instant, @@ -119,7 +120,7 @@ impl ConnInner { stream: None, version: (0, 0, 0), id: 0, - pending_result: None, + pending_result: ResultSetMetaState::None, pool: None, tx_status: TxStatus::None, last_io: Instant::now(), @@ -251,12 +252,26 @@ impl Conn { /// /// If `Some(_)`, then result is not yet consumed. pub(crate) fn get_pending_result(&self) -> Option<&ResultSetMeta> { - self.inner.pending_result.as_ref() + self.inner.pending_result.meta() } - /// Sets the given pening result metadata for this connection. - pub(crate) fn set_pending_result(&mut self, meta: Option) { - self.inner.pending_result = meta; + /// Marks pending result as fetched to indicate that result set is not empty + pub(crate) fn mark_pending_result_as_fetched(&mut self) { + self.inner.pending_result.mark_as_fetched(); + } + + pub(crate) fn is_pending_result_fetched(&self) -> bool { + self.inner.pending_result.is_fetched() + } + + // Initializes not yet fetched pending result + pub(crate) fn initialize_pending_result(&mut self, meta: ResultSetMeta) { + self.inner.pending_result = ResultSetMetaState::NotFetched(meta); + } + + // Marks pending result as consumed + pub(crate) fn drop_pending_result(&mut self) { + self.inner.pending_result = ResultSetMetaState::None; } /// Returns current status flags. @@ -728,7 +743,7 @@ impl Conn { } pub(crate) async fn drop_result(&mut self) -> Result<()> { - match self.inner.pending_result.as_ref() { + match self.get_pending_result() { Some(ResultSetMeta::Text(_)) => { QueryResult::<'_, '_, TextProtocol>::new(self) .drop_result() @@ -750,7 +765,7 @@ impl Conn { /// The purpose of this function, is to cleanup the connection while returning it to a [`Pool`]. async fn cleanup(mut self) -> Result { loop { - if self.inner.pending_result.is_some() { + if self.get_pending_result().is_some() { self.drop_result().await?; } else if self.inner.tx_status != TxStatus::None { self.rollback_transaction().await?; @@ -764,6 +779,7 @@ impl Conn { #[cfg(test)] mod test { + use crate::queryable::query_result::NextItem; use crate::{ from_row, params, prelude::*, test_misc::get_opts, Conn, Error, OptsBuilder, TxOpts, WhiteListFsLocalInfileHandler, @@ -1061,6 +1077,82 @@ mod test { Ok(()) } + #[tokio::test] + async fn should_report_empty_result_sets_with_extended_next() -> super::Result<()> { + let mut conn = Conn::new(get_opts()).await?; + conn.query_drop( + r" + CREATE TEMPORARY TABLE empty_result_sets_with_extended_next_1 ( + id INT UNSIGNED NOT NULL AUTO_INCREMENT, + column_value INT UNSIGNED NOT NULL, + PRIMARY KEY(id) + ); + + CREATE TEMPORARY TABLE empty_result_sets_with_extended_next_2 ( + id INT UNSIGNED NOT NULL AUTO_INCREMENT, + column_value INT UNSIGNED NOT NULL, + PRIMARY KEY(id) + ); + + CREATE TEMPORARY TABLE empty_result_sets_with_extended_next_3 ( + id INT UNSIGNED NOT NULL AUTO_INCREMENT, + column_value INT UNSIGNED NOT NULL, + PRIMARY KEY(id) + ); + + INSERT INTO empty_result_sets_with_extended_next_3 (column_value) VALUES (1), (2); + ", + ) + .await?; + let mut result = conn + .query_iter( + r" + SELECT * FROM empty_result_sets_with_extended_next_1; + SELECT * FROM empty_result_sets_with_extended_next_2; + SELECT * FROM empty_result_sets_with_extended_next_3; + ", + ) + .await?; + + let item_into_string = |value| match value { + NextItem::None => "None".into(), + NextItem::EmptyResult(columns) => format!( + "EmptyResult({},{})", + columns.get(0).unwrap().name_str(), + columns.get(1).unwrap().name_str(), + ), + NextItem::Row(row) => { + let row = from_row::<(String, String)>(row); + format!("Row({},{})", row.0, row.1) + } + }; + + let rows: Vec = vec![ + result.next_item().await?, + result.next_item().await?, + result.next_item().await?, + result.next_item().await?, + result.next_item().await?, + ]; + + conn.disconnect().await?; + + assert_eq!( + rows.into_iter() + .map(item_into_string) + .collect::>(), + vec![ + String::from("EmptyResult(id,column_value)"), + String::from("EmptyResult(id,column_value)"), + String::from("Row(1,1)"), + String::from("Row(2,2)"), + String::from("None") + ] + ); + + Ok(()) + } + #[tokio::test] async fn should_map_resultset() -> super::Result<()> { let mut conn = Conn::new(get_opts()).await?; diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 5748686d..b656c390 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -172,7 +172,7 @@ impl Pool { && !conn.inner.disconnected && !conn.expired() && conn.inner.tx_status == TxStatus::None - && conn.inner.pending_result.is_none() + && conn.get_pending_result().is_none() && !self.inner.close.load(atomic::Ordering::Acquire) { let mut exchange = self.inner.exchange.lock().unwrap(); diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index dd352e77..cc75c05c 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -68,7 +68,7 @@ impl Future for Recycler { .discard .push(BoxFuture(Box::pin(::futures_util::future::ok(())))); } else if $conn.inner.tx_status != TxStatus::None - || $conn.inner.pending_result.is_some() + || $conn.get_pending_result().is_some() { $self.cleaning.push(BoxFuture(Box::pin($conn.cleanup()))); } else if $conn.expired() || close { diff --git a/src/lib.rs b/src/lib.rs index bd9daf59..4e4fc8df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,7 +188,7 @@ pub use mysql_common::value::convert::{from_value, from_value_opt, FromValueErro pub use mysql_common::value::json::{Deserialized, Serialized}; #[doc(inline)] -pub use self::queryable::query_result::QueryResult; +pub use self::queryable::query_result::{NextItem, QueryResult}; #[doc(inline)] pub use self::queryable::transaction::{Transaction, TxOpts}; diff --git a/src/queryable/query_result.rs b/src/queryable/query_result.rs index 4c37fb0f..95c6d604 100644 --- a/src/queryable/query_result.rs +++ b/src/queryable/query_result.rs @@ -39,6 +39,49 @@ impl ResultSetMeta { } } +// State to indicate current progression of result set processing +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum ResultSetMetaState { + None, + NotFetched(ResultSetMeta), + Fetched(ResultSetMeta), +} + +impl ResultSetMetaState { + // Marks result set as fetched + pub(crate) fn mark_as_fetched(&mut self) { + let meta = match &self { + Self::None | Self::Fetched(_) => return, + Self::NotFetched(meta) => meta.clone(), + }; + + *self = Self::Fetched(meta) + } + + // Checks if result set was ever fetched + pub(crate) fn is_fetched(&self) -> bool { + match self { + ResultSetMetaState::Fetched(_) => true, + _ => false, + } + } + + // Returns any contained result set meta if any + pub(crate) fn meta(&self) -> Option<&ResultSetMeta> { + match self { + ResultSetMetaState::Fetched(meta) | ResultSetMetaState::NotFetched(meta) => Some(meta), + ResultSetMetaState::None => None, + } + } +} + +#[derive(Clone, PartialEq)] +pub enum NextItem { + None, + Row(Row), + EmptyResult(Arc<[Column]>), +} + /// Result of a query or statement execution. /// /// Represents an asynchronous query result, that may not be fully consumed. Note, @@ -77,6 +120,16 @@ where } pub async fn next(&mut self) -> Result> { + let row = match self.next_item().await? { + NextItem::None => None, + NextItem::Row(row) => Some(row), + NextItem::EmptyResult(_) => None, + }; + + Ok(row) + } + + pub async fn next_item(&mut self) -> Result { loop { let columns = match self.conn.get_pending_result() { Some(ResultSetMeta::Text(cols)) | Some(ResultSetMeta::Binary(cols)) => { @@ -90,26 +143,34 @@ where Ok(Some(columns)) => { if columns.is_empty() { // Empty, but not yet consumed result set. - self.conn.set_pending_result(None); - return Ok(None); + self.conn.drop_pending_result(); + return Ok(NextItem::None); } else { // Not yet consumed non-empty result set. let packet = match self.conn.read_packet().await { Ok(packet) => packet, Err(err) => { // Next row contained an error. No more data will follow. - self.conn.set_pending_result(None); + self.conn.drop_pending_result(); return Err(err); } }; if P::is_last_result_set_packet(self.conn.capabilities(), &packet) { // `packet` is a result set terminator. - self.conn.set_pending_result(None); - return Ok(None); + let item = if self.conn.is_pending_result_fetched() { + NextItem::None + } else { + NextItem::EmptyResult(columns) + }; + + self.conn.drop_pending_result(); + return Ok(item); } else { // `packet` is a result set row. - return Ok(Some(P::read_result_set_row(&packet, columns)?)); + let row = P::read_result_set_row(&packet, columns)?; + self.conn.mark_pending_result_as_fetched(); + return Ok(NextItem::Row(row)); } } } @@ -122,12 +183,12 @@ where continue; } else { // The end of a query result. - return Ok(None); + return Ok(NextItem::None); } } Err(err) => { // Error result set. No more data will follow. - self.conn.set_pending_result(None); + self.conn.drop_pending_result(); return Err(err); } } @@ -336,9 +397,9 @@ impl crate::Conn { let packet = self.read_packet().await?; match packet.get(0) { - Some(0x00) => self.set_pending_result(Some(P::result_set_meta(Arc::from( + Some(0x00) => self.initialize_pending_result(P::result_set_meta(Arc::from( Vec::new().into_boxed_slice(), - )))), + ))), Some(0xFB) => self.handle_local_infile::

(&*packet).await?, _ => self.handle_result_set::

(&*packet).await?, } @@ -369,9 +430,9 @@ impl crate::Conn { } self.read_packet().await?; - self.set_pending_result(Some(P::result_set_meta(Arc::from( + self.initialize_pending_result(P::result_set_meta(Arc::from( Vec::new().into_boxed_slice(), - )))); + ))); Ok(()) } @@ -385,7 +446,7 @@ impl crate::Conn { let column_count = packet.read_lenenc_int()?; let columns = self.read_column_defs(column_count as usize).await?; let meta = P::result_set_meta(Arc::from(columns.into_boxed_slice())); - self.set_pending_result(Some(meta)); + self.initialize_pending_result(meta); Ok(()) } } diff --git a/tests/exports.rs b/tests/exports.rs index e6ff0466..9963948c 100644 --- a/tests/exports.rs +++ b/tests/exports.rs @@ -8,8 +8,8 @@ use mysql_async::{ StatementLike, ToValue, }, time, uuid, BinaryProtocol, BoxFuture, Column, Conn, Deserialized, DriverError, Error, - FromRowError, FromValueError, IoError, IsolationLevel, Opts, OptsBuilder, Params, ParseError, - Pool, PoolConstraints, PoolOpts, QueryResult, Result, Row, Serialized, ServerError, SslOpts, - Statement, TextProtocol, Transaction, TxOpts, UrlError, Value, WhiteListFsLocalInfileHandler, - DEFAULT_INACTIVE_CONNECTION_TTL, DEFAULT_TTL_CHECK_INTERVAL, + FromRowError, FromValueError, IoError, IsolationLevel, NextItem, Opts, OptsBuilder, Params, + ParseError, Pool, PoolConstraints, PoolOpts, QueryResult, Result, Row, Serialized, ServerError, + SslOpts, Statement, TextProtocol, Transaction, TxOpts, UrlError, Value, + WhiteListFsLocalInfileHandler, DEFAULT_INACTIVE_CONNECTION_TTL, DEFAULT_TTL_CHECK_INTERVAL, };