Skip to content

Commit

Permalink
Merge pull request #454 from mcanaves/master
Browse files Browse the repository at this point in the history
Added parallel migrations. Thank you @mcanaves
  • Loading branch information
bernardopires committed Apr 23, 2017
2 parents 71019cd + 73fdfdf commit a4fb6cd
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 45 deletions.
25 changes: 13 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
sudo: false
language: python
python:
- 2.7
- 3.5
- 2.7
- 3.5
services:
- postgresql
- postgresql
addons:
postgresql: '9.4'
install:
- pip install tox
before_script:
- psql -c "CREATE DATABASE dts_test_project;" -U postgres
script:
- tox -e py${TRAVIS_PYTHON_VERSION/./}-dj${DJANGO/./}
install: pip install -q tox-travis
env:
- DJANGO=1.8
- DJANGO=1.10
- DJANGO=1.11
- DJANGO=1.8
- DJANGO=1.9
- DJANGO=1.10
- DJANGO=1.11
matrix:
fast_finish: true
script: tox
before_script: psql -c "CREATE DATABASE dts_test_project;" -U postgres
deploy:
provider: pypi
user: bcarneiro
Expand Down
2 changes: 0 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# All configuration values have a default; values that are commented out
# serve to show the default.

import sys
import os
import datetime

# If extensions (or modules to document with autodoc) are in another directory,
Expand Down
20 changes: 20 additions & 0 deletions docs/use.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ The options given to ``migrate_schemas`` are also passed to every ``migrate``. H
``migrate_schemas`` raises an exception when an tenant schema is missing.

migrate_schemas in parallel
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Once the number of tenants grow, migrating all the tenants can become a bottleneck. To speed up this process, you can run tenant migrations in parallel like this:

.. code-block:: bash
python manage.py migrate_schemas --executor=parallel
In fact, you can write your own executor which will run tenant migrations in
any way you want, just take a look at ``tenant_schemas/migration_executors``.

The ``parallel`` executor accepts the following settings:

* ``TENANT_PARALLEL_MIGRATION_MAX_PROCESSES`` (default: 2) - maximum number of
processes for migration pool (this is to avoid exhausting the database
connection pool)
* ``TENANT_PARALLEL_MIGRATION_CHUNKS`` (default: 2) - number of migrations to be
sent at once to every worker

tenant_command
~~~~~~~~~~~~~~

Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

from os.path import exists

from version import get_git_version

try:
Expand All @@ -15,6 +16,7 @@
author_email='carneiro.be@gmail.com',
packages=[
'tenant_schemas',
'tenant_schemas.migration_executors',
'tenant_schemas.postgresql_backend',
'tenant_schemas.management',
'tenant_schemas.management.commands',
Expand Down
3 changes: 3 additions & 0 deletions tenant_schemas/management/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,14 @@ def add_arguments(self, parser):
help=('Database state will be brought to the state after that '
'migration. Use the name "zero" to unapply all migrations.'))
parser.add_argument("-s", "--schema", dest="schema_name")
parser.add_argument('--executor', action='store', dest='executor', default=None,
help='Executor for running migrations [standard (default)|parallel]')

def handle(self, *args, **options):
self.sync_tenant = options.get('tenant')
self.sync_public = options.get('shared')
self.schema_name = options.get('schema_name')
self.executor = options.get('executor')
self.installed_apps = settings.INSTALLED_APPS
self.args = args
self.options = options
Expand Down
33 changes: 9 additions & 24 deletions tenant_schemas/management/commands/migrate_schemas.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import django

from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import connection

from tenant_schemas.management.commands import SyncCommon
from tenant_schemas.utils import get_tenant_model, get_public_schema_name, schema_exists
from tenant_schemas.migration_executors import get_executor
from tenant_schemas.utils import get_public_schema_name, get_tenant_model, schema_exists

if django.VERSION >= (1, 9, 0):
from django.db.migrations.exceptions import MigrationSchemaMissing
Expand Down Expand Up @@ -34,35 +33,21 @@ def handle(self, *args, **options):
super(Command, self).handle(*args, **options)
self.PUBLIC_SCHEMA_NAME = get_public_schema_name()

executor = get_executor(codename=self.executor)(self.args, self.options)

if self.sync_public and not self.schema_name:
self.schema_name = self.PUBLIC_SCHEMA_NAME

if self.sync_public:
self.run_migrations(self.schema_name, settings.SHARED_APPS)
executor.run_migrations(tenants=[self.schema_name])
if self.sync_tenant:
if self.schema_name and self.schema_name != self.PUBLIC_SCHEMA_NAME:
if not schema_exists(self.schema_name):
raise MigrationSchemaMissing('Schema "{}" does not exist'.format(
self.schema_name))
else:
self.run_migrations(self.schema_name, settings.TENANT_APPS)
tenants = [self.schema_name]
else:
all_tenants = get_tenant_model().objects.exclude(schema_name=get_public_schema_name())
for tenant in all_tenants:
self.run_migrations(tenant.schema_name, settings.TENANT_APPS)

def run_migrations(self, schema_name, included_apps):
if int(self.options.get('verbosity', 1)) >= 1:
self._notice("=== Running migrate for schema %s" % schema_name)

if not schema_exists(schema_name):
raise MigrationSchemaMissing('Schema "{}" does not exist'.format(
schema_name))

connection.set_schema(schema_name)
command = MigrateCommand()
command.execute(*self.args, **self.options)
connection.set_schema_to_public()

