Skip to content

Commit

Permalink
Add data poller loop for kqueue.
Browse files Browse the repository at this point in the history
This adds an alternate poller loop for systems that use kqueue rather
than poll.  Limited testing seems to work fine on FreeBSD, so it should
work similarly on OSX.

Ref #10737
  • Loading branch information
wagnerrp committed May 21, 2012
1 parent d50dad8 commit fff02e6
Showing 1 changed file with 142 additions and 64 deletions.
206 changes: 142 additions & 64 deletions mythtv/bindings/python/MythTV/utility.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from MythTV.exceptions import MythDBError, MythError from MythTV.exceptions import MythDBError, MythError


from cStringIO import StringIO from cStringIO import StringIO
from select import select, poll, POLLHUP, POLLIN, POLLOUT from select import select
from time import time, mktime, sleep from time import time, mktime, sleep
from datetime import datetime as _pydatetime from datetime import datetime as _pydatetime
from datetime import tzinfo as _pytzinfo from datetime import tzinfo as _pytzinfo
Expand Down Expand Up @@ -927,66 +927,8 @@ def check_ipv6(n):
except socket.error: except socket.error:
return False return False


class DequeBuffer( object ): try:
""" from select import poll, POLLHUP, POLLIN, POLLOUT
This is a chunked buffer, storing a sequence of buffer objects in a
deque, allowing for FIFO operations outside the limited 64K system
buffer, and the efficient freeing of memory without needing to rewrite
a large contiguous buffer object.
"""
class _Buffer( object ):
"""
This subclass contains a buffer object and a read/write lock, as
well as independent read and write positions.
"""
__slots__ = ['buffer', 'lock', 'blocksize', 'EOF', 'rpos', 'wpos']
def __init__(self, size=2**18):
self.buffer = StringIO()
self.lock = Lock()
self.blocksize = size
self.EOF = False
self.rpos = 0
self.wpos = 0

def __del__(self):
self.buffer.close()

def read(self, nbytes):
with self.lock:
self.buffer.seek(self.rpos)
buff = self.buffer.read(nbytes)
self.rpos = self.buffer.tell()
if self.rpos == self.blocksize:
self.EOF = True
return buff

def write(self, data):
with self.lock:
nbytes = self.blocksize-self.wpos
if nbytes < len(data):
data = data[:nbytes]
else:
nbytes = len(data)
self.buffer.seek(self.wpos)
self.buffer.write(data)
self.wpos += nbytes
return nbytes

def rollback(self, nbytes):
with self.lock:
self.EOF = False
if self.rpos < nbytes:
nbytes -= self.rpos
self.rpos = 0
return nbytes
else:
self.rpos -= nbytes
return 0

def close(self):
with self.lock:
self.buffer.close()

class _PollingThread( Thread ): class _PollingThread( Thread ):
""" """
This polling thread listens on selected pipes, and automatically reads This polling thread listens on selected pipes, and automatically reads
Expand All @@ -998,13 +940,14 @@ def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}): args=(), kwargs={}):
self.inputqueue = Queue() self.inputqueue = Queue()
self.idletime = time() self.idletime = time()
super(DequeBuffer._PollingThread, self).__init__(group, super(_PollingThread, self).__init__(group,
target, name, args, kwargs) target, name, args, kwargs)
def add_pipe(self, buff, pipe, mode): def add_pipe(self, buff, pipe, mode):
self.inputqueue.put((buff, pipe, mode)) self.inputqueue.put((buff, pipe, mode))
def run(self): def run(self):
poller = poll() poller = poll()
fds = {} fds = {}
events = []
while True: while True:
while not self.inputqueue.empty(): while not self.inputqueue.empty():
# loop though the queue and add new pipes to the # loop though the queue and add new pipes to the
Expand Down Expand Up @@ -1055,6 +998,141 @@ def run(self):
sleep(0.1) sleep(0.1)
else: else:
self.idletime = time() self.idletime = time()
except ImportError:
from select import kqueue, kevent, KQ_FILTER_READ, KQ_FILTER_WRITE, \
KQ_EV_ADD, KQ_EV_DELETE, KQ_EV_EOF
class _PollingThread( Thread ):
"""
This polling thread listens on selected pipes, and automatically reads
and writes data between the buffer and those pipes. This will self
terminate when there are no more pipes defined, and will need to be
restarted.
"""
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}):
self.inputqueue = Queue()
self.idletime = time()
super(_PollingThread, self).__init__(group,
target, name, args, kwargs)
def add_pipe(self, buff, pipe, mode):
self.inputqueue.put((buff, pipe, mode))
def run(self):
poller = kqueue()
fds = {}
events = []
while True:
while not self.inputqueue.empty():
# loop through the queue and gather new pipes to add the
# kernel queue
buff, pipe, mode = self.inputqueue.get()
if 'r' in mode:
events.append(kevent(pipe, KQ_FILTER_READ, KQ_EV_ADD))
elif 'w' in mode:
events.append(kevent(pipe, KQ_FILTER_WRITE, KQ_EV_ADD))
else:
continue
fds[pipe.fileno()] = (weakref.ref(buff), pipe)

