Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/enable aggregate operations #52

Closed
wants to merge 19 commits into from

Conversation

csadorf
Copy link
Contributor

@csadorf csadorf commented Feb 7, 2019

This pull request refactors the current job-operation model into jobs-operations, that means each operation is a function of one or more jobs.

Prior to merging we need to tackle the following items:

  • Implement parallelized status update
  • Update changelog
  • Deprecate JobOperation
  • Ensure that the scheduler status is updated prior to submission.

@csadorf csadorf requested review from a team and jglaser February 7, 2019 18:12
@csadorf
Copy link
Contributor Author

csadorf commented Feb 8, 2019

The tests are failing on Python 2.7, because I am using keyword-only arguments to implement this new feature, which are exclusive to Python 3.

I will fix that, but only if we decide that we like this feature.

@csadorf csadorf removed the request for review from a team February 8, 2019 23:08
Copy link
Contributor

@vyasr vyasr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor note, we'll need to update all docstrings referring to singular jobs.

Bigger picture, I think this implementation addresses the use-case, but I'm a little hesitant offhand. I saw your inline question about whether status should return one or all. This raises the broader question of exactly how JobsOperation is meant to work. Is the only use-case an aggregation where all jobs are passed to a single aggregation acting on all of them?

I realize my issue is a little vague at the moment. I'm uneasy because this implementation required relatively few changes, and it feels like that's because we essentially took our current one job-one operation approach and shoehorned in aggregation. I'm wondering if we need a more holistic approach, akin to how we ended up dealing with bundling by making it the base case. I'd be more comfortable if we had a brief meeting to discuss the possible use-cases, just to make sure that this implementation isn't inflexible in ways that could cause us problems down the road.

flow/project.py Outdated
cmd=repr(self.cmd),
directives=self.directives)

@property
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add an analogous jobs property.

