Skip to content

Commit

Permalink
Added optional transactions (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
emilpriver committed Feb 10, 2024
1 parent e63144e commit 17d07e6
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 33 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ DROP TABLE Persons;

in the generated `.down.sql` file as this code would revert the creation of the table `Persons`

### Transactions

Geni defaults to always run in transactions but if you want to prevent usage of transactions, add `transaction: no` as the first line of the migration file.
Then Geni won't use transactions for the specific migration.
This works for both up and down

Example:

```sql
-- transaction:no
CREATE TABLE table_2 (
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```

### Running migration

Running migration can be done using
Expand Down
14 changes: 13 additions & 1 deletion src/lib/database_drivers/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl DatabaseDriver for LibSQLDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let queries = query
Expand All @@ -52,7 +53,18 @@ impl DatabaseDriver for LibSQLDriver {
.map(Statement::new)
.collect::<Vec<Statement>>();

self.db.batch(queries).await?;
if run_in_transaction {
self.db.batch(queries).await?;
} else {
for query in queries {
match self.db.execute(query).await {
Ok(_) => {}
Err(e) => {
bail!("{:?}", e);
}
}
}
}

Ok(())
};
Expand Down
23 changes: 14 additions & 9 deletions src/lib/database_drivers/maria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,23 @@ impl DatabaseDriver for MariaDBDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let mut tx = self.db.begin().await?;

match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
if run_in_transaction {
let mut tx = self.db.begin().await?;
match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
}
}
return Ok(());
} else {
self.db.execute(query).await?;
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/lib/database_drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub trait DatabaseDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>>;

// create database with the specific driver
Expand Down
23 changes: 14 additions & 9 deletions src/lib/database_drivers/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,23 @@ impl DatabaseDriver for MySQLDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let mut tx = self.db.begin().await?;

match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
if run_in_transaction {
let mut tx = self.db.begin().await?;
match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
}
}
return Ok(());
} else {
self.db.execute(query).await?;
}

Ok(())
Expand Down
23 changes: 14 additions & 9 deletions src/lib/database_drivers/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,23 @@ impl DatabaseDriver for PostgresDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let mut tx = self.db.begin().await?;

match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
if run_in_transaction {
let mut tx = self.db.begin().await?;
match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
}
}
return Ok(());
} else {
self.db.execute(query).await?;
}

Ok(())
Expand Down
21 changes: 19 additions & 2 deletions src/lib/database_drivers/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::database_drivers::DatabaseDriver;
use anyhow::Result;
use anyhow::{bail, Result};
use libsql_client::{de, local::Client};
use std::fs::{self, File};
use std::future::Future;
Expand Down Expand Up @@ -48,15 +48,32 @@ impl DatabaseDriver for SqliteDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
if run_in_transaction {
self.db.execute("BEGIN;")?;
}

let queries = query
.split(';')
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<&str>>();
for query in queries {
self.db.execute(query)?;
match self.db.execute(query) {
Ok(_) => {}
Err(e) => {
if run_in_transaction {
self.db.execute("ROLLBACK;")?;
}
bail!("{:?}", e);
}
}
}

if run_in_transaction {
self.db.execute("COMMIT;")?;
}
Ok(())
};
Expand Down
8 changes: 5 additions & 3 deletions src/lib/migrate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::database_drivers;
use crate::utils::{get_local_migrations, read_file_content};
use crate::{database_drivers, utils};
use anyhow::{bail, Result};
use log::info;
use std::path::PathBuf;
Expand Down Expand Up @@ -55,8 +55,9 @@ pub async fn up(
if !migrations.contains(&id) {
info!("Running migration {}", id);
let query = read_file_content(&f.1);
let run_in_transaction = utils::should_run_in_transaction(&query);

database.execute(&query).await?;
database.execute(&query, run_in_transaction).await?;

database.insert_schema_migration(&id).await?;
}
Expand Down Expand Up @@ -124,8 +125,9 @@ pub async fn down(
Some(f) => {
info!("Running rollback for {}", migration);
let query = read_file_content(&f.1);
let run_in_transaction = utils::should_run_in_transaction(&query);

database.execute(&query).await?;
database.execute(&query, run_in_transaction).await?;

database
.remove_schema_migration(migration.to_string().as_str())
Expand Down
48 changes: 48 additions & 0 deletions src/lib/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,51 @@ pub fn get_local_migrations(folder: &PathBuf, ending: &str) -> Result<Vec<(i64,
pub fn read_file_content(path: &PathBuf) -> String {
fs::read_to_string(path).unwrap()
}

pub fn should_run_in_transaction(query: &str) -> bool {
let first_line = query.split_once('\n').unwrap_or(("", "")).0;

if first_line.contains("transaction: no") {
return false;
}

if first_line.contains("transaction:no") {
return false;
}

return true;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_without_transaction_no_in_first_line() {
let query = "something else\ntransaction: no";
assert_eq!(should_run_in_transaction(query), true);
}

#[test]
fn test_with_empty_line() {
let query = "";
assert_eq!(should_run_in_transaction(query), true);
}

#[test]
fn test_with_transaction_yes_in_first_line() {
let query = "transaction: yes\nSELECT * FROM users";
assert_eq!(should_run_in_transaction(query), true);
}

#[test]
fn test_with_transaction_no_in_first_line() {
let query = "transaction: no\nSELECT * FROM users";
assert_eq!(should_run_in_transaction(query), false);
}
#[test]
fn test_with_transaction_no_in_first_line_without_space() {
let query = "transaction:no\nSELECT * FROM users";
assert_eq!(should_run_in_transaction(query), false);
}
}

0 comments on commit 17d07e6

Please sign in to comment.