if len(events) == 0:
events = None
events = poller.control(events, 16, 0.1)

for i in range(len(events)):
# loop through response and handle events
event = events.pop()
buff, pipe = fds[event.ident]

if buff() is None:
# buffer object has closed out from underneath us
# pipe will be automatically removed from kqueue
pipe.close()
del fds[event.ident]
continue

if (abs(event.filter) & abs(KQ_FILTER_READ)) and event.data:
# new data has come in, push into the buffer
buff().write(pipe.read(event.data))

if (abs(event.filter) & abs(KQ_FILTER_WRITE)) and event.data:
# space is available to write data
pipe.write(buff().read(\
min(buff()._nbytes, event.data, 2**16)))

if abs(event.flags) & abs(KQ_EV_EOF):
# pipe has been closed and all IO has been processed
# pipe will be automatically removed from kqueue
buff().close()
pipe.close()
del fds[event.ident]

if len(fds) == 0:
# no pipes referenced
if self.idletime + 20 < time():
# idle timeout reached, terminate
break
sleep(0.1)
else:
self.idletime = time()

class DequeBuffer( object ):
"""
This is a chunked buffer, storing a sequence of buffer objects in a
deque, allowing for FIFO operations outside the limited 64K system
buffer, and the efficient freeing of memory without needing to rewrite
a large contiguous buffer object.
"""
class _Buffer( object ):
"""
This subclass contains a buffer object and a read/write lock, as
well as independent read and write positions.
"""
__slots__ = ['buffer', 'lock', 'blocksize', 'EOF', 'rpos', 'wpos']
def __init__(self, size=2**18):
self.buffer = StringIO()
self.lock = Lock()
self.blocksize = size
self.EOF = False
self.rpos = 0
self.wpos = 0

def __del__(self):
self.buffer.close()

def read(self, nbytes):
with self.lock:
self.buffer.seek(self.rpos)
buff = self.buffer.read(nbytes)
self.rpos = self.buffer.tell()
if self.rpos == self.blocksize:
self.EOF = True
return buff

def write(self, data):
with self.lock:
nbytes = self.blocksize-self.wpos
if nbytes < len(data):
data = data[:nbytes]
else:
nbytes = len(data)
self.buffer.seek(self.wpos)
self.buffer.write(data)
self.wpos += nbytes
return nbytes

def rollback(self, nbytes):
with self.lock:
self.EOF = False
if self.rpos < nbytes:
nbytes -= self.rpos
self.rpos = 0
return nbytes
else:
self.rpos -= nbytes
return 0

def close(self):
with self.lock:
self.buffer.close()


_pollingthread = None _pollingthread = None


Expand Down Expand Up @@ -1102,7 +1180,7 @@ def read(self, nbytes=None):
else: else:
# end of data or request reached, return # end of data or request reached, return
break break
self._nbytes += data.tell() self._nbytes -= data.tell()
return data.getvalue() return data.getvalue()


def write(self, data): def write(self, data):
Expand Down Expand Up @@ -1187,7 +1265,7 @@ def _add_pipe(cls, pipe, buffer, mode=None):


if (cls._pollingthread is None) or not cls._pollingthread.isAlive(): if (cls._pollingthread is None) or not cls._pollingthread.isAlive():
# create new thread, and set it to not block shutdown # create new thread, and set it to not block shutdown
cls._pollingthread = cls._PollingThread() cls._pollingthread = _PollingThread()
cls._pollingthread.daemon = True cls._pollingthread.daemon = True
cls._pollingthread.start() cls._pollingthread.start()
cls._pollingthread.add_pipe(buffer, pipe, mode) cls._pollingthread.add_pipe(buffer, pipe, mode)
Expand Down

0 comments on commit fff02e6

Please sign in to comment.