Permalink
Browse files

Remove Sleep-Timer and replace it with a Process-Output-Monitor

  • Loading branch information...
MaZderMind committed Feb 6, 2015
1 parent 7da81ab commit c9cfee60f4c2c8935e2ff8955c0fb83a7f94bbe5
Showing with 155 additions and 17 deletions.
  1. +155 −17 python-api/gstswitch/server.py
@@ -10,11 +10,11 @@
import signal
import subprocess
import logging
import threading
from distutils import spawn
from errno import ENOENT
from .exception import PathError, ServerProcessError
from time import sleep
__all__ = ["Server", ]
@@ -23,6 +23,145 @@
TOOLS_DIR = '/'.join(os.getcwd().split('/')[:-1]) + '/tools/'
class OutputMonitoringBackgroundProcess(object):
"""Runs a Command in a Background-Thread and monitors it's output
Can block until the Command prints a certain string and log the full
output into a file
"""
def __init__(self, cmd, log):
self.log = logging.getLogger('server-output-monitor')
# After calling start(), _proc contains a subprocess.Popen instance
self._proc = None
# While wait_for_output is waiting for a match, _match contains the
# string it is looking for
self._match = None
# threading.Event instance used in wait_for_output to block until the
# Server prints something that matches self._match
self._match_event = threading.Event()
# threading.Event instance that is used to synchronize the main thread
# with the Server-Monitor-Thread during start and termination
self._control_event = threading.Event()
# Command to run (Array passed to subprocess.Popen)
self._cmd = cmd
# Logfile to write to
self._log = log
# Internal Buffer to search for matched when wait_for_output is called
self._buffer = ""
def start(self):
"""Starts up the process specified in the constructor in a thread.
Blocks until the process is started and has a pid assigned, but not
longer then 0.5s.
"""
assert self._proc is None
self.log.debug("starting worker-thread")
worker = threading.Thread(target=self._worker)
worker.start()
self.log.debug("worker-thread started, "
"waiting for the server to start")
self._control_event.wait(0.5)
self._control_event.clear()
assert self._proc is not None
self.log.debug("server started successfully")
def _worker(self):
"""Private Worker-Method that runs the process, reads its
stdout/stderr and performs pattern matching on it
when told to do so via wait_for_output.
"""
with open(self._log, 'w') as tempf:
self.log.debug("starting subprocess")
self._proc = subprocess.Popen(
self._cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
shell=False)
self.log.debug("signaling that the server started")
self._control_event.set()
self.log.debug("entering server read-loop")
while self._proc.poll() is None:
chunk = self._proc.stdout.read(16).decode('utf-8')
self._buffer += chunk
tempf.write(chunk)
if self._match:
if self._buffer.find(self._match) > 0:
self.log.debug("match found, "
"signaling wait_for_output")
self._match = None
self._match_event.set()
self.log.debug("exiting")
self._control_event.set()
def terminate(self):
"""Kills the process and waits for the thread to exit"""
self.log.debug("killing the server, waiting for worker to exit")
self._proc.terminate()
if not self._control_event.wait(timeout=5):
raise TimeoutError("Server-Monitor-Thread did not exit in time.")
self.log.debug("worker did exit cleanly")
self._control_event.clear()
def poll(self):
"""Calls poll() on the running process and returns its result or None
if no process is running"""
if self._proc:
return self._proc.poll()
return None
@property
def pid(self):
"""Returns the pid of the running process or None
if no process is running"""
if self._proc:
return self._proc.pid
return None
def wait_for_output(self, match, timeout=None):
"""Searches the output already captured from the running process for
match and returns immediatly if match has already been captured.
Sets up a match-request for the stdout/stderr-reader and blocks until
match emerges in the processes stdout/stderr but not longer then
timeout. If no match is found until timeout is passed, a TimeoutError
is raised.
"""
self.log.debug("testing for '%s' in buffer", match)
if self._buffer.find(match) > 0:
self.log.debug("match found, returnung instantly")
return
self.log.debug("waiting for match event")
self._match = match
if not self._match_event.wait(timeout):
raise TimeoutError("Server-Monitor-Thread did not find the match "
"%s in the Server's output in time." % match)
self.log.debug("match event fired")
self._match_event.clear()
class Server(object):
"""Control all server related operations
@@ -38,7 +177,6 @@ class Server(object):
:param video_format: The video format to use on the server.
:returns: nothing
"""
SLEEP_TIME = 0.5
def __init__(
self,
@@ -68,7 +206,7 @@ def __init__(
self.video_format = video_format
self.proc = None
self.pid = -1
self.pid = None
@property
def path(self):
@@ -207,7 +345,11 @@ def run(self, gst_option=''):
self.proc = self._run_process()
if self.proc:
self.pid = self.proc.pid
sleep(self.SLEEP_TIME)
def wait_for_output(self, match, timeout=5):
"""Calls wait_for_output with the given parameters on the underlying
OutputMonitoringBackgroundProcess"""
self.proc.wait_for_output(match, timeout)
def _run_process(self):
"""Non-public method: Runs the gst-switch-srv process
@@ -247,16 +389,11 @@ def _start_process(self, cmd):
:param cmd: The command which needs to be executed
:returns: process created
"""
self.log.info('Starting process %s' % (cmd))
self.log.info('Starting process %s', cmd)
try:
with open('server.log', 'w') as tempf:
process = subprocess.Popen(
cmd,
stdout=tempf,
stderr=tempf,
bufsize=-1,
shell=False)
return process
process = OutputMonitoringBackgroundProcess(cmd, 'server.log')
process.start()
return process
except OSError as error:
if error.errno == ENOENT:
raise PathError("Cannot find gst-switch-srv at path:"
@@ -298,8 +435,8 @@ def terminate(self, cov=False):
if cov:
self.gcov_flush()
self.make_coverage()
proc.terminate()
self.log.info('Killing server')
proc.terminate()
self.proc = None
return True
except OSError:
@@ -316,7 +453,7 @@ def terminate_and_output_status(self, cov=False):
if poll == -11:
self.log.error("Server exited with Segmentation Fault")
if poll != 0:
self.log.error("Server exited Error Eode {0}".format(poll))
self.log.error("Server exited Error Code %s", poll)
self.terminate(cov)
with open('server.log') as log:
@@ -359,8 +496,9 @@ def gcov_flush(self):
raise ServerProcessError('Server process does not exist')
else:
try:
self.log.debug("Signaling GCOV Flush")
os.kill(self.pid, signal.SIGUSR1)
self.log.debug("Signaling GCOV Flush to %s", self.pid)
if self.pid:
os.kill(self.pid, signal.SIGUSR1)
return True
except OSError:
raise ServerProcessError('Unable to send signal')

0 comments on commit c9cfee6

Please sign in to comment.