Skip to content

Commit

Permalink
Merge in changes from master
Browse files Browse the repository at this point in the history
  • Loading branch information
langmm committed Nov 4, 2018
2 parents e0c1d59 + 9fce3ad commit c4f4c59
Show file tree
Hide file tree
Showing 105 changed files with 3,067 additions and 634 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ cis_interface/tests/scripts/Makefile

# Intermediate docs
docs/build
docs/source/cis_interface.inc
docs/source/cis_interface*.rst
docs/source/modules.rst
docs/source/examples/*.rst

# Plots for timing purposes
scaling_*.png

# Coverage
.coveragerc
.coverage
Expand All @@ -17,6 +21,9 @@ htmlcov
# temp for storing notes, whatever, etc. not managed
tmp/

# Cache
.cache/

# Temporary file copies
*~
*.dSYM/
Expand All @@ -39,6 +46,7 @@ tmp/
# Packaging things
cis_interface.egg-info
dist/
build/


# Windows image file caches
Expand Down
4 changes: 4 additions & 0 deletions cis_interface/.cis_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ comm:
comm_schema_subtypes:
CommBase:
- default
IPCComm:
- ipc
RMQComm:
- rmq
ZMQComm:
- zmq
comm_subtype_attr: _commtype
connection:
input:
Expand Down
1 change: 1 addition & 0 deletions cis_interface/backwards.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import time
from cis_interface.scanf import scanf
_python_version = '%d.%d' % (sys.version_info[0], sys.version_info[1])
PY2 = (sys.version_info[0] == 2)
PY34 = ((sys.version_info[0] == 3) and (sys.version_info[1] == 4))
if PY2: # pragma: Python 2
Expand Down
22 changes: 21 additions & 1 deletion cis_interface/command_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import sys
import traceback
from cis_interface import runner, schema, config
from cis_interface import runner, schema, config, timing
from cis_interface.drivers import GCCModelDriver


Expand Down Expand Up @@ -67,6 +67,26 @@ def update_config():
config.update_config(config.usr_config_file, config.def_config_file)


def cistime_comm():
r"""Plot timing statistics comparing the different communication mechanisms."""
timing.plot_scalings(compare='commtype')


def cistime_lang():
r"""Plot timing statistics comparing the different languages."""
timing.plot_scalings(compare='language')


def cistime_os():
r"""Plot timing statistics comparing the different operating systems."""
timing.plot_scalings(compare='platform')


def cistime_py():
r"""Plot timing statistics comparing the different versions of Python."""
timing.plot_scalings(compare='python')


if __name__ == '__main__':
cisrun()
sys.exit(0)
19 changes: 15 additions & 4 deletions cis_interface/communication/AsyncComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ class AsyncComm(CommBase.CommBase):
"""
def __init__(self, name, dont_backlog=False, **kwargs):
self.dont_backlog = (dont_backlog or kwargs.get('matlab', False) or
kwargs.get('is_inteface', False))
self.dont_backlog = (dont_backlog or kwargs.get('matlab', False)
or kwargs.get('is_inteface', False))
self._backlog_recv = []
self._backlog_send = []
self._backlog_thread = None
self.backlog_send_ready = threading.Event()
self.backlog_recv_ready = threading.Event()
self.backlog_open = False
self.loop_count = 0
super(AsyncComm, self).__init__(name, **kwargs)

def printStatus(self, nindent=0):
Expand Down Expand Up @@ -289,11 +290,17 @@ def pop_backlog_send(self):
def run_backlog_send(self):
r"""Continue trying to send buffered messages."""
if not self.is_open_backlog: # pragma: debug
self.debug("Backlog closed")
self._close_backlog()
return
if not self.send_backlog(): # pragma: debug
self.debug("Stopping because send_backlog failed")
self._close_backlog()
return
self.loop_count += 1
if (self.loop_count % 100) == 0:
self.debug("Sleeping (is_confirmed_send=%s)",
str(self.is_confirmed_send))
self.sleep()

def run_backlog_recv(self):
Expand All @@ -311,6 +318,10 @@ def run_backlog_recv(self):
self.debug("Stopping backlog recv thread")
self.backlog_thread.set_break_flag()
return
self.loop_count += 1
if (self.loop_count % 100) == 0:
self.debug("Sleeping (is_confirmed_recv=%s)",
str(self.is_confirmed_recv))
self.sleep()

def send_backlog(self):
Expand Down Expand Up @@ -447,8 +458,8 @@ def _recv(self, timeout=None, no_backlog=False, no_confirm=False):
# If no backlog, receive from queue
if no_backlog:
T = self.start_timeout(timeout, key_suffix='_recv:direct')
while ((not T.is_out) and (self.n_msg_direct_recv == 0) and
self.is_open_direct):
while ((not T.is_out) and (self.n_msg_direct_recv == 0)
and self.is_open_direct):
self.sleep()
self.stop_timeout(key_suffix='_recv:direct')
if not self.is_open_direct: # pragma: debug
Expand Down
17 changes: 14 additions & 3 deletions cis_interface/communication/ClientComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,20 @@ def __init__(self, name, request_comm=None, response_kwargs=None,
address=self.ocomm.address)

@classmethod
def is_installed(cls):
r"""bool: Is the comm installed."""
return get_comm_class().is_installed()
def is_installed(cls, language=None):
r"""Determine if the necessary libraries are installed for this
communication class.
Args:
language (str, optional): Specific language that should be checked
for compatibility. Defaults to None and all languages supported
on the current platform will be checked.
Returns:
bool: Is the comm installed.
"""
return get_comm_class().is_installed(language=language)

@property
def maxMsgSize(self):
Expand Down
2 changes: 2 additions & 0 deletions cis_interface/communication/CommBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef struct comm_t {
void *info; //!< Pointer to any extra info comm requires.
seri_t *serializer; //!< Serializer for comm messages.
size_t maxMsgSize; //!< The maximum message size.
size_t msgBufSize; //!< The size that should be reserved in messages.
int always_send_header; //!< 1 if comm should always send a header.
int index_in_register; //!< Index of the comm in the comm register.
time_t *last_send; //!< Clock output at time of last send.
Expand Down Expand Up @@ -58,6 +59,7 @@ comm_t empty_comm_base() {
ret.info = NULL;
ret.serializer = NULL;
ret.maxMsgSize = 0;
ret.msgBufSize = 0;
ret.always_send_header = 0;
ret.index_in_register = -1;
ret.last_send = NULL;
Expand Down
71 changes: 60 additions & 11 deletions cis_interface/communication/CommBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
import numpy as np
import pandas as pd
from logging import info
from cis_interface import backwards, tools, serialize
from cis_interface.tools import get_CIS_MSG_MAX, CIS_MSG_EOF
from cis_interface.communication import (
Expand Down Expand Up @@ -316,7 +317,7 @@ def __init__(self, name, address=None, direction='send',
if comm is not None:
assert(comm == self.comm_class)
super(CommBase, self).__init__(name, **kwargs)
if not self.__class__.is_installed():
if not self.__class__.is_installed(language='python'):
raise RuntimeError("Comm class %s not installed" % self.__class__)
suffix = determine_suffix(no_suffix=no_suffix,
reverse_names=reverse_names,
Expand Down Expand Up @@ -416,9 +417,52 @@ def printStatus(self, nindent=0):
print('%s%-15s: %s' % (prefix, 'nrecv', self._n_recv))

@classmethod
def is_installed(cls):
r"""bool: Is the comm installed."""
return True
def is_installed(cls, language=None):
r"""Determine if the necessary libraries are installed for this
communication class.
Args:
language (str, optional): Specific language that should be checked
for compatibility. Defaults to None and all languages supported
on the current platform will be checked. If set to 'any', the
result will be True if this comm is installed for any of the
supported languages.
Returns:
bool: Is the comm installed.
"""
lang_list = tools.get_supported_lang()
comm_class = str(cls).split("'")[1].split(".")[-1]
use_any = False
if language in [None, 'all']:
language = lang_list
elif language == 'any':
use_any = True
language = lang_list
if isinstance(language, list):
out = (not use_any)
for l in language:
if not cls.is_installed(language=l):
if not use_any:
out = False
break
elif use_any:
out = True
break
elif language in ['cpp', 'c++', 'make', 'cmake']:
out = cls.is_installed(language='c')
elif language in ['lpy', 'matlab']:
out = cls.is_installed(language='python')
elif language in ['executable']:
out = True
else:
if comm_class in ['CommBase', 'AsyncComm', 'ForkComm', 'ErrorClass']:
out = (language in lang_list)
else:
# Default to False for languages so subclasses must be explicit
out = False
return out

@property
def maxMsgSize(self):
Expand Down Expand Up @@ -471,7 +515,9 @@ def comm_count(cls):
r"""int: Number of communication connections."""
out = len(cls.comm_registry())
if out > 0:
print(cls, cls.comm_registry())
info('There are %d %s comms: %s',
len(cls.comm_registry()), cls.__name__,
[k for k in cls.comm_registry().keys()])
return out

@classmethod
Expand Down Expand Up @@ -610,7 +656,7 @@ def linger(self):
else:
self.drain_messages(variable='n_msg_send')
self.wait_for_confirm(timeout=self._timeout_drain)
self.debug("Finished")
self.debug("Finished (timeout_drain = %s)", str(self._timeout_drain))

def matlab_atexit(self): # pragma: matlab
r"""Close operations including draining receive."""
Expand Down Expand Up @@ -879,7 +925,8 @@ def create_work_comm(self, work_comm_name=None, **kwargs):
kws.update(**kwargs)
if work_comm_name is None:
cls = kws.get("comm", tools.get_default_comm())
work_comm_name = 'temp_%s_%s.%s' % (cls, kws['direction'], kws['uuid'])
work_comm_name = '%s_temp_%s_%s.%s' % (
self.name, cls, kws['direction'], kws['uuid'])
c = new_comm(work_comm_name, **kws)
self.add_work_comm(c)
return c
Expand Down Expand Up @@ -958,8 +1005,8 @@ def header2workcomm(self, header, work_comm_name=None, **kwargs):
kws['address'] = header['address']
if work_comm_name is None:
cls = kws.get("comm", tools.get_default_comm())
work_comm_name = 'temp_%s_%s.%s' % (
cls, kws['direction'], header['id'])
work_comm_name = '%s_temp_%s_%s.%s' % (
self.name, cls, kws['direction'], header['id'])
c = get_comm(work_comm_name, **kws)
return c

Expand Down Expand Up @@ -1055,6 +1102,7 @@ def on_send_eof(self):
bool: True if EOF message should be sent, False otherwise.
"""
self.debug('')
msg_s = backwards.unicode2bytes(self.eof_msg)
with self._closing_thread.lock:
if not self._eof_sent.is_set():
Expand Down Expand Up @@ -1309,8 +1357,8 @@ def on_recv(self, s_msg, second_pass=False):
if self.is_eof(msg_):
flag = self.on_recv_eof()
msg = msg_
elif ((self.recv_converter is not None) and
(not header.get('incomplete', False))):
elif ((self.recv_converter is not None)
and (not header.get('incomplete', False))):
self.debug("Converting message")
msg = self.recv_converter(msg_)
else:
Expand Down Expand Up @@ -1350,6 +1398,7 @@ def recv(self, *args, **kwargs):
self.exception('Failed to recv.')
return (False, None)
if self.single_use and self._used:
self.debug('Linger close on single use')
self.linger_close()
return (flag, msg)

Expand Down
21 changes: 19 additions & 2 deletions cis_interface/communication/FileComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ def _init_before_open(self, read_meth='read', append=False, in_temp=False,
if v is not None:
setattr(self, k, func_conv(v))

@classmethod
def is_installed(cls, language=None):
r"""Determine if the necessary libraries are installed for this
communication class.
Args:
language (str, optional): Specific language that should be checked
for compatibility. Defaults to None and all languages supported
on the current platform will be checked.
Returns:
bool: Is the comm installed.
"""
# Filesystem is implied
return True

@property
def maxMsgSize(self):
r"""int: Maximum size of a single message that should be sent."""
Expand Down Expand Up @@ -208,8 +225,8 @@ def advance_in_series(self, series_index=None):
if series_index is None:
series_index = self._series_index + 1
if self._series_index != series_index:
if (((self.direction == 'send') or
os.path.isfile(self.get_series_address(series_index)))):
if (((self.direction == 'send')
or os.path.isfile(self.get_series_address(series_index)))):
self._file_close()
self._series_index = series_index
self._open()
Expand Down
Loading

0 comments on commit c4f4c59

Please sign in to comment.