Skip to content

Commit

Permalink
its: Refactored to extract migrations for command database
Browse files Browse the repository at this point in the history
  • Loading branch information
bouzuya committed Feb 23, 2022
1 parent e9b43d7 commit 95075ab
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 34 deletions.
2 changes: 1 addition & 1 deletion its/crates/adapter_sqlite/Cargo.toml
Expand Up @@ -11,7 +11,7 @@ domain = { path = "../domain" }
limited-date-time = { git = "https://github.com/bouzuya/rust-limited-date-time", tag = "0.17.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.74"
sqlx = { version = "0.5.10", features = ["any", "runtime-tokio-rustls", "sqlite"] }
sqlx = { version = "0.5.10", features = ["any", "migrate", "runtime-tokio-rustls", "sqlite"] }
thiserror = "1.0.30"
tokio = { version = "1.15.0", features = ["full"] }
ulid = "0.5.0"
Expand Down
1 change: 1 addition & 0 deletions its/crates/adapter_sqlite/src/adapter/sqlite.rs
@@ -1,3 +1,4 @@
mod command_migration_source;
pub mod event_dto;
mod event_store;
mod sqlite_issue_repository;
Expand Down
@@ -0,0 +1,45 @@
use std::{borrow::Cow, future::Future, pin::Pin};

use sqlx::{
error::BoxDynError,
migrate::{Migration, MigrationSource, MigrationType},
};

#[derive(Debug, Default)]
pub struct CommandMigrationSource {}

impl MigrationSource<'static> for CommandMigrationSource {
fn resolve(
self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Migration>, BoxDynError>> + Send + 'static>> {
Box::pin(async move {
let migrations = vec![
Migration::new(
20210223000001,
Cow::from("create aggregates"),
MigrationType::Simple,
Cow::from(include_str!(
"../../../sql/command/migrations/20220224000001_create_aggregates.sql"
)),
),
Migration::new(
20210223000002,
Cow::from("create events"),
MigrationType::Simple,
Cow::from(include_str!(
"../../../sql/command/migrations/20220224000002_create_events.sql"
)),
),
Migration::new(
20210223000003,
Cow::from("create issue_ids"),
MigrationType::Simple,
Cow::from(include_str!(
"../../../sql/command/migrations/20220224000003_create_issue_ids.sql"
)),
),
];
Ok(migrations)
})
}
}
21 changes: 7 additions & 14 deletions its/crates/adapter_sqlite/src/adapter/sqlite/event_store.rs
Expand Up @@ -121,18 +121,6 @@ pub async fn save(
Ok(())
}

pub async fn migrate(transaction: &mut Transaction<'_, Any>) -> Result<(), EventStoreError> {
sqlx::query(include_str!("../../../sql/command/create_aggregates.sql"))
.execute(&mut *transaction)
.await
.map_err(|_| EventStoreError::MigrateCreateAggregateTable)?;
sqlx::query(include_str!("../../../sql/command/create_events.sql"))
.execute(&mut *transaction)
.await
.map_err(|_| EventStoreError::MigrateCreateEventTable)?;
Ok(())
}

pub async fn find_aggregate_ids(
transaction: &mut Transaction<'_, Any>,
) -> Result<Vec<AggregateId>, EventStoreError> {
Expand Down Expand Up @@ -163,11 +151,14 @@ mod tests {
use anyhow::Context;
use sqlx::{
any::AnyConnectOptions,
migrate::Migrator,
sqlite::{SqliteConnectOptions, SqliteJournalMode},
AnyPool,
};
use tempfile::tempdir;

use crate::adapter::sqlite::command_migration_source::CommandMigrationSource;

use super::*;

#[tokio::test]
Expand All @@ -182,9 +173,11 @@ mod tests {
.journal_mode(SqliteJournalMode::Delete);
let options = AnyConnectOptions::from(options);
let pool = AnyPool::connect_with(options).await?;
let mut transaction = pool.begin().await?;

migrate(&mut transaction).await?;
let migrator = Migrator::new(CommandMigrationSource::default()).await?;
migrator.run(&pool).await?;

let mut transaction = pool.begin().await?;

let aggregates = find_aggregate_ids(&mut transaction).await?;
assert!(aggregates.is_empty());
Expand Down
Expand Up @@ -12,6 +12,7 @@ use domain::{

use sqlx::{
any::{AnyArguments, AnyConnectOptions, AnyRow},
migrate::Migrator,
query::Query,
sqlite::{SqliteConnectOptions, SqliteJournalMode},
Any, AnyPool, FromRow, Row, Transaction,
Expand All @@ -24,7 +25,10 @@ use crate::{
SqliteQueryHandler,
};

use super::event_store::{self, AggregateId};
use super::{
command_migration_source::CommandMigrationSource,
event_store::{self, AggregateId},
};

#[derive(Debug)]
pub struct SqliteIssueRepository {
Expand Down Expand Up @@ -84,26 +88,10 @@ impl SqliteIssueRepository {
.await
.map_err(|_| RepositoryError::IO)?;

// FIXME
let mut transaction = pool.begin().await.map_err(|_| RepositoryError::IO)?;
event_store::migrate(&mut transaction)
.await
.map_err(|_| RepositoryError::IO)?;
transaction
.commit()
.await
.map_err(|_| RepositoryError::IO)?;

// migrate
let mut transaction = pool.begin().await.map_err(|_| RepositoryError::IO)?;
sqlx::query(include_str!("../../../sql/command/create_issue_ids.sql"))
.execute(&mut transaction)
.await
.map_err(|_| RepositoryError::IO)?;
transaction
.commit()
let migrator = Migrator::new(CommandMigrationSource::default())
.await
.map_err(|_| RepositoryError::IO)?;
migrator.run(&pool).await.map_err(|_| RepositoryError::IO)?;

Ok(Self {
pool,
Expand Down

0 comments on commit 95075ab

Please sign in to comment.