Skip to content

Commit

Permalink
#669 + #400: split sound into a separate process
Browse files Browse the repository at this point in the history
* define a wrapper for gstreamer sound pipelines (fairly generic too!) - which takes care of serializing data in both directions
* remove the threading stuff which was meant to try to avoid deadlocks when running in-process
* add workaround for writing more than 32KB to stdout on win32
* add utility to force binary mode on stdout/stdin (so win32 doesn't close the pipe when it sees an eof character!)
* add a "get_sound_executable" to platform paths so we know which executable to use to launch the xpra sound processes
* add "_sound_record" and "_sound_play" hidden xpra subcommands
* wait for the "new-stream" signal to send the "start-of-stream" metadata since the subprocess needs to tell us what codec it ended up using (should be the first in the list - but this may change)
* temporarily disable wav and wavpack because of errors
* constify some gstreamer values
* add a volume to sink so all pipelines have it
* apply the "min-threshold-time" change to try to deal with underruns better (and consitify it to make it easier to disable)
* always call emit() via idle_add, to ensure we don't have threading issues (since the network side will run its own threads)
* add "time" to buffer metadata to make it easier to debug clock and syncing issues

git-svn-id: https://xpra.org/svn/Xpra/trunk@8786 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Mar 17, 2015
1 parent ec153c8 commit 8b56fa9
Show file tree
Hide file tree
Showing 13 changed files with 687 additions and 153 deletions.
46 changes: 21 additions & 25 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from xpra.simple_stats import std_unit
from xpra.net import compression, packet_encoding
from xpra.daemon_thread import make_daemon_thread
from xpra.os_util import thread, Queue, os_info, platform_name, get_machine_id, get_user_uuid, bytestostr
from xpra.os_util import Queue, os_info, platform_name, get_machine_id, get_user_uuid, bytestostr
from xpra.util import nonl, std, AtomicInteger, AdHocStruct, log_screen_sizes, typedict, CLIENT_EXIT
try:
from xpra.clipboard.clipboard_base import ALL_CLIPBOARDS
Expand Down Expand Up @@ -1551,8 +1551,8 @@ def sound_source_state_changed(*args):
def sound_source_bitrate_changed(*args):
self.emit("microphone-changed")
try:
from xpra.sound.gstreamer_util import start_sending_sound
self.sound_source = start_sending_sound(self.sound_source_plugin, None, 1.0, self.server_sound_decoders, self.microphone_codecs, self.server_pulseaudio_server, self.server_pulseaudio_id)
from xpra.sound.wrapper import start_sending_sound
self.sound_source = start_sending_sound(self.sound_source_plugin, None, 1.0, self.server_sound_decoders, self.server_pulseaudio_server, self.server_pulseaudio_id)
if not self.sound_source:
return False
self.sound_source.connect("new-buffer", self.new_sound_buffer)
Expand All @@ -1571,15 +1571,11 @@ def stop_sending_sound(self):
ss = self.sound_source
self.microphone_enabled = False
self.sound_source = None
def stop_sending_sound_thread():
soundlog("UIXpraClient.stop_sending_sound_thread()")
if ss is None:
log.warn("stop_sending_sound: sound not started!")
return
ss.cleanup()
self.emit("microphone-changed")
soundlog("UIXpraClient.stop_sending_sound_thread() done")
thread.start_new_thread(stop_sending_sound_thread, ())
if ss is None:
log.warn("stop_sending_sound: sound not started!")
return
ss.cleanup()
self.emit("microphone-changed")

def start_receiving_sound(self):
""" ask the server to start sending sound and emit the client signal """
Expand Down Expand Up @@ -1617,15 +1613,8 @@ def stop_receiving_sound(self, tell_server=True):
if ss is None:
return
self.sound_sink = None
def stop_receiving_sound_thread():
soundlog("UIXpraClient.stop_receiving_sound_thread()")
if ss is None:
log("stop_receiving_sound: sound not started!")
return
ss.cleanup()
self.emit("speaker-changed")
soundlog("UIXpraClient.stop_receiving_sound_thread() done")
thread.start_new_thread(stop_receiving_sound_thread, ())
ss.cleanup()
self.emit("speaker-changed")

def bump_sound_sequence(self):
if self.server_sound_sequence:
Expand All @@ -1652,14 +1641,17 @@ def sound_sink_bitrate_changed(self, sound_sink, bitrate):
#not shown in the UI, so don't bother with emitting a signal:
#self.emit("speaker-changed")
def sound_sink_error(self, sound_sink, error):
log.warn("stopping speaker because of error: %s", error)
soundlog.warn("stopping speaker because of error: %s", error)
self.stop_receiving_sound()
def sound_process_stopped(self, sound_sink, *args):
soundlog("the sound sink process has stopped (%s)", args)
self.stop_receiving_sound()

def sound_sink_overrun(self, *args):
if self.sink_restart_pending:
soundlog("overrun re-start is already pending")
return
log.warn("re-starting speaker because of overrun")
soundlog.warn("re-starting speaker because of overrun")
codec = self.sound_sink.codec
self.sink_restart_pending = True
if self.server_sound_sequence:
Expand All @@ -1680,12 +1672,16 @@ def start_sound_sink(self, codec):
assert self.sound_sink is None, "sound sink already exists!"
try:
soundlog("starting %s sound sink", codec)
from xpra.sound.sink import SoundSink
self.sound_sink = SoundSink(codec=codec)
from xpra.sound.wrapper import start_receiving_sound
self.sound_sink = start_receiving_sound(codec)
if not self.sound_sink:
return False
self.sound_sink.connect("state-changed", self.sound_sink_state_changed)
self.sound_sink.connect("bitrate-changed", self.sound_sink_bitrate_changed)
self.sound_sink.connect("error", self.sound_sink_error)
self.sound_sink.connect("overrun", self.sound_sink_overrun)
from xpra.net.protocol import Protocol
self.sound_sink.connect(Protocol.CONNECTION_LOST, self.sound_process_stopped)
self.sound_sink.start()
soundlog("%s sound sink started", codec)
return True
Expand Down
38 changes: 35 additions & 3 deletions src/xpra/net/bytestreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
errno.ECONNRESET : "ECONNRESET",
errno.EPIPE : "EPIPE"}
continue_wait = 0

