Skip to content

Commit

Permalink
Introduce "--parallel" argument
Browse files Browse the repository at this point in the history
When provided, upgrade scripts are executed in parallel for databases,
provided by backend.
  • Loading branch information
kedder committed Apr 23, 2020
1 parent adb2329 commit a846dec
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ CHANGELOG
1.3.2 (unreleased)
------------------

- Support migration of multiple databases in parallel (see `--parallel`
command line argument for command)

- Add support for Python-3.8

- Drop support for Python-2.7
Expand Down
20 changes: 18 additions & 2 deletions src/migrant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright 2014 by Shoobx, Inc.
#
###############################################################################
from typing import Dict
import os
import sys
import argparse
Expand Down Expand Up @@ -33,11 +34,13 @@ def cmd_new(args, cfg):
backend.on_new_script(revname)


def cmd_upgrade(args, cfg):
def cmd_upgrade(args, cfg: Dict[str, str]) -> None:
cfg = get_db_config(cfg, args.database)
repo = create_repo(cfg)
backend = create_backend(cfg)
engine = MigrantEngine(backend, repo, cfg, dry_run=args.dry_run)
engine = MigrantEngine(
backend, repo, cfg, dry_run=args.dry_run, processes=args.parallel,
)
engine.update(args.revision)


Expand Down Expand Up @@ -106,6 +109,19 @@ def cmd_status(args, cfg):
help=("Revision to upgrade to. If not specified, " "latest revision will be used"),
)

upgrade_parser.add_argument(
"-j",
"--parallel",
nargs="?",
type=int,
default=1,
help=(
"Migrate databases in parallel. If backend provides multiple databases, "
"migration for each of them will be performed in parallel. Concurrency "
"level is set by this argument."
),
)


# TEST options
test_parser = commands.add_parser(
Expand Down
18 changes: 11 additions & 7 deletions src/migrant/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Copyright 2014 by Shoobx, Inc.
#
###############################################################################
from typing import NewType, Dict, Iterable, List, Tuple
from typing import NewType, Dict, Iterable, List, Tuple, Generator
import logging
import multiprocessing
import functools
Expand All @@ -26,12 +26,14 @@ def __init__(
repository: Repository,
config: Dict[str, str],
dry_run: bool = False,
processes: int = None,
) -> None:
self.backend = backend
self.repository = repository
self.script_ids = ["INITIAL"] + repository.list_script_ids()
self.dry_run = dry_run
self.config = config
self.processes = processes or multiprocessing.cpu_count()

def status(self, target_id: str = None) -> int:
"""Return number of migration actions to be performed to
Expand All @@ -47,18 +49,20 @@ def status(self, target_id: str = None) -> int:
return total_actions

def _update(self, db: DB, target_id: str) -> None:
log.info("Starting migration for %s" % db)
log.info(f"Starting migration for {db}")
actions = self.calc_actions(db, target_id)
self.execute_actions(db, actions)
log.info("Migration completed for %s" % db)
log.info(f"Migration completed for {db}")

def update(self, target_id: str = None) -> None:
target_id = self.pick_rev_id(target_id)
conns = self.backend.generate_connections()
f = functools.partial(self._update, target_id=target_id)

with multiprocessing.Pool() as pool:
pool.map(f, self.initialized_dbs(conns))
with multiprocessing.Pool(self.processes) as pool:
# call all jobs by materialize result generator
for _ in pool.imap_unordered(f, self.initialized_dbs(conns)):
pass

def test(self, target_id: str = None) -> None:
target_id = self.pick_rev_id(target_id)
Expand All @@ -79,7 +83,7 @@ def test(self, target_id: str = None) -> None:

log.info("Testing completed for %s" % db)

def initialized_dbs(self, conns: Iterable[DB]):
def initialized_dbs(self, conns: Iterable[DB]) -> Generator[DB, None, None]:
for db in conns:
log.info("Preparing migrations for %s" % db)
migrations = self.backend.list_migrations(db)
Expand All @@ -104,7 +108,7 @@ def initialize_db(self, db: DB, initial_revid: str) -> None:
sid = script.name
self.backend.push_migration(db, sid)

log.info(f"Initializing migrations for {db}. Assuming database is at {sid}")
log.info(f"Initialized migrations for {db}. Assuming database is at {sid}")

def pick_rev_id(self, rev_id: str = None) -> str:
if rev_id is None:
Expand Down
15 changes: 10 additions & 5 deletions src/migrant/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def test_get_db_config(self):


class UpgradeTest(unittest.TestCase):

@pytest.fixture(autouse=True)
def backend_fixture(self, tmpdir, migrant_backend):
m = multiprocessing.Manager()
Expand Down Expand Up @@ -152,7 +151,8 @@ def test_initial_upgrade(self):
cli.dispatch(args, self.cfg)

self.assertEqual(
list(self.db0.migrations), ["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"]
list(self.db0.migrations),
["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"],
)

def test_subsequent_emtpy_upgrade(self):
Expand All @@ -162,7 +162,8 @@ def test_subsequent_emtpy_upgrade(self):
cli.dispatch(args, self.cfg)

self.assertEqual(
list(self.db0.migrations), ["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"]
list(self.db0.migrations),
["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"],
)

def test_upgrade_latest(self):
Expand All @@ -185,7 +186,9 @@ def test_upgrade_particular(self):
self.assertEqual(dict(self.db0.data), {"value": "b"})

def test_downgrade(self):
self.db0.migrations.extend(["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"])
self.db0.migrations.extend(
["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"]
)
self.db0.data.update({"hello": "world", "value": "c"})

args = cli.parser.parse_args(["test", "upgrade", "--revision", "aaaa_first"])
Expand All @@ -195,7 +198,9 @@ def test_downgrade(self):
self.assertEqual(dict(self.db0.data), {"value": "a"})

def test_downgrade_to_initial(self):
self.db0.migrations.extend(["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"])
self.db0.migrations.extend(
["INITIAL", "aaaa_first", "bbbb_second", "cccc_third"]
)
self.db0.data.update({"hello": "world", "value": "c"})

args = cli.parser.parse_args(["test", "upgrade", "--revision", "INITIAL"])
Expand Down
107 changes: 107 additions & 0 deletions src/migrant/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
# Copyright 2014 by Shoobx, Inc.
#
###############################################################################
from typing import List, Dict, Generator
import os
import unittest
import time

import mock

from migrant.engine import MigrantEngine
from migrant.backend import MigrantBackend
from migrant.repository import Script, Repository


class MigrantEngineTest(unittest.TestCase):
Expand Down Expand Up @@ -162,3 +167,105 @@ def _make_engine(migrations, scripts, log=None):

engine = MigrantEngine(backend, repository, {})
return engine


class MultiDbBackend(MigrantBackend):
def __init__(self, dbs: List[str]) -> None:
self._applied = {}
self.dbs = dbs
for db in dbs:
self._applied[db] = ["INITIAL"]

def list_migrations(self, db: str) -> List[str]:
return self._applied.get(db, [])

def push_migration(self, db, migration):
migrations = self._applied.setdefault(db, [])
migrations.append(migration)

def pop_migration(self, db, migration):
migrations = self._applied.setdefault(db, [])
migrations.remove(migration)

def generate_connections(self) -> Generator[str, None, None]:
"""Generate connections to process
"""
for db in self.dbs:
yield db


class TimedScript(Script):
def __init__(self, name: str, timemap: Dict[str, float], logfname: str) -> None:
self.name = name
self._timemap = timemap
self._logfname = logfname

def up(self, db):
tosleep = self._timemap.get(db, 0)
time.sleep(tosleep)
with open(self._logfname, "a") as f:
f.write(f"{db}: Upgraded to {self.name} ({tosleep}s)\n")


class MultiDbRepo(Repository):
def __init__(self, timemap: Dict[str, float], logfname: str) -> None:
self.timemap = timemap
self.logfname = logfname

def list_script_ids(self) -> List[str]:
return ["script1"]

def load_script(self, scriptid: str) -> Script:
return TimedScript(scriptid, self.timemap, self.logfname)


def test_concurrent_upgrade_multiprocess(tmp_path) -> None:
# GIVEN

# Prepare the log file. Since we are running scripts in
# multiprocessing environment, we cannot share memory data structure, like
# list, and let scripts write log to it. Instead, write to a share file.
logfname = os.path.join(tmp_path, "migration.log")
backend = MultiDbBackend(["db1", "db2", "db3"])
repository = MultiDbRepo({"db1": 0.1, "db2": 0.01, "db3": 0.05}, logfname)
engine = MigrantEngine(backend, repository, {}, processes=2)

# WHEN
engine.update()

# THEN
with open(logfname, "r") as f:
log = f.read().strip().split("\n")

# Longest task executed last
assert log == [
"db2: Upgraded to script1 (0.01s)",
"db3: Upgraded to script1 (0.05s)",
"db1: Upgraded to script1 (0.1s)",
]


def test_concurrent_upgrade_singleprocess(tmp_path) -> None:
# GIVEN

# Prepare the log file. Since we are running scripts in
# multiprocessing environment, we cannot share memory data structure, like
# list, and let scripts write log to it. Instead, write to a share file.
logfname = os.path.join(tmp_path, "migration.log")
backend = MultiDbBackend(["db1", "db2", "db3"])
repository = MultiDbRepo({"db1": 0.1, "db2": 0.01, "db3": 0.05}, logfname)
engine = MigrantEngine(backend, repository, {}, processes=1)

# WHEN
engine.update()

# THEN
with open(logfname, "r") as f:
log = f.read().strip().split("\n")

# First task executed first despite being the longest
assert log == [
"db1: Upgraded to script1 (0.1s)",
"db2: Upgraded to script1 (0.01s)",
"db3: Upgraded to script1 (0.05s)",
]

0 comments on commit a846dec

Please sign in to comment.