diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 26c1d5e6e3..319a4a2821 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -9,6 +9,8 @@ from concurrent.futures import Future from functools import partial +import parsl +import libsubmit from parsl.dataflow.error import * from parsl.dataflow.states import States from parsl.dataflow.futures import AppFuture @@ -64,6 +66,11 @@ def __init__(self, config=None, executors=None, lazy_fail=True, appCache=True, """ # Create run dirs for this run self.rundir = make_rundir(config=config, path=rundir) + parsl.set_file_logger("{}/parsl.log".format(self.rundir), + level=logging.INFO) + + logger.info("Parsl version: {}".format(parsl.__version__)) + logger.info("Libsubmit version: {}".format(libsubmit.__version__)) # Update config with defaults self._config = update_config(config, self.rundir) @@ -109,9 +116,6 @@ def _count_deps(depends, task_id): count = 0 for dep in depends: if isinstance(dep, Future) or issubclass(type(dep), Future): - logger.debug("Task[{0}]: dep:{1} done:{2}".format(task_id, - dep, - dep.done())) if not dep.done(): count += 1 @@ -170,13 +174,17 @@ def handle_update(self, task_id, future, memo_cbk=False): future.result() except Exception as e: - logger.debug("[TODO] Task[{0}]: FAILED with {1}".format(task_id, future)) + logger.info("Task[{}]: FAILED with [{}]".format(task_id, e)) + print(dir(e)) + print("Traceback : ", e.print_traceback()) + # We keep the history separately, since the future itself could be + # tossed. self.tasks[task_id]['fail_history'].append(future._exception) self.tasks[task_id]['fail_count'] += 1 # If eager fail, raise error if not self.lazy_fail: - logger.debug("[TODO] Eager fail, skipping retry logic") + logger.debug("Eager fail, skipping retry logic") raise e if self.tasks[task_id]['fail_count'] <= self.fail_retries: @@ -185,12 +193,12 @@ def handle_update(self, task_id, future, memo_cbk=False): self.tasks[task_id]['status'] = States.pending else: - logger.debug("Task[{0}]: All retry attempts:{1} have failed".format(task_id, + logger.info("Task[{0}]: All retry attempts:{1} have failed".format(task_id, self.fail_retries)) self.tasks[task_id]['status'] = States.failed else: - logger.debug("Task[{}]: COMPLETED with {}".format(task_id, future)) + logger.info("Task[{}]: COMPLETED with {}".format(task_id, future)) self.tasks[task_id]['status'] = States.done # Identify tasks that have resolved dependencies and launch @@ -207,7 +215,6 @@ def handle_update(self, task_id, future, memo_cbk=False): self.tasks[tid]['args'] = new_args self.tasks[tid]['kwargs'] = kwargs if not exceptions: - logger.debug("Task[{0}] Launching Task".format(tid)) # There are no dependency errors exec_fu = None # Acquire a lock, retest the state, launch @@ -225,7 +232,7 @@ def handle_update(self, task_id, future, memo_cbk=False): logger.error("Task[{}]: Caught AttributeError at update_parent".format(tid)) raise e else: - logger.debug("Task[{}]: Deferring Task due to dependency failure".format(tid)) + logger.info("Task[{}]: Deferring Task due to dependency failure".format(tid)) # Raise a dependency exception self.tasks[tid]['status'] = States.dep_fail try: @@ -292,7 +299,7 @@ def launch_task(self, task_id, executable, *args, **kwargs): exec_fu = executor.submit(executable, *args, **kwargs) exec_fu.retries_left = self.fail_retries - self.tasks[task_id]['fail_count'] exec_fu.add_done_callback(partial(self.handle_update, task_id)) - logger.debug("Task[{}] launched on executor:{}".format(task_id, executor)) + logger.info("Task[{}] launched on site:{}".format(task_id, site)) return exec_fu @staticmethod @@ -449,6 +456,10 @@ def submit(self, func, *args, parsl_sites='all', fn_hash=None, cache=False, **kw task_stdout = kwargs.get('stdout', None) task_stderr = kwargs.get('stderr', None) + logger.info("Task[{}] App:{} Dependencies:{}".format(task_id, + task_def['func_name'], + [fu.tid for fu in depends])) + if dep_cnt == 0: # Set to running new_args, kwargs, exceptions = self.sanitize_and_wrap(task_id, args, kwargs) @@ -482,6 +493,7 @@ def submit(self, func, *args, parsl_sites='all', fn_hash=None, cache=False, **kw logger.debug("Task[{}] Launched with AppFut:{}".format(task_id, task_def['app_fu'])) + return task_def['app_fu'] def cleanup(self): @@ -494,7 +506,7 @@ def cleanup(self): ''' - logger.debug("DFK cleanup initiated") + logger.info("DFK cleanup initiated") # Send final stats self.usage_tracker.send_message() @@ -503,7 +515,7 @@ def cleanup(self): if not self._executors_managed: return - logger.debug("Terminating flow_control and strategy threads") + logger.info("Terminating flow_control and strategy threads") self.flowcontrol.close() for executor in self.executors.values(): @@ -514,7 +526,7 @@ def cleanup(self): # We are not doing shutdown here because even with block=False this blocks. executor.shutdown() - logger.debug("DFK cleanup complete") + logger.info("DFK cleanup complete") def checkpoint(self): ''' Checkpoint the dfk incrementally to a checkpoint file. @@ -530,7 +542,7 @@ def checkpoint(self): run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl} ''' - logger.debug("Checkpointing.. ") + logger.info("Checkpointing.. ") start = time.time() checkpoint_dir = '{0}/checkpoint'.format(self.rundir) @@ -580,8 +592,8 @@ def checkpoint(self): if count == 0: logger.warn('No tasks checkpointed, please ensure caching is enabled') else: - logger.debug("Done checkpointing {} tasks in {}s".format(count, - end - start)) + logger.info("Done checkpointing {} tasks in {}s".format(count, + end - start)) return checkpoint_dir def _load_checkpoints(self, checkpointDirs): @@ -629,8 +641,8 @@ def _load_checkpoints(self, checkpointDirs): logger.error(reason) raise BadCheckpoint(reason) - logger.debug("Completed loading checkpoint:{0} with {1} tasks".format(checkpoint_file, - len(memo_lookup_table.keys()))) + logger.info("Completed loading checkpoint:{0} with {1} tasks".format(checkpoint_file, + len(memo_lookup_table.keys()))) return memo_lookup_table def load_checkpoints(self, checkpointDirs):