flow/project.py Outdated Show resolved Hide resolved
flow/project.py Outdated Show resolved Hide resolved
flow/project.py Outdated Show resolved Hide resolved
else:
return self._cmd.format(job=job)
return self._cmd.format(job=jobs[0], job_ids=job_ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary for a first implementation, but we may want to support some form of attribute access over all jobs (e.g. jobs.id in the string expands to a list of all job ids). This could get very complicated and I can already see lots of potential problems, so it may not be worth implementing, but we should think about it before finishing up this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phew... that sounds like it might open up a whole bottle of potential problems, but yeah let's think about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same problem we face in other packages, such as with trajectory properties. It's a hard thing to deal with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not worry about this for now.

@csadorf
Copy link
Contributor Author

csadorf commented Feb 9, 2019

One minor note, we'll need to update all docstrings referring to singular jobs.

Bigger picture, I think this implementation addresses the use-case, but I'm a little hesitant offhand. I saw your inline question about whether status should return one or all. This raises the broader question of exactly how JobsOperation is meant to work. Is the only use-case an aggregation where all jobs are passed to a single aggregation acting on all of them?

I realize my issue is a little vague at the moment. I'm uneasy because this implementation required relatively few changes, and it feels like that's because we essentially took our current one job-one operation approach and shoehorned in aggregation. I'm wondering if we need a more holistic approach, akin to how we ended up dealing with bundling by making it the base case. I'd be more comfortable if we had a brief meeting to discuss the possible use-cases, just to make sure that this implementation isn't inflexible in ways that could cause us problems down the road.

Let's experiment a little bit with this and then we can decide whether we want to move forward at all. I believe that this is the right approach, but we need to really test it in semi-productive environment to be able to evaluate usefulness. I'll try to implement a few test cases as part of my own research workflow and I am also looking forward to using this for custom parallelization like @jglaser suggested.

@csadorf csadorf force-pushed the feature/enable-aggregate-operations branch from 646574d to 0eb13ac Compare February 9, 2019 20:04
@bdice
Copy link
Member

bdice commented Feb 10, 2019

I would like to understand more about how users will actually trigger this. Must aggregate operations be triggered manually, perhaps via the command line? If the user provides a filter or set of jobs of size 4, but only 3 are eligible, is the aggregation operation greedy (take 3) or does it fail? How will a user know when aggregate jobs are queueing? A separate section of the flow status? Should it be possible to group data differently across different dimensions, e.g. average over one parameter and plot each average in terms of a second parameter as series in a graph?

@csadorf
Copy link
Contributor Author

csadorf commented Feb 10, 2019

I would like to understand more about how users will actually trigger this. Must aggregate operations be triggered manually, perhaps via the command line? If the user provides a filter or set of jobs of size 4, but only 3 are eligible, is the aggregation operation greedy (take 3) or does it fail? How will a user know when aggregate jobs are queueing? A separate section of the flow status? Should it be possible to group data differently across different dimensions, e.g. average over one parameter and plot each average in terms of a second parameter as series in a graph?

This was a really good point and I extended the API to clarify this. In general the user can determine the kind of aggregation by providing a grouper function that takes a sequence of jobs and yields groups of jobs.

The most basic example is to arbitrarily group all jobs in groups of 2. A use case might be manual parallelization across two processors. This could be achieved like this:

def pairwise(jobs):
    from itertools import tee
    a, b = tee(jobs)
    next(b, None)
    return zip(a, b)

@Project.operation
@aggregate(pairwise)
def pairwise_op(joba, jobb):
    print(joba, jobb)

Executing this with run will print all jobs in pairs of two. Now, you were wondering about "greediness". That behavior is now determined both by the grouper function and the operation function. In this case the grouper function will drop the last odd member of the data space.

On the other hand, the number of elements an operation function expects can be controlled via the function signature. The pairwise_op operation function expects two and only two arguments, but that of course does not have to be the case. Here are some other options:

  1. Expect an arbitrary number of jobs (including zero): def op(**jobs)
  2. Expect at least one job: def op(job, **more_jobs)
  3. Expect exactly one argument: def op(job).
  4. Expect exactly two or three arguments: def op(job1, job2, job3=None)

The extended API comes with a few bundled grouper functions, including:

  1. Group all: @aggregate().
  2. Aggregate arbitrary groups of a specific size: @aggregate.groupsof(n)
  3. Aggregate groups grouped by state point key: @aggregate.groupby('foo')
  4. Aggregate groups by arbitrary grouper function: @aggregate(grouper).

What is grouped is controlled like before with filter functions etc.

Finally, I don't think we necessarily have to differentiate between jobs that are active as part of an aggregate operations or not, because with the current concept all operations are aggregate operations. The ones with a group size of one just present a special case.

This is my test script if you want to try things out: aggregate.py.txt

@csadorf
Copy link
Contributor Author

csadorf commented Feb 10, 2019

@jglaser Here is how you would use this API to parallelize HOOMD simulations with partitions:

from flow import FlowProject, aggregate
from mpi4py import MPI


COMM = MPI.COMM_WORLD


class Project(FlowProject):
    pass


@Project.operation
@aggregate.groupsof(COMM.Get_size())
@Project.post.isfile('trajectory.gsd')
def simulate_hoomd(*jobs):
    import hoomd, hoomd.md
    hoomd.context.initialize('--nrank=1')
    job = jobs[hoomd.comm.get_partition()]

    with job:
        print('simulating', job)
        unitcell=hoomd.lattice.sc(a=2.0, type_name='A')
        hoomd.init.create_lattice(unitcell=unitcell, n=10)
        nl = hoomd.md.nlist.cell()
        lj = hoomd.md.pair.lj(r_cut=3.0, nlist=nl)
        lj.pair_coeff.set('A', 'A', epsilon=1.0, sigma=1.0)
        all = hoomd.group.all();
        hoomd.md.integrate.mode_standard(dt=0.005)
        hoomd.md.integrate.langevin(group=all, kT=job.sp.kT, seed=4)
        hoomd.dump.gsd('trajectory.gsd', period=100, group=hoomd.group.all())
        hoomd.run(10e3)


if __name__ == '__main__':
    Project().main()

Then just call with mpirun -n 4 python project.py run or so.
With the prototype implementation you must make sure that the number of eligible jobs is divisible by the number of ranks, but that is something we can make more robust moving forward.

For scripting and submissions you might want to hard-code the number of ranks per partition:

NP=32

@Project.operation
@aggregate.groupsof(NP)
@directives(np=NP)
@Project.post.isfile('trajectory.gsd')
def simulate_hoomd(*jobs):
    assert len(jobs) == NP
    ...

There might be better ways, we can figure it out...

@jglaser
Copy link

jglaser commented Feb 10, 2019

@jglaser Here is how you would use this API to parallelize HOOMD simulations with partitions:

awesome. Let me try soon.

@jglaser
Copy link

jglaser commented Feb 11, 2019

@jglaser Here is how you would use this API to parallelize HOOMD simulations with partitions:

from flow import FlowProject, aggregate
...
from mpi4py import MPI


COMM = MPI.COMM_WORLD

a superficial comment. Is it really necessary to use mpi4py as an extra dependency? If so, the communicator should also be passed to context.initialize(), I suppose, to prevent HOOMD from doing a second MPI_Init().

@csadorf
Copy link
Contributor Author

csadorf commented Feb 11, 2019

@jglaser Here is how you would use this API to parallelize HOOMD simulations with partitions:

from flow import FlowProject, aggregate
...
from mpi4py import MPI


COMM = MPI.COMM_WORLD

a superficial comment. Is it really necessary to use mpi4py as an extra dependency? If so, the communicator should also be passed to context.initialize(), I suppose, to prevent HOOMD from doing a second MPI_Init().

No, it's not. I just used it in the example to avoid hard-coding the number of ranks.

@csadorf csadorf requested a review from a team as a code owner March 4, 2019 16:11
@csadorf csadorf added this to the v0.8 milestone Mar 18, 2019
csadorf and others added 11 commits March 26, 2019 12:43
To enable the provision of multiple jobs instead of only one to this
class.
The main change introduced in this patch is the refactoring of the
JobOperation class into JobsOperation. That means that an operation
operating on only one job is a special case of a general operation, that
operates on multiple operations. In that sense *every* operation is an
aggregate operation, but the standard case is that it operates on only
one job.

This is achieved in detail by:

 * adding an "aggregate" parameter to the FlowOperation class which
   classifies a specific FlowOperation as an aggregate operation.
 * adding the `@flow.aggregate` decorator which labels an operation
   function to define an aggregate flow operation,
 * adding the `aggregate` parameter to the
   `FlowProject.add_operations()` method,
 * refactoring the `JobOperation` class into `JobsOperations`, which
   operates on one or more jobs by default. A job-operation with only
   one job is therefore a special case of an aggregate operation.

It might be possible to remove the need for an aggregate decorator, and
instead just inspect the function signature, but we can reevaluate that
at a later point.
* fix submission of jobs

* flake8

* Fix bug in status document update.
 * Update function name and related variable names.
 * The 'jobs' argument must be an explicit list or tuple of jobs.
Do not fail if any of the aggregation group contains None values.
Instead of applying logic to determine the aggregated condition.
@csadorf csadorf force-pushed the feature/enable-aggregate-operations branch from fefe90a to 53422ef Compare March 27, 2019 17:01
@csadorf
Copy link
Contributor Author

csadorf commented Mar 30, 2019

@jglaser You should look at the detailed status view with option -d or --detailed. That will give you a much better picture of what's going on. Possibly in combination with -e.

@jglaser
Copy link

jglaser commented Mar 31, 2019

@jglaser You should look at the detailed status view with option -d or --detailed. That will give you a much better picture of what's going on. Possibly in combination with -e.

Yes. That works, too. What about this warning:

/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py:927: UserWarning: Parallelized status update not implemented for this branch.

?

@jglaser
Copy link

jglaser commented Mar 31, 2019

Perhaps unrelated: with this branch, I seem to be able to submit only once. When the job has run for the first time, the second submission will not include any jobs. This seems to be due to caching in the file signac_project_document.json. If I delete that file, I can submit again. What is the intended usage pattern in this case?

@csadorf
Copy link
Contributor Author

csadorf commented Apr 1, 2019

Perhaps unrelated: with this branch, I seem to be able to submit only once. When the job has run for the first time, the second submission will not include any jobs. This seems to be due to caching in the file signac_project_document.json. If I delete that file, I can submit again. What is the intended usage pattern in this case?

No, that's not intended. Did you run a status update in between?

@csadorf
Copy link
Contributor Author

csadorf commented Apr 1, 2019

Yes. That works, too. What about this warning:

/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py:927: UserWarning: Parallelized status update not implemented for this branch.

?

I needed to reimplement the status update for thew new model and have not tackled the parallelization yet. I added this warning as a reminder that this is something we need to prior to merging this.

@jglaser
Copy link

jglaser commented Apr 2, 2019

I would also like to ask how label decorators are currently handled. Does that need to be documented? Do they take job lists as arguments? If so, wouldn't it make more sense for them to take single job arguments, so they can return their value on a per-job basis?

@jglaser
Copy link

jglaser commented Apr 2, 2019

Also, I now get this error:

bash-4.2$ jsrun -n 1 -g 3 python project.py exec  sample 6fd6446dec09d76a5ad7b9706af79568a --show-traceback
Using environment configuration: SummitEnvironment
ERROR: Encountered error during program execution: ''6fd6446dec09d76a5ad7b9706af79568a''
Execute with '--show-traceback' or '--debug' to get more information.
Traceback (most recent call last):
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 745, in get_statepoint
    sp = self.read_statepoints(fn=fn)[jobid]
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 630, in read_statepoints
    with open(fn, 'r') as file:
FileNotFoundError: [Errno 2] No such file or directory: '/gpfs/alpine/mat110/scratch/glaser/specific_barrier/signac_statepoints.json'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "project.py", line 287, in <module>
    Project().main()
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2446, in main
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2414, in main
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2192, in _main_exec
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2192, in <listcomp>
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 337, in open_job
    return self.Job(project=self, statepoint=self.get_statepoint(id), _id=id)
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 750, in get_statepoint
    raise error
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 742, in get_statepoint
    sp = self._get_statepoint_from_workspace(jobid)
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 702, in _get_statepoint_from_workspace
    raise KeyError(jobid)
KeyError: '6fd6446dec09d76a5ad7b9706af79568a'

@jglaser
Copy link

jglaser commented Apr 2, 2019

Also, I now get this error:

bash-4.2$ jsrun -n 1 -g 3 python project.py exec  sample 6fd6446dec09d76a5ad7b9706af79568a --show-traceback
Using environment configuration: SummitEnvironment
ERROR: Encountered error during program execution: ''6fd6446dec09d76a5ad7b9706af79568a''
Execute with '--show-traceback' or '--debug' to get more information.
Traceback (most recent call last):
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 745, in get_statepoint
    sp = self.read_statepoints(fn=fn)[jobid]
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 630, in read_statepoints
    with open(fn, 'r') as file:
FileNotFoundError: [Errno 2] No such file or directory: '/gpfs/alpine/mat110/scratch/glaser/specific_barrier/signac_statepoints.json'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "project.py", line 287, in <module>
    Project().main()
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2446, in main
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2414, in main
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2192, in _main_exec
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac_flow-0.7.1-py3.7.egg/flow/project.py", line 2192, in <listcomp>
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 337, in open_job
    return self.Job(project=self, statepoint=self.get_statepoint(id), _id=id)
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 750, in get_statepoint
    raise error
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 742, in get_statepoint
    sp = self._get_statepoint_from_workspace(jobid)
  File "/ccs/home/glaser/.conda/envs/myenv/lib/python3.7/site-packages/signac/contrib/project.py", line 702, in _get_statepoint_from_workspace
    raise KeyError(jobid)
KeyError: '6fd6446dec09d76a5ad7b9706af79568a'

never mind, the error is correct. the statepoint didn't exist.

@jglaser
Copy link

jglaser commented Apr 2, 2019

Perhaps unrelated: with this branch, I seem to be able to submit only once. When the job has run for the first time, the second submission will not include any jobs. This seems to be due to caching in the file signac_project_document.json. If I delete that file, I can submit again. What is the intended usage pattern in this case?

No, that's not intended. Did you run a status update in between?

no. but if I do, it works.

@csadorf
Copy link
Contributor Author

csadorf commented Apr 2, 2019

Perhaps unrelated: with this branch, I seem to be able to submit only once. When the job has run for the first time, the second submission will not include any jobs. This seems to be due to caching in the file signac_project_document.json. If I delete that file, I can submit again. What is the intended usage pattern in this case?

No, that's not intended. Did you run a status update in between?

no. but if I do, it works.

Ok, I'll make sure to address that.

@csadorf csadorf changed the base branch from develop to master April 8, 2019 21:51
Copy link
Contributor

@vyasr vyasr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for aggregation looks good. The majority of lines changed appear to be status related, which I suppose makes sense.

@property
def job(self):
assert len(self.jobs) >= 1
return self.jobs[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we guarantee that the JobsOperation is always constructed with jobs in the same order? Since status is stored on a project level this probably doesn't affect any internals, but I recall that we maybe discussed what were sensible choices here.


def get_id(self, index=0):
"Return a name, which identifies this job-operation."
project = self.jobs[0]._project
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.job._project

from signac.contrib.filterparse import parse_filter_arg

import jinja2
from jinja2 import TemplateNotFound as Jinja2TemplateNotFound

from .environment import get_environment
from .operations import JobsOperation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backwards compatibility should we also from .legacy import JobOperation?

else:
return self._cmd.format(job=job)
return self._cmd.format(job=jobs[0], job_ids=job_ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not worry about this for now.

"True when all post-conditions are met."
if len(self._postconds):
return all(cond(job) for cond in self._postconds)
return all(cond(job) for cond in self._postconds for job in jobs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inconsistent with above, where we have cond(*jobs), right? Are we assuming that conditions are now functions of all jobs, or just one job? From our discussion, my understanding was that we would leave them as functions of one job but then add a new way to specify aggregate conditions.


statuses = OrderedDict([(s['job_id'], s) for s in tmp])
# Remove all operations from the status info, that are not eligible.
status = {_id: doc for _id, doc in status.items() if not doc['eligible']}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line does the reverse of what the comment says.

@csadorf csadorf modified the milestones: v0.8, v0.9 May 24, 2019
@csadorf
Copy link
Contributor Author

csadorf commented Aug 14, 2019

I would like to re-open this after we have merged #114 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aggregation GSoC Google Summer of Code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants