Skip to content

Commit 2b3942d

Browse files
committed
Even more logging.
- Log every upload not just dynamically collected files (more cohesive with download logging). - Log all staging actions (in and out) with a complete description of FileAction. - Log full description of FileAction object on each staging up/staging down failure. Requested by @natefoo.
1 parent fa2b6dc commit 2b3942d

File tree

6 files changed

+27
-10
lines changed

6 files changed

+27
-10
lines changed

pulsar/client/action_mapper.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,16 @@ def staging_needed(self):
282282
def staging_action_local(self):
283283
return self.staging == STAGING_ACTION_LOCAL
284284

285+
def to_dict(self):
286+
return dict(action_type=self.action_type)
287+
288+
def __str__(self):
289+
as_dict = self.to_dict()
290+
attribute_str = ""
291+
for key, value in as_dict.items():
292+
attribute_str += "%s=%s" % (key, value)
293+
return "FileAction[%s]" % attribute_str
294+
285295

286296
class NoneAction(BaseAction):
287297
""" This action indicates the corresponding path does not require any

pulsar/client/staging/down.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def __collect_other_working_directory_files(self):
111111
if name in self.downloaded_working_directory_files:
112112
continue
113113
if self.client_outputs.dynamic_match(name):
114-
log.info("collecting %s" % name)
114+
log.debug("collecting dynamic output %s" % name)
115115
output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name))
116116
if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name):
117117
self.downloaded_working_directory_files.append(name)
@@ -129,6 +129,7 @@ def _attempt_collect_output(self, output_type, path, name=None):
129129
return collected
130130

131131
def _collect_output(self, output_type, action, name):
132+
log.info("collecting output %s with action %s" % (name, action))
132133
return self.output_collector.collect_output(self, output_type, action, name)
133134

134135

pulsar/managers/staging/post.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ def collect_output(self, results_collector, output_type, action, name):
5555
name = os.path.basename(action.path)
5656

5757
pulsar_path = self.job_directory.calculate_path(name, output_type)
58-
self.action_executor.execute(lambda: action.write_from_path(pulsar_path))
58+
description = "staging out file %s via %s" % (pulsar_path, action)
59+
self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description)
5960

6061

6162
def __pulsar_outputs(job_directory):

pulsar/managers/staging/pre.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ def preprocess(job_directory, setup_actions, action_executor):
1212
input_type = setup_action["type"]
1313
action = from_dict(setup_action["action"])
1414
path = job_directory.calculate_path(name, input_type)
15-
log.debug("Staging %s '%s' via %s to %s", input_type, name, action.__class__.__name__, path)
16-
action_executor.execute(lambda: action.write_to_path(path))
15+
description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path)
16+
log.debug(description)
17+
action_executor.execute(lambda: action.write_to_path(path), "action[%s]" % description)
1718

1819
__all__ = ['preprocess']

pulsar/managers/util/retry.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@ def __init__(self, **kwds):
2727
self.errback = kwds.get("errback", self.__default_errback)
2828
self.catch = kwds.get("catch", DEFAULT_CATCH)
2929

30-
self.description = kwds.get("description", DEFAULT_DESCRIPTION)
30+
self.default_description = kwds.get("description", DEFAULT_DESCRIPTION)
3131

32-
def execute(self, action):
32+
def execute(self, action, description=None):
3333
def on_error(exc, intervals, retries, interval=0):
3434
interval = next(intervals)
3535
if self.errback:
36-
self.errback(exc, interval)
36+
errback_args = [exc, interval]
37+
if description is not None:
38+
errback_args.append(description)
39+
self.errback(exc, interval, description)
3740
return interval
3841

3942
return _retry_over_time(
@@ -46,10 +49,11 @@ def on_error(exc, intervals, retries, interval=0):
4649
errback=on_error,
4750
)
4851

49-
def __default_errback(self, exc, interval):
52+
def __default_errback(self, exc, interval, description=None):
53+
description = description or self.default_description
5054
log.info(
5155
"Failed to execute %s, retrying in %s seconds.",
52-
self.description,
56+
description,
5357
interval,
5458
exc_info=True
5559
)

test/retry_action_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def test_third_execution_fine():
2525
RetryActionExecutor(max_retries=2, interval_start=.01, interval_step=.01).execute(action_tracker.execute)
2626
except Exception:
2727
exception_raised = True
28-
assert action_tracker.count == 3
28+
assert action_tracker.count == 3, action_tracker.count
2929
assert not exception_raised
3030

3131

0 commit comments

Comments
 (0)