Skip to content
Browse files

Removed the recursion so it now passes the Twisted test suite without…

… increasing the recursion limit. Use the actual timeout calculated by Twisted not 100ms, add a win32 event interface.
  • Loading branch information...
1 parent 4c49d21 commit 385936b4b0fa134e56851f75fb121a17a77dce60 @michaelnt michaelnt committed Jan 18, 2011
Showing with 225 additions and 130 deletions.
  1. +1 −8 README
  2. +224 −122 qt4reactor.py
View
9 README
@@ -4,13 +4,6 @@ test with:
trial --reactor=qt4 twisted (or twisted.test or twisted.test.test_internet)
-If you're using Qt 4.4 and the newer versions of twisted, I've been using:
-
-trial --reactor=qt4 --without-module gtk2reactor --recursionlimit=2000
-twisted 2>&1 | tee trial.out
-
-particularly on a 64 bit OS... I have no idea why the recursion limit matters more on 64 bit machines... maybe there's deeper to go so it does :-)
-
= Contributors =
Many thanks to Darren Dale who provided the patch to fix the reactor for Qt4.4
@@ -52,7 +45,7 @@ However, most users want this reactor to do gui stuff so this
shouldn't be an issue.
Performance impact of Qt has been reduced by minimizing use of
-signaling which is expensive. 241s for qt4reactor vs 194s for select
+signaling which is expensive. 186s for qt4reactor vs 180s for select
for entire twisted trial suite.
-glenn
View
346 qt4reactor.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
+# Copyright (c) 2001-2011 Twisted Matrix Laboratories.
# See LICENSE for details.
@@ -36,12 +36,13 @@
Subsequent port by therve
"""
-__all__ = ['install']
-
-
-import sys, time
-
+import sys
+import time
from zope.interface import implements
+from twisted.internet.interfaces import IReactorFDSet
+from twisted.python import log, runtime
+from twisted.internet import posixbase
+from twisted.python.runtime import platformType, platform
try:
from PyQt4.QtCore import QSocketNotifier, QObject, SIGNAL, QTimer, QCoreApplication
@@ -50,57 +51,65 @@
from PySide.QtCore import QSocketNotifier, QObject, SIGNAL, QTimer, QCoreApplication
from PySide.QtCore import QEventLoop
-from twisted.internet.interfaces import IReactorFDSet
-from twisted.python import log
-from twisted.internet.posixbase import PosixReactorBase
-class TwistedSocketNotifier(QSocketNotifier):
+class TwistedSocketNotifier(QObject):
"""
Connection between an fd event and reader/writer callbacks.
"""
- def __init__(self, reactor, watcher, type):
- QSocketNotifier.__init__(self, watcher.fileno(), type)
+ def __init__(self, parent, reactor, watcher, socketType):
+ QObject.__init__(self, parent)
self.reactor = reactor
self.watcher = watcher
- self.fn = None
- if type == QSocketNotifier.Read:
+ fd = watcher.fileno()
+ self.notifier = QSocketNotifier(fd, socketType, parent)
+ self.notifier.setEnabled(True)
+ if socketType == QSocketNotifier.Read:
self.fn = self.read
- elif type == QSocketNotifier.Write:
+ else:
self.fn = self.write
- QObject.connect(self, SIGNAL("activated(int)"), self.fn)
+ QObject.connect(self.notifier, SIGNAL("activated(int)"), self.fn)
def shutdown(self):
- QObject.disconnect(self, SIGNAL("activated(int)"), self.fn)
- self.setEnabled(False)
+ self.notifier.setEnabled(False)
+ self.disconnect(self.notifier, SIGNAL("activated(int)"), self.fn)
self.fn = self.watcher = None
+ self.notifier.deleteLater()
self.deleteLater()
- def read(self, sock):
+ def read(self, fd):
+ if not self.watcher:
+ return
w = self.watcher
- #self.setEnabled(False) # ??? do I need this?
+ # doRead can cause self.shutdown to be called so keep a reference to self.watcher
def _read():
+ #Don't call me again, until the data has been read
+ self.notifier.setEnabled(False)
why = None
try:
why = w.doRead()
+ inRead = True
except:
+ inRead = False
log.err()
why = sys.exc_info()[1]
if why:
- self.reactor._disconnectSelectable(w, why, True)
+ self.reactor._disconnectSelectable(w, why, inRead)
elif self.watcher:
- pass
- #self.setEnabled(True)
+ self.notifier.setEnabled(True) # Re enable notification following sucessfull read
+ self.reactor._iterate(fromqt=True)
log.callWithLogger(w, _read)
- self.reactor.reactorInvocation()
def write(self, sock):
+ if not self.watcher:
+ return
w = self.watcher
- self.setEnabled(False)
def _write():
why = None
+ self.notifier.setEnabled(False)
+
try:
why = w.doWrite()
except:
@@ -109,71 +118,91 @@ def _write():
if why:
self.reactor._disconnectSelectable(w, why, False)
elif self.watcher:
- self.setEnabled(True)
+ self.notifier.setEnabled(True)
+ self.reactor._iterate(fromqt=True)
log.callWithLogger(w, _write)
- self.reactor.reactorInvocation()
-class fakeApplication(QEventLoop):
- def __init__(self):
- QEventLoop.__init__(self)
-
- def exec_(self):
- QEventLoop.exec_(self)
-
-class QTReactor(PosixReactorBase):
- """
- Qt based reactor.
- """
- implements(IReactorFDSet)
- _timer = None
+
+class QtReactor(posixbase.PosixReactorBase):
+ implements(IReactorFDSet)
def __init__(self):
self._reads = {}
self._writes = {}
- self._timer=QTimer()
+ self._notifiers = {}
+ self._timer = QTimer()
self._timer.setSingleShot(True)
+ QObject.connect(self._timer, SIGNAL("timeout()"), self.iterate)
+
if QCoreApplication.startingUp():
+ # Application Object has not been started yet
self.qApp=QCoreApplication([])
self._ownApp=True
else:
self.qApp = QCoreApplication.instance()
self._ownApp=False
self._blockApp = None
- self._readWriteQ=[]
-
- """ some debugging instrumentation """
- self._doSomethingCount=0
-
- PosixReactorBase.__init__(self)
+ posixbase.PosixReactorBase.__init__(self)
+
+
+ def _add(self, xer, primary, type):
+ """
+ Private method for adding a descriptor from the event loop.
+
+ It takes care of adding it if new or modifying it if already added
+ for another state (read -> read/write for example).
+ """
+ if xer not in primary:
+ primary[xer] = TwistedSocketNotifier(None, self, xer, type)
+
def addReader(self, reader):
- if not reader in self._reads:
- self._reads[reader] = TwistedSocketNotifier(self, reader,
- QSocketNotifier.Read)
+ """
+ Add a FileDescriptor for notification of data available to read.
+ """
+ self._add(reader, self._reads, QSocketNotifier.Read)
def addWriter(self, writer):
- if not writer in self._writes:
- self._writes[writer] = TwistedSocketNotifier(self, writer,
- QSocketNotifier.Write)
+ """
+ Add a FileDescriptor for notification of data available to write.
+ """
+ self._add(writer, self._writes, QSocketNotifier.Write)
+
+
+ def _remove(self, xer, primary):
+ """
+ Private method for removing a descriptor from the event loop.
+ It does the inverse job of _add, and also add a check in case of the fd
+ has gone away.
+ """
+ if xer in primary:
+ notifier = primary.pop(xer)
+ notifier.shutdown()
+
def removeReader(self, reader):
- if reader in self._reads:
- #self._reads[reader].shutdown()
- #del self._reads[reader]
- self._reads.pop(reader).shutdown()
+ """
+ Remove a Selectable for notification of data available to read.
+ """
+ self._remove(reader, self._reads)
+
def removeWriter(self, writer):
- if writer in self._writes:
- self._writes[writer].shutdown()
- #del self._writes[writer]
- self._writes.pop(writer)
+ """
+ Remove a Selectable for notification of data available to write.
+ """
+ self._remove(writer, self._writes)
def removeAll(self):
- return self._removeAll(self._reads, self._writes)
+ """
+ Remove all selectables, and return a list of them.
+ """
+ rv = self._removeAll(self._reads, self._writes)
+ return rv
def getReaders(self):
@@ -182,76 +211,149 @@ def getReaders(self):
def getWriters(self):
return self._writes.keys()
-
+
+
def callLater(self,howlong, *args, **kargs):
- rval = super(QTReactor,self).callLater(howlong, *args, **kargs)
+ rval = super(QtReactor,self).callLater(howlong, *args, **kargs)
self.reactorInvocation()
return rval
-
- def crash(self):
- super(QTReactor,self).crash()
+
+
+ def reactorInvocation(self):
+ self._timer.stop()
+ self._timer.setInterval(0)
+ self._timer.start()
- def iterate(self,delay=0.0):
- t=self.running # not sure I entirely get the state of running
- self.running=True
- self._timer.stop() # in case its not (rare?)
- try:
- if delay == 0.0:
- self.reactorInvokePrivate()
- self._timer.stop() # supports multiple invocations
- else:
- endTime = delay + time.time()
- self.reactorInvokePrivate()
- while True:
- t = endTime - time.time()
- if t <= 0.0: return
- self.qApp.processEvents(QEventLoop.AllEvents |
- QEventLoop.WaitForMoreEvents,t*1010)
- finally:
- self.running=t
-
- def addReadWrite(self,t):
- self._readWriteQ.append(t)
+
+ def _iterate(self, delay=None, fromqt=False):
+ """See twisted.internet.interfaces.IReactorCore.iterate.
+ """
+ self.runUntilCurrent()
+ self.doIteration(delay, fromqt)
+
+ iterate = _iterate
+
+ def doIteration(self, delay=None, fromqt=False):
+ 'This method is called by a Qt timer or by network activity on a file descriptor'
+ if not self.running and self._blockApp:
+ self._blockApp.quit()
+ self._timer.stop()
+ delay = max(delay, 1)
+ if not fromqt:
+ self.qApp.processEvents(QEventLoop.AllEvents, delay * 1000)
+ if self.timeout() is None:
+ timeout = 0.1
+ elif self.timeout() == 0:
+ timeout = 0
+ else:
+ timeout = self.timeout()
+ self._timer.setInterval(timeout * 1000)
+ self._timer.start()
+
+
def runReturn(self, installSignalHandlers=True):
- QObject.connect(self._timer, SIGNAL("timeout()"),
- self.reactorInvokePrivate)
self.startRunning(installSignalHandlers=installSignalHandlers)
- self._timer.start(0)
-
+ self.reactorInvocation()
+
+
def run(self, installSignalHandlers=True):
- try:
- if self._ownApp:
- self._blockApp=self.qApp
- else:
- self._blockApp = fakeApplication()
- self.runReturn(installSignalHandlers)
- self._blockApp.exec_()
- finally:
- self._timer.stop() # should already be stopped
+ if self._ownApp:
+ self._blockApp = self.qApp
+ else:
+ self._blockApp = QEventLoop()
+ self.runReturn()
+ self._blockApp.exec_()
+
+
+class QtEventReactor(QtReactor):
+ def __init__(self, *args, **kwargs):
+ self._events = {}
+ super(QtEventReactor, self).__init__()
- def reactorInvocation(self):
- self._timer.setInterval(0)
- def reactorInvokePrivate(self):
- if not self.running:
- self._blockApp.quit()
- self._doSomethingCount += 1
- self.runUntilCurrent()
- t = self.timeout()
- if t is None: t=0.1
- else: t = min(t,0.1)
- self._timer.setInterval(t*1010)
- self.qApp.processEvents() # could change interval
- self._timer.start()
-
- def doIteration(self):
- assert False, "doiteration is invalid call"
+ def addEvent(self, event, fd, action):
+ """
+ Add a new win32 event to the event loop.
+ """
+ self._events[event] = (fd, action)
+
+
+ def removeEvent(self, event):
+ """
+ Remove an event.
+ """
+ if event in self._events:
+ del self._events[event]
+
+
+ def doEvents(self):
+ handles = self._events.keys()
+ if len(handles) > 0:
+ val = None
+ while val != WAIT_TIMEOUT:
+ val = MsgWaitForMultipleObjects(handles, 0, 0, QS_ALLINPUT | QS_ALLEVENTS)
+ if val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
+ event_id = handles[val - WAIT_OBJECT_0]
+ if event_id in self._events:
+ fd, action = self._events[event_id]
+ log.callWithLogger(fd, self._runAction, action, fd)
+ elif val == WAIT_TIMEOUT:
+ pass
+ else:
+ #print 'Got an unexpected return of %r' % val
+ return
+
+
+ def _runAction(self, action, fd):
+ try:
+ closed = getattr(fd, action)()
+ except:
+ closed = sys.exc_info()[1]
+ log.deferr()
+
+ if closed:
+ self._disconnectSelectable(fd, closed, action == 'doRead')
+
-def install():
+ def timeout(self):
+ t = super(QtEventReactor, self).timeout()
+ return min(t, 0.01)
+
+
+ def iterate(self, delay=None):
+ """See twisted.internet.interfaces.IReactorCore.iterate.
+ """
+ self.runUntilCurrent()
+ self.doEvents()
+ self.doIteration(delay)
+
+
+def posixinstall():
"""
- Configure the twisted mainloop to be run inside the qt mainloop.
+ Install the Qt reactor.
"""
- from twisted.internet import main
- reactor = QTReactor()
- main.installReactor(reactor)
+ p = QtReactor()
+ from twisted.internet.main import installReactor
+ installReactor(p)
+
+
+def win32install():
+ """
+ Install the Qt reactor.
+ """
+ p = QtEventReactor()
+ from twisted.internet.main import installReactor
+ installReactor(p)
+
+
+if runtime.platform.getType() == 'win32':
+ from win32event import CreateEvent, MsgWaitForMultipleObjects
+ from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT, QS_ALLEVENTS
+ install = win32install
+else:
+ install = posixinstall
+
+
+__all__ = ["install"]
+

0 comments on commit 385936b

Please sign in to comment.
Something went wrong with that request. Please try again.