Skip to content

Commit

Permalink
Merge pull request #45 from cropsinsilico/minor
Browse files Browse the repository at this point in the history
Minor
  • Loading branch information
langmm committed Jan 20, 2019
2 parents f33b0c7 + 1e45cf6 commit f5aa769
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 44 deletions.
34 changes: 25 additions & 9 deletions cis_interface/communication/AsyncComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, name, dont_backlog=False, **kwargs):
self.backlog_send_ready = threading.Event()
self.backlog_recv_ready = threading.Event()
self.backlog_open = False
self.loop_count = 0
self._used_direct = False
super(AsyncComm, self).__init__(name, **kwargs)

def printStatus(self, nindent=0):
Expand Down Expand Up @@ -297,10 +297,9 @@ def run_backlog_send(self):
self.debug("Stopping because send_backlog failed")
self._close_backlog()
return
self.loop_count += 1
if (self.loop_count % 100) == 0:
self.debug("Sleeping (is_confirmed_send=%s)",
str(self.is_confirmed_send))
self.periodic_debug('run_backlog_send', period=1000)(
"Sleeping (is_confirmed_send=%s)",
str(self.is_confirmed_send))
self.sleep()

def run_backlog_recv(self):
Expand All @@ -318,10 +317,9 @@ def run_backlog_recv(self):
self.debug("Stopping backlog recv thread")
self.backlog_thread.set_break_flag()
return
self.loop_count += 1
if (self.loop_count % 100) == 0:
self.debug("Sleeping (is_confirmed_recv=%s)",
str(self.is_confirmed_recv))
self.periodic_debug('run_backlog_recv', period=1000)(
"Sleeping (is_confirmed_recv=%s)",
str(self.is_confirmed_recv))
self.sleep()

def send_backlog(self):
Expand All @@ -331,10 +329,14 @@ def send_backlog(self):
return True
try:
imsg, ikwargs = self.backlog_send[0]
if not self._used_direct:
self.suppress_special_debug = True
flag = self._send_direct(imsg, **ikwargs)
self.suppress_special_debug = False
if flag:
self.debug("Sent %d bytes to %s", len(imsg), self.address)
self.pop_backlog_send()
self._used_direct = True
except AsyncTryAgain: # pragma: debug
flag = True
except BaseException: # pragma: debug
Expand All @@ -354,10 +356,14 @@ def recv_backlog(self):
flag = True
else:
try:
if not self._used_direct:
self.suppress_special_debug = True
flag, data = self._recv_direct()
self.suppress_special_debug = False
if flag and data:
self.debug("Recv %d bytes from %s", len(data), self.address)
self.add_backlog_recv(data)
self._used_direct = True
except BaseException: # pragma: debug
self.exception('Error receiving into backlog.')
flag = False
Expand Down Expand Up @@ -413,7 +419,12 @@ def _send(self, payload, no_backlog=False, no_confirm=False, **kwargs):
no_backlog = True
if no_backlog or not self.backlog_send_ready.is_set():
try:
if not self._used_direct:
self.suppress_special_debug = True
out = self._send_direct(payload, **kwargs)
self.suppress_special_debug = False
if out:
self._used_direct = True
if no_backlog:
if out and (self.direction == 'send'):
out = self.wait_for_confirm(active_confirm=True,
Expand Down Expand Up @@ -468,7 +479,12 @@ def _recv(self, timeout=None, no_backlog=False, no_confirm=False):
if self.n_msg_direct_recv == 0: # pragma: debug
self.verbose_debug("No messages waiting.")
return (True, self.empty_msg)
if not self._used_direct:
self.suppress_special_debug = True
out = self._recv_direct()
self.suppress_special_debug = False
if out:
self._used_direct = True
if out and (self.direction == 'recv'):
self.wait_for_confirm(active_confirm=True,
timeout=False,
Expand Down
12 changes: 6 additions & 6 deletions cis_interface/communication/ZMQComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,8 @@ def _reply_handshake_send(self):
return False
out = self.reply_socket_send.poll(timeout=1, flags=zmq.POLLIN)
if out == 0:
if (self.loop_count % 100) == 0:
self.debug('No reply handshake waiting')
self.periodic_debug('_reply_handshake_send', period=1000)(
'No reply handshake waiting')
return False
msg = self.reply_socket_send.recv(flags=zmq.NOBLOCK)
if msg == self.eof_msg: # pragma: debug
Expand All @@ -757,8 +757,8 @@ def _reply_handshake_recv(self, msg_send, key):
return False
out = socket.poll(timeout=1, flags=zmq.POLLOUT)
if out == 0: # pragma: debug
if (self.loop_count % 100) == 0:
self.debug('Cannot initiate reply handshake')
self.periodic_debug('_reply_handshake_recv', period=1000)(
'Cannot initiate reply handshake')
return False
socket.send(msg_send, flags=zmq.NOBLOCK)
if msg_send == self.eof_msg: # pragma: debug
Expand Down Expand Up @@ -1031,11 +1031,11 @@ def _send_direct(self, msg, topic='', identity=None, **kwargs):
if self.socket.closed: # pragma: debug
self.error("Socket closed")
return False
self.debug("Sending %d bytes to %s", len(total_msg), self.address)
self.special_debug("Sending %d bytes to %s", len(total_msg), self.address)
if self.socket_type_name == 'ROUTER':
self.socket.send(identity, zmq.SNDMORE)
self.socket.send(total_msg, **kwargs)
self.debug("Sent %d bytes to %s", len(total_msg), self.address)
self.special_debug("Sent %d bytes to %s", len(total_msg), self.address)
self._n_zmq_sent += 1
except zmq.ZMQError as e: # pragma: debug
if e.errno == zmq.EAGAIN:
Expand Down
17 changes: 14 additions & 3 deletions cis_interface/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import os
import sys
import shutil
import warnings
import logging
Expand Down Expand Up @@ -225,15 +226,25 @@ def cfg_logging(cfg=None):
environment. Defaults to :data:`cis_interface.config.cis_cfg`.
"""
is_model = (os.environ.get('CIS_SUBPROCESS', "False") == "True")
if cfg is None:
cfg = cis_cfg
_LOG_FORMAT = "%(levelname)s:%(module)s.%(funcName)s[%(lineno)d]:%(message)s"
logging.basicConfig(level=logging.INFO, format=_LOG_FORMAT)
logLevelCIS = eval('logging.%s' % cfg.get('debug', 'cis', 'NOTSET'))
logLevelRMQ = eval('logging.%s' % cfg.get('debug', 'rmq', 'INFO'))
logging.getLogger("cis_interface").setLevel(level=logLevelCIS)
logging.getLogger("pika").setLevel(level=logLevelRMQ)

cis_logger = logging.getLogger("cis_interface")
rmq_logger = logging.getLogger("pika")
cis_logger.setLevel(level=logLevelCIS)
rmq_logger.setLevel(level=logLevelRMQ)
# For models, route the loggs to stdout so that they are displayed by the
# model driver.
if is_model:
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logLevelCIS)
cis_logger.addHandler(handler)
rmq_logger.addHandler(handler)


def cfg_environment(env=None, cfg=None):
r"""Set environment variables based on config options.
Expand Down
89 changes: 67 additions & 22 deletions cis_interface/drivers/MatlabModelDriver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import subprocess
from logging import debug
from logging import debug, error
from datetime import datetime
import os
import psutil
import warnings
import weakref
try: # pragma: matlab
Expand Down Expand Up @@ -30,6 +31,21 @@ def kill_all():
os.system(('pkill -f matlab.engine.shareEngine'))


def locate_matlab_engine_processes(): # pragma: matlab
r"""Get all of the active matlab sharedEngine processes.
Returns:
list: Active matlab sharedEngine processes.
"""
out = []
for p in psutil.process_iter(attrs=['name', 'pid', 'cmdline']):
if (((p.info['name'] == 'MATLAB')
and ('matlab.engine.shareEngine' in p.info['cmdline']))):
out.append(p) # p.info['pid'])
return out


def is_matlab_running():
r"""Determine if there is a Matlab engine running.
Expand Down Expand Up @@ -91,7 +107,9 @@ def start_matlab(skip_connect=False, timeout=None): # pragma: matlab
option ('matlab', 'startup_waittime_s').
Returns:
str: Name of the screen session running matlab.
tuple: Information on the started session including the name of the
screen session running matlab, the created engine object, the name
of the matlab session, and the matlab engine process.
Raises:
RuntimeError: If Matlab is not installed.
Expand All @@ -101,6 +119,7 @@ def start_matlab(skip_connect=False, timeout=None): # pragma: matlab
raise RuntimeError("Matlab is not installed.")
if timeout is None:
timeout = float(config.cis_cfg.get('matlab', 'startup_waittime_s', 10))
old_process = set(locate_matlab_engine_processes())
old_matlab = set(matlab.engine.find_matlab())
screen_session = str('cis_matlab' + datetime.today().strftime("%Y%j%H%M%S")
+ '_%d' % len(old_matlab))
Expand All @@ -122,11 +141,12 @@ def start_matlab(skip_connect=False, timeout=None): # pragma: matlab
if (len(set(matlab.engine.find_matlab()) - old_matlab) == 0): # pragma: debug
raise Exception("start_matlab timed out at %f s" % T.elapsed)
new_matlab = list(set(matlab.engine.find_matlab()) - old_matlab)[0]
new_process = list(set(locate_matlab_engine_processes()) - old_process)[0]
# Connect to the engine
matlab_engine = None
if not skip_connect:
matlab_engine = connect_matlab(new_matlab, first_connect=True)
return screen_session, matlab_engine, new_matlab
return screen_session, matlab_engine, new_matlab, new_process


def connect_matlab(matlab_session, first_connect=False): # pragma: matlab
Expand Down Expand Up @@ -161,7 +181,7 @@ def connect_matlab(matlab_session, first_connect=False): # pragma: matlab
return matlab_engine


def stop_matlab(screen_session, matlab_engine, matlab_session,
def stop_matlab(screen_session, matlab_engine, matlab_session, matlab_process,
keep_engine=False): # pragma: matlab
r"""Stop a Matlab shared engine session running inside a detached screen
session.
Expand All @@ -172,6 +192,7 @@ def stop_matlab(screen_session, matlab_engine, matlab_session,
matlab_engine (MatlabEngine): Matlab engine that should be stopped.
matlab_session (str): Name of Matlab session that the Matlab engine is
connected to.
matlab_process (psutil.Process): Process running the Matlab shared engine.
keep_engine (bool, optional): If True, the references to the engine will be
removed so it is not deleted. Defaults to False.
Expand All @@ -196,7 +217,7 @@ def stop_matlab(screen_session, matlab_engine, matlab_session,
if matlab_session in matlab.engine.find_matlab():
try:
matlab_engine.eval('exit', nargout=0)
except matlab.engine.EngineError:
except BaseException:
pass
else: # pragma: no cover
matlab_engine.__dict__.pop('_matlab', None)
Expand All @@ -210,7 +231,9 @@ def stop_matlab(screen_session, matlab_engine, matlab_session,
debug("Waiting for matlab engine to exit")
sleep(1)
if (matlab_session in matlab.engine.find_matlab()): # pragma: debug
raise Exception("stop_matlab timed out at %f s" % T.elapsed)
matlab_process.terminate()
error("stop_matlab timed out at %f s. " % T.elapsed
+ "Killed Matlab sharedEngine process.")


class MatlabProcess(tools.CisClass): # pragma: matlab
Expand Down Expand Up @@ -261,6 +284,7 @@ def __init__(self, target, args, kwargs=None, name=None, matlab_engine=None):
self.kwargs['async'] = True # For python 3.7 where async is reserved
self.future = None
self.matlab_engine = matlab_engine
self._returncode = None
super(MatlabProcess, self).__init__(name)

def poll(self, *args, **kwargs):
Expand Down Expand Up @@ -341,22 +365,31 @@ def returncode(self):
else:
return 0
else:
return None
return self._returncode

def kill(self, *args, **kwargs):
r"""Cancel the async call."""
if self.is_alive():
try:
self.future.cancel()
except matlab.engine.EngineError:
out = self.future.cancel()
self.debug("Result of cancelling Matlab call?: %s", out)
except matlab.engine.EngineError as e:
self.debug('Matlab Engine Error: %s' % e)
self.on_matlab_error()
except BaseException:
pass
except BaseException as e:
self.debug('Other error on kill: %s' % e)
self.print_output()
if self.is_alive():
self.info('Error killing Matlab script.')
self.matlab_engine.quit()
self.future = None
self._returncode = -1
assert(not self.is_alive())

def on_matlab_error(self):
r"""Actions performed on error in Matlab engine."""
# self.print_output()
self.debug('')
if self.matlab_engine is not None:
try:
self.matlab_engine.eval('exception = MException.last;', nargout=0)
Expand Down Expand Up @@ -400,6 +433,7 @@ def __init__(self, name, args, **kwargs):
self.screen_session = None
self.mlengine = None
self.mlsession = None
self.mlprocess = None
self.fdir = os.path.dirname(os.path.abspath(self.args[0]))
self.check_exits()

Expand All @@ -417,12 +451,10 @@ def is_installed(self):

def start_matlab(self):
r"""Start matlab session and connect to it."""
# Connect to matlab, start if not running
if len(matlab.engine.find_matlab()) == 0:
self.debug("Starting a matlab shared engine (none existing)")
self.screen_session, self.mlengine, self.mlsession = start_matlab()
self.started_matlab = True
else:
ml_attr = ['screen_session', 'mlengine', 'mlsession', 'mlprocess']
attempt_connect = (len(matlab.engine.find_matlab()) != 0)
# Connect to matlab if a session exists
if attempt_connect:
for mlsession in matlab.engine.find_matlab():
try:
self.debug("Trying to connect to session %s", mlsession)
Expand All @@ -433,26 +465,34 @@ def start_matlab(self):
break
except matlab.engine.EngineError:
pass
if self.mlengine is None:
# Start if not running or connect failed
if self.mlengine is None:
if attempt_connect:
self.debug("Starting a matlab shared engine (connect failed)")
self.screen_session, self.mlengine, self.mlsession = start_matlab()
self.started_matlab = True
else:
self.debug("Starting a matlab shared engine (none existing)")
out = start_matlab()
for i, attr in enumerate(ml_attr):
setattr(self, attr, out[i])
self.started_matlab = True
# Add things to Matlab environment
self.mlengine.addpath(self.fdir, nargout=0)
self.debug("Connected to matlab")
self.debug("Connected to matlab session '%s'" % self.mlsession)

def cleanup(self):
r"""Close the Matlab session and engine."""
try:
stop_matlab(self.screen_session, self.mlengine, self.mlsession,
keep_engine=(not self.started_matlab))
self.mlprocess, keep_engine=(not self.started_matlab))
except (SystemError, Exception) as e: # pragma: debug
self.error('Failed to exit matlab engine')
self.raise_error(e)
self.debug('Stopped Matlab')
self.screen_session = None
self.mlsession = None
self.started_matlab = False
self.mlengine = None
self.mlprocess = None
super(MatlabModelDriver, self).cleanup()

def check_exits(self):
Expand Down Expand Up @@ -510,6 +550,7 @@ def before_start(self):
def run_loop(self):
r"""Loop to check if model is still running and forward output."""
self.model_process.print_output()
self.periodic_debug('matlab loop', period=100)('Looping')
if self.model_process.is_done():
self.model_process.print_output()
self.set_break_flag()
Expand All @@ -527,6 +568,10 @@ def run_loop(self):
def after_loop(self):
r"""Actions to perform after run_loop has finished. Mainly checking
if there was an error and then handling it."""
if (self.model_process is not None) and self.model_process.is_alive():
self.info("Model process thread still alive")
self.kill_process()
return
super(MatlabModelDriver, self).after_loop()
with self.lock:
self.cleanup()
Loading

0 comments on commit f5aa769

Please sign in to comment.