Skip to content

Commit

Permalink
Add IPv6 support to the Python Bindings.
Browse files Browse the repository at this point in the history
Break command and event messages in BECache into two independent
connections, of type BEConnection and BEEventConnection, respectively.
This should make connections considerably more robust.

Disable the connection escalation for now.  This means a connection
that starts as Monitor or Playback will remain that way, even if other
connections link in through the cache, rather than the previous behavior
where it would automatically reconnect.  Ideally this will be replaced
by a protocol command that will produce the same behavior without the
reconnection.
  • Loading branch information
wagnerrp committed May 25, 2011
1 parent e8e1775 commit cd23715
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 140 deletions.
204 changes: 110 additions & 94 deletions mythtv/bindings/python/MythTV/connections.py
Expand Up @@ -176,72 +176,33 @@ def __exit__(self, type, value, traceback):
class BEConnection( object ):
"""
This is the basic backend connection object.
You probably dont want to use this directly.
You probably don't want to use this directly.
"""
logmodule = 'Python Backend Connection'

class BEConnOpts( OrdDict ):
def __init__(self, noshutdown=False, systemevents=False,
generalevents=False):
OrdDict.__init__(self, (('noshutdown',noshutdown),
('systemevents',systemevents),
('generalevents',generalevents)))
def __and__(self, other):
res = self.__class__()
for key in self._field_order:
if self[key] & other[key]:
res[key] = True
return res
def __or__(self, other):
res = self.__class__()
for key in self._field_order:
if self[key] | other[key]:
res[key] = True
return res
def __xor__(self, other):
res = self.__class__()
for key in self._field_order:
if self[key] ^ other[key]:
res[key] = True
return res

def __init__(self, backend, port, localname=None, \
opts=None, deadline=10.0):
"""
BEConnection(backend, type, db=None) -> backend socket connection
'backend' can be either a hostname or IP address, or will default
to the master backend if None.
"""
self._regusers = weakref.WeakValueDictionary()
self._regevents = weakref.WeakValueDictionary()
self._socklock = allocate_lock()

def __init__(self, backend, port, localname=None,
blockshutdown=False, timeout=10.0):
self.connected = False
self.threadrunning = False
self.log = MythLog(self.logmodule)
self._socklock = allocate_lock()

self.host = backend
self.port = port
self.hostname = None
self.deadline = deadline
self.deadline = timeout
self.blockshutdown = blockshutdown

self.localname = localname
if self.localname is None:
self.localname = socket.gethostname()

self.opts = opts
if self.opts is None:
self.opts = self.BEConnOpts()
self.eventqueue = Queue.Queue()

try:
self.connect()
except socket.error, e:
self.log.logTB(MythLog.SOCKET|MythLog.EXTRA)
self.connected = False
self.log(MythLog.IMPORTANT|MythLog.SOCKET,
"Couldn't connect to backend %s:%d" \
"Couldn't connect to backend [%s]:%d" \
% (self.host, self.port))
raise MythBEError(MythError.PROTO_CONNECTION, self.host, self.port)
except:
Expand All @@ -254,58 +215,46 @@ def connect(self):
if self.connected:
return
self.log(MythLog.SOCKET|MythLog.NETWORK,
"Connecting to backend %s:%d" % (self.host, self.port))
self.socket = deadlinesocket(socket.AF_INET, socket.SOCK_STREAM)
"Connecting to backend [%s]:%d" % (self.host, self.port))
if ':' in self.host:
self.socket = deadlinesocket(socket.AF_INET6, socket.SOCK_STREAM)
else:
self.socket = deadlinesocket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.log = self.log
self.socket.connect((self.host, self.port))
self.socket.setdeadline(self.deadline)
self.connected = True
self.check_version()
self.announce()
if len(self._regevents) and (not self.threadrunning):
start_new_thread(self.eventloop, ())

def disconnect(self, hard=False):
if not self.connected:
return
self.log(MythLog.SOCKET|MythLog.NETWORK,
"Terminating connection to %s:%d" % (self.host, self.port))
"Terminating connection to [%s]:%d" % (self.host, self.port))
if not hard:
self.backendCommand('DONE',0)
self.connected = False
self.socket.shutdown(1)
self.socket.close()

