Skip to content

Commit

Permalink
gevent: Adds autoscaler support to gevent pool. Closes #599.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Lavin authored and ask committed Feb 1, 2012
1 parent 3836dd8 commit 104fd94
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
13 changes: 13 additions & 0 deletions celery/concurrency/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,16 @@ def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
return self._pool.spawn(apply_target, target, args, kwargs,
callback, accept_callback)
def grow(self, n=1):
self._pool._semaphore.counter += n
self._pool.size += n
return None

def shrink(self, n=1):
self._pool._semaphore.counter -= n
self._pool.size -= n
return None

@property
def num_processes(self):
return len(self._pool)
53 changes: 53 additions & 0 deletions celery/tests/test_concurrency/test_concurrency_gevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from __future__ import absolute_import

from nose import SkipTest

from celery.concurrency.gevent import TaskPool
from celery.tests.utils import unittest


class GeventCase(unittest.TestCase):

def setUp(self):
try:
self.gevent = __import__("gevent")
except ImportError:
raise SkipTest(
"gevent not installed, skipping related tests.")


class test_TaskPool(GeventCase):

def test_grow(self):
pool = TaskPool(10)
pool.start()
self.assertEqual(pool._pool.size, 10)
pool.grow()
self.assertEqual(pool._pool.size, 11)

def test_grow_many(self):
pool = TaskPool(10)
pool.start()
self.assertEqual(pool._pool.size, 10)
pool.grow(2)
self.assertEqual(pool._pool.size, 12)

def test_shrink(self):
pool = TaskPool(10)
pool.start()
self.assertEqual(pool._pool.size, 10)
pool.shrink()
self.assertEqual(pool._pool.size, 9)

def test_shrink_many(self):
pool = TaskPool(10)
pool.start()
self.assertEqual(pool._pool.size, 10)
pool.shrink(2)
self.assertEqual(pool._pool.size, 8)

def test_num_processes(self):
pool = TaskPool(10)
pool.start()
pool.apply_async(lambda x: x, (2, ), {})
self.assertEqual(pool.num_processes, 1)

0 comments on commit 104fd94

Please sign in to comment.