Skip to content

Commit

Permalink
Add a 'taskOwner' who must recognise a task to keep it alive
Browse files Browse the repository at this point in the history
The creator of a task is responsible for destroying it. To highlight
bugs where a task creator fails in their duty, we add the concept of
a task 'owner'. The owner must acknowledge their interest in active
tasks every 30s or so, otherwise the tasks are 'destroy' (i.e.
set to automatically disappear on completion). Note that 'destroy'
only removes the object *when the activity has finished*. It does
not auto-cancel.

Signed-off-by: David Scott <dave.scott@eu.citrix.com>
  • Loading branch information
David Scott committed Feb 11, 2014
1 parent 6cfe9f6 commit 20cd4b2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 13 deletions.
80 changes: 67 additions & 13 deletions dbus/vm/python/dbus-resource-script.py
Expand Up @@ -10,8 +10,10 @@

import syslog
import threading
import urlparse
import time
import getopt
import getopt
import os
import sys
import subprocess
import gtk
Expand Down Expand Up @@ -68,6 +70,7 @@ def error(message):
next_task_id = 0

TASK_INTERFACE="org.xenserver.api.task"
TASKOWNER_INTERFACE="org.xenserver.api.taskOwner"

class Canceller(threading.Thread):
"""A thread which attempts to cleanly shutdown a process, resorting
Expand All @@ -88,7 +91,7 @@ def run(self):
continue
sys.stdout.flush()
time.sleep(1)
if self.process.poll() <> None:
if self.process.poll() == None:
info("%s: cancelling: sending SIGKILL to %d" % (self.task.path, self.process.pid))
try:
self.process.kill()
Expand All @@ -100,8 +103,55 @@ def run(self):
pass
info("%s: cancelling: process has terminated" % self.task.path)

class TaskMonitor(threading.Thread):
def __init__(self, interval = 30):
threading.Thread.__init__(self)
self.owners = {}
self.interval = interval
self.daemon = True
self.start()
def add(self, task, owner_uri):
if owner_uri in self.owners:
self.owners[owner_uri].append(task)
else:
self.owners[owner_uri] = [ task ]
def remove(self, task, owner_uri):
if owner_uri in self.owners:
self.owners[owner_uri] = filter(lambda x:x <> task, self.owners[owner_uri])
if self.owners[owner_uri] == []:
del self.owners[owner_uri]
def run(self):
# Every <interval>, check if the owner still wants this task. If the
# owner doesn't recognise the task URI (eg because it crashed), then
# call destroy() to prevent a leak
while True:
time.sleep(self.interval)
bus = dbus.SessionBus()
for owner_uri in self.owners.keys():
try:
uri = urlparse.urlparse(owner_uri)
proxy = bus.get_object(uri.scheme, uri.path)
tasks = self.owners[owner_uri]
alive = proxy.ping(map(lambda x:x.uri, tasks), dbus_interface=TASKOWNER_INTERFACE)
for i in range(0, len(alive)):
if not(alive[i]):
task = tasks[i]
error("%s: owner %s has failed and orphaned this task. Deleting task now" % (task.uri, owner_uri))
self.remove(task, owner_uri)
task.destroy()
except Exception, e:
# If the owner has gone, then we delete the tasks. It's
# the owner's job to stay alive.
tasks = self.owners[owner_uri]
for task in tasks:
error("%s: owner %s could not be contacted (%s). Deleting task now." % (task.uri, owner_uri, str(e)))
self.remove(task, owner_uri)
task.destroy()

taskMonitor = TaskMonitor()

class Task(dbus.service.Object, threading.Thread):
def __init__(self, cmd):
def __init__(self, cmd, owner_uri):
threading.Thread.__init__(self)

self.bus = dbus.SessionBus()
Expand All @@ -111,14 +161,18 @@ def __init__(self, cmd):
self.task_id = next_task_id
next_task_id = next_task_id + 1
self.path = "/org/xenserver/task/" + str(self.task_id)
self.uri = name + "://" + self.path
self.completed = False
self.canceller = None
self.result = None
self.returncode = None
self.auto_destroy = False
self.owner_uri = owner_uri

taskMonitor.add(self, owner_uri)

try:
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
except Exception, e:
error("%s: %s" % (" ".join(cmd), str(e)))
raise dbus.exceptions.DBusException(
Expand Down Expand Up @@ -162,12 +216,12 @@ def cancel(self):
@dbus.service.method(dbus_interface=TASK_INTERFACE)
def destroy(self):
self.auto_destroy = True
taskMonitor.remove(self, self.owner_uri)

if self.completed:
info("%s: destroy: task complete, removing object" % self.path)
self.remove_from_connection()
return
info("%s: destroy: task is running, requesting cancel with auto-destroy" % self.path)
self.cancel()

@dbus.service.method(dbus_interface=TASK_INTERFACE)
def getResult(self):
Expand Down Expand Up @@ -196,7 +250,7 @@ def GetAll(self, interface_name):
else:
raise dbus.exceptions.DBusException(
'com.example.UnknownInterface',
'The Foo object does not implement the %s interface'
'The Task object does not implement the %s interface'
% interface_name)

RESOURCE_INTERFACE="org.xenserver.api.resource"
Expand All @@ -207,12 +261,12 @@ def __init__(self):
bus_name = dbus.service.BusName(name, bus=self.bus)
dbus.service.Object.__init__(self, bus_name, "/" + name.replace(".", "/"))

@dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="s", out_signature="o")
def attach(self, global_uri):
return Task([script, "attach", global_uri]).path
@dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="s", out_signature="")
def detach(self, id):
return Task([script, "detach", id]).path
@dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="ss", out_signature="s")
def attach(self, global_uri, owner_uri):
return Task([script, "attach", global_uri], owner_uri).uri
@dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="ss", out_signature="s")
def detach(self, id, owner_uri):
return Task([script, "detach", id], owner_uri).uri

gobject.threads_init()
DBusGMainLoop(set_as_default=True)
Expand Down
2 changes: 2 additions & 0 deletions dbus/vm/resource.xml
Expand Up @@ -2,10 +2,12 @@
<interface name="org.xenserver.api.resource">
<method name="attach">
<arg name="global_uri" type="s" direction="in"/>
<arg name="owner_uri" type="s" direction="in"/>
<arg name="task" type="o" direction="out"/>
</method>
<method name="detach">
<arg name="id" type="s" direction="in"/>
<arg name="owner_uri" type="s" direction="in"/>
<arg name="task" type="o" direction="out"/>
</method>
</interface>
Expand Down
11 changes: 11 additions & 0 deletions dbus/vm/taskOwner.xml
@@ -0,0 +1,11 @@
<!DOCTYPE node PUBLIC
"-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<node name="/" xmlns:doc="http://www.freedesktop.org/dbus/1.0/doc.dtd">
<interface name="org.xenserver.api.taskOwner">
<method name="ping">
<arg name="tasks" type="as" direction="in"/>
<arg name="alive" type="ab" direction="out"/>
</method>
</interface>
</node>

0 comments on commit 20cd4b2

Please sign in to comment.