Skip to content

Commit

Permalink
Merge e96dd24 into 44c3ba3
Browse files Browse the repository at this point in the history
  • Loading branch information
magmax committed Mar 2, 2014
2 parents 44c3ba3 + e96dd24 commit cc109ac
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 0 deletions.
5 changes: 5 additions & 0 deletions celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from .utils.imports import instantiate
from .utils.timeutils import humanize_seconds
from .utils.log import get_logger, iter_open_logger_fds
from .failover import ServiceFailover


__all__ = ['SchedulingError', 'ScheduleEntry', 'Scheduler',
'PersistentScheduler', 'Service', 'EmbeddedService']
Expand Down Expand Up @@ -452,8 +454,11 @@ def start(self, embedded_process=False):
signals.beat_embedded_init.send(sender=self)
platforms.set_process_title('celery beat')

failover = ServiceFailover(self.app, 'celery.beat.master')
try:
while not self._is_shutdown.is_set():
failover.wait_master()

interval = self.scheduler.tick()
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
Expand Down
58 changes: 58 additions & 0 deletions celery/failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
"""
celery.failover
~~~~~~~~~~~
Mutex for services.
"""
import time
import socket
import amqp

from .utils.log import get_logger

logger = get_logger(__name__)
debug, info, error, warning = (logger.debug, logger.info,
logger.error, logger.warning)


class ServiceFailover(object):
"""
Creates an exclusive mutex queue and stops other instances until
it can get the mutex queue. This way only one service executes
at a time.
"""
def __init__(self, app, name):
self.app = app
self.name = name

self._conn = None

@property
def connection(self):
if self._conn == None:
self._conn = self.app.connection()
return self._conn

def wait_master(self, timeout=1):
debug('Waiting for %s token', self.name)
while not self.is_master():
debug("Checking for failover in %s seconds", timeout)
time.sleep(timeout)
debug('%s service received', self.name)

def is_master(self):
try:
self._get_mutex()
return True
except (socket.error, amqp.exceptions.ConnectionError):
warning('Connection lost!')
except amqp.exceptions.ChannelError:
debug("Not the %s master", self.name)
return False

def _get_mutex(self):
return self.connection.SimpleQueue(
'%s.mutex' % self.name,
queue_opts={'exclusive': True})
68 changes: 68 additions & 0 deletions celery/tests/utils/test_failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from celery.tests.case import Case, Mock
from celery.failover import ServiceFailover
import socket
import amqp


class test_failover_checking_if_master(Case):
def setUp(self):
self.app = Mock(name='source')
self.failover = ServiceFailover(self.app, 'example')

def test_is_master(self):
connection = Mock()
connection.SimpleQueue = Mock()
self.app.connection = Mock(return_value=connection)

self.assertTrue(self.failover.is_master())

def test_is_not_master_socket_error(self):
connection = Mock()
connection.SimpleQueue = Mock(side_effect=socket.error('foo'))
self.app.connection = Mock(return_value=connection)

self.assertFalse(self.failover.is_master())

def test_is_not_master_connection_lost(self):
connection = Mock()
connection.SimpleQueue = Mock(side_effect=amqp.exceptions.ConnectionError('foo'))
self.app.connection = Mock(return_value=connection)

self.assertFalse(self.failover.is_master())

def test_is_not_master_channel_error(self):
connection = Mock()
connection.SimpleQueue = Mock(side_effect=amqp.exceptions.ChannelError('foo'))
self.app.connection = Mock(return_value=connection)

self.assertFalse(self.failover.is_master())


class test_failover_wait_for_mastery(Case):
def setUp(self):
self.app = Mock(name='source')
self.failover = ServiceFailover(self.app, 'example')
self.connection = Mock()
self.connection.SimpleQueue = Mock()
self.app.connection = Mock(return_value=self.connection)

def test_get_mastery_at_once(self):
self.connection.SimpleQueue.side_effect = [
self.connection,
Exception('Test failed: Should have already stop!'),
]

self.failover.wait_master(timeout=0)

self.assertEqual(1, self.connection.SimpleQueue.call_count)

def test_get_mastery_at_twice(self):
self.connection.SimpleQueue.side_effect = [
socket.error('foo'), # failure expected
self.connection,
Exception('Test failed: Should have already stop!'),
]

self.failover.wait_master(timeout=0)

self.assertEqual(2, self.connection.SimpleQueue.call_count)

0 comments on commit cc109ac

Please sign in to comment.