Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Bulk changes for PR.
  • Loading branch information
ConnorPigg committed Sep 13, 2018
1 parent 1ff251e commit 27a4d5c
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 89 deletions.
2 changes: 2 additions & 0 deletions parsl/config.py
Expand Up @@ -30,6 +30,8 @@ class Config(RepresentationMixin):
data_management_max_threads : int, optional
Maximum number of threads to allocate for the data manager to use for managing input and output transfers.
Default is 10.
db_logger_config : LoggerConfig, optional
The config to use for database monitoring. Default is None which does not log to a database.
lazy_errors : bool, optional
If True, errors from task failures will not be raised until `future.result()` is called. Otherwise, they will
be raised as soon as the task returns. Default is True.
Expand Down
57 changes: 29 additions & 28 deletions parsl/dataflow/dflow.py
Expand Up @@ -8,10 +8,11 @@
import inspect
import sys
import multiprocessing
import getpass

from getpass import getuser
from uuid import uuid4
from datetime import datetime
from socket import gethostname

from concurrent.futures import Future
from functools import partial

Expand Down Expand Up @@ -90,30 +91,30 @@ def __init__(self, config=Config()):
# ES logging
self.tasks_completed_count = 0
self.tasks_failed_count = 0
self.db_logger_config_object = config.db_logger_config
if self.db_logger_config_object is not None and self.db_logger_config_object.logger_type == 'local_database'\
and self.db_logger_config_object.eng_link is None:
# uses the rundir as the default location. Should report to users?
self.db_logger_config = config.db_logger_config
if self.db_logger_config is not None and self.db_logger_config.logger_type == 'local_database'\
and self.db_logger_config.eng_link is None:
# uses the rundir as the default location.
logger.info('Local monitoring database can be found inside the run_dir at: {}'.format(self.run_dir))
self.db_logger_config_object.eng_link = "sqlite:///{}".format(os.path.join(os.path.abspath(self.run_dir), 'parsl.db'))
if self.db_logger_config_object is None:
self.db_logger_config.eng_link = "sqlite:///{}".format(os.path.join(os.path.abspath(self.run_dir), 'monitoring.db'))
if self.db_logger_config is None:
self.db_logger = get_db_logger()
else:
self.db_logger = get_db_logger(db_logger_config_object=self.db_logger_config_object)
self.db_logger = get_db_logger(db_logger_config=self.db_logger_config)
self.workflow_name = os.path.basename(str(inspect.stack()[1][1]))
if self.db_logger_config_object is not None and self.db_logger_config_object.workflow_name is not None:
self.workflow_name = self.db_logger_config_object.workflow_name
if self.db_logger_config is not None and self.db_logger_config.workflow_name is not None:
self.workflow_name = self.db_logger_config.workflow_name
self.time_began = datetime.now()
self.time_completed = None
self.run_id = self.workflow_name + "-" + str(self.time_began.minute)
self.dashboard = self.db_logger_config_object.dashboard_link if self.db_logger_config_object is not None else None
self.run_id = str(uuid4())
self.dashboard = self.db_logger_config.dashboard_link if self.db_logger_config is not None else None
# TODO: make configurable
logger.info("Run id is: " + self.run_id)
if self.dashboard is not None:
logger.info("Dashboard is found at " + self.dashboard)
# start tornado logging server
if self.db_logger_config_object is not None and self.db_logger_config_object.logger_type == 'local_database':
self.logging_server = multiprocessing.Process(target=logging_server.run, kwargs={'db_logger_config_object': self.db_logger_config_object})
if self.db_logger_config is not None and self.db_logger_config.logger_type == 'local_database':
self.logging_server = multiprocessing.Process(target=logging_server.run, kwargs={'db_logger_config': self.db_logger_config})
self.logging_server.start()
else:
self.logging_server = None
Expand All @@ -122,12 +123,12 @@ def __init__(self, config=Config()):
'parsl_version': get_version(),
'libsumbit_version': libsubmit.__version__,
"time_began": str(self.time_began.strftime('%Y-%m-%d %H:%M:%S')),
'time_completed': str(self.time_completed),
'task_run_id': self.run_id,
'time_completed': str(None),
'run_id': self.run_id,
'rundir': self.run_dir,
'tasks_completed_count': self.tasks_completed_count,
'tasks_failed_count': self.tasks_failed_count,
'user': getpass.getuser(),
'user': getuser(),
'host': gethostname(),
}
self.db_logger.info("DFK start", extra=workflow_info)
Expand Down Expand Up @@ -174,6 +175,7 @@ def _create_task_log_info(self, task_id, fail_mode=None):
Create the dictionary that will be included in the log.
"""
task_log_info = {"task_" + k: v for k, v in self.tasks[task_id].items()}
task_log_info['run_id'] = self.run_id
task_log_info['task_status_name'] = self.tasks[task_id]['status'].name
task_log_info['tasks_failed_count'] = self.tasks_failed_count
task_log_info['tasks_completed_count'] = self.tasks_completed_count
Expand Down Expand Up @@ -243,15 +245,15 @@ def handle_update(self, task_id, future, memo_cbk=False):
if not self._config.lazy_errors:
logger.debug("Eager fail, skipping retry logic")
self.tasks[task_id]['status'] = States.failed
if self.db_logger_config_object is not None:
if self.db_logger_config is not None:
task_log_info = self._create_task_log_info(task_id, 'eager')
self.db_logger.info("Task Fail", extra=task_log_info)
raise e

if self.tasks[task_id]['fail_count'] <= self._config.retries:
self.tasks[task_id]['status'] = States.pending
logger.debug("Task {} marked for retry".format(task_id))
if self.db_logger_config_object is not None:
if self.db_logger_config is not None:
task_log_info = self._create_task_log_info(task_id, 'lazy')
self.db_logger.info("Task Retry", extra=task_log_info)

Expand All @@ -263,7 +265,7 @@ def handle_update(self, task_id, future, memo_cbk=False):
self.tasks_failed_count += 1

self.tasks[task_id]['time_completed'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if self.db_logger_config_object is not None:
if self.db_logger_config is not None:
task_log_info = self._create_task_log_info(task_id, 'lazy')
self.db_logger.info("Task Retry Failed", extra=task_log_info)

Expand All @@ -274,7 +276,7 @@ def handle_update(self, task_id, future, memo_cbk=False):

logger.info("Task {} completed".format(task_id))
self.tasks[task_id]['time_completed'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if self.db_logger_config_object is not None:
if self.db_logger_config is not None:
task_log_info = self._create_task_log_info(task_id)
self.db_logger.info("Task Done", extra=task_log_info)

Expand Down Expand Up @@ -335,7 +337,7 @@ def handle_update(self, task_id, future, memo_cbk=False):
"Task {} deferred due to dependency failure".format(tid))
# Raise a dependency exception
self.tasks[tid]['status'] = States.dep_fail
if self.db_logger_config_object is not None:
if self.db_logger_config is not None:
task_log_info = self._create_task_log_info(task_id, 'lazy')
self.db_logger.info("Task Dep Fail", extra=task_log_info)

Expand Down Expand Up @@ -386,12 +388,12 @@ def launch_task(self, task_id, executable, *args, **kwargs):
executor = self.executors[executor_label]
except Exception as e:
logger.exception("Task {} requested invalid executor {}: config is\n{}".format(task_id, executor_label, self._config))
if self.db_logger_config_object is not None:
executable = app_monitor.monitor_wrapper(executable, task_id, self.db_logger_config_object, self.run_id)
if self.db_logger_config is not None:
executable = app_monitor.monitor_wrapper(executable, task_id, self.db_logger_config, self.run_id)
exec_fu = executor.submit(executable, *args, **kwargs)
self.tasks[task_id]['status'] = States.running
self.tasks[task_id]['time_started'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if self.db_logger_config_object is not None:
if self.db_logger_config is not None:
task_log_info = self._create_task_log_info(task_id)
self.db_logger.info("Task Launch", extra=task_log_info)
exec_fu.retries_left = self._config.retries - \
Expand Down Expand Up @@ -572,7 +574,6 @@ def submit(self, func, *args, executors='all', fn_hash=None, cache=False, **kwar
'id': task_id,
'time_started': None,
'time_completed': None,
'run_id': self.run_id,
'app_fu': None}

if task_id in self.tasks:
Expand Down Expand Up @@ -744,7 +745,7 @@ def cleanup(self):
self.db_logger.info("DFK end", extra={'tasks_failed_count': self.tasks_failed_count, 'tasks_completed_count': self.tasks_completed_count,
"time_began": str(self.time_began.strftime('%Y-%m-%d %H:%M:%S')),
'time_completed': str(self.time_completed.strftime('%Y-%m-%d %H:%M:%S')),
'task_run_id': self.run_id, 'rundir': self.run_dir})
'run_id': self.run_id, 'rundir': self.run_dir})
if self.logging_server is not None:
self.logging_server.terminate()
self.logging_server.join()
Expand Down
16 changes: 8 additions & 8 deletions parsl/monitoring/app_monitor.py
Expand Up @@ -5,17 +5,17 @@
from parsl.monitoring.db_logger import get_db_logger


def monitor(pid, task_id, db_logger_config_object, run_id):
def monitor(pid, task_id, db_logger_config, run_id):
"""Internal
Monitors the Parsl task's resources by pointing psutil to the task's pid and watching it and its children.
"""

if db_logger_config_object is None:
if db_logger_config is None:
logger = get_db_logger()
else:
logger = get_db_logger(logger_name=run_id + str(task_id), db_logger_config_object=db_logger_config_object)
logger = get_db_logger(logger_name=run_id + str(task_id), db_logger_config=db_logger_config)

sleep_duration = db_logger_config_object.resource_loop_sleep_duration
sleep_duration = db_logger_config.resource_loop_sleep_duration
time.sleep(sleep_duration)
# these values are simple to log. Other information is available in special formats such as memory below.
simple = ["cpu_num", 'cpu_percent', 'create_time', 'cwd', 'exe', 'memory_percent', 'nice', 'name', 'num_threads', 'pid', 'ppid', 'status', 'username']
Expand All @@ -31,7 +31,7 @@ def to_mb(some_bytes):
while True:
try:
d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple}
d["task_run_id"] = run_id
d["run_id"] = run_id
d["task_id"] = task_id
children = pm.children(recursive=True)
d["psutil_cpu_count"] = psutil.cpu_count()
Expand Down Expand Up @@ -69,16 +69,16 @@ def to_mb(some_bytes):

finally:
logger.info("task resource update", extra=d)
sleep_duration = db_logger_config_object.resource_loop_sleep_duration
sleep_duration = db_logger_config.resource_loop_sleep_duration
time.sleep(sleep_duration)


def monitor_wrapper(f, task_id, db_logger_config_object, run_id):
def monitor_wrapper(f, task_id, db_logger_config, run_id):
""" Internal
Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins.
"""
def wrapped(*args, **kwargs):
p = Process(target=monitor, args=(os.getpid(), task_id, db_logger_config_object, run_id))
p = Process(target=monitor, args=(os.getpid(), task_id, db_logger_config, run_id))
p.start()
try:
return f(*args, **kwargs)
Expand Down

0 comments on commit 27a4d5c

Please sign in to comment.