Permalink
Browse files

Pollers for I/O

  • Loading branch information...
1 parent be4fddf commit c711869d97641d76ad8a8ed456211becbb3292df Alvaro Saurin committed Nov 14, 2012
View
@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
- <orderEntry type="jdk" jdkName="Python 2.7.2+ (/usr/bin/python2.7)" jdkType="Python SDK" />
+ <orderEntry type="jdk" jdkName="Python 2.7.2 (/System/Library/Frameworks/Python.framework/Versions/2.7/bin/python)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
@@ -0,0 +1,16 @@
+<component name="InspectionProjectProfileManager">
+ <profile version="1.0" is_locked="false">
+ <option name="myName" value="Project Default" />
+ <option name="myLocal" value="false" />
+ <inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
+ <option name="ignoredPackages">
+ <value>
+ <list size="2">
+ <item index="0" class="java.lang.String" itemvalue="zc.recipe.egg" />
+ <item index="1" class="java.lang.String" itemvalue="zc.buildout" />
+ </list>
+ </value>
+ </option>
+ </inspection_tool>
+ </profile>
+</component>
@@ -0,0 +1,7 @@
+<component name="InspectionProjectProfileManager">
+ <settings>
+ <option name="PROJECT_PROFILE" value="Project Default" />
+ <option name="USE_PROJECT_PROFILE" value="true" />
+ <version value="1.0" />
+ </settings>
+</component>
View
@@ -3,6 +3,6 @@
<component name="ProjectResources">
<default-html-doctype>$APPLICATION_HOME_DIR$/lib/pycharm.jar!/resources/html5-schema/html5.rnc</default-html-doctype>
</component>
- <component name="ProjectRootManager" version="2" project-jdk-name="Python 2.7.2+ (/usr/bin/python2.7)" project-jdk-type="Python SDK" />
+ <component name="ProjectRootManager" version="2" project-jdk-name="Python 2.7.2 (/System/Library/Frameworks/Python.framework/Versions/2.7/bin/python)" project-jdk-type="Python SDK" />
</project>
View
@@ -30,7 +30,10 @@
from evy.support import greenlets as greenlet
from evy import patcher
-__all__ = ["use_hub", "get_hub", "get_default_hub", "trampoline"]
+__all__ = ["use_hub",
+ "get_hub",
+ "get_default_hub",
+ "trampoline"]
threading = patcher.original('threading')
_threadlocal = threading.local()
@@ -62,7 +65,7 @@ def use_hub (mod = None):
if hasattr(_threadlocal, 'hub'):
del _threadlocal.hub
- _threadlocal.Hub = get_default_hub().BaseHub
+ _threadlocal.Hub = get_default_hub().Hub
def get_hub ():
View
@@ -27,21 +27,28 @@
# THE SOFTWARE.
#
-import heapq
import math
import traceback
import signal
import sys
import os
+from weakref import WeakKeyDictionary
+
from evy.uv.interface import libuv, handle_unref
from evy.uv.interface import ffi
from evy.uv import watchers
from evy.support import greenlets as greenlet, clear_sys_exc_info
-from evy.hubs import timer
+from evy.hubs import timer, poller
from evy import patcher
+__all__ = ["Hub",
+ "READ",
+ "WRITE"]
+
+
+
arm_alarm = None
if hasattr(signal, 'setitimer'):
def alarm_itimer (seconds):
@@ -61,7 +68,9 @@ def alarm_signal (seconds):
time = patcher.original('time')
-g_prevent_multiple_readers = True
+
+
+
READ = "read"
WRITE = "write"
@@ -82,7 +91,7 @@ def signal_checker(uv_prepare_handle, status):
_default_loop_destroyed = False
-class BaseHub(object):
+class Hub(object):
"""
Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture.
@@ -106,19 +115,15 @@ def __init__ (self, clock = time.time, ptr = None, default = True):
:param ptr: a pointer a an (optional) libuv loop
:type ptr: a "uv_loop_t*"
"""
- self.fd_read_watchers = {}
- self.fd_write_watchers = {}
-
- self.fd_read_watchers_alt = {}
- self.fd_write_watchers_alt = {}
-
self.clock = clock
self.greenlet = greenlet.greenlet(self.run)
self.stopping = False
self.running = False
self.timers = set()
+ self.pollers = set()
+ self.pollers_by_fd = WeakKeyDictionary()
self.debug_exceptions = True
self.debug_blocking = False
@@ -167,129 +172,41 @@ def add (self, evtype, fileno, cb):
"""
Signals an intent to or write a particular file descriptor.
- The *evtype* argument is either the constant READ or WRITE.
-
- The *fileno* argument is the file number of the file of interest.
-
- The *cb* argument is the callback which will be called when the file
- is ready for reading/writing.
+ :param evtype: either the constant READ or WRITE.
+ :param fileno: the file number of the file of interest.
+ :param cb: callback which will be called when the file is ready for reading/writing.
"""
- if fileno < 0:
- raise ValueError('invalid file descriptor: %d' % (fileno))
+ p = poller.Poller(fileno, evtype, cb, fileno)
+ return self.add_poller(p)
- events = 0
- if evtype == READ:
- events = libuv.UV_READABLE
- bucket = self.fd_read_watchers
- elif evtype == WRITE:
- events = libuv.UV_WRITABLE
- bucket = self.fd_write_watchers
-
- listener = watchers.Poll(self, fileno, events)
-
- if fileno in bucket:
- if g_prevent_multiple_readers:
- raise RuntimeError("Second simultaneous %s on fileno %s "\
- "detected. Unless you really know what you're doing, "\
- "make sure that only one greenthread can %s any "\
- "particular socket. Consider using a pools.Pool. "\
- "If you do know what you're doing and want to disable "\
- "this error, call "\
- "evy.debug.hub_multiple_reader_prevention(False)" % (
- evtype, fileno, evtype))
-
- # store off the second listener in another structure
- if evtype == READ: self.fd_read_watchers_alt.setdefault(fileno, []).append(listener)
- elif evtype == WRITE: self.fd_write_watchers_alt.setdefault(fileno, []).append(listener)
-
- else:
- bucket[fileno] = listener
-
- ## register the listener with libuv
- listener.start(cb, listener)
-
- return listener
-
- def remove (self, listener):
+ def remove (self, p):
"""
Remove a listener
:param listener: the listener to remove
"""
- fileno = listener.fileno
- events = listener.events
-
- if events == libuv.UV_READABLE: self.fd_read_watchers.pop(fileno, None)
- else: self.fd_write_watchers.pop(fileno, None)
-
- # migrate a secondary listener to be the primary listener
- if (fileno in self.fd_read_watchers) or (fileno in self.fd_write_watchers):
- if events == libuv.UV_READABLE: sec = self.fd_read_watchers.get(fileno, None)
- else: sec = self.fd_write_watchers.get(fileno, None)
+ self._poller_canceled(p)
- if not sec:
- return
-
- if events == libuv.UV_READABLE: self.fd_read_watchers[fileno] = sec.pop(0)
- else: self.fd_write_watchers[fileno] = sec.pop(0)
-
- if not sec:
- if events == libuv.UV_READABLE: del self.fd_read_watchers[fileno]
- else: del self.fd_write_watchers[fileno]
def remove_descriptor (self, fileno):
"""
Completely remove all watchers for this *fileno*. For internal use only.
"""
- watchers = []
-
- watchers.append(self.fd_read_watchers.pop(fileno, None))
- watchers.append(self.fd_write_watchers.pop(fileno, None))
-
- watchers.extend(self.fd_read_watchers_alt.pop(fileno, ()))
- watchers.extend(self.fd_write_watchers_alt.pop(fileno, ()))
-
- for listener in watchers:
- try:
- listener.cb(fileno)
- except Exception, e:
- self.squelch_generic_exception(sys.exc_info())
-
- ## remove the poller for this fileno
- # TODO
-
- def get_readers (self):
- return self.fd_read_watchers.values()
-
- def get_writers (self):
- return self.fd_write_watchers.values()
-
-
- def _get_reader_for(self, fileno):
- """
- Obtains the reader for the file descriptor *fileno*
+ p = self.pollers_by_fd[fileno]
- :param fileno: the file descriptor
- :return: a valid Poll instance, or None otherwise
- """
- readers = self.fd_watchers[READ]
- return readers.get(fileno, None)
+ # invoke the callback in the poller and destroy it
+ p()
+ self._poller_canceled(p)
- def _get_writer_for(self, fileno):
+ def set_timer_exceptions (self, value):
"""
- Obtains the writer for the file descriptor *fileno*
+ Debug exceptions
- :param fileno: the file descriptor
- :return: a valid Poll instance, or None otherwise
+ :param value: True if we want to debug exceptions
+ :type value: boolean
"""
- readers = self.fd_watchers[WRITE]
- return readers.get(fileno, None)
-
-
- def set_timer_exceptions (self, value):
self.debug_exceptions = value
-
def ensure_greenlet (self):
if self.greenlet.dead:
# create new greenlet sharing same parent as original
@@ -332,6 +249,9 @@ def wait (self, seconds = None):
:type seconds: integer
"""
+ if not seconds:
+ seconds = self.default_loop()
+
## create a timer for avoiding exiting the loop for *seconds*
timer = watchers.Timer(self, seconds * 1000)
timer.start(None)
@@ -471,10 +391,10 @@ def add_timer (self, timer):
"""
eventtimer = watchers.Timer(self, timer.seconds * 1000)
timer.impltimer = eventtimer
- eventtimer.start(self.timer_triggered, timer)
+ eventtimer.start(self._timer_triggered, timer)
self.timers.add(timer)
- def timer_canceled (self, timer):
+ def _timer_canceled (self, timer):
"""
A timer has been canceled
@@ -491,7 +411,7 @@ def timer_canceled (self, timer):
except ValueError:
pass
- def timer_triggered (self, timer):
+ def _timer_triggered (self, timer):
"""
Performs the timer trigger
@@ -526,6 +446,75 @@ def timers_count(self):
return len(self.timers)
##
+ ## pollers
+ ##
+
+ def add_poller (self, p):
+ """
+ Add a timer in the hub
+
+ :param timer:
+ :param unreferenced: if True, we unreference the timer, so the loop does not wait until it is triggered
+ :return:
+ """
+ assert isinstance(p, poller.Poller)
+
+ event_poller = watchers.Poller(self)
+ event_poller.impltimer = event_poller
+ event_poller.start(self._poller_triggered, p)
+
+ ## register the poller
+ self.pollers.add(event_poller)
+ self.pollers_by_fd[event_poller.fileno] = event_poller
+
+ def _poller_triggered (self, p):
+ """
+ Performs the poller trigger
+
+ :param poller: the poller that has been triggered
+ :return: nothing
+ """
+ try:
+ p()
+ except self.SYSTEM_EXCEPTIONS:
+ self.interrupted = True
+ except:
+ self.squelch_exception(-1, sys.exc_info())
+
+ if not p.persistent:
+ self._poller_canceled(p)
+
+ def _poller_canceled (self, p):
+ """
+ A poller has been canceled
+
+ :param poller: the poller that has been canceled
+ :return: nothing
+ """
+ fileno = p.fileno
+ try:
+ poller.destroy()
+ self.pollers.remove(timer)
+ del self.pollers_by_fd[fileno]
+ except (AttributeError, TypeError):
+ pass
+
+
+ def forget_poller(self, poller):
+ """
+ Let the hub forget about a poller, so we do not keep the loop running forever until
+ this poller triggers...
+ """
+ try:
+ self.unref(poller.impltimer.handle)
+ except (AttributeError, TypeError):
+ pass
+
+ @property
+ def poller_count(self):
+ return len(self.pollers)
+
+ ##
## global and local calls
##
Oops, something went wrong.

0 comments on commit c711869

Please sign in to comment.