Skip to content

Commit

Permalink
InnerConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
billy1624 committed Nov 16, 2022
1 parent 2cd1a56 commit bd33f56
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
10 changes: 9 additions & 1 deletion src/database/db_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ pub enum DatabaseConnection {
/// The same as a [DatabaseConnection]
pub type DbConn = DatabaseConnection;

impl Default for DatabaseConnection {
fn default() -> Self {
Self::Disconnected
}
}

/// The type of database backend for real world databases.
/// This is enabled by feature flags as specified in the crate documentation
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand All @@ -50,6 +56,7 @@ pub enum DatabaseBackend {

/// The same as [DatabaseBackend] just shorter :)
pub type DbBackend = DatabaseBackend;

#[derive(Debug)]
pub(crate) enum InnerConnection {
#[cfg(feature = "sqlx-mysql")]
Expand All @@ -60,9 +67,10 @@ pub(crate) enum InnerConnection {
Sqlite(PoolConnection<sqlx::Sqlite>),
#[cfg(feature = "mock")]
Mock(std::sync::Arc<crate::MockDatabaseConnection>),
Disconnected,
}

impl Default for DatabaseConnection {
impl Default for InnerConnection {
fn default() -> Self {
Self::Disconnected
}
Expand Down
60 changes: 33 additions & 27 deletions src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult,
Statement, StreamTrait, TransactionStream, TransactionTrait,
RuntimeErr, Statement, StreamTrait, TransactionStream, TransactionTrait,
};
#[cfg(feature = "sqlx-dep")]
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
Expand Down Expand Up @@ -81,7 +81,6 @@ impl DatabaseTransaction {
}

#[instrument(level = "trace", skip(metric_callback))]
#[allow(unreachable_code)]
async fn begin(
conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
Expand All @@ -98,23 +97,26 @@ impl DatabaseTransaction {
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "mock")]
InnerConnection::Mock(ref mut c) => c.begin()?,
}
InnerConnection::Mock(ref mut c) => c.begin(),
InnerConnection::Disconnected => {
Err(DbErr::Conn(RuntimeErr::Internal("Disconnected".to_owned())))
}
}?;
Ok(res)
}

Expand Down Expand Up @@ -143,62 +145,64 @@ impl DatabaseTransaction {

/// Commit a transaction atomically
#[instrument(level = "trace")]
#[allow(unreachable_code)]
pub async fn commit(mut self) -> Result<(), DbErr> {
self.open = false;
match *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "mock")]
InnerConnection::Mock(ref mut c) => c.commit()?,
InnerConnection::Mock(ref mut c) => c.commit(),
InnerConnection::Disconnected => {
Err(DbErr::Conn(RuntimeErr::Internal("Disconnected".to_owned())))
}
}
Ok(())
}

/// rolls back a transaction in case error are encountered during the operation
#[instrument(level = "trace")]
#[allow(unreachable_code)]
pub async fn rollback(mut self) -> Result<(), DbErr> {
self.open = false;
match *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "mock")]
InnerConnection::Mock(ref mut c) => c.rollback()?,
InnerConnection::Mock(ref mut c) => c.rollback(),
InnerConnection::Disconnected => {
Err(DbErr::Conn(RuntimeErr::Internal("Disconnected".to_owned())))
}
}
Ok(())
}

// the rollback is queued and will be performed on next async operation, like returning the connection to the pool
Expand All @@ -223,8 +227,7 @@ impl DatabaseTransaction {
InnerConnection::Mock(c) => {
c.rollback().unwrap();
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
InnerConnection::Disconnected => panic!("Disconnected"),
}
} else {
//this should never happen
Expand Down Expand Up @@ -290,8 +293,9 @@ impl ConnectionTrait for DatabaseTransaction {
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.execute(stmt),
#[allow(unreachable_patterns)]
_ => unreachable!(),
InnerConnection::Disconnected => {
Err(DbErr::Conn(RuntimeErr::Internal("Disconnected".to_owned())))
}
}
}

Expand Down Expand Up @@ -330,8 +334,9 @@ impl ConnectionTrait for DatabaseTransaction {
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.query_one(stmt),
#[allow(unreachable_patterns)]
_ => unreachable!(),
InnerConnection::Disconnected => {
Err(DbErr::Conn(RuntimeErr::Internal("Disconnected".to_owned())))
}
}
}

Expand Down Expand Up @@ -376,8 +381,9 @@ impl ConnectionTrait for DatabaseTransaction {
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.query_all(stmt),
#[allow(unreachable_patterns)]
_ => unreachable!(),
InnerConnection::Disconnected => {
Err(DbErr::Conn(RuntimeErr::Internal("Disconnected".to_owned())))
}
}
}
}
Expand Down

0 comments on commit bd33f56

Please sign in to comment.