From 5c4d846f62f868e8c14dcdcc1a905aef21db1a45 Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Wed, 10 Jan 2024 14:57:32 +0100 Subject: [PATCH 1/2] Update migration command to enable/disable transactions removed logging format added test Added --no-use-transaction option for migration --- beanie/executors/migrate.py | 16 +++++- beanie/migrations/runner.py | 83 +++++++++++++++++++++--------- docs/tutorial/migrations.md | 5 +- tests/migrations/test_free_fall.py | 23 +++++++++ 4 files changed, 101 insertions(+), 26 deletions(-) diff --git a/beanie/executors/migrate.py b/beanie/executors/migrate.py index b3c37509..2e6f62b4 100644 --- a/beanie/executors/migrate.py +++ b/beanie/executors/migrate.py @@ -53,6 +53,7 @@ def __init__(self, **kwargs): or self.get_from_toml("allow_index_dropping") or False ) + self.use_transaction = bool(kwargs.get("use_transaction")) @staticmethod def get_env_value(field_name) -> Any: @@ -111,7 +112,9 @@ async def run_migrate(settings: MigrationSettings): direction=settings.direction, distance=settings.distance ) await root.run( - mode=mode, allow_index_dropping=settings.allow_index_dropping + mode=mode, + allow_index_dropping=settings.allow_index_dropping, + use_transaction=settings.use_transaction, ) @@ -160,6 +163,15 @@ async def run_migrate(settings: MigrationSettings): default=False, help="if allow-index-dropping is set, Beanie will drop indexes from your collection", ) +@click.option( + "--use-transaction/--no-use-transaction", + required=False, + default=True, + help="Enable or disable the use of transactions during migration. " + "When enabled (--use-transaction), Beanie uses transactions for migration, " + "which necessitates a replica set. When disabled (--no-use-transaction), " + "migrations occur without transactions.", +) def migrate( direction, distance, @@ -167,6 +179,7 @@ def migrate( database_name, path, allow_index_dropping, + use_transaction, ): settings_kwargs = {} if direction: @@ -181,6 +194,7 @@ def migrate( settings_kwargs["path"] = path if allow_index_dropping: settings_kwargs["allow_index_dropping"] = allow_index_dropping + settings_kwargs["use_transaction"] = use_transaction settings = MigrationSettings(**settings_kwargs) asyncio.run(run_migrate(settings)) diff --git a/beanie/migrations/runner.py b/beanie/migrations/runner.py index ec0f5815..62d4ff95 100644 --- a/beanie/migrations/runner.py +++ b/beanie/migrations/runner.py @@ -1,7 +1,9 @@ import logging from importlib.machinery import SourceFileLoader from pathlib import Path -from typing import Optional, Type +from typing import List, Optional, Type + +from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorDatabase from beanie.migrations.controllers.iterative import BaseMigrationController from beanie.migrations.database import DBHandler @@ -55,7 +57,12 @@ async def update_current_migration(self): await self.clean_current_migration() await MigrationLog(is_current=True, name=self.name).insert() - async def run(self, mode: RunningMode, allow_index_dropping: bool): + async def run( + self, + mode: RunningMode, + allow_index_dropping: bool, + use_transaction: bool, + ): """ Migrate @@ -71,7 +78,8 @@ async def run(self, mode: RunningMode, allow_index_dropping: bool): logger.info("Running migrations forward without limit") while True: await migration_node.run_forward( - allow_index_dropping=allow_index_dropping + allow_index_dropping=allow_index_dropping, + use_transaction=use_transaction, ) migration_node = migration_node.next_migration if migration_node is None: @@ -80,7 +88,8 @@ async def run(self, mode: RunningMode, allow_index_dropping: bool): logger.info(f"Running {mode.distance} migrations forward") for i in range(mode.distance): await migration_node.run_forward( - allow_index_dropping=allow_index_dropping + allow_index_dropping=allow_index_dropping, + use_transaction=use_transaction, ) migration_node = migration_node.next_migration if migration_node is None: @@ -91,7 +100,8 @@ async def run(self, mode: RunningMode, allow_index_dropping: bool): logger.info("Running migrations backward without limit") while True: await migration_node.run_backward( - allow_index_dropping=allow_index_dropping + allow_index_dropping=allow_index_dropping, + use_transaction=use_transaction, ) migration_node = migration_node.prev_migration if migration_node is None: @@ -100,30 +110,41 @@ async def run(self, mode: RunningMode, allow_index_dropping: bool): logger.info(f"Running {mode.distance} migrations backward") for i in range(mode.distance): await migration_node.run_backward( - allow_index_dropping=allow_index_dropping + allow_index_dropping=allow_index_dropping, + use_transaction=use_transaction, ) migration_node = migration_node.prev_migration if migration_node is None: break - async def run_forward(self, allow_index_dropping): + async def run_forward( + self, allow_index_dropping: bool, use_transaction: bool + ): if self.forward_class is not None: await self.run_migration_class( - self.forward_class, allow_index_dropping=allow_index_dropping + self.forward_class, + allow_index_dropping=allow_index_dropping, + use_transaction=use_transaction, ) await self.update_current_migration() - async def run_backward(self, allow_index_dropping): + async def run_backward( + self, allow_index_dropping: bool, use_transaction: bool + ): if self.backward_class is not None: await self.run_migration_class( - self.backward_class, allow_index_dropping=allow_index_dropping + self.backward_class, + allow_index_dropping=allow_index_dropping, + use_transaction=use_transaction, ) if self.prev_migration is not None: await self.prev_migration.update_current_migration() else: await self.clean_current_migration() - async def run_migration_class(self, cls: Type, allow_index_dropping: bool): + async def run_migration_class( + self, cls: Type, allow_index_dropping: bool, use_transaction: bool + ): """ Run Backward or Forward migration class @@ -142,19 +163,35 @@ async def run_migration_class(self, cls: Type, allow_index_dropping: bool): if client is None: raise RuntimeError("client must not be None") async with await client.start_session() as s: - async with s.start_transaction(): - for migration in migrations: - for model in migration.models: - await init_beanie( - database=db, - document_models=[model], # type: ignore - allow_index_dropping=allow_index_dropping, - ) # TODO this is slow - logger.info( - f"Running migration {migration.function.__name__} " - f"from module {self.name}" + if use_transaction: + async with s.start_transaction(): + await self.run_migrations( + migrations, db, allow_index_dropping, s ) - await migration.run(session=s) + else: + await self.run_migrations( + migrations, db, allow_index_dropping, s + ) + + async def run_migrations( + self, + migrations: List[BaseMigrationController], + db: AsyncIOMotorDatabase, + allow_index_dropping: bool, + session: AsyncIOMotorClientSession, + ) -> None: + for migration in migrations: + for model in migration.models: + await init_beanie( + database=db, + document_models=[model], # type: ignore + allow_index_dropping=allow_index_dropping, + ) # TODO this is slow + logger.info( + f"Running migration {migration.function.__name__} " + f"from module {self.name}" + ) + await migration.run(session=session) @classmethod async def build(cls, path: Path): diff --git a/docs/tutorial/migrations.md b/docs/tutorial/migrations.md index aa03be04..1085cf28 100644 --- a/docs/tutorial/migrations.md +++ b/docs/tutorial/migrations.md @@ -1,6 +1,5 @@ ## Attention! -Migrations use transactions inside. They work only with **MongoDB replica sets** ## Create @@ -17,6 +16,8 @@ Each one contains instructions to roll migration respectively forward and backwa ## Run +**Attention**: By default, migrations use transactions. This approach only works with **MongoDB replica sets**. If you prefer to run migrations without transactions, pass the `--no-use-transaction` flag to the `migrate` command. However, be aware that this approach is risky, as there is no way to roll back migrations without transactions. + To roll one forward migration, run: ```shell @@ -26,7 +27,7 @@ beanie migrate -uri 'mongodb+srv://user:pass@host/db' -p relative/path/to/migrat To roll all forward migrations, run: ```shell -beanie migrate -uri 'mongodb+srv://user:pass@host/db' -p relative/path/to/migrations/directory/ +beanie migrate -uri 'mongodb://user:pass@host' -db db -p relative/path/to/migrations/directory/ ``` To roll one backward migration, run: diff --git a/tests/migrations/test_free_fall.py b/tests/migrations/test_free_fall.py index 2b03c4f3..0419471a 100644 --- a/tests/migrations/test_free_fall.py +++ b/tests/migrations/test_free_fall.py @@ -65,3 +65,26 @@ async def test_migration_free_fall(settings, notes, db): assert inspection.status == InspectionStatuses.OK note = await OldNote.find_one({}) assert note.name == "0" + + +async def test_migration_free_fall_no_use_transactions(settings, notes, db): + migration_settings = MigrationSettings( + connection_uri=settings.mongodb_dsn, + database_name=settings.mongodb_db_name, + path="tests/migrations/migrations_for_test/free_fall", + use_transaction=False, + ) + await run_migrate(migration_settings) + + await init_beanie(database=db, document_models=[Note]) + inspection = await Note.inspect_collection() + assert inspection.status == InspectionStatuses.OK + note = await Note.find_one({}) + assert note.title == "0" + + migration_settings.direction = RunningDirections.BACKWARD + await run_migrate(migration_settings) + inspection = await OldNote.inspect_collection() + assert inspection.status == InspectionStatuses.OK + note = await OldNote.find_one({}) + assert note.name == "0" From 887e0bea1345bcfc0c553165790477c26e9dede7 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 22 Jan 2024 11:12:54 -0600 Subject: [PATCH 2/2] fix mypy --- beanie/migrations/controllers/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/beanie/migrations/controllers/base.py b/beanie/migrations/controllers/base.py index 977da7fd..c49852f0 100644 --- a/beanie/migrations/controllers/base.py +++ b/beanie/migrations/controllers/base.py @@ -5,6 +5,9 @@ class BaseMigrationController(ABC): + def __init__(self, function): + self.function = function + @abstractmethod async def run(self, session): pass