#default to using os.read and os.write for both tty devices and regular streams
#(but overriden for win32 below for tty devices to workaround an OS "feature")
OS_READ = os.read
OS_WRITE = os.write
TTY_READ = os.read
TTY_WRITE = os.write


if sys.platform.startswith("win"):
#on win32, we have to deal with a few more odd error codes:
#(it would be nicer if those were wrapped using errno instead..)
Expand All @@ -45,6 +54,16 @@
#on win32, we want to wait just a little while,
#to prevent servers spinning wildly on non-blocking sockets:
continue_wait = 5
if sys.version[0]<"3":
#win32 has problems writing more than 32767 characters to stdout!
#see: http://bugs.python.org/issue11395
#(this is fixed in python 3.2 and we don't care about 3.0 or 3.1)
def win32ttywrite(fd, buf):
#this awful limitation only applies to tty devices:
if len(buf)>32767:
buf = buf[:32767]
return os.write(fd, buf)
TTY_WRITE = win32ttywrite


def untilConcludes(is_active_cb, f, *a, **kw):
Expand Down Expand Up @@ -120,6 +139,16 @@ def __init__(self, writeable, readable, abort_test=None, target=None, info="", c
Connection.__init__(self, target, info)
self._writeable = writeable
self._readable = readable
self._read_fd = self._readable.fileno()
self._write_fd = self._writeable.fileno()
if os.isatty(self._read_fd):
self._osread = TTY_READ
else:
self._osread = OS_READ
if os.isatty(self._write_fd):
self._oswrite = TTY_WRITE
else:
self._oswrite = OS_WRITE
self._abort_test = abort_test
self._close_cb = close_cb

Expand All @@ -130,19 +159,22 @@ def may_abort(self, action):

def read(self, n):
self.may_abort("read")
return self._read(os.read, self._readable.fileno(), n)
return self._read(self._osread, self._read_fd, n)

def write(self, buf):
self.may_abort("write")
return self._write(os.write, self._writeable.fileno(), buf)
return self._write(self._oswrite, self._write_fd, buf)

def close(self):
Connection.close(self)
try:
self._writeable.close()
self._readable.close()
except:
pass
try:
self._writeable.close()
except:
pass
if self._close_cb:
self._close_cb()

Expand Down
20 changes: 20 additions & 0 deletions src/xpra/os_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,26 @@ def force_quit(status=1):
os._exit(status)


def disable_stdout_buffering():
import gc
# Appending to gc.garbage is a way to stop an object from being
# destroyed. If the old sys.stdout is ever collected, it will
# close() stdout, which is not good.
gc.garbage.append(sys.stdout)
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)

def setbinarymode(fd):
if sys.platform.startswith("win"):
#turn on binary mode:
try:
import msvcrt
msvcrt.setmode(fd, os.O_BINARY) #@UndefinedVariable
except:
from xpra.log import Logger
log = Logger("util")
log.error("setting stdin to binary mode failed", exc_info=True)


def find_lib(libname):
#it would be better to rely on dlopen to find the paths
#but I cannot find a way of getting ctypes to tell us the path
Expand Down
5 changes: 5 additions & 0 deletions src/xpra/platform/darwin/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,8 @@ def get_system_conf_dir():
#the system wide configuration directory
default_conf_dir = "/Library/Application Support/Xpra"
return os.environ.get("XPRA_SYSCONF_DIR", default_conf_dir)


def get_sound_executable():
helper = os.path.join(get_app_dir(), "MacOS", "Xpra")
return os.environ.get("XPRA_SOUND_EXECUTABLE", helper)
6 changes: 6 additions & 0 deletions src/xpra/platform/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,17 @@ def get_license_text(self):
return LICENSE_TEXT


def get_sound_executable():
return os.environ.get("XPRA_SOUND_EXECUTABLE", "xpra")


from xpra.platform import platform_import
platform_import(globals(), "paths", True,
"get_resources_dir",
"get_app_dir",
"get_icon_dir")
platform_import(globals(), "paths", False,
"get_sound_executable",
"get_install_prefix",
"get_default_conf_dir",
"get_system_conf_dir",
Expand All @@ -151,6 +156,7 @@ def get_info():
"resources" : get_resources_dir(),
"icons" : get_icon_dir(),
"home" : os.path.expanduser("~"),
"sound_executable" : get_sound_executable(),
}


Expand Down
3 changes: 3 additions & 0 deletions src/xpra/platform/win32/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,6 @@ def get_app_dir():
return APP_DIR
from xpra.platform.paths import default_get_app_dir #imported here to prevent import loop
return default_get_app_dir()

def get_sound_executable():
return os.environ.get("XPRA_SOUND_EXECUTABLE", "xpra_cmd.exe")
2 changes: 1 addition & 1 deletion src/xpra/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ def dump_frames(*arsg):


def configure_logging(options, mode):
if mode in ("start", "upgrade", "attach", "shadow", "proxy"):
if mode in ("start", "upgrade", "attach", "shadow", "proxy", "_sound_record", "_sound_play"):
if "help" in options.speaker_codec or "help" in options.microphone_codec:
from xpra.sound.gstreamer_util import show_sound_codec_help
info = show_sound_codec_help(mode!="attach", options.speaker_codec, options.microphone_codec)
Expand Down
36 changes: 17 additions & 19 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from xpra.net import compression
from xpra.net.compression import compressed_wrapper, Compressed, Uncompressed
from xpra.daemon_thread import make_daemon_thread
from xpra.os_util import platform_name, thread, Queue, get_machine_id, get_user_uuid
from xpra.os_util import platform_name, Queue, get_machine_id, get_user_uuid
from xpra.server.background_worker import add_work_item
from xpra.util import std, typedict, updict, get_screen_info, CLIENT_PING_TIMEOUT, WORKSPACE_UNSET, DEFAULT_METADATA_SUPPORTED

Expand Down Expand Up @@ -764,7 +764,8 @@ def start_sending_sound(self, codec, volume=1.0):
log.warn("not starting sound as we are suspended")
return
try:
from xpra.sound.gstreamer_util import start_sending_sound, ALLOW_SOUND_LOOP
from xpra.sound.gstreamer_util import ALLOW_SOUND_LOOP
from xpra.sound.wrapper import start_sending_sound
if self.machine_id and self.machine_id==get_machine_id() and not ALLOW_SOUND_LOOP:
#looks like we're on the same machine, verify it's a different user:
if self.uuid==get_user_uuid():
Expand All @@ -773,16 +774,11 @@ def start_sending_sound(self, codec, volume=1.0):
assert self.supports_speaker, "cannot send sound: support not enabled on the server"
assert self.sound_source is None, "a sound source already exists"
assert self.sound_receive, "cannot send sound: support is not enabled on the client"
self.sound_source = start_sending_sound(self.sound_source_plugin, codec, volume, self.sound_decoders, self.microphone_codecs, self.pulseaudio_server, self.pulseaudio_id)
self.sound_source = start_sending_sound(self.sound_source_plugin, codec, volume, self.sound_decoders, self.pulseaudio_server, self.pulseaudio_id)
soundlog("start_sending_sound() sound source=%s", self.sound_source)
if self.sound_source:
if self.server_driven:
#tell the client this is the start:
self.send("sound-data", self.sound_source.codec, "",
{"start-of-stream" : True,
"codec" : self.sound_source.codec,
"sequence" : self.sound_source_sequence})
self.sound_source.connect("new-buffer", self.new_sound_buffer)
self.sound_source.connect("new-stream", self.new_stream)
self.sound_source.start()
except Exception as e:
log.error("error setting up sound: %s", e, exc_info=True)
Expand All @@ -795,11 +791,17 @@ def stop_sending_sound(self):
if self.server_driven:
#tell the client this is the end:
self.send("sound-data", ss.codec, "", {"end-of-stream" : True})
def stop_sending_sound_thread(*args):
soundlog("stop_sending_sound_thread(%s)", args)
ss.cleanup()
soundlog("stop_sending_sound_thread(%s) done", args)
thread.start_new_thread(stop_sending_sound_thread, ())
ss.cleanup()

def new_stream(self, sound_source, codec):
soundlog("new_stream(%s)", codec)
self.sound_source.codec = codec
if self.server_driven:
#tell the client this is the start:
self.send("sound-data", self.sound_source.codec, "",
{"start-of-stream" : True,
"codec" : self.sound_source.codec,
"sequence" : self.sound_source_sequence})

def new_sound_buffer(self, sound_source, data, metadata):
soundlog("new_sound_buffer(%s, %s, %s) suspended=%s, sequence=%s",
Expand All @@ -815,11 +817,7 @@ def stop_receiving_sound(self):
soundlog("stop_receiving_sound() sound_sink=%s", ss)
if ss:
self.sound_sink = None
def stop_receiving_sound_thread(*args):
soundlog("stop_receiving_sound_thread() sound_sink=%s", ss)
ss.cleanup()
soundlog("stop_receiving_sound_thread() done")
thread.start_new_thread(stop_receiving_sound_thread, ())
ss.cleanup()


def sound_control(self, action, *args):
Expand Down
38 changes: 2 additions & 36 deletions src/xpra/sound/gstreamer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import sys
import os

from xpra.util import AdHocStruct
from xpra.log import Logger
log = Logger("sound")

Expand Down Expand Up @@ -87,7 +86,8 @@ def get_queue_time(default_value=450):
]
CODECS = {}

CODEC_ORDER = [MP3, WAVPACK, WAV, FLAC, SPEEX]
#CODEC_ORDER = [MP3, WAVPACK, WAV, FLAC, SPEEX]
CODEC_ORDER = [MP3, FLAC, SPEEX]


#code to temporarily redirect stderr and restore it afterwards, adapted from:
Expand Down Expand Up @@ -486,40 +486,6 @@ def parse_sound_source(sound_source_plugin, remote):
return gst_sound_source_plugin, options


def start_sending_sound(sound_source_plugin, codec, volume, remote_decoders, local_decoders, remote_pulseaudio_server, remote_pulseaudio_id):
assert has_gst
try:
#info about the remote end:
remote = AdHocStruct()
remote.pulseaudio_server = remote_pulseaudio_server
remote.pulseaudio_id = remote_pulseaudio_id
remote.remote_decoders = remote_decoders
plugin, options = parse_sound_source(sound_source_plugin, remote)
if not plugin:
log.error("failed to setup '%s' sound stream source", (sound_source_plugin or "auto"))
return None
log("parsed '%s':", sound_source_plugin)
log("plugin=%s", plugin)
log("options=%s", options)
matching_codecs = [x for x in remote_decoders if x in local_decoders]
ordered_codecs = [x for x in CODEC_ORDER if x in matching_codecs]
if len(ordered_codecs)==0:
log.error("no matching codecs between remote (%s) and local (%s) - sound disabled", remote_decoders, local_decoders)
return None
if codec is not None and codec not in matching_codecs:
log.warn("invalid codec specified: %s", codec)
codec = None
if codec is None:
codec = ordered_codecs[0]
log("using sound codec %s", codec)
from xpra.sound.src import SoundSource
log.info("starting sound stream capture using %s source", PLUGIN_TO_DESCRIPTION.get(plugin, plugin))
return SoundSource(plugin, options, codec, volume, {})
except Exception as e:
log.error("error setting up sound: %s", e, exc_info=True)
return None


def get_info(receive=True, send=True, receive_codecs=[], send_codecs=[]):
if not has_gst:
return {}
Expand Down
Loading

0 comments on commit 8b56fa9

Please sign in to comment.