Skip to content

Commit

Permalink
feat: add should_flush and only spawn on drop for PoolConnection if w…
Browse files Browse the repository at this point in the history
…e need to flush
  • Loading branch information
mehcode committed Jun 21, 2020
1 parent d76002e commit 4448c0e
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 14 deletions.
4 changes: 3 additions & 1 deletion sqlx-core/src/connection.rs
Expand Up @@ -64,10 +64,12 @@ pub trait Connection: Send {
})
}

/// Flush any pending commands to the database.
#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;

#[doc(hidden)]
fn should_flush(&self) -> bool;

#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection;

Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/mssql/connection/mod.rs
Expand Up @@ -42,6 +42,11 @@ impl Connection for MssqlConnection {
self.stream.wait_until_ready().boxed()
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
}

#[doc(hidden)]
fn get_ref(&self) -> &MssqlConnection {
self
Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/mysql/connection/mod.rs
Expand Up @@ -75,6 +75,11 @@ impl Connection for MySqlConnection {
self.stream.wait_until_ready().boxed()
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
}

#[doc(hidden)]
fn get_ref(&self) -> &Self {
self
Expand Down
38 changes: 25 additions & 13 deletions sqlx-core/src/pool/connection.rs
Expand Up @@ -78,6 +78,11 @@ impl<DB: Database> Connection for PoolConnection<DB> {
self.get_mut().flush()
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
self.get_ref().should_flush()
}

#[doc(hidden)]
fn get_ref(&self) -> &DB::Connection {
self.deref().get_ref()
Expand All @@ -94,19 +99,26 @@ impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(mut live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
// flush the connection (will immediately return if not needed) before
// we fully release to the pool
if let Err(e) = live.raw.flush().await {
log::error!("error occurred while flushing the connection: {}", e);

// we now consider the connection to be broken
// close the connection and drop from the pool
let _ = live.float(&pool).into_idle().close().await;
} else {
pool.release(live.float(&pool));
}
});

if live.raw.should_flush() {
spawn(async move {
// flush the connection (will immediately return if not needed) before
// we fully release to the pool
if let Err(e) = live.raw.flush().await {
log::error!("error occurred while flushing the connection: {}", e);

// we now consider the connection to be broken
// close the connection and drop from the pool
let _ = live.float(&pool).into_idle().close().await;
} else {
// after we have flushed successfully, release to the pool
pool.release(live.float(&pool));
}
});
} else {
// nothing to flush, release immediately outside of a spawn
pool.release(live.float(&pool));
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/postgres/connection/mod.rs
Expand Up @@ -124,6 +124,11 @@ impl Connection for PgConnection {
self.wait_until_ready().boxed()
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
}

#[doc(hidden)]
fn get_ref(&self) -> &Self {
self
Expand Down
6 changes: 6 additions & 0 deletions sqlx-core/src/sqlite/connection/mod.rs
Expand Up @@ -52,11 +52,17 @@ impl Connection for SqliteConnection {
Box::pin(future::ok(()))
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
// For SQLite, FLUSH does effectively nothing
Box::pin(future::ok(()))
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
false
}

#[doc(hidden)]
fn get_ref(&self) -> &Self {
self
Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/transaction.rs
Expand Up @@ -127,6 +127,11 @@ where
self.get_mut().flush()
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
self.get_ref().should_flush()
}

#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection {
self.connection.get_ref()
Expand Down

0 comments on commit 4448c0e

Please sign in to comment.