New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Enable Aggregate Operations #289
WIP: Enable Aggregate Operations #289
Conversation
cdba49f
to
87f04ee
Compare
@kidrahahjo Make sure to explicitly ping us when you want feedback/ input on this. Otherwise I'm just going to assume that this is pure drafting/ WIP and ignore related notifications. |
@csadorf Yes, I'll make sure to do that. |
87f04ee
to
b07c207
Compare
An implementation that enables the execution of aggregate operations has been achieved for now. The current iteration of do not differentiate between normal operation and aggregate operation. Every operation is an aggregate operation but in order to maintain consistency for the users, unpacking of jobs, where the length of jobs is equal to 1, is done so they can directly refer to the statepoints rather that performing indexing ( |
Few problems remain in execution.
|
f868f2a
to
e7e8c3e
Compare
flow/project.py
Outdated
jobstr = "" | ||
for job in self.jobs: | ||
jobstr+="{} ".format(job.id) | ||
return "{}({})".format(self.name, jobstr.strip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobstr = "" | |
for job in self.jobs: | |
jobstr+="{} ".format(job.id) | |
return "{}({})".format(self.name, jobstr.strip()) | |
return "{}({})".format(self.name, ", ".join(map(str, self.jobs))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, that's a nice way to do this. Thanks! I'll make the change.
flow/project.py
Outdated
type=type(self).__name__, | ||
name=self.name, | ||
job=str(self.job), | ||
job=jobstr.strip(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make the change
flow/project.py
Outdated
@@ -345,7 +422,7 @@ def set_status(self, value): | |||
def get_status(self): | |||
"Retrieve the operation's last known status." | |||
try: | |||
return JobStatus(self.job._project.document['_status'][self.id]) | |||
return JobStatus(self.jobs._project.document['_status'][self.id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is that supposed to work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was not used in the execution part, hence I haven't changed it. Though, self.jobs won't work here as jobs of the type list
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should have it work with at least one job in this PR.
"Return an id, which identifies this group with respect to this job." | ||
project = job._project | ||
project = jobs[0]._project |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't just ignore the other jobs here, can we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're assuming that the jobs are from the same project. When we're done executing this, we filter the JobOperation instance and there we check whether every job is in the same project or not. Though it seems to me that I should test whether the jobs are in the same project or not here also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to make that assumption explicit with an assert. This could be a significant performance drag, but now is not the time for optimization, now is the time for code safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, will insert assertion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, I think this can be covered by tests, but for now safety is the right approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean that if two aggregates have the same job first in their lists that their ids would be identical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@atravitz
This can be a problem if we run in something like this python project.py run -o op_group_of_2 -j J1 J1 J1 J1
This behaviour seems fair to me because even if we generate similar ids, we'll just be overwriting the id
in the project document.
flow/project.py
Outdated
for job in jobs: | ||
eligible = False | ||
for j in job: | ||
if not type(j) is signac.contrib.job.Job: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could it be any other type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made jobs to be a nested list.
If we wanted to create a group of 2 using the grouper function then we will eventually have to iterate over something like this: [[Job1, Job2], [Job3, Job4], [Job5, Job6], ... ]
Hence this must be of the type signac.contrib.job.job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it must be of that type, no need to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@csadorf I forgot to mention that this also helps when we handle None Type
inside a group.
For instance, [ [j1, j2, j3], [j4, j5, j6], ... [j10, None None] ]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it is better to explicitly check with is None
.
flow/project.py
Outdated
@@ -2138,11 +2258,11 @@ class _PickleError(Exception): | |||
|
|||
@staticmethod | |||
def _dumps_op(op): | |||
return (op.id, op.name, op.job._id, op.cmd, op.directives) | |||
return (op.id, op.name, op.jobs, op.cmd, op.directives) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're dumping ids here to avoid serializing the whole job instance, we should do the same for multiple jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okay, I have a question though. Is this the reason I was facing a pickling error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This worked, thank you!
flow/project.py
Outdated
job_in_project = False | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this constitutes a "fatal" (read unrecoverable) condition, then we should raise an exception right here instead of just breaking and ignoring the rest of the jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll raise an error here.
flow/project.py
Outdated
@@ -299,7 +303,7 @@ def keyfunction(job): | |||
|
|||
def grouper(jobs): | |||
for key, group in groupby(sorted(jobs, key=keyfunction), key=keyfunction): | |||
yield group | |||
yield list(group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be much better to not rely on the fact that this is a list. Users might yield iterables as well. You can always create a list where you call the generator function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's valid, I'll make this compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also worth noting that you should only create a list, when you really need a list. There could be literally millions of jobs in that list which would take up a significant amount of memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that there's also a need of optimization in my code because there are places where I created a list where it's not even necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! Just be conscious of that, no need to optimize the code right now.
flow/project.py
Outdated
self.grouper = grouper | ||
|
||
@classmethod | ||
def groupsof(cls, num=1, fillvalue=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should the fillvalue
attribute be treated by the users? What all values should it take? Should we check pre/post conditions for that particular value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's best to skip these.
For instance, if I want a group of 3 and I get [joba, None, None] then it's was probably supposed to happen. Hence we shouldn't interfere much when a use asks for "user specified" arguments
flow/project.py
Outdated
import six | ||
from six.moves import zip_longest | ||
if six.PY2: | ||
from collections import Iterable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer support Python 2, so six should be unnecessary.
flow/project.py
Outdated
except KeyError: | ||
raise KeyError("The key '{}' was not found in statepoint " | ||
"parameters of the job {}.".format(sort, job)) | ||
self.grouper = grouper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grouper
should be a private member.
flow/project.py
Outdated
@@ -3401,7 +3887,8 @@ def _register_groups(self): | |||
operation_directives = getattr(func, '_flow_group_operation_directives', dict()) | |||
for group_name in func._flow_groups: | |||
self._groups[group_name].add_operation( | |||
op_name, op, operation_directives.get(group_name, None)) | |||
op_name, op, operation_directives.get(group_name, None), | |||
flow_aggregate[op_name], flow_select[op_name]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment in FlowGroup.add_operation
. I don't see a need for flow_aggregate
or flow_select
being parameters.
flow/project.py
Outdated
flow_aggregate = dict() | ||
flow_select = dict() | ||
for op_name, op in self._operations.items(): | ||
try: | ||
flow_aggregate[op_name] = self._operation_functions[op_name]._flow_aggregate | ||
flow_select[op_name] = self._operation_functions[op_name]._flow_select | ||
except KeyError: | ||
flow_aggregate[op_name] = op.cmd._flow_aggregate | ||
flow_select[op_name] = op.cmd._flow_select |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code likely isn't necessary. This can be done in the loop below that uses (op_name, op)
and the func
variable.
@@ -3401,7 +3887,8 @@ def _register_groups(self): | |||
operation_directives = getattr(func, '_flow_group_operation_directives', dict()) | |||
for group_name in func._flow_groups: | |||
self._groups[group_name].add_operation( | |||
op_name, op, operation_directives.get(group_name, None)) | |||
op_name, op, operation_directives.get(group_name, None), | |||
flow_aggregate[op_name], flow_select[op_name]) | |||
|
|||
# For singleton groups add directives | |||
self._groups[op_name].operation_directives[op_name] = getattr(func, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to set the singleton groups aggregation and selection here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what do you mean by this. Can you please explain?
flow/project.py
Outdated
def _eligible_for_submission(self, flow_group, jobs): | ||
"""Determine if a flow_group is eligible for submission with the given jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to consider that the ordering of jobs can change.
flow/project.py
Outdated
print("Eligible aggregates: ", end=" ") | ||
for job in op.jobs: | ||
print(job, end=" ") | ||
print() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print("Eligible aggregates: ", end=" ") | |
for job in op.jobs: | |
print(job, end=" ") | |
print() | |
print("Eligible aggregates:", *op.jobs) |
flow/project.py
Outdated
def operation_function(job): | ||
cmd = operation(job).format(job=job) | ||
def operation_function(*jobs): | ||
cmd = operation(jobs).format(jobs=jobs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need for the format here, it exists in FlowCmdOperation
.
flow/project.py
Outdated
try: | ||
filter = operation_function._flow_select | ||
grouper, sort = operation_function._flow_aggregate | ||
except AttributeError: | ||
filter = operation.cmd._flow_select | ||
grouper, sort = operation.cmd._flow_aggregate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be checking indirectly the operation class of the given operation. We should just check that directly then. Also, I think we want to allow for the group to determine the aggregation and selection behavior. This does not allow for that since we cannot override that in exec.
flow/project.py
Outdated
jobs = list(jobs) | ||
jobs_list = filter(jobs) | ||
if sort is not None: | ||
jobs_list = sort(jobs_list) | ||
jobs_list = grouper([job for job in jobs_list]) | ||
|
||
for job_list in jobs_list: | ||
job_list = list(job_list) | ||
for i, job in enumerate(job_list): | ||
if job is None: | ||
del job_list[i:] | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is repeated frequently. We should combine this logic into a function or class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll merge this concept with the aggregate
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left more comments on changes I think would improve the code. Some of the errors, I believe I found, would be caught by some simple tests you could write. Writing tests will help you to figure out if your assumptions about the code are true (which is really helpful for developing code).
Some high level things,
- I think we should have an
Aggregate
class that handles the logic for taking a list of jobs and spitting back out individual aggregates. This prevents the logic from being in multiple places, and having tuples where each element represents only part of the aggregation logic. - We need to find some consistence in the naming
jobs
v.job_list
v.jobs_list
. The ambiguity here makes the code hard to parse at times. - We need to decide on how to deal with currently user facing functions that accept lists of jobs versus those that pass in individual positional arguments (i.e.
jobs
v.*jobs
). We are not consistent with this currently, and I think this will lead to a lot of user confusion. - I know there is a separate branch for submitting, but this is definitively still broken in this branch (this is more of a note).
This pull request has served its purpose of guiding me through my project as #336 is now ready for a review. |
Description
This pull request adds a new feature which allows the users to operate over a group of jobs at once
Motivation and Context
This pull request addresses issue #266
Users will be able to use this feature using the method below
Types of Changes
1The change breaks (or has the potential to break) existing functionality.
Work to be done:
aggregate
andselect
classes.run
,exec
.next
.FlowCmdOperation
Checklist:
If necessary: