Skip to content

Commit

Permalink
Fix #35 - Propagate migration crashes to quorum parties.
Browse files Browse the repository at this point in the history
This ensures remote parties terminate immediately when the party that
attained pre-migration quorum crashes when applying migrations.

This is achieved by introducing a concept of quorum severance and implemented
in the cache backend by assigning a negative quorum value to the namespace
which can be detected by other parties.
  • Loading branch information
charettes committed Oct 6, 2023
1 parent 06eea31 commit a24f01e
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 27 deletions.
28 changes: 23 additions & 5 deletions syzygy/management/commands/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@

from syzygy.constants import Stage
from syzygy.plan import Plan, get_pre_deploy_plan, hash_plan
from syzygy.quorum import join_quorum, poll_quorum
from syzygy.quorum import (
QuorumDisolved,
join_quorum,
poll_quorum,
sever_quorum,
)


class PreDeployMigrationExecutor(MigrationExecutor):
Expand Down Expand Up @@ -189,7 +194,15 @@ def _handle_quorum(
self.stdout.write(
"Reached pre-migrate quorum, proceeding with planned migrations..."
)
yield True
try:
yield True
except: # noqa: E722
# Bare except clause to capture any form of termination.
self.stderr.write(
"Encountered exception while applying migrations, disovling quorum."
)
sever_quorum(post_namespace, quorum)
raise
if verbosity:
self.stdout.write("Waiting for post-migrate quorum...")
duration = self._join_or_poll_until_quorum(
Expand All @@ -207,9 +220,14 @@ def _handle_quorum(
if verbosity:
self.stdout.write(f"Reached pre-migrate quorum after {duration:.2f}s...")
self.stdout.write("Waiting for migrations to be applied by remote party...")
duration = self._join_or_poll_until_quorum(
post_namespace, quorum, quorum_timeout
)
try:
duration = self._join_or_poll_until_quorum(
post_namespace, quorum, quorum_timeout
)
except QuorumDisolved as exc:
raise CommandError(
"Error encountered by remote party while applying migration, aborting."
) from exc
if verbosity:
self.stdout.write(f"Reached post-migrate quorum after {duration:.2f}s...")
self.stdout.write("Migrations applied by remote party")
Expand Down
13 changes: 11 additions & 2 deletions syzygy/quorum/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
from django.core.exceptions import ImproperlyConfigured
from django.utils.module_loading import import_string

from .backends.base import QuorumBase
from .exceptions import QuorumDisolved

__all__ = ("join_quorum", "sever_quorum", "poll_quorum", "QuorumDisolved")


@lru_cache(maxsize=1)
def _get_quorum(backend_path, **backend_options):
def _get_quorum(backend_path, **backend_options) -> QuorumBase:
backend_cls = import_string(backend_path)
return backend_cls(**backend_options)


def _get_configured_quorum():
def _get_configured_quorum() -> QuorumBase:
try:
config: Union[dict, str] = settings.MIGRATION_QUORUM_BACKEND # type: ignore
except AttributeError:
Expand Down Expand Up @@ -50,5 +55,9 @@ def join_quorum(namespace: str, quorum: int) -> bool:
return _get_configured_quorum().join(namespace=namespace, quorum=quorum)


def sever_quorum(namespace: str, quorum: int) -> bool:
return _get_configured_quorum().sever(namespace=namespace, quorum=quorum)


def poll_quorum(namespace: str, quorum: int) -> bool:
return _get_configured_quorum().poll(namespace=namespace, quorum=quorum)
24 changes: 19 additions & 5 deletions syzygy/quorum/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,34 @@

class QuorumBase(ABC):
"""
An abstraction to allow multiple clusters to obtain quorum before
proceeding with deployment stage.
An abstraction to allow multiple parties to obtain quorum before
proceeding with a coordinated action.
For a particular `namespace` the `join` method must be called
`quorum` times and `poll` must be called ``quorum - 1`` times or
for each member that got returned `False` when calling `join`.
for each party that got returned `False` when calling `join`.
The `sever` method must be called instead of `join` by parties
that do not intend to participate in attaining `namespace`'s quorum.
It must result in other parties raising `QuorumDisolved` on their next
`poll` for that `namespace`.
"""

@abstractmethod
def join(self, namespace: str, quorum: int) -> bool:
"""Join the `namespace` and return whether or not `quorum` was reached."""
"""Join the `namespace` and return whether or not `quorum` was attained."""
...

@abstractmethod
def sever(self, namespace: str, quorum: int):
"""Sever the `namespace`'s quorum attainment process."""

@abstractmethod
def poll(self, namespace: str, quorum: int) -> bool:
"""Return whether or not `namespace`'s `quorum` was reached."""
"""
Return whether or not `namespace`'s `quorum` was reached.
Raise `QuorumDisolved` if the quorom attainment process was
severed.
"""
...
12 changes: 10 additions & 2 deletions syzygy/quorum/backends/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.core.cache import DEFAULT_CACHE_ALIAS, caches

from ..exceptions import QuorumDisolved
from .base import QuorumBase


Expand Down Expand Up @@ -30,7 +31,6 @@ def _clear(self, namespace: str):
)

def join(self, namespace: str, quorum: int) -> bool:
"""Join the `namespace` and return whether or not `quorum` was reached."""
namespace_key, clear_namespace_key = self._get_namespace_keys(namespace)
self.cache.add(namespace_key, 0, timeout=self.timeout, version=self.version)
self.cache.add(
Expand All @@ -44,12 +44,20 @@ def join(self, namespace: str, quorum: int) -> bool:
return True
return False

def sever(self, namespace: str, quorum: int):
namespace_key, _ = self._get_namespace_keys(namespace)
self.cache.add(namespace_key, 0, timeout=self.timeout, version=self.version)
self.cache.decr(namespace_key, quorum, version=self.version)

def poll(self, namespace: str, quorum: int) -> bool:
"""Return whether or not `namespace`'s `quorum` was reached."""
namespace_key, clear_namespace_key = self._get_namespace_keys(namespace)
current = self.cache.get(namespace_key, version=self.version)
if current == quorum:
if self.cache.decr(clear_namespace_key, version=self.version) == 0:
self._clear(namespace)
return True
elif current <= 0:
if self.cache.decr(clear_namespace_key, version=self.version) == 0:
self._clear(namespace)
raise QuorumDisolved
return False
2 changes: 2 additions & 0 deletions syzygy/quorum/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class QuorumDisolved(Exception):
pass
52 changes: 52 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ def _call_command_thread(self, options):
connections.close_all()
return stdout

def _call_failing_command_thread(self, options):
stdout = StringIO()
stderr = StringIO()
with self.assertRaises(Exception) as exc:
call_command(
"migrate",
"tests",
no_color=True,
stdout=stdout,
stderr=stderr,
**options,
)
connections.close_all()
return exc.exception, stdout.getvalue(), stderr.getvalue()

def call_command_with_quorum(self, stage, quorum=3):
with mock.patch("time.sleep"), ThreadPool(processes=quorum) as pool:
stdouts = pool.map_async(
Expand Down Expand Up @@ -127,6 +142,43 @@ def test_quorum_timeout(self):
with self.assertRaisesMessage(RuntimeError, msg):
self.call_command(quorum=2, quorum_timeout=1)

@override_settings(MIGRATION_MODULES={"tests": "tests.test_migrations.crash"})
def test_quorum_severed(self):
quorum = 3
stage = Stage.PRE_DEPLOY
with mock.patch("time.sleep"), ThreadPool(processes=quorum) as pool:
results = pool.map_async(
self._call_failing_command_thread,
[{"stage": stage, "quorum": quorum}] * quorum,
).get()
severed = 0
severer = 0
for exc, stdout, stderr in results:
if (
isinstance(exc, CommandError)
and str(exc)
== "Error encountered by remote party while applying migration, aborting."
):
self.assertIn(
"Waiting for migrations to be applied by remote party...", stdout
)
self.assertEqual(stderr, "")
severed += 1
elif str(exc) == "Test crash":
self.assertIn(
"Reached pre-migrate quorum, proceeding with planned migrations...",
stdout,
)
self.assertIn(
"Encountered exception while applying migrations, disovling quorum",
stderr,
)
severer += 1
else:
self.fail(f"Unexpected exception: {exc}")
self.assertEqual(severed, quorum - 1)
self.assertEqual(severer, 1)


class MakeMigrationsTests(TestCase):
def test_disabled(self):
Expand Down
15 changes: 15 additions & 0 deletions tests/test_migrations/crash/0001_pre_deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from django.db import migrations

from syzygy import Stage


def crash(*args, **kwargs):
raise Exception("Test crash")


class Migration(migrations.Migration):
initial = True
stage = Stage.PRE_DEPLOY
atomic = False

operations = [migrations.RunPython(crash)]
Empty file.
73 changes: 60 additions & 13 deletions tests/test_quorum.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import time
import uuid
from multiprocessing.pool import ThreadPool
from random import shuffle

from django.core.exceptions import ImproperlyConfigured
from django.test.testcases import SimpleTestCase
from django.test.utils import override_settings

from syzygy.quorum import join_quorum, poll_quorum
from syzygy.quorum import (
QuorumDisolved,
join_quorum,
poll_quorum,
sever_quorum,
)


class QuorumConfigurationTests(SimpleTestCase):
Expand Down Expand Up @@ -49,35 +55,76 @@ def test_cannot_initialize_backend(self):


class BaseQuorumTestMixin:
quorum = 5

def test_multiple(self):
namespace = str(uuid.uuid4())
self.assertFalse(join_quorum(namespace, 2))
self.assertFalse(poll_quorum(namespace, 2))
self.assertTrue(join_quorum(namespace, 2))
self.assertTrue(poll_quorum(namespace, 2))

def test_thread_safety(self):
quorum = 5
@classmethod
def attain_quorum(cls, namespace):
if join_quorum(namespace, cls.quorum):
return True
while not poll_quorum(namespace, cls.quorum):
time.sleep(0.01)
return False

def achieve_quorum(namespace):
if join_quorum(namespace, quorum):
return True
while not poll_quorum(namespace, quorum):
time.sleep(0.01)
return False
def test_attainment(self):
namespace = str(uuid.uuid4())

with ThreadPool(processes=quorum) as pool:
results = pool.map_async(achieve_quorum, [str(uuid.uuid4())] * quorum).get()
with ThreadPool(processes=self.quorum) as pool:
results = pool.map_async(
self.attain_quorum, [namespace] * self.quorum
).get()

self.assertEqual(sum(1 for result in results if result is True), 1)
self.assertEqual(sum(1 for result in results if result is False), 4)

def test_namespace_reuse(self):
def test_attainment_namespace_reuse(self):
namespace = str(uuid.uuid4())
self.assertFalse(join_quorum(namespace, 2))
self.assertTrue(join_quorum(namespace, 2))
self.assertTrue(poll_quorum(namespace, 2))
# Once quorum is reached its associated is immediately cleared and reusable.
# Once quorum is reached its associated namespace is immediately
# cleared to make it reusable.
self.assertFalse(join_quorum(namespace, 2))
self.assertTrue(join_quorum(namespace, 2))
self.assertTrue(poll_quorum(namespace, 2))

def test_disolution(self):
namespace = str(uuid.uuid4())

calls = [(self.attain_quorum, (namespace,))] * (self.quorum - 1)
calls.append((sever_quorum, (namespace, self.quorum)))
shuffle(calls)

with ThreadPool(processes=self.quorum) as pool:
results = [pool.apply_async(func, args) for func, args in calls]
pool.close()
pool.join()

disolved = 0
for result in results:
try:
attained = result.get()
except QuorumDisolved:
disolved += 1
else:
if attained is not None:
self.fail(f"Unexpected quorum attainment: {attained}")
self.assertEqual(disolved, self.quorum - 1)

def test_disolution_namespace_reuse(self):
namespace = str(uuid.uuid4())
self.assertFalse(join_quorum(namespace, 2))
sever_quorum(namespace, 2)
with self.assertRaises(QuorumDisolved):
poll_quorum(namespace, 2)
# Once quorum is disolved its associated namespace is immediately
# cleared to make it reusable.
self.assertFalse(join_quorum(namespace, 2))
self.assertTrue(join_quorum(namespace, 2))
self.assertTrue(poll_quorum(namespace, 2))
Expand Down

0 comments on commit a24f01e

Please sign in to comment.