Permalink
Browse files

Add data poller loop for kqueue.

This adds an alternate poller loop for systems that use kqueue rather
than poll.  Limited testing seems to work fine on FreeBSD, so it should
work similarly on OSX.

Ref #10737
  • Loading branch information...
1 parent d50dad8 commit fff02e698418de20a918480694ac74661ce666db @wagnerrp wagnerrp committed May 21, 2012
Showing with 142 additions and 64 deletions.
  1. +142 −64 mythtv/bindings/python/MythTV/utility.py
@@ -5,7 +5,7 @@
from MythTV.exceptions import MythDBError, MythError
from cStringIO import StringIO
-from select import select, poll, POLLHUP, POLLIN, POLLOUT
+from select import select
from time import time, mktime, sleep
from datetime import datetime as _pydatetime
from datetime import tzinfo as _pytzinfo
@@ -927,66 +927,8 @@ def check_ipv6(n):
except socket.error:
return False
-class DequeBuffer( object ):
- """
- This is a chunked buffer, storing a sequence of buffer objects in a
- deque, allowing for FIFO operations outside the limited 64K system
- buffer, and the efficient freeing of memory without needing to rewrite
- a large contiguous buffer object.
- """
- class _Buffer( object ):
- """
- This subclass contains a buffer object and a read/write lock, as
- well as independent read and write positions.
- """
- __slots__ = ['buffer', 'lock', 'blocksize', 'EOF', 'rpos', 'wpos']
- def __init__(self, size=2**18):
- self.buffer = StringIO()
- self.lock = Lock()
- self.blocksize = size
- self.EOF = False
- self.rpos = 0
- self.wpos = 0
-
- def __del__(self):
- self.buffer.close()
-
- def read(self, nbytes):
- with self.lock:
- self.buffer.seek(self.rpos)
- buff = self.buffer.read(nbytes)
- self.rpos = self.buffer.tell()
- if self.rpos == self.blocksize:
- self.EOF = True
- return buff
-
- def write(self, data):
- with self.lock:
- nbytes = self.blocksize-self.wpos
- if nbytes < len(data):
- data = data[:nbytes]
- else:
- nbytes = len(data)
- self.buffer.seek(self.wpos)
- self.buffer.write(data)
- self.wpos += nbytes
- return nbytes
-
- def rollback(self, nbytes):
- with self.lock:
- self.EOF = False
- if self.rpos < nbytes:
- nbytes -= self.rpos
- self.rpos = 0
- return nbytes
- else:
- self.rpos -= nbytes
- return 0
-
- def close(self):
- with self.lock:
- self.buffer.close()
-
+try:
+ from select import poll, POLLHUP, POLLIN, POLLOUT
class _PollingThread( Thread ):
"""
This polling thread listens on selected pipes, and automatically reads
@@ -998,13 +940,14 @@ def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}):
self.inputqueue = Queue()
self.idletime = time()
- super(DequeBuffer._PollingThread, self).__init__(group,
+ super(_PollingThread, self).__init__(group,
target, name, args, kwargs)
def add_pipe(self, buff, pipe, mode):
self.inputqueue.put((buff, pipe, mode))
def run(self):
poller = poll()
fds = {}
+ events = []
while True:
while not self.inputqueue.empty():
# loop though the queue and add new pipes to the
@@ -1055,6 +998,141 @@ def run(self):
sleep(0.1)
else:
self.idletime = time()
+except ImportError:
+ from select import kqueue, kevent, KQ_FILTER_READ, KQ_FILTER_WRITE, \
+ KQ_EV_ADD, KQ_EV_DELETE, KQ_EV_EOF
+ class _PollingThread( Thread ):
+ """
+ This polling thread listens on selected pipes, and automatically reads
+ and writes data between the buffer and those pipes. This will self
+ terminate when there are no more pipes defined, and will need to be
+ restarted.
+ """
+ def __init__(self, group=None, target=None, name=None,
+ args=(), kwargs={}):
+ self.inputqueue = Queue()
+ self.idletime = time()
+ super(_PollingThread, self).__init__(group,
+ target, name, args, kwargs)
+ def add_pipe(self, buff, pipe, mode):
+ self.inputqueue.put((buff, pipe, mode))
+ def run(self):
+ poller = kqueue()
+ fds = {}
+ events = []
+ while True:
+ while not self.inputqueue.empty():
+ # loop through the queue and gather new pipes to add the
+ # kernel queue
+ buff, pipe, mode = self.inputqueue.get()
+ if 'r' in mode:
+ events.append(kevent(pipe, KQ_FILTER_READ, KQ_EV_ADD))
+ elif 'w' in mode:
+ events.append(kevent(pipe, KQ_FILTER_WRITE, KQ_EV_ADD))
+ else:
+ continue
+ fds[pipe.fileno()] = (weakref.ref(buff), pipe)
+
+ if len(events) == 0:
+ events = None
+ events = poller.control(events, 16, 0.1)
+
+ for i in range(len(events)):
+ # loop through response and handle events
+ event = events.pop()
+ buff, pipe = fds[event.ident]
+
+ if buff() is None:
+ # buffer object has closed out from underneath us
+ # pipe will be automatically removed from kqueue
+ pipe.close()
+ del fds[event.ident]
+ continue
+
+ if (abs(event.filter) & abs(KQ_FILTER_READ)) and event.data:
+ # new data has come in, push into the buffer
+ buff().write(pipe.read(event.data))
+
+ if (abs(event.filter) & abs(KQ_FILTER_WRITE)) and event.data:
+ # space is available to write data
+ pipe.write(buff().read(\
+ min(buff()._nbytes, event.data, 2**16)))
+
+ if abs(event.flags) & abs(KQ_EV_EOF):
+ # pipe has been closed and all IO has been processed
+ # pipe will be automatically removed from kqueue
+ buff().close()
+ pipe.close()
+ del fds[event.ident]
+
+ if len(fds) == 0:
+ # no pipes referenced
+ if self.idletime + 20 < time():
+ # idle timeout reached, terminate
+ break
+ sleep(0.1)
+ else:
+ self.idletime = time()
+
+class DequeBuffer( object ):
+ """
+ This is a chunked buffer, storing a sequence of buffer objects in a
+ deque, allowing for FIFO operations outside the limited 64K system
+ buffer, and the efficient freeing of memory without needing to rewrite
+ a large contiguous buffer object.
+ """
+ class _Buffer( object ):
+ """
+ This subclass contains a buffer object and a read/write lock, as
+ well as independent read and write positions.
+ """
+ __slots__ = ['buffer', 'lock', 'blocksize', 'EOF', 'rpos', 'wpos']
+ def __init__(self, size=2**18):
+ self.buffer = StringIO()
+ self.lock = Lock()
+ self.blocksize = size
+ self.EOF = False
+ self.rpos = 0
+ self.wpos = 0
+
+ def __del__(self):
+ self.buffer.close()
+
+ def read(self, nbytes):
+ with self.lock:
+ self.buffer.seek(self.rpos)
+ buff = self.buffer.read(nbytes)
+ self.rpos = self.buffer.tell()
+ if self.rpos == self.blocksize:
+ self.EOF = True
+ return buff
+
+ def write(self, data):
+ with self.lock:
+ nbytes = self.blocksize-self.wpos
+ if nbytes < len(data):
+ data = data[:nbytes]
+ else:
+ nbytes = len(data)
+ self.buffer.seek(self.wpos)
+ self.buffer.write(data)
+ self.wpos += nbytes
+ return nbytes
+
+ def rollback(self, nbytes):
+ with self.lock:
+ self.EOF = False
+ if self.rpos < nbytes:
+ nbytes -= self.rpos
+ self.rpos = 0
+ return nbytes
+ else:
+ self.rpos -= nbytes
+ return 0
+
+ def close(self):
+ with self.lock:
+ self.buffer.close()
_pollingthread = None
@@ -1102,7 +1180,7 @@ def read(self, nbytes=None):
else:
# end of data or request reached, return
break
- self._nbytes += data.tell()
+ self._nbytes -= data.tell()
return data.getvalue()
def write(self, data):
@@ -1187,7 +1265,7 @@ def _add_pipe(cls, pipe, buffer, mode=None):
if (cls._pollingthread is None) or not cls._pollingthread.isAlive():
# create new thread, and set it to not block shutdown
- cls._pollingthread = cls._PollingThread()
+ cls._pollingthread = _PollingThread()
cls._pollingthread.daemon = True
cls._pollingthread.start()
cls._pollingthread.add_pipe(buffer, pipe, mode)

0 comments on commit fff02e6

Please sign in to comment.