def reconnect(self, force=False, hard=False):
# compute new connection options
opts = self.BEConnOpts()
for o in self._regusers.values():
opts |= o
if (True in (opts^self.opts).values()) or force:
# options have changed, reconnect
self.opts = opts
self.disconnect(hard)
self.connect()
def reconnect(self, hard=False):
self.disconnect(hard)
self.connect()

def announce(self):
# set type, 'Playback' blocks backend shutdown
type = ('Monitor','Playback')[self.opts.noshutdown]

# set event level, 3=system only, 2=generic only, 1=both, 0=none
a = self.opts.generalevents
b = self.opts.systemevents
eventlevel = (a|b) + (a^b)*(a + 2*b)
type = ('Monitor','Playback')[self.blockshutdown]

res = self.backendCommand('ANN %s %s %d' % \
(type, self.localname, eventlevel))
res = self.backendCommand('ANN %s %s 0' % (type, self.localname))
if res != 'OK':
self.log(MythLog.IMPORTANT|MythLog.NETWORK,
"Unexpected answer to ANN", res)
raise MythBEError(MythError.PROTO_ANNOUNCE,
self.host, self.port, res)
else:
self.log(MythLog.SOCKET,"Successfully connected to backend",
"%s:%d" % (self.host, self.port))
"[%s]:%d" % (self.host, self.port))
self.hostname = self.backendCommand('QUERY_HOSTNAME')

def check_version(self):
Expand All @@ -318,7 +267,7 @@ def check_version(self):
raise MythBEError(MythError.PROTO_MISMATCH,
int(res[1]), PROTO_VERSION)

def backendCommand(self, data=None, deadline=None):
def backendCommand(self, data, deadline=None):
"""
obj.backendCommand(data=None, timeout=None) -> response string
Expand All @@ -338,60 +287,122 @@ def backendCommand(self, data=None, deadline=None):
if deadline < 1000:
deadline += time()

if data is not None:
# flush socket
self.backendCommand(deadline=0.)

try:
# lock socket access
with self._socklock:
# send command string
if data is not None:
self.socket.sendheader(data.encode('utf-8'))
# loop waiting for proper response
self.socket.sendheader(data.encode('utf-8'))
# wait timeout for data to be received on the socket
t = time()
timeout = (deadline-t) if (deadline-t>0) else 0.0
if len(select([self.socket],[],[], timeout)[0]) == 0:
# no data, return
return u''
res = self.socket.recvheader(deadline=deadline)

# convert to unicode
try:
res = unicode(''.join([res]), 'utf8')
except:
res = u''.join([res])

return res
except MythError, e:
if e.sockcode == 54:
# remote has closed connection, attempt reconnect
self.reconnect(True, True)
return self.backendCommand(data, deadline)
else:
raise

class BEEventConnection( BEConnection ):
"""
This is the basic event listener object.
You probably don't want to use this directly.
"""
logmodule = 'Python Event Connection'

def __init__(self, backend, port, localname=None, deadline=10.0, level=2):
self._regevents = weakref.WeakValueDictionary()
self._announced = False
self._eventlevel = level

self.hostname = ""
self.threadrunning = False
self.eventqueue = Queue.Queue()

super(BEEventConnection, self).__init__(backend, port, localname,
False, deadline)

def connect(self):
if self.connected:
return
super(BEEventConnection, self).connect()
if len(self._regevents) and (not self.threadrunning):
start_new_thread(self.eventloop, ())

def announce(self):
# set event level, 3=system only, 2=generic only, 1=both, 0=none
res = self.backendCommand('ANN Monitor %s %d' % \
(self.localname, self._eventlevel))
### NOTE: backendCommand cannot be reliably used beyond this point
self._announced = True
if res != 'OK':
self.log(MythLog.IMPORTANT|MythLog.NETWORK,
"Unexpected answer to ANN", res)
raise MythBEError(MythError.PROTO_ANNOUNCE,
self.host, self.port, res)
else:
self.log(MythLog.SOCKET,"Successfully connected to backend",
"%s:%d" % (self.host, self.port))

def queueEvents(self):
"""
Pulls any unsolicited messages from the receive buffer
"""

# return if not connected
if not self.connected:
return

try:
with self._socklock:
while True:
# wait timeout for data to be received on the socket
t = time()
timeout = (deadline-t) if (deadline-t>0) else 0.0
if len(select([self.socket],[],[], timeout)[0]) == 0:
# no data, return
return u''
event = self.socket.recvheader(deadline=deadline)

# convert to unicode
# continue looping as long as data exists on the line
if len(select([self.socket],[],[], 0.0)[0]) == 0:
return
event = self.socket.recvheader(deadline=0.0)

try:
event = unicode(''.join([event]), 'utf8')
except:
event = u''.join([event])

if event[:15] == 'BACKEND_MESSAGE':
self.eventqueue.put(event)
else:
return event
# else discard

if timeout == 0:
return u''
except MythError, e:
if e.sockcode == 54:
# remote has closed connection, attempt reconnect
self.reconnect(True, True)
return self.backendCommand(data, deadline)
else:
raise

def registeruser(self, uuid, opts):
self._regusers[uuid] = opts

def registerevent(self, regex, function):
self._regevents[regex] = function
if (not self.threadrunning) and \
(self.opts.generalevents or self.opts.systemevents):
(self._eventlevel):
start_new_thread(self.eventloop, ())

def eventloop(self):
self.threadrunning = True
while (len(self._regevents) > 0) and self.connected:
self.backendCommand(deadline=0.0)
self.queueEvents()
while True:
try:
event = self.eventqueue.get_nowait()
Expand All @@ -411,6 +422,11 @@ def eventloop(self):
sleep(0.1)
self.threadrunning = False

def backendCommand(self, data, deadline=None):
if self._announced:
return ""
return super(BEEventConnection, self).backendCommand(data, deadline)

class FEConnection( object ):
"""
This is the basic frontend connection object.
Expand Down
29 changes: 19 additions & 10 deletions mythtv/bindings/python/MythTV/methodheap.py
Expand Up @@ -7,11 +7,11 @@
from static import *
from exceptions import *
from logging import MythLog
from connections import FEConnection, XMLConnection
from connections import FEConnection, XMLConnection, BEEventConnection
from utility import databaseSearch, datetime
from database import DBCache, DBData
from system import SystemEvent
from mythproto import BEEvent, FileOps, Program, FreeSpace
from mythproto import BECache, FileOps, Program, FreeSpace
from dataheap import *

from datetime import timedelta
Expand Down Expand Up @@ -378,16 +378,25 @@ def clearSettings(self):
self.backendCommand('MESSAGE%sCLEAR_SETTINGS_CACHE' % BACKEND_SEP)
self.db.settings.clear()

class BEEventMonitor( BEEvent ):
class BEEventMonitor( BECache ):
def __init__(self, backend=None, blockshutdown=False,
systemevents=False, db=None):
self.systemevents = systemevents
super(BEEventMonitor, self).__init__(backend, blockshutdown, True, db)

def _listhandlers(self):
return [self.eventMonitor]

def _neweventconn(self):
return BEEventConnection(self.host, self.port, self.db.gethostname(),
level = (2,1)[self.systemevents])

def eventMonitor(self, event=None):
if event is None:
return re.compile('BACKEND_MESSAGE')
self.log(MythLog.ALL-MythLog.EXTRA, event)

class MythSystemEvent( BEEvent ):
class MythSystemEvent( BECache ):
class systemeventhandler( object ):
# decorator class for system events
bs = BACKEND_SEP.replace('[','\[').replace(']','\]')
Expand Down Expand Up @@ -452,17 +461,17 @@ def __call__(self, event=None):
def _listhandlers(self):
return []

def __init__(self, backend=None, noshutdown=False, generalevents=False, \
db=None, opts=None, enablehandler=True):
if opts is None:
opts = BEConnection.BEConnOpts(noshutdown,\
True, generalevents)
BEEvent.__init__(self, backend, db=db, opts=opts)
def __init__(self, backend=None, blockshutdown=False, db=None,
enablehandler=True):
super(MythSystemEvent, self).__init__(backend, blockshutdown, True, db)

if enablehandler:
self._events.append(self._generic_handler)
self.registerevent(self._generic_handler)

def _neweventconn(self):
return BEEventConnection(self.host, self.port, self.db.gethostname(), 3)

@systemeventhandler
def _generic_handler(self, event):
SystemEvent(event['event'], inst.db).command(event)
Expand Down

0 comments on commit cd23715

Please sign in to comment.