Skip to content

Commit

Permalink
[WIP] issues 183 and 184
Browse files Browse the repository at this point in the history
Signed-off-by: Lance Drane <dranelt@ornl.gov>
  • Loading branch information
Lance Drane committed Feb 29, 2024
1 parent 6f02737 commit 6df92a5
Show file tree
Hide file tree
Showing 24 changed files with 215 additions and 73 deletions.
Empty file.
13 changes: 13 additions & 0 deletions examples-proposed/001-helloworld/helloworld/hello_driver.py
@@ -0,0 +1,13 @@
from ipsframework import Component


class hello_driver(Component):
def __init__(self, services, config):
super().__init__(services, config)
print('Created %s' % (self.__class__))

def step(self, timestamp=0.0):
print('hello_driver: beginning step call')
worker_comp = self.services.get_port('WORKER')
self.services.call(worker_comp, 'step', 0.0)
print('hello_driver: finished step call')
10 changes: 10 additions & 0 deletions examples-proposed/001-helloworld/helloworld/hello_worker.py
@@ -0,0 +1,10 @@
from ipsframework import Component


class hello_worker(Component):
def __init__(self, services, config):
super().__init__(services, config)
print('Created %s' % (self.__class__))

def step(self, timestamp=0.0):
print('Hello from hello_worker')
9 changes: 9 additions & 0 deletions examples-proposed/001-helloworld/setup.py
@@ -0,0 +1,9 @@
#!/usr/bin/env python3
from setuptools import setup, find_packages

setup(
name="helloworld",
version="1.0.0",
install_requires=["ipsframework"],
packages=find_packages(),
)
9 changes: 9 additions & 0 deletions examples-proposed/README.md
@@ -0,0 +1,9 @@
# Examples

This directory is meant to showcase concrete code examples on how to use the IPS framework.

## Running the examples

1) Make sure your environment has the `ipsframework` dependency installed. (To run from the repository, you can run `pip install -e .`)
2) Change directory into the specific folder (i.e. `001-hello-world`)
3) Run `ips.py --config config.conf --platform platform.conf`
34 changes: 34 additions & 0 deletions examples-proposed/dask/dask_sim.config
@@ -0,0 +1,34 @@
SIM_NAME = dask_example
SIM_ROOT = $PWD
LOG_FILE = log
LOG_LEVEL = INFO
SIMULATION_MODE = NORMAL

[PORTS]
NAMES = DRIVER WORKER
[[DRIVER]]
IMPLEMENTATION = driver

[[WORKER]]
IMPLEMENTATION = dask_worker

[driver]
CLASS = DRIVER
SUB_CLASS =
NAME = Driver
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT = $PWD/driver.py

[dask_worker]
CLASS = DASK_WORKER
SUB_CLASS =
NAME = DaskWorker
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT = $PWD/dask_worker.py
EXECUTABLE = $PWD/sleep
32 changes: 32 additions & 0 deletions examples-proposed/dask/dask_worker.py
@@ -0,0 +1,32 @@
import copy
from time import sleep
from ipsframework import Component


def myFun(*args):
print(f"myFun({args[0]})")
sleep(float(args[0]))
return 0


class DaskWorker(Component):
def step(self, timestamp=0.0):
cwd = self.services.get_working_dir()
self.services.create_task_pool('pool')

duration = 0.5
self.services.add_task('pool', 'binary', 1, cwd, self.EXECUTABLE, duration)
self.services.add_task('pool', 'function', 1, cwd, myFun, duration)
self.services.add_task('pool', 'method', 1, cwd, copy.copy(self).myMethod, duration)

ret_val = self.services.submit_tasks('pool',
use_dask=True,
dask_nodes=1)
print('ret_val =', ret_val)
exit_status = self.services.get_finished_tasks('pool')
print('exit_status = ', exit_status)

def myMethod(self, *args):
print(f"myMethod({args[0]})")
sleep(float(args[0]))
return 0
7 changes: 7 additions & 0 deletions examples-proposed/dask/driver.py
@@ -0,0 +1,7 @@
from ipsframework import Component


class Driver(Component):
def step(self, timestamp=0.0):
worker_comp = self.services.get_port('WORKER')
self.services.call(worker_comp, 'step', 0.0)
7 changes: 7 additions & 0 deletions examples-proposed/dask/platform.conf
@@ -0,0 +1,7 @@
MPIRUN = eval
NODE_DETECTION = manual
CORES_PER_NODE = 1
SOCKETS_PER_NODE = 1
NODE_ALLOCATION_MODE = shared
HOST = localhost
SCRATCH =
38 changes: 38 additions & 0 deletions examples-proposed/dask/simulation_log.json
@@ -0,0 +1,38 @@
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_LAUNCH_DASK_TASK",
"walltime": "2.33",
"comment": "task_name = method, Target = myMethod(0.5)",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_LAUNCH_DASK_TASK",
"walltime": "2.33",
"comment": "task_name = function, Target = myFun(0.5)",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_LAUNCH_DASK_TASK",
"walltime": "2.33",
"state": "Running",
"comment": "task_name = binary, Target = sleep 0.5",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = method, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = function, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.85",
"state": "Running",
"comment": "task_name = binary, elapsed time = 0.52s",
}
3 changes: 3 additions & 0 deletions examples-proposed/dask/sleep
@@ -0,0 +1,3 @@
#!/bin/bash
echo /bin/sleep $1
/bin/sleep $1
2 changes: 1 addition & 1 deletion ipsframework/cca_es_spec.py
Expand Up @@ -73,7 +73,7 @@ def getHeader(self):
def getBody(self):
return self.body

def __str__(self):
def __str__(self) -> str:
return str(self.body)


Expand Down
11 changes: 7 additions & 4 deletions ipsframework/component.py
Expand Up @@ -6,7 +6,9 @@
import os
import weakref
from copy import copy
from typing import Any, Dict, Literal
from .messages import Message, MethodResultMessage
from .services import ServicesProxy


class Component:
Expand All @@ -21,7 +23,7 @@ class Component:
:type config: dict
"""

def __init__(self, services, config):
def __init__(self, services: ServicesProxy, config: Dict[str, Any]):
"""
Set up config values and reference to services.
"""
Expand All @@ -38,7 +40,7 @@ def __init__(self, services, config):
try:
setattr(self, i, config[i])
except Exception as e:
print('Error setting Component parameter : ', i, ' - ', e)
print('Error setting Component parameter : ', i, ' - ', e, file=sys.stderr)
raise

def __copy__(self):
Expand Down Expand Up @@ -88,7 +90,7 @@ def __run__(self):
if 'OUT_REDIRECT_FNAME' not in self.services.sim_conf:
fname = "%s.out" % (self.services.sim_conf['SIM_NAME'])
fname = os.path.join(self.services.sim_conf['PWD'], fname)
print('Redirecting stdout to ', fname)
print('Redirecting stdout to ', fname, file=sys.stderr)
else:
fname = self.services.sim_conf['OUT_REDIRECT_FNAME']
original_stdout_fd = sys.stdout.fileno()
Expand Down Expand Up @@ -213,7 +215,8 @@ def checkpoint(self, timestamp=0.0, **keywords):
"""
self.services.debug('checkpoint() method called')

def terminate(self, status):
# TODO fix typing when Message is no longer an enum
def terminate(self, status: Literal[0, 1]):
"""
Clean up services and call :py:obj:`sys_exit`.
"""
Expand Down
16 changes: 8 additions & 8 deletions ipsframework/componentRegistry.py
Expand Up @@ -29,7 +29,7 @@ def deserialize(comp_id_string):
"""
tokens = comp_id_string.split(ComponentID.delimiter)
if len(tokens) != 3:
print('Invalid serialized component ID : ', comp_id_string)
print('Invalid serialized component ID : ', comp_id_string, file=sys.stderr)
sys.exit(1)
return ComponentID.all_ids[comp_id_string]

Expand Down Expand Up @@ -138,7 +138,7 @@ def addEntry(self, component_id, svc_response_q, invocation_q,
self.registry[key] = value
except KeyError as e:
print('Error creating component registry entry for ', key,
' : ', str(e))
' : ', str(e), file=sys.stderr)
raise e

def removeEntry(self, component_id):
Expand All @@ -147,7 +147,7 @@ def removeEntry(self, component_id):
del self.registry[key]
except KeyError as e:
print('Error removing component registry entry for ', key,
' : ', str(e))
' : ', str(e), file=sys.stderr)
raise

# SIMYAN: this was added to provide an easy way to use the component
Expand All @@ -160,7 +160,7 @@ def getEntry(self, component_id):
try:
entry = self.registry[key]
except KeyError:
print('No registry entry found for ', key)
print('No registry entry found for ', key, file=sys.stderr)
raise
return entry

Expand All @@ -172,13 +172,13 @@ def getComponentArtifact(self, component_id, artifact):
try:
entry = self.registry[key]
except KeyError:
print('No registry entry found for ', key)
print('No registry entry found for ', key, file=sys.stderr)
raise
try:
value = getattr(entry, artifact)
except KeyError:
print('Invalid registry attribute : ', artifact)
print('Possible values are : ', list(entry.__dict__.keys()))
print('Invalid registry attribute : ', artifact, file=sys.stderr)
print('Possible values are : ', list(entry.__dict__.keys()), file=sys.stderr)
raise
return value

Expand All @@ -191,6 +191,6 @@ def setComponentArtifact(self, component_id, artifact, value):
try:
entry = self.registry[key]
except KeyError:
print('No registry entry found for ', key)
print('No registry entry found for ', key, file=sys.stderr)
raise
setattr(entry, artifact, value)
4 changes: 2 additions & 2 deletions ipsframework/configurationManager.py
Expand Up @@ -104,7 +104,7 @@ def __getattr__(self, attr):
if abs_path not in self.config_file_list:
self.config_file_list.append(abs_path)
else:
print('Ignoring duplicate configuration file ', abs_path)
print('Ignoring duplicate configuration file ', abs_path, file=sys.stderr)

# sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
sys.stdout = Unbuffered(sys.stdout)
Expand Down Expand Up @@ -748,7 +748,7 @@ def terminate(self, status):
for sim_name in list(self.sim_map.keys()):
self.terminate_sim(sim_name)
except Exception:
print('Encountered exception when terminating simulation')
print('Encountered exception when terminating simulation', file=sys.stderr)
raise
for k in list(self.sim_map.keys()):
del self.sim_map[k]
Expand Down
18 changes: 11 additions & 7 deletions ipsframework/ips.py
Expand Up @@ -60,6 +60,8 @@
import os
import time
import hashlib
from typing import List, Optional

from ipsframework import platformspec
from ipsframework.messages import Message, ServiceRequestMessage, \
ServiceResponseMessage, MethodInvokeMessage, MethodResultMessage
Expand All @@ -75,8 +77,8 @@
from ipsframework.ipsutil import getTimeString
from ipsframework._version import get_versions

if sys.version[0] != '3': # noqa: E402
print("IPS can is only compatible with Python 3.5 or higher")
if sys.version_info[0] != 3 or sys.version_info[1] < 5: # noqa: E402
print("IPS is only compatible with Python 3.5 or higher", file=sys.stderr)
sys.exit(1)


Expand Down Expand Up @@ -123,11 +125,12 @@ class Framework:
:param cmd_ppn: Computer processor per nodes (default = 0)
:type cmd_ppn: int
"""
def __init__(self, config_file_list, log_file_name, platform_file_name=None,
def __init__(self, config_file_list: List[str], log_file_name: str, platform_file_name: Optional[str] = None,
debug=False, verbose_debug=False, cmd_nodes=0, cmd_ppn=0):
# added compset_list for list of components to load config files for
# command line option
print("Starting IPS", get_versions()['version'])
# TODO check to see if printing to sys.stderr breaks anything
print("Starting IPS", get_versions()['version'], file=sys.stderr)
os.environ['IPS_INITIAL_CWD'] = os.getcwd()

self.log_file_name = log_file_name
Expand Down Expand Up @@ -232,7 +235,7 @@ def register_service_handler(self, service_list, handler):
for svc in service_list:
self.service_handler[svc] = handler

def _dispatch_service_request(self, msg):
def _dispatch_service_request(self, msg: ServiceRequestMessage):
"""
Find and execute handler that corresponds to the *target_method* of
*msg*, a :class:`messages.ServiceRequestMessage` object. If handler
Expand Down Expand Up @@ -724,7 +727,7 @@ def send_terminate_msg(self, sim_name, status=Message.SUCCESS):
invocation_q.put(msg)
except Exception as e:
self.exception('exception encountered while terminating comp %s', comp_id)
print(e)
print(e, file=sys.stderr)

def terminate_all_sims(self, status=Message.SUCCESS):
"""Terminate all active component instances.
Expand Down Expand Up @@ -752,11 +755,12 @@ def main(argv=None):
"""
Check and parse args, create and run the framework.
"""
# TODO don't think this is necessary
sys.stdout.flush()

platform_default = os.environ.get("IPS_PLATFORM_FILE")
if platform_default:
print("IPS using platform file :", platform_default)
print("IPS using platform file :", platform_default, file=sys.stderr)

parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version="%(prog)s " + get_versions()['version'])
Expand Down
2 changes: 1 addition & 1 deletion ipsframework/ipsLogging.py
Expand Up @@ -93,7 +93,7 @@ def add_sim_log(self, log_pipe_name, log_file=sys.stdout):
os.makedirs(directory, exist_ok=True)
except OSError as oserr:
print('Error creating directory %s : %s-%s' %
(directory, oserr.errno, oserr.strerror))
(directory, oserr.errno, oserr.strerror), file=sys.stderr)
sys.exit(1)
log_handler = logging.FileHandler(log_file, mode='w')

Expand Down
3 changes: 2 additions & 1 deletion ipsframework/ipsutil.py
Expand Up @@ -9,6 +9,7 @@
import Pyro4
except ImportError:
pass
import sys


def which(program, alt_paths=None):
Expand Down Expand Up @@ -89,7 +90,7 @@ def copyFiles(src_dir, src_file_list, target_dir, prefix='', keep_old=False):
try:
os.makedirs(head, exist_ok=True)
except OSError as oserr:
print('Error creating directory %s : %s' % (head, oserr.strerror))
print('Error creating directory %s : %s' % (head, oserr.strerror), file=sys.stderr)
raise
try:
shutil.copy(src_file, target_file)
Expand Down

0 comments on commit 6df92a5

Please sign in to comment.