Browse files

Added optional port arguments to server and client classes.\nAddedNuk…

…eCommandManager class for transparently managing a client-server pair within a with statement.\nAdded NukeManagedServer subclass for use with the NukeCommandManager.
  • Loading branch information...
1 parent b5b6a8a commit d69544998bbcd58068d232b4d70be4a8e3f46d78 Nathan Rusch committed Apr 15, 2011
Showing with 287 additions and 33 deletions.
  1. +198 −8 nukeCommandClient.py
  2. +89 −25 nukeCommandServer.py
View
206 nukeCommandClient.py
@@ -1,25 +1,64 @@
+'''
+This script defines the client-side classes for the Nuke command server interface.
+
+It also functions as an executable for the purposes of launching NukeCommandManager
+instances.
+'''
+
+import inspect
import pickle
import socket
+import subprocess
+import sys
+import threading
+import time
+from os import devnull
basicTypes = [int, float, complex, str, unicode, buffer, xrange, bool, type(None)]
listTypes = [list, tuple, set, frozenset]
dictTypes = [dict]
+# This constant should be set to whatever absolute or
+# relative call your system uses to launch Nuke (excluding
+# any flags or arguments).
+NUKE_EXEC = 'Nuke'
+
+
class NukeConnectionError(StandardError):
pass
+class NukeManagerError(NukeConnectionError):
+ pass
+
+class NukeServerError(NukeConnectionError):
+ pass
+
class NukeConnection:
- def __init__(self, host = "localhost", instance = 0):
+ '''
+ If 'port' is specified, the client will attempt to connect
+ to a command server on that port, raising an exception
+ if one is not found.
+
+ Otherwise, the standard port search routine runs.
+ '''
+ def __init__(self, port=None, host="localhost", instance=0):
self._objects = {}
self._functions = {}
self._host = host
- start_port = 54200 + instance
- end_port = 54300
- self._port = self.find_connection_port(start_port, end_port)
-
- if self._port == -1:
- raise NukeConnectionError("Connection with Nuke failed")
-
+ self.is_active = False
+ if not port:
+ start_port = 54200 + instance
+ end_port = 54300
+ self._port = self.find_connection_port(start_port, end_port)
+ if self._port == -1:
+ raise NukeConnectionError("Connection with Nuke failed")
+ self.is_active = True
+ else:
+ self._port = port
+ if not self.test_connection():
+ raise NukeConnectionError("Could not connect to Nuke command server on port %d" % self._port)
+ self.is_active = True
+
def find_connection_port(self, start_port, end_port):
for port in range(start_port, end_port + 1):
self._port = port
@@ -61,6 +100,20 @@ def get(self, item_type, item_id = -1, parameters = None):
return result
+ def shutdown_server(self):
+ '''
+ Passes the 'shutdown' keyword to the server.
+ This will raise a special exception in the
+ server's listener loop, causing it to pass
+ back a shutdown message, close the client,
+ and exit cleanly.
+
+ Returns whatever shutdown message the server
+ sends back as a string.
+ '''
+ self.is_active = False
+ return self.get('shutdown')
+
def get_object_attribute(self, obj_id, property_name):
return self.decode(self.get("getattr", obj_id, property_name))
@@ -163,3 +216,140 @@ def __str__(self):
def __repr__(self):
return self._connection.get_object_repr(self._id)
+
+
+class NukeCommandManager():
+ '''
+ This class internally manages a Nuke command client-server pair.
+ It is designed to be instantiated as the 'as' assignment in a
+ 'with' statement.
+
+ Example usage:
+
+ with NukeCommandManager() as conn:
+ nuke = conn.nuke
+ b = nuke.createNode('Blur')
+ print b.writeKnobs()
+
+ When it starts up, it establishes a manager socket on an
+ available OS-assigned port.
+
+ At this point, it creates a bound thread that will call a Nuke
+ instance in terminal mode and establish a managed command server
+ within it.
+
+ When the manager's __enter__ method is called, the server thread
+ is started. The manager then waits for the managed server to call
+ back with its status and bound port.
+
+ A NukeConnection instance is then started using the port number
+ returned by the managed server's callback. This instance is
+ attached to the manager and returned to the 'with' statement.
+
+ The body of the 'with' block is now executed,
+ with the client instance available via the 'as' assignment.
+
+ When the 'with' statement is complete, the client instance sends
+ its companion server the 'shutdown' signal. This will cause the
+ server to send back its shutdown message, close the connection to
+ the client, and exit cleanly.
+
+ The __exit__ method then waits for the server thread to exit by
+ calling its '.join()' method.
+ '''
+ def __init__(self):
+ self.manager_port = -1
+ self.manager_socket = None
+ self.server_port = -1
+ self.client = None
+
+ bound_port = False
+
+ manager = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ manager.bind(('', 0))
+ bound_port = True
+ self.manager_port = manager.getsockname()[1]
+ self.manager_socket = manager
+
+ if (not bound_port) or (self.manager_port == -1):
+ raise NukeManagerError("MANAGER: Cannot find port to bind to")
+
+ # Make sure the port number has a trailing space... this is a bug in Nuke's
+ # Python argument parsing (logged with The Foundry as Bug 17918)
+ threadArgs = ([NUKE_EXEC, '-t', '-m', '1', '--', inspect.getabsfile(self.__class__), '%d ' % self.manager_port],)
+ self.serverThread = threading.Thread(None, subprocess.call, args=threadArgs, kwargs={'stdout':open(devnull)})
+
+ def __enter__(self):
+ if not self.manager_socket:
+ raise NukeManagerError("Manager failed to initialize socket.")
+ backlog = 5
+ bufsize = 4096
+ self.manager_socket.listen(backlog)
+
+ # Start the server thread and wait for it to call back to the
+ # manager with its success status and bound port
+ self.serverThread.start()
+ startTime = time.time()
+ timeout = startTime + 10 # Timeout after 10 seconds of waiting for server
+ while True:
+ server, address = self.manager_socket.accept()
+ data = server.recv(bufsize)
+ if data:
+ serverData = pickle.loads(data)
+ server.close()
+ if not serverData[0]:
+ raise NukeServerError("Server could not find port to bind to.")
+ self.server_port = serverData[1]
+ break
+ if time.time() >= timeout:
+ self.shutdown_server()
+ raise NukeManagerError("Manager timed out waiting for server connection.")
+ self.manager_socket.close()
+ try:
+ self.client = NukeConnection(self.server_port)
+ except:
+ self.shutdown_server()
+ raise
+ return self.client
+
+ def __exit__(self, type, value, traceback):
+ self.client.shutdown_server()
+ self.serverThread.join()
+
+ def shutdown_server(self):
+ '''
+ Used to shut down a managed server if its
+ client could not be initialized.
+ Returns the server's shutdown message.
+ '''
+ bufsize = 1024 * 1024
+ packet = {'action':'shutdown', 'id':-1, 'parameters':None}
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ s.connect(('', self.server_port))
+ s.send(pickle.dumps(packet))
+ result = s.recv(bufsize)
+ s.close()
+ return pickle.loads(result)
+ except socket.error:
+ # Failed to connect to server port (server is dead?)
+ raise NukeServerError("Server failed to initialize.")
+
+
+def start_managed_nuke_server(manager_port=None):
+ '''
+ Convenience function for launching a managed Nuke command
+ server instance that will communicate with a NukeCommandManager
+ on the specified port. Must be called from within Nuke.
+ '''
+ import nukeCommandServer
+ nukeCommandServer.NukeManagedServer(manager_port=manager_port)
+
+
+if __name__ == '__main__':
+ manager_port = None
+
+ if len(sys.argv) > 1:
+ manager_port = int(sys.argv[1].strip())
+
+ start_managed_nuke_server(manager_port)
View
114 nukeCommandServer.py
@@ -15,41 +15,57 @@ def nuke_command_server():
t = threading.Thread(None, NukeInternal)
t.setDaemon(True)
t.start()
-
+
class NukeInternal:
- def __init__(self):
+ def __init__(self, port=None):
self._objects = {}
self._next_object_id = 0
+ self.port = port
+ self.bound_port = False
+ self.buffer_size = 1024 * 1024
host = ''
- start_port = 54200
- end_port = 54300
backlog = 5
- size = 1024 * 1024
+ if not self.port:
+ start_port = 54200
+ end_port = 54300
+ else:
+ start_port = end_port = self.port
- bound_port = False
- for port in range(start_port, end_port + 1):
+ for port in xrange(start_port, end_port + 1):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- print "Trying port %d" % port
+ print "SERVER: Checking port %d" % port
s.bind((host, port))
- bound_port = True
+ self.bound_port = True
+ self.port = port
break
except Exception, e:
pass
- if not bound_port:
+ if not self.bound_port:
raise NukeConnectionError("Cannot find port to bind to")
s.listen(backlog)
-
+ self.start_server(s)
+
+ def start_server(self, sock):
+ '''
+ Starts the main server loop
+ '''
while 1:
- client, address = s.accept()
- data = client.recv(size)
- if data:
- result = self.receive(data)
+ client, address = sock.accept()
+ try:
+ data = client.recv(self.buffer_size)
+ if data:
+ result = self.receive(data)
+ client.send(result)
+ except SystemExit:
+ result = self.encode('SERVER: Shutting down...')
client.send(result)
- client.close()
+ raise
+ finally:
+ client.close()
def recode_data(self, data, recode_object_func):
if type(data) in basicTypes or isinstance(data, Exception):
@@ -95,28 +111,32 @@ def get(self, data_string):
obj = self.get_object(data['id'])
params = data['parameters']
result = None
+ action = data['action']
try:
- if data['action'] == "getattr":
+ if action == "getattr":
result = getattr(obj, params)
- elif data['action'] == "setattr":
+ elif action == "setattr":
setattr(obj, params[0], params[1])
- elif data['action'] == "getitem":
+ elif action == "getitem":
# If we're actually getting from globals(), then raise NameError instead of KeyError
if data['id'] == -1 and params not in obj:
raise NameError("name '%s' is not defined" % params)
result = obj[params]
- elif data['action'] == "setitem":
+ elif action == "setitem":
obj[params[0]] = params[1]
- elif data['action'] == "call":
+ elif action == "call":
result = nuke.executeInMainThreadWithResult(obj, args=params['args'], kwargs=params['kwargs'])
- elif data['action'] == "len":
+ elif action == "len":
result = len(obj)
- elif data['action'] == "str":
+ elif action == "str":
result = str(obj)
- elif data['action'] == "repr":
+ elif action == "repr":
result = `obj`
- elif data['action'] == "import":
+ elif action == "import":
result = imp.load_module(params, *imp.find_module(params))
+ elif action == "shutdown":
+ # This keyword triggers the server shutdown
+ raise SystemExit
except Exception, e:
result = e
@@ -132,3 +152,47 @@ def get_object(self, id):
return globals()
else:
return self._objects[id]
+
+
+class NukeManagedServer(NukeInternal):
+ '''
+ Subclass of the Nuke Command Server designed to be managed
+ by a NukeCommandManager. It adds constructor arguments for
+ a manager port and a manager hostname.
+
+ Once it has initialized, and immediately before the main
+ server loop is started, it sends a status "packet" to the
+ manager on 'manager_port,' which informs the manager whether
+ the server has successfully bound itself to a port, and
+ which port it is using.
+ '''
+ def __init__(self, port=None, manager_port=None, manager_host='localhost'):
+ self.manager_port = manager_port
+ self.manager_host = manager_host
+ NukeInternal.__init__(self, port)
+
+ def start_server(self, socket):
+ '''
+ Fires the manager callback, then starts
+ the main server loop.
+ '''
+ self.manager_callback(self.bound_port)
+ NukeInternal.start_server(self, socket)
+
+ def manager_callback(self, status):
+ '''
+ Tell the manager what port the server ended up
+ binding to so the client can connect to the
+ correct server instance.
+
+ 'status' is a boolean indicating whether the server
+ succeeded in binding to a port.
+ '''
+ if not self.manager_port:
+ return
+ manager = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ manager.connect((self.manager_host, self.manager_port))
+ manager.send(self.encode((status, self.port)))
+ manager.close()
+ if not status:
+ raise NukeConnectionError("Cannot find port to bind to")

0 comments on commit d695449

Please sign in to comment.