Skip to content

Commit

Permalink
Consistent log lines for #85
Browse files Browse the repository at this point in the history
  • Loading branch information
yadudoc committed Mar 7, 2018
1 parent 7d470b2 commit 413a6ae
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions parsl/dataflow/dflow.py
Expand Up @@ -9,6 +9,8 @@
from concurrent.futures import Future
from functools import partial

import parsl
import libsubmit
from parsl.dataflow.error import *
from parsl.dataflow.states import States
from parsl.dataflow.futures import AppFuture
Expand Down Expand Up @@ -64,6 +66,11 @@ def __init__(self, config=None, executors=None, lazy_fail=True, appCache=True,
"""
# Create run dirs for this run
self.rundir = make_rundir(config=config, path=rundir)
parsl.set_file_logger("{}/parsl.log".format(self.rundir),
level=logging.INFO)

logger.info("Parsl version: {}".format(parsl.__version__))
logger.info("Libsubmit version: {}".format(libsubmit.__version__))

# Update config with defaults
self._config = update_config(config, self.rundir)
Expand Down Expand Up @@ -109,9 +116,6 @@ def _count_deps(depends, task_id):
count = 0
for dep in depends:
if isinstance(dep, Future) or issubclass(type(dep), Future):
logger.debug("Task[{0}]: dep:{1} done:{2}".format(task_id,
dep,
dep.done()))
if not dep.done():
count += 1

Expand Down Expand Up @@ -170,13 +174,17 @@ def handle_update(self, task_id, future, memo_cbk=False):
future.result()

except Exception as e:
logger.debug("[TODO] Task[{0}]: FAILED with {1}".format(task_id, future))
logger.info("Task[{}]: FAILED with [{}]".format(task_id, e))
print(dir(e))
print("Traceback : ", e.print_traceback())
# We keep the history separately, since the future itself could be
# tossed.
self.tasks[task_id]['fail_history'].append(future._exception)
self.tasks[task_id]['fail_count'] += 1

# If eager fail, raise error
if not self.lazy_fail:
logger.debug("[TODO] Eager fail, skipping retry logic")
logger.debug("Eager fail, skipping retry logic")
raise e

if self.tasks[task_id]['fail_count'] <= self.fail_retries:
Expand All @@ -185,12 +193,12 @@ def handle_update(self, task_id, future, memo_cbk=False):
self.tasks[task_id]['status'] = States.pending

else:
logger.debug("Task[{0}]: All retry attempts:{1} have failed".format(task_id,
logger.info("Task[{0}]: All retry attempts:{1} have failed".format(task_id,
self.fail_retries))
self.tasks[task_id]['status'] = States.failed

else:
logger.debug("Task[{}]: COMPLETED with {}".format(task_id, future))
logger.info("Task[{}]: COMPLETED with {}".format(task_id, future))
self.tasks[task_id]['status'] = States.done

# Identify tasks that have resolved dependencies and launch
Expand All @@ -207,7 +215,6 @@ def handle_update(self, task_id, future, memo_cbk=False):
self.tasks[tid]['args'] = new_args
self.tasks[tid]['kwargs'] = kwargs
if not exceptions:
logger.debug("Task[{0}] Launching Task".format(tid))
# There are no dependency errors
exec_fu = None
# Acquire a lock, retest the state, launch
Expand All @@ -225,7 +232,7 @@ def handle_update(self, task_id, future, memo_cbk=False):
logger.error("Task[{}]: Caught AttributeError at update_parent".format(tid))
raise e
else:
logger.debug("Task[{}]: Deferring Task due to dependency failure".format(tid))
logger.info("Task[{}]: Deferring Task due to dependency failure".format(tid))
# Raise a dependency exception
self.tasks[tid]['status'] = States.dep_fail
try:
Expand Down Expand Up @@ -292,7 +299,7 @@ def launch_task(self, task_id, executable, *args, **kwargs):
exec_fu = executor.submit(executable, *args, **kwargs)
exec_fu.retries_left = self.fail_retries - self.tasks[task_id]['fail_count']
exec_fu.add_done_callback(partial(self.handle_update, task_id))
logger.debug("Task[{}] launched on executor:{}".format(task_id, executor))
logger.info("Task[{}] launched on site:{}".format(task_id, site))
return exec_fu

@staticmethod
Expand Down Expand Up @@ -449,6 +456,10 @@ def submit(self, func, *args, parsl_sites='all', fn_hash=None, cache=False, **kw
task_stdout = kwargs.get('stdout', None)
task_stderr = kwargs.get('stderr', None)

logger.info("Task[{}] App:{} Dependencies:{}".format(task_id,
task_def['func_name'],
[fu.tid for fu in depends]))

if dep_cnt == 0:
# Set to running
new_args, kwargs, exceptions = self.sanitize_and_wrap(task_id, args, kwargs)
Expand Down Expand Up @@ -482,6 +493,7 @@ def submit(self, func, *args, parsl_sites='all', fn_hash=None, cache=False, **kw

logger.debug("Task[{}] Launched with AppFut:{}".format(task_id,
task_def['app_fu']))

return task_def['app_fu']

def cleanup(self):
Expand All @@ -494,7 +506,7 @@ def cleanup(self):
'''

logger.debug("DFK cleanup initiated")
logger.info("DFK cleanup initiated")

# Send final stats
self.usage_tracker.send_message()
Expand All @@ -503,7 +515,7 @@ def cleanup(self):
if not self._executors_managed:
return

logger.debug("Terminating flow_control and strategy threads")
logger.info("Terminating flow_control and strategy threads")
self.flowcontrol.close()

for executor in self.executors.values():
Expand All @@ -514,7 +526,7 @@ def cleanup(self):
# We are not doing shutdown here because even with block=False this blocks.
executor.shutdown()

logger.debug("DFK cleanup complete")
logger.info("DFK cleanup complete")

def checkpoint(self):
''' Checkpoint the dfk incrementally to a checkpoint file.
Expand All @@ -530,7 +542,7 @@ def checkpoint(self):
run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
'''

logger.debug("Checkpointing.. ")
logger.info("Checkpointing.. ")
start = time.time()

checkpoint_dir = '{0}/checkpoint'.format(self.rundir)
Expand Down Expand Up @@ -580,8 +592,8 @@ def checkpoint(self):
if count == 0:
logger.warn('No tasks checkpointed, please ensure caching is enabled')
else:
logger.debug("Done checkpointing {} tasks in {}s".format(count,
end - start))
logger.info("Done checkpointing {} tasks in {}s".format(count,
end - start))
return checkpoint_dir

def _load_checkpoints(self, checkpointDirs):
Expand Down Expand Up @@ -629,8 +641,8 @@ def _load_checkpoints(self, checkpointDirs):
logger.error(reason)
raise BadCheckpoint(reason)

logger.debug("Completed loading checkpoint:{0} with {1} tasks".format(checkpoint_file,
len(memo_lookup_table.keys())))
logger.info("Completed loading checkpoint:{0} with {1} tasks".format(checkpoint_file,
len(memo_lookup_table.keys())))
return memo_lookup_table

def load_checkpoints(self, checkpointDirs):
Expand Down

0 comments on commit 413a6ae

Please sign in to comment.