def _notice(self, output):
self.stdout.write(self.style.NOTICE(output))
tenants = get_tenant_model().objects.exclude(schema_name=get_public_schema_name()).values_list(
'schema_name', flat=True)
executor.run_migrations(tenants=tenants)
15 changes: 15 additions & 0 deletions tenant_schemas/migration_executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

from tenant_schemas.migration_executors.base import MigrationExecutor
from tenant_schemas.migration_executors.parallel import ParallelExecutor
from tenant_schemas.migration_executors.standard import StandardExecutor


def get_executor(codename=None):
codename = codename or os.environ.get('EXECUTOR', StandardExecutor.codename)

for klass in MigrationExecutor.__subclasses__():
if klass.codename == codename:
return klass

raise NotImplementedError('No executor with codename %s' % codename)
64 changes: 64 additions & 0 deletions tenant_schemas/migration_executors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import sys

from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import transaction

from tenant_schemas.utils import get_public_schema_name


def run_migrations(args, options, executor_codename, schema_name, allow_atomic=True):
from django.core.management import color
from django.core.management.base import OutputWrapper
from django.db import connection

style = color.color_style()

def style_func(msg):
return '[%s:%s] %s' % (
style.NOTICE(executor_codename),
style.NOTICE(schema_name),
msg
)

stdout = OutputWrapper(sys.stdout)
stdout.style_func = style_func
stderr = OutputWrapper(sys.stderr)
stderr.style_func = style_func
if int(options.get('verbosity', 1)) >= 1:
stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name))

connection.set_schema(schema_name)
MigrateCommand(stdout=stdout, stderr=stderr).execute(*args, **options)

try:
transaction.commit()
connection.close()
connection.connection = None
except transaction.TransactionManagementError:
if not allow_atomic:
raise

# We are in atomic transaction, don't close connections
pass

connection.set_schema_to_public()


class MigrationExecutor(object):
codename = None

def __init__(self, args, options):
self.args = args
self.options = options

def run_migrations(self, tenants):
public_schema_name = get_public_schema_name()

if public_schema_name in tenants:
run_migrations(self.args, self.options, self.codename, public_schema_name)
tenants.pop(tenants.index(public_schema_name))

self.run_tenant_migrations(tenants)

def run_tenant_migrations(self, tenant):
raise NotImplementedError
30 changes: 30 additions & 0 deletions tenant_schemas/migration_executors/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import functools
import multiprocessing

from django.conf import settings

from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations


class ParallelExecutor(MigrationExecutor):
codename = 'parallel'

def run_tenant_migrations(self, tenants):
if tenants:
processes = getattr(settings, 'TENANT_PARALLEL_MIGRATION_MAX_PROCESSES', 2)
chunks = getattr(settings, 'TENANT_PARALLEL_MIGRATION_CHUNKS', 2)

from django.db import connection

connection.close()
connection.connection = None

run_migrations_p = functools.partial(
run_migrations,
self.args,
self.options,
self.codename,
allow_atomic=False
)
p = multiprocessing.Pool(processes=processes)
p.map(run_migrations_p, tenants, chunks)
9 changes: 9 additions & 0 deletions tenant_schemas/migration_executors/standard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations


class StandardExecutor(MigrationExecutor):
codename = 'standard'

def run_tenant_migrations(self, tenants):
for schema_name in tenants:
run_migrations(self.args, self.options, self.codename, schema_name)
6 changes: 3 additions & 3 deletions tenant_schemas/postgresql_backend/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import re
import warnings
import psycopg2

import django.db.utils
import psycopg2
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured, ValidationError
import django.db.utils

from tenant_schemas.utils import get_public_schema_name, get_limit_set_calls
from tenant_schemas.postgresql_backend.introspection import DatabaseSchemaIntrospection
from tenant_schemas.utils import get_limit_set_calls, get_public_schema_name

ORIGINAL_BACKEND = getattr(settings, 'ORIGINAL_BACKEND', 'django.db.backends.postgresql_psycopg2')
# Django 1.9+ takes care to rename the default backend to 'django.db.backends.postgresql'
Expand Down
5 changes: 2 additions & 3 deletions tenant_schemas/test/cases.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from django.core.management import call_command
from django.conf import settings
from django.core.management import call_command
from django.db import connection
from django.test import TestCase

from tenant_schemas.utils import get_public_schema_name
from tenant_schemas.utils import get_tenant_model
from tenant_schemas.utils import get_public_schema_name, get_tenant_model

ALLOWED_TEST_DOMAIN = '.test.com'

Expand Down
1 change: 1 addition & 0 deletions tenant_schemas/tests/testcases.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect

from django.conf import settings
from django.core.management import call_command
from django.db import connection
Expand Down
13 changes: 12 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
[tox]
envlist = py{27,35}-dj{18,110,111}
envlist = py{27,35}-dj{18,19,110,111}-{standard,parallel}

[travis:env]
DJANGO =
1.8: dj18-{standard,parallel}
1.9: dj19-{standard,parallel}
1.10: dj110-{standard,parallel}
1.11: dj111-{standard,parallel}

[testenv]
usedevelop = True
Expand All @@ -17,6 +24,10 @@ changedir = dts_test_project

passenv = PG_NAME PG_USER PG_PASSWORD PG_HOST PG_PORT

setenv =
standard: MIGRATION_EXECUTOR=standard
parallel: MIGRATION_EXECUTOR=parallel

commands =
coverage run manage.py test --noinput {posargs:tenant_schemas}
coverage report -m --include=../tenant_schemas/*

0 comments on commit a4fb6cd

Please sign in to comment.