Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per attempt task #725

Merged
merged 16 commits into from Oct 18, 2021
Merged

Per attempt task #725

merged 16 commits into from Oct 18, 2021

Conversation

romain-intel
Copy link
Contributor

No description provided.

You can now specify a specific attempt for a Task or a DataArtifact
in the client like so:
  - Task('flow/run/step/id/attempt')
  - DataArtifact('flow/run/step/id/name/attempt')

This gives you a specific view of that particular attempt. Note that
attempts are only valid for Task and Artifacts.
This requires the attempt-fix branch of the metadata service.

TODO:
  - still need to add version check to make sure we are hitting a modern enough service
@@ -462,7 +462,8 @@ def done(self):
self._metadata.register_metadata(
self._run_id, self._step_name, self._task_id,
[MetaDatum(field='attempt-done', value=str(self._attempt),
type='attempt-done', tags=[])])
type='attempt-done',
tags=['attempt_id:{0}'.format(self._attempt)])])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just that we tag all metadata with attempt id and I guess this one fell through the cracks. I should have put it separately. This is for the UI mostly.

return pre_filter
# Otherwise, we filter out the metadata. If we have attempt_id information
# (post MF 2.4.0), it's easy; if not, we do our best to reconstruct
# the order.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could move this special logic to handle older versions in a function of its own. Now it adds a lot of complexity in get_object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will refactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to clarify, just this part of the function that builds the metadata filtered out when we don't have the attempt value right?

@@ -503,6 +538,37 @@ def _apply_filter(elts, filters):
result = []
return starting_point

@staticmethod
def _reconstruct_metadata_for_attempt(all_metadata, attempt_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be an issue with this function and older metaflow client versions. See if you can reproduce my issues with the following:

Run a flow with an old client, (f.ex. metaflow==2.0.5) that retries a step

The following will work:

task = Task("path/spec/to/task")
task.metadata

but this one will fail:

task = Task("path/spec/to/task", attempt=3)
task.metadata

>     for t in all_tags:
> TypeError: 'NoneType' object is not iterable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed now.

saikonen and others added 4 commits October 13, 2021 23:32
…izes (#752)

* wip: rough implementation of artifact size gets and suggestion for stdout/stderr log size property

* add file_size to the datastore interface, implement for s3storage and use for artifact file size checks.

* wip: implement log sizes for legacy and MFLOG type logs.

* implement file_size for LocalStorage as well.

update datastorage file_size docs

* cleanup core docstrings for log_size properties

* update docs and rename get_size to be specific about artifact size

* refactor: move current attempt to a property

* cleanup artifact size return

* cleanup comment and rename file_size to be in line with other methods

* change to require_mode('r') for size getters

* fix indent

* use cached filesize found in 'info' metadata for artifacts instead of continuously requesting filesizes.

Fix possible issue with task_datastore not retaining passed in task attempt for further use.

* change artifact size function to return an iterator to adhere to existing styles.
Copy link
Contributor

@akyrola akyrola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some smallish comment. It would be good to add description to the PR what this PR is about.

metaflow/client/core.py Outdated Show resolved Hide resolved
metaflow/client/core.py Outdated Show resolved Hide resolved
metaflow/client/core.py Outdated Show resolved Hide resolved
self._path_components = traverse(_CLASSES[self._NAME], ids, [])
return self._path_components
ids = self.pathspec.split('/')
self._path_components = ids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid mutating state in getters (such as @Property). I think you can just do
return list(self._path_components or self.pathspec.split('/'))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to cache it but I can remove it. The operation isn't very expensive.


if filecache is None:
# TODO: Pass proper environment to properly extract artifacts
filecache = FileCache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid side effects in getters/@Property. Based on the the comment on 775, this is not needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is wrong. I will remove the comment (it is a leftover from the previous commit). We are using the FileCache to cache the metadata extracted from S3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the comment was already removed in a following push. I think it is OK to set filecache as a global here even if it is a side effect. Otherwise I can initialize it non-lazily as well.

metaflow/client/core.py Outdated Show resolved Hide resolved
try:
return os.path.getsize(path)
except OSError:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return None instead of throwing? Surely asking size of a non existent file is an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to be consistent with the S3 code. We could throw in both cases though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'll keep it like that for now. It is consistent with the documentation (taht says it returns None if the file is not found) and helps in case log files haven't been pushed out yet (otherwise the handling logic is a little more tricky than the one that is currently there. Thoughts?

Copy link
Collaborator

@savingoyal savingoyal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!


if self._attempt < 0:
raise MetaflowNotFound("Attempt can only be non-negative")
elif self._attempt >= MAX_ATTEMPTS:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI - This will break if we decide to change the value of MAX_ATTEMPTS in the future

@@ -828,7 +879,8 @@ def _iter_filter(self, x):
@property
def metadata(self):
"""
Metadata events produced by this task.
Metadata events produced by this task across all attempts of the task
*except* if you selected a specific task attempt.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting an error in metadata_dict where it doesn't actually return metadata related to the latest attempt, only the latest metadata for a given metadata key name.

@@ -1081,7 +1198,8 @@ def environment_info(self):
env_type = my_code.info['environment_type']
if not env_type:
return None
env = [m for m in ENVIRONMENTS + [MetaflowEnvironment] if m.TYPE == env_type][0]
env = [m for m in ENVIRONMENTS +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to consider removing environment_info at some point?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its utility is not wholly apparent and it exposes the internal implementation details of conda

@@ -155,6 +155,22 @@ def info_file(self, path):
"""
raise NotImplementedError

def size_file(self, path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - feel free to ignore - Just size?

@romain-intel romain-intel merged commit 4e2bea4 into master Oct 18, 2021
@romain-intel romain-intel deleted the per-attempt-task branch October 18, 2021 21:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants