diff --git a/src/tests/unit/net/subprocess_wrapper_test.py b/src/tests/unit/net/subprocess_wrapper_test.py new file mode 100755 index 0000000000..96351f5f9e --- /dev/null +++ b/src/tests/unit/net/subprocess_wrapper_test.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# This file is part of Xpra. +# Copyright (C) 2011-2013 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +import unittest + +import gobject +gobject.threads_init() + +from xpra.log import enable_debug_for +enable_debug_for("all") + +from xpra.gtk_common.gobject_util import one_arg_signal +from xpra.net.protocol import Protocol +from xpra.net.subprocess_wrapper import subprocess_caller, subprocess_callee +from xpra.net.bytestreams import Connection +from xpra.os_util import Queue + + +class fake_subprocess(): + """ defined just so the protocol layer can call terminate() on it """ + def terminate(self): + pass + +class loopback_connection(Connection): + """ a fake connection which just writes back whatever is sent to it """ + def __init__(self, *args): + Connection.__init__(self, *args) + self.queue = Queue() + + def read(self, n): + self.may_abort("read") + #FIXME: we don't handle n... + return self.queue.get(True) + + def write(self, buf): + self.may_abort("write") + self.queue.put(buf) + return len(buf) + + def may_abort(self, action): + return False + +def loopback_protocol(process_packet_cb, get_packet_cb): + conn = loopback_connection("fake", "fake") + protocol = Protocol(gobject, conn, process_packet_cb, get_packet_cb=get_packet_cb) + protocol.enable_encoder("rencode") + protocol.enable_compressor("none") + return protocol + + +class loopback_process(subprocess_caller): + """ a fake subprocess which uses the loopback connection """ + def exec_subprocess(self): + return fake_subprocess() + def make_protocol(self): + return loopback_protocol(self.process_packet, self.get_packet) + + +class loopback_callee(subprocess_callee): + + def make_protocol(self): + return loopback_protocol(self.process_packet, self.get_packet) + + +class TestCallee(gobject.GObject): + __gsignals__ = { + "test-signal": one_arg_signal, + } + +gobject.type_register(TestCallee) + + +class SubprocessWrapperTest(unittest.TestCase): + + def test_loopback_caller(self): + mainloop = gobject.MainLoop() + lp = loopback_process() + readback = [] + def record_packet(self, *args): + readback.append(args) + lp.connect("foo", record_packet) + def end(*args): + mainloop.quit() + lp.connect("end", end) + self.timeout = False + def timeout_error(): + self.timeout = True + mainloop.quit() + gobject.timeout_add(500, timeout_error) + gobject.idle_add(lp.send, "foo", "hello foo") + gobject.idle_add(lp.send, "bar", "hello bar") + gobject.idle_add(lp.send, "end") + lp.stop = mainloop.quit + #run! + lp.start() + mainloop.run() + assert len(readback)==1, "expected 1 record in loopback but got %s" % len(readback) + assert readback[0][0] == "hello foo" + assert self.timeout is False, "the test did not exit cleanly (not received the 'end' packet?)" + + def test_loopback_callee(self): + callee = TestCallee() + lc = loopback_callee(wrapped_object=callee, method_whitelist=["test-signal", "stop", "unused"]) + #this will cause the "test-signal" to be sent via the loopback connection + lc.connect_export("test-signal") + readback = [] + def test_signal_function(*args): + #print("test_signal_function%s" % str(args)) + readback.append(args) + #hook up a function which will be called when the wrapper converts the packet into a method call: + callee.test_signal = test_signal_function + #lc.connect_export("test-signal", hello) + self.timeout = False + def timeout_error(): + self.timeout = True + lc.stop() + gobject.timeout_add(500, timeout_error) + gobject.idle_add(callee.emit, "test-signal", "hello foo") + #hook up a stop function call which ends this test cleanly + def stop(*args): + lc.stop() + callee.stop = stop + gobject.idle_add(lc.send, "stop") + #run! + lc.start() + assert len(readback)==1, "expected 1 record in loopback but got %s" % len(readback) + assert readback[0][0] == "hello foo" + assert self.timeout is False, "the test did not exit cleanly (not received the 'end' packet?)" + +def main(): + unittest.main() + +if __name__ == '__main__': + main() diff --git a/src/xpra/net/subprocess_wrapper.py b/src/xpra/net/subprocess_wrapper.py new file mode 100644 index 0000000000..2a49047be3 --- /dev/null +++ b/src/xpra/net/subprocess_wrapper.py @@ -0,0 +1,279 @@ +# This file is part of Xpra. +# Copyright (C) 2015 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +import os +import sys +import signal +import time +import subprocess +import binascii + +import gobject +gobject.threads_init() + +from xpra.net.bytestreams import TwoFileConnection +from xpra.net.protocol import Protocol +from xpra.os_util import Queue, setbinarymode, SIGNAMES +from xpra.log import Logger +log = Logger("util") + + +#this wrapper allows us to interact with a subprocess as if it was +#a normal class with gobject signals +#so that we can interact with it using a standard xpra protocol layer +#there is a wrapper for the caller +#and one for the class +#they talk to each other through stdin / stdout, +#using the protocol for encoding the data + + +DEBUG_WRAPPER = os.environ.get("XPRA_WRAPPER_DEBUG", "0")=="1" +#to make it possible to inspect files (more human readable): +HEXLIFY_PACKETS = os.environ.get("XPRA_HEXLIFY_PACKETS", "0")=="1" + + +class subprocess_callee(object): + """ + This is the callee side, wrapping the gobject we want to interact with. + All the input received will be converted to method calls on the wrapped object. + Subclasses should register the signal handlers they want to see exported back to the caller. + The convenience connect_export(signal-name, *args) can be used to forward signals unmodified. + You can also call send() to pass packets back to the caller. + (there is no validation of which signals are valid or not) + """ + def __init__(self, input_filename="-", output_filename="-", wrapped_object=None, method_whitelist=None): + self.mainloop = gobject.MainLoop() + self.name = "" + self.input_filename = input_filename + self.output_filename = output_filename + self.method_whitelist = None + #the gobject instance which is wrapped: + self.wrapped_object = wrapped_object + self.send_queue = Queue() + self.protocol = None + signal.signal(signal.SIGINT, self.handle_signal) + signal.signal(signal.SIGTERM, self.handle_signal) + + + def connect_export(self, signal_name, *user_data): + """ gobject style signal registration for the wrapped object, + the signals will automatically be forwarded to the wrapper process + using send(signal_name, *signal_args, *user_data) + """ + log("connect_export%s", [signal_name] + list(user_data)) + args = list(user_data) + [signal_name] + self.wrapped_object.connect(signal_name, self.export, *args) + + def export(self, *args): + signal_name = args[-1] + log("export(%s, ...)", signal_name) + data = args[1:-1] + self.send(signal_name, *list(data)) + + + def start(self): + self.protocol = self.make_protocol() + self.protocol.start() + try: + self.run() + return 0 + except KeyboardInterrupt as e: + log.warn("%s", e) + return 0 + except Exception: + log.error("error in main loop", exc_info=True) + return 1 + finally: + if self.protocol: + self.protocol.close() + self.protocol = None + if self.input_filename=="-": + try: + self._input.close() + except: + pass + if self.output_filename=="-": + try: + self._output.close() + except: + pass + + def make_protocol(self): + #figure out where we read from and write to: + if self.input_filename=="-": + #disable stdin buffering: + self._input = os.fdopen(sys.stdin.fileno(), 'r', 0) + setbinarymode(self._input.fileno()) + else: + self._input = open(self.input_filename, 'rb') + if self.output_filename=="-": + #disable stdout buffering: + self._output = os.fdopen(sys.stdout.fileno(), 'w', 0) + setbinarymode(self._output.fileno()) + else: + self._output = open(self.output_filename, 'wb') + #stdin and stdout wrapper: + conn = TwoFileConnection(self._output, self._input, abort_test=None, target=self.name, info=self.name, close_cb=self.stop) + conn.timeout = 0 + protocol = Protocol(gobject, conn, self.process_packet, get_packet_cb=self.get_packet) + try: + protocol.enable_encoder("rencode") + except Exception as e: + log.warn("failed to enable rencode: %s", e) + protocol.enable_compressor("none") + return protocol + + + def run(self): + self.mainloop.run() + + + def stop(self): + if self.protocol: + self.protocol.close() + self.protocol = None + self.mainloop.quit() + + + def handle_signal(self, sig, frame): + """ This is for OS signals SIGINT and SIGTERM """ + signame = SIGNAMES.get(sig, sig) + log("handle_signal(%s, %s) calling stop", signame, frame) + self.send("signal", signame) + #give time for the network layer to send the signal message + time.sleep(0.1) + self.stop() + + + def send(self, *args): + if HEXLIFY_PACKETS: + args = args[:1]+[binascii.hexlify(str(x)[:32]) for x in args[1:]] + log("send: adding '%s' message (%s items already in queue)", args[0], self.send_queue.qsize()) + self.send_queue.put(args) + self.protocol.source_has_more() + + def get_packet(self): + try: + item = self.send_queue.get(False) + except: + item = None + return (item, None, None, self.send_queue.qsize()>0) + + def process_packet(self, proto, packet): + command = packet[0] + if command==Protocol.CONNECTION_LOST: + log("connection-lost: %s, calling stop", packet[1:]) + self.stop() + return + #make it easier to hookup signals to methods: + attr = command.replace("-", "_") + if self.method_whitelist is not None and attr not in self.method_whitelist: + log.warn("invalid command: %s (not in whitelist: %s)", attr, self.method_whitelist) + return + method = getattr(self.wrapped_object, attr, None) + if not method: + log.warn("unknown command: %s", command) + return + if DEBUG_WRAPPER: + log("calling %s.%s%s", self.wrapped_object, attr, str(tuple(packet[1:]))[:128]) + gobject.idle_add(method, *packet[1:]) + + +class subprocess_caller(object): + """ + This is the caller side, wrapping the subprocess. + You can call send() to pass packets to it + which will get converted to method calls on the receiving end, + You can register for signals, in which case your callbacks will be called + when those signals are forwarded back. + (there is no validation of which signals are valid or not) + """ + + def __init__(self): + self.process = None + self.protocol = None + self.command = None + self.send_queue = Queue() + self.signal_callbacks = {} + #hook a default packet handlers: + self.connect(Protocol.CONNECTION_LOST, self.connection_lost) + + + def connect(self, signal, cb, *args): + """ gobject style signal registration """ + self.signal_callbacks.setdefault(signal, []).append((cb, list(args))) + + + def subprocess_exit(self, *args): + log("subprocess_exit%s command=%s", args, self.command) + + def start(self): + self.process = self.exec_subprocess() + self.protocol = self.make_protocol() + self.protocol.start() + + def make_protocol(self): + #make a connection using the process stdin / stdout + conn = TwoFileConnection(self.process.stdin, self.process.stdout, abort_test=None, target="sound", info="sound", close_cb=self.subprocess_exit) + conn.timeout = 0 + protocol = Protocol(gobject, conn, self.process_packet, get_packet_cb=self.get_packet) + #we assume the other end has the same encoders (which is reasonable): + #TODO: fallback to bencoder + protocol.enable_encoder("rencode") + #we assume this is local, so no compression: + protocol.enable_compressor("none") + return protocol + + + def exec_subprocess(self): + kwargs = {} + if os.name=="posix": + kwargs["close_fds"] = True + return subprocess.Popen(self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr.fileno(), **kwargs) + + + def cleanup(self): + self.stop() + + def stop(self): + log("%s.stop()", self) + if self.process: + try: + self.process.terminate() + self.protocol.close() + except Exception as e: + log.warn("failed to stop sound process %s: %s", self.process, e) + + + def connection_lost(self, *args): + log("connection_lost%s", args) + self.stop() + + + def get_packet(self): + try: + item = self.send_queue.get(False) + except: + item = None + return (item, None, None, self.send_queue.qsize()>0) + + def send(self, *packet_data): + assert self.protocol + self.send_queue.put(packet_data) + self.protocol.source_has_more() + + def process_packet(self, proto, packet): + if DEBUG_WRAPPER: + log("process_packet(%s, %s)", proto, [str(x)[:32] for x in packet]) + command = packet[0] + callbacks = self.signal_callbacks.get(command) + log("process_packet callbacks(%s)=%s", command, callbacks) + if callbacks: + for cb, args in callbacks: + try: + all_args = list(packet[1:]) + args + cb(self, *all_args) + except Exception: + log.error("error processing callback %s for %s packet", cb, command, exc_info=True) diff --git a/src/xpra/sound/wrapper.py b/src/xpra/sound/wrapper.py index ee2a4da878..5f0f0031bf 100644 --- a/src/xpra/sound/wrapper.py +++ b/src/xpra/sound/wrapper.py @@ -4,27 +4,23 @@ # later version. See the file COPYING for details. import os -import sys -import signal import time -import subprocess import gobject gobject.threads_init() +from xpra.net.subprocess_wrapper import subprocess_caller, subprocess_callee from xpra.platform.paths import get_sound_executable -from xpra.os_util import Queue, setbinarymode from xpra.util import AdHocStruct from xpra.log import Logger log = Logger("sound") + DEBUG_SOUND = os.environ.get("XPRA_SOUND_DEBUG", "0")=="1" SUBPROCESS_DEBUG = os.environ.get("XPRA_SOUND_SUBPROCESS_DEBUG", "").split(",") EXPORT_INFO_TIME = int(os.environ.get("XPRA_SOUND_INFO_TIME", "1000")) #this wrapper takes care of launching src.py or sink.py -#wrapped so that we can interact with them using a standard xpra protocol layer -#it is generic enough to be used with other processes # #the command line should look something like: # xpra MODE IN OUT PLUGIN PLUGIN_OPTIONS CODECS CODEC_OPTIONS VOLUME @@ -45,52 +41,47 @@ # The output will be a regular xpra packet, containing serialized signals that we receive # The input can be a regular xpra packet, those are converted into method calls -#to make it possible to inspect files (more human readable): -HEXLIFY_PACKETS = os.environ.get("XPRA_HEXLIFY_PACKETS", "0")=="1" -#use a packet encoder on the data: -ENCODE_PACKETS = os.environ.get("XPRA_ENCODE_PACKETS", "1")=="1" +class sound_subprocess(subprocess_callee): + def start(self): + if EXPORT_INFO_TIME>0: + gobject.timeout_add(EXPORT_INFO_TIME, self.export_info) + gobject.idle_add(self.wrapped_object.start) + subprocess_callee.start(self) + + def stop(self): + if self.wrapped_object: + self.wrapped_object.stop() + self.wrapped_object = None + subprocess_callee.stop(self) -#by default we just print the exported signals: -def printit(*args): - log.info("export %s", [str(x)[:128] for x in args]) + def make_protocol(self): + p = subprocess_callee.make_protocol(self) + p.large_packets = ["new-buffer"] + return p -export_callback = printit - -def export(*args): - global export_callback - signame = args[-1] - data = args[1:-1] - export_callback(*([signame] + list(data))) + def export_info(self): + self.send("info", self.wrapped_object.get_info()) def run_sound(mode, error_cb, options, args): assert len(args)>=6, "not enough arguments" - mainloop = gobject.MainLoop() - #common to both sink and src: - signal_handlers = { - "state-changed" : export, - "bitrate-changed" : export, - "error" : export, - "new-stream" : export, - } - #these definitions should probably be introspected somehow: - #(to make it more generic / abstracted) - functions = ["set_volume", "stop"] + exports = ["state-changed", "bitrate-changed", + "error"] + methods = ["set_volume", "stop"] if mode=="_sound_record": from xpra.sound.src import SoundSource gst_wrapper = SoundSource - signal_handlers["new-buffer"] = export + exports += ["new-stream", "new-buffer"] elif mode=="_sound_play": from xpra.sound.sink import SoundSink gst_wrapper = SoundSink - def eos(*args): - gobject.idle_add(mainloop.quit) - signal_handlers["eos"] = eos - signal_handlers["underrun"] = export - signal_handlers["overrun"] = export - functions += ["add_data"] + #def eos(*args): + # gobject.idle_add(mainloop.quit) + #signal_handlers["eos"] = eos + methods += ["add_data"] + exports += ["underrun", "overrun"] else: raise Exception("unknown mode: %s" % mode) @@ -109,134 +100,31 @@ def eos(*args): except: volume = 1.0 - #figure out where we read from and write to: - input_filename = args[0] - if input_filename=="-": - #disable stdin buffering: - _input = os.fdopen(sys.stdin.fileno(), 'r', 0) - setbinarymode(_input.fileno()) - else: - _input = open(input_filename, 'rb') - output_filename = args[1] - if output_filename=="-": - #disable stdout buffering: - _output = os.fdopen(sys.stdout.fileno(), 'w', 0) - setbinarymode(_output.fileno()) - else: - _output = open(output_filename, 'wb') - try: pipeline = gst_wrapper(plugin, options, codecs, codec_options, volume) - - def stop(): - pipeline.cleanup() - mainloop.quit() - - def handle_signal(*args): - gobject.idle_add(stop) - - if ENCODE_PACKETS: - from xpra.net.bytestreams import TwoFileConnection - conn = TwoFileConnection(_output, _input, abort_test=None, target=mode, info=mode, close_cb=stop) - conn.timeout = 0 - from xpra.net.protocol import Protocol - def process_packet(proto, packet): - #log("process_packet(%s, %s)", proto, str(packet)[:128]) - command = packet[0] - if command==Protocol.CONNECTION_LOST: - log("connection-lost: %s, terminating", packet[1:]) - stop() - return - method = getattr(pipeline, command, None) - if not method: - log.warn("unknown command: %s", command) - return - if DEBUG_SOUND: - log("calling %s.%s%s", pipeline, command, str(tuple(packet[1:]))[:128]) - gobject.idle_add(method, *packet[1:]) - - queue = Queue() - def get_packet_cb(): - try: - item = queue.get(False) - except: - item = None - return (item, None, None, queue.qsize()>0) - protocol = Protocol(gobject, conn, process_packet, get_packet_cb=get_packet_cb) - protocol.large_packets = ["new-buffer"] - try: - protocol.enable_encoder("rencode") - except Exception as e: - log.warn("failed to enable rencode: %s", e) - protocol.enable_compressor("none") - protocol.start() - global export_callback - def send_via_protocol(*args): - if HEXLIFY_PACKETS: - import binascii - args = args[:1]+[binascii.hexlify(str(x)[:32]) for x in args[1:]] - log("send_via_protocol: adding '%s' message (%s items already in queue)", args[0], queue.qsize()) - queue.put(args) - protocol.source_has_more() - export_callback = send_via_protocol - #export signal before shutting down: - from xpra.os_util import SIGNAMES - def handle_signal(sig, frame): - signame = SIGNAMES.get(sig, sig) - log("handle_signal(%s, %s)", signame, frame) - send_via_protocol("signal", signame) - #give time for the network layer to send the signal - time.sleep(0.1) - stop() - - signal.signal(signal.SIGINT, handle_signal) - signal.signal(signal.SIGTERM, handle_signal) - - for x,handler in signal_handlers.items(): - log("registering signal %s", x) - pipeline.connect(x, handler, x) - - if EXPORT_INFO_TIME>0: - def export_info(): - send_via_protocol("info", pipeline.get_info()) - gobject.timeout_add(EXPORT_INFO_TIME, export_info) - - gobject.idle_add(pipeline.start) - mainloop.run() + ss = sound_subprocess(wrapped_object=pipeline, method_whitelist=methods) + for x in exports: + ss.connect_export(x) + ss.start() return 0 - except Exception as e: + except Exception: log.error("run_sound%s error", (mode, error_cb, options, args), exc_info=True) return 1 finally: - if _input!=sys.stdin: - try: - _input.close() - except: - pass - if _output!=sys.stdout: - try: - _output.close() - except: - pass + ss.stop() -class sound_subprocess_wrapper(object): +class sound_subprocess_wrapper(subprocess_caller): def __init__(self): + subprocess_caller.__init__(self) self.state = "stopped" self.codec = "unknown" self.codec_description = "" - self.process = None - self.protocol = None - self.command = None - self.send_queue = Queue() - self.signal_callbacks = {} self.last_info = {} #hook some default packet handlers: - from xpra.net.protocol import Protocol self.connect("state-changed", self.state_changed) self.connect("info", self.info_update) - self.connect(Protocol.CONNECTION_LOST, self.connection_lost) def state_changed(self, sink, new_state): @@ -245,6 +133,7 @@ def state_changed(self, sink, new_state): def get_state(self): return self.state + def get_info(self): return self.last_info @@ -253,6 +142,7 @@ def info_update(self, sink, info): self.last_info["time"] = int(time.time()) self.codec_description = info.get("codec_description") + def set_volume(self, v): self.send("set_volume", int(v*100)) @@ -260,72 +150,12 @@ def get_volume(self): return self.last_info.get("volume", 100)/100.0 + def make_protocol(self): + """ add some 'large packets' to the list """ + protocol = subprocess_caller.make_protocol(self) + protocol.large_packets = ["new-buffer", "add_data"] + return protocol - def start(self): - log("starting sound source using %s", self.command) - kwargs = {} - if os.name=="posix": - kwargs["close_fds"] = True - self.process = subprocess.Popen(self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr.fileno(), **kwargs) - #make a connection using the process stdin / stdout - from xpra.net.bytestreams import TwoFileConnection - def sound_process_exit(): - log("sound_process_exit()") - conn = TwoFileConnection(self.process.stdin, self.process.stdout, abort_test=None, target="sound", info="sound", close_cb=sound_process_exit) - conn.timeout = 0 - from xpra.net.protocol import Protocol - self.protocol = Protocol(gobject, conn, self.process_packet, get_packet_cb=self.get_packet) - self.protocol.large_packets = ["new-buffer", "add_data"] - self.protocol.enable_encoder("rencode") - self.protocol.enable_compressor("none") - self.protocol.start() - - - def cleanup(self): - #TODO: rename in callers? - self.stop() - - def stop(self): - log("%s.stop()", self) - if self.process: - try: - self.process.terminate() - self.protocol.close() - except Exception as e: - log.warn("failed to stop sound process %s: %s", self.process, e) - - def connection_lost(self, *args): - log("connection_lost%s", args) - self.stop() - - def get_packet(self): - try: - item = self.send_queue.get(False) - except: - item = None - return (item, None, None, self.send_queue.qsize()>0) - - def send(self, *packet_data): - assert self.protocol - self.send_queue.put(packet_data) - self.protocol.source_has_more() - - def process_packet(self, proto, packet): - if DEBUG_SOUND: - log("process_packet(%s, %s)", proto, [str(x)[:32] for x in packet]) - command = packet[0] - callbacks = self.signal_callbacks.get(command) - log("process_packet callbacks(%s)=%s", command, callbacks) - if callbacks: - for cb, args in callbacks: - try: - all_args = list(packet[1:]) + args - cb(self, *all_args) - except Exception as e: - log.error("error processing callback %s for %s packet: %s", cb, command, e, exc_info=True) - - def connect(self, signal, cb, *args): - self.signal_callbacks.setdefault(signal, []).append((cb, list(args))) def _add_debug_args(self): from xpra.log import debug_enabled_categories