Skip to content

Commit

Permalink
Type check Semaphore, GreenPool arguments; Thanks to Matthew D. Pagel
Browse files Browse the repository at this point in the history
- export Event, *Semaphore in `eventlet.` top level namespace

#364
  • Loading branch information
temoto committed Dec 22, 2016
1 parent e1bb2fe commit 79292bd
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 88 deletions.
38 changes: 23 additions & 15 deletions eventlet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,44 @@
# errors of greenlet so that the packager can still at least
# access the version. Also this makes easy_install a little quieter
if os.environ.get('EVENTLET_IMPORT_VERSION_ONLY') != '1':
from eventlet import greenthread
from eventlet import convenience
from eventlet import event
from eventlet import greenpool
from eventlet import greenthread
from eventlet import patcher
from eventlet import queue
from eventlet import semaphore
from eventlet import timeout
from eventlet import patcher
from eventlet import convenience
import greenlet

connect = convenience.connect
listen = convenience.listen
serve = convenience.serve
StopServe = convenience.StopServe
wrap_ssl = convenience.wrap_ssl

Event = event.Event

GreenPool = greenpool.GreenPool
GreenPile = greenpool.GreenPile

sleep = greenthread.sleep
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n
spawn_after = greenthread.spawn_after
kill = greenthread.kill

Timeout = timeout.Timeout
with_timeout = timeout.with_timeout

GreenPool = greenpool.GreenPool
GreenPile = greenpool.GreenPile
import_patched = patcher.import_patched
monkey_patch = patcher.monkey_patch

Queue = queue.Queue

import_patched = patcher.import_patched
monkey_patch = patcher.monkey_patch
Semaphore = semaphore.Semaphore
CappedSemaphore = semaphore.CappedSemaphore
BoundedSemaphore = semaphore.BoundedSemaphore

connect = convenience.connect
listen = convenience.listen
serve = convenience.serve
StopServe = convenience.StopServe
wrap_ssl = convenience.wrap_ssl
Timeout = timeout.Timeout
with_timeout = timeout.with_timeout

getcurrent = greenlet.greenlet.getcurrent

Expand Down
38 changes: 22 additions & 16 deletions eventlet/greenpool.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import traceback

from eventlet import event
from eventlet import greenthread
import eventlet
from eventlet import queue
from eventlet import semaphore
from eventlet.support import greenlets as greenlet
from eventlet.support import six

Expand All @@ -17,10 +15,18 @@ class GreenPool(object):
"""

def __init__(self, size=1000):
try:
size = int(size)
except ValueError as e:
msg = 'GreenPool() expect size :: int, actual: {0} {1}'.format(type(size), str(e))
raise TypeError(msg)
if size < 0:
msg = 'GreenPool() expect size >= 0, actual: {0}'.format(repr(size))
raise ValueError(msg)
self.size = size
self.coroutines_running = set()
self.sem = semaphore.Semaphore(size)
self.no_coros_running = event.Event()
self.sem = eventlet.Semaphore(size)
self.no_coros_running = eventlet.Event()

def resize(self, new_size):
""" Change the max number of greenthreads doing work at any given time.
Expand Down Expand Up @@ -49,7 +55,7 @@ def free(self):

def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
Returns the :class:`GreenThread <eventlet.GreenThread>`
object that is running the function, which can be used to retrieve the
results.
Expand All @@ -61,17 +67,17 @@ def spawn(self, function, *args, **kwargs):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
# a bit hacky to use the GT without switching to it
gt = greenthread.GreenThread(current)
gt = eventlet.greenthread.GreenThread(current)
gt.main(function, args, kwargs)
return gt
else:
self.sem.acquire()
gt = greenthread.spawn(function, *args, **kwargs)
gt = eventlet.spawn(function, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done)
return gt
Expand All @@ -89,7 +95,7 @@ def _spawn_n_impl(self, func, args, kwargs, coro):
if coro is None:
return
else:
coro = greenthread.getcurrent()
coro = eventlet.getcurrent()
self._spawn_done(coro)

def spawn_n(self, function, *args, **kwargs):
Expand All @@ -99,21 +105,21 @@ def spawn_n(self, function, *args, **kwargs):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
g = greenthread.spawn_n(
g = eventlet.spawn_n(
self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(g)

def waitall(self):
"""Waits until all greenthreads in the pool are finished working."""
assert greenthread.getcurrent() not in self.coroutines_running, \
assert eventlet.getcurrent() not in self.coroutines_running, \
"Calling waitall() from within one of the " \
"GreenPool's greenthreads will never terminate."
if self.running():
Expand Down Expand Up @@ -151,7 +157,7 @@ def starmap(self, function, iterable):
if function is None:
function = lambda *a: a
gi = GreenMap(self.size)
greenthread.spawn_n(self._do_map, function, iterable, gi)
eventlet.spawn_n(self._do_map, function, iterable, gi)
return gi

def imap(self, function, *iterables):
Expand Down
20 changes: 11 additions & 9 deletions eventlet/semaphore.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from __future__ import with_statement

import collections

from eventlet import greenthread
import eventlet
from eventlet import hubs
from eventlet.timeout import Timeout


class Semaphore(object):
Expand Down Expand Up @@ -34,10 +31,15 @@ class Semaphore(object):
"""

def __init__(self, value=1):
self.counter = value
try:
value = int(value)
except ValueError as e:
msg = 'Semaphore() expect value :: int, actual: {0} {1}'.format(type(value), str(e))
raise TypeError(msg)
if value < 0:
raise ValueError("Semaphore must be initialized with a positive "
"number, got %s" % value)
msg = 'Semaphore() expect value >= 0, actual: {0}'.format(repr(value))
raise ValueError(msg)
self.counter = value
self._waiters = collections.deque()

def __repr__(self):
Expand Down Expand Up @@ -92,15 +94,15 @@ def acquire(self, blocking=True, timeout=None):
if not blocking and self.locked():
return False

current_thread = greenthread.getcurrent()
current_thread = eventlet.getcurrent()

if self.counter <= 0 or self._waiters:
if current_thread not in self._waiters:
self._waiters.append(current_thread)
try:
if timeout is not None:
ok = False
with Timeout(timeout, False):
with eventlet.Timeout(timeout, False):
while self.counter <= 0:
hubs.get_hub().switch()
ok = True
Expand Down
Loading

0 comments on commit 79292bd

Please sign in to comment.