Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMDA with refactored _single_frame #128

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
39 changes: 12 additions & 27 deletions pmda/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"""
from __future__ import absolute_import

from MDAnalysis.core.groups import AtomGroup
from MDAnalysis.core.universe import Universe
from MDAnalysis.coordinates.base import ProtoReader
import numpy as np
Expand Down Expand Up @@ -75,32 +74,19 @@ def __init__(self, function, universe, *args, **kwargs):
analyze can not be passed as keyword arguments currently.

"""

self.function = function

# collect all atomgroups with the same trajectory object as universe
trajectory = universe.trajectory
arg_ags = []
self.other_args = []
for arg in args:
if isinstance(arg,
AtomGroup) and arg.universe.trajectory == trajectory:
arg_ags.append(arg)
else:
self.other_args.append(arg)

super(AnalysisFromFunction, self).__init__(universe, arg_ags)
super().__init__(universe)
self.args = args
self.kwargs = kwargs

def _prepare(self):
self.results = []
self._results = [None] * self.n_frames

def _single_frame(self, ts, atomgroups):
args = atomgroups + self.other_args
return self.function(*args, **self.kwargs)
def _single_frame(self):
self._results[self._frame_index] = self.function(*self.args, **self.kwargs)

def _conclude(self):
self.results = np.concatenate(self._results)
self.results = self._results


def analysis_class(function):
Expand Down Expand Up @@ -144,13 +130,12 @@ def analysis_class(function):
class WrapperClass(AnalysisFromFunction):
"""Custom Analysis Function"""

def __init__(self, trajectory=None, *args, **kwargs):
if not (isinstance(trajectory, ProtoReader) or isinstance(
trajectory, Universe)):
print(type(trajectory))
def __init__(self, universe=None, *args, **kwargs):
if not isinstance(universe, Universe):
print(type(universe))
raise ValueError(
"First argument needs to be an MDAnalysis reader object.")
super(WrapperClass, self).__init__(function, trajectory, *args,
**kwargs)
"First argument needs to be an MDAnalysis Universe.")
super().__init__(function, universe, *args,
**kwargs)

return WrapperClass
41 changes: 22 additions & 19 deletions pmda/density.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None,
metadata=None, padding=2.0, updating=False,
parameters=None, gridcenter=None, xdim=None, ydim=None,
zdim=None):
u = atomgroup.universe
super(DensityAnalysis, self).__init__(u, (atomgroup, ))
universe = atomgroup.universe
super().__init__(universe)
self._atomgroup = atomgroup
self._delta = delta
self._atomselection = atomselection
Expand All @@ -253,7 +253,7 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None,
self._xdim = xdim
self._ydim = ydim
self._zdim = zdim
self._trajectory = u.trajectory
self._trajectory = universe.trajectory
if updating and atomselection is None:
raise ValueError("updating=True requires a atomselection string")
elif not updating and atomselection is not None:
Expand Down Expand Up @@ -289,20 +289,31 @@ def _prepare(self):
grid, edges = np.histogramdd(np.zeros((1, 3)), bins=bins,
range=arange, normed=False)
grid *= 0.0
self._grid = grid

self._results = [grid] * self.n_frames
self._edges = edges
self._arange = arange
self._bins = bins

def _single_frame(self, ts, atomgroups):
coord = self.current_coordinates(atomgroups[0], self._atomselection,
self._updating)
result = np.histogramdd(coord, bins=self._bins, range=self._arange,
normed=False)
return result[0]
def _single_frame(self):
h, _ = np.histogramdd(self._atomgroup.positions,
bins=self._bins, range=self._arange,
normed=False)
# reduce (proposed change #2542 to match the parallel version in pmda.density)
# return self._reduce(self._grid, h)
#
# serial code can simply do

# the current timestep of the trajectory is self._ts
# self._results[self._frame_index][0] = self._ts.frame
# the actual trajectory is at self._trajectory
# self._results[self._frame_index][1] = self._trajectory.time
self._results[self._frame_index] = h
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing a full histogram for each frame is bad – you can easily run out of memory. I think it is important that the aggregation is done every step and not just in _conclude.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't that what also happens with _reduce? It won't pass the full histogram back to the main process, but only the calculated frames in _dask_helper.

Copy link
Member

@orbeckst orbeckst Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the density reduce

pmda/pmda/density.py

Lines 326 to 332 in 13fa3b5

def _reduce(res, result_single_frame):
""" 'accumulate' action for a time series"""
if isinstance(res, list) and len(res) == 0:
res = result_single_frame
else:
res += result_single_frame
return res
does an in-place addition (not a list append) in line 331. In _conclude

pmda/pmda/density.py

Lines 305 to 306 in 13fa3b5

self._grid = self._results[:].sum(axis=0)
self._grid /= float(self.n_frames)
we only sum over the summed histograms of the blocks and then divide by all frames to get the average.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, the PMDA paper has a discussion on that topic.


def _conclude(self):
self._grid = self._results[:].sum(axis=0)

# sum both inside and among blocks.
self._grid = self._results[:].sum(axis=(0, 1))
self._grid /= float(self.n_frames)
metadata = self._metadata if self._metadata is not None else {}
metadata['psf'] = self._atomgroup.universe.filename
Expand All @@ -322,14 +333,6 @@ def _conclude(self):
density.make_density()
self.density = density

@staticmethod
def _reduce(res, result_single_frame):
""" 'accumulate' action for a time series"""
if isinstance(res, list) and len(res) == 0:
res = result_single_frame
else:
res += result_single_frame
return res
Comment on lines -325 to -332
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the design that gets rid of reduce.


@staticmethod
def current_coordinates(atomgroup, atomselection, updating):
Expand Down
121 changes: 63 additions & 58 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self, universe, atomgroups):

super(LeafletFinder, self).__init__(universe, (atomgroups,))

def _find_connected_components(self, data, cutoff=15.0):
def _find_connected_components(self, data_list, cutoff=15.0):
"""Perform the Connected Components discovery for the atoms in data.

Parameters
Expand All @@ -99,62 +99,66 @@ def _find_connected_components(self, data, cutoff=15.0):

"""
# pylint: disable=unsubscriptable-object
window, index = data[0]
num = window[0].shape[0]
i_index = index[0]
j_index = index[1]
graph = nx.Graph()

if i_index == j_index:
train = window[0]
test = window[1]
else:
train = np.vstack([window[0], window[1]])
test = np.vstack([window[0], window[1]])

tree = cKDTree(train, leafsize=40)
edges = tree.query_ball_point(test, cutoff)
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list))
for idx, dest_list in enumerate(edges)]

edge_list_flat = np.array([list(item) for sublist in edge_list for
item in sublist])

if i_index == j_index:
res = edge_list_flat.transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] + j_index - 1
else:
removed_elements = list()
for i in range(edge_list_flat.shape[0]):
if (edge_list_flat[i, 0] >= 0 and
edge_list_flat[i, 0] <= num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= num and
edge_list_flat[i, 1] <= 2 * num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1):
removed_elements.append(i)
res = np.delete(edge_list_flat, removed_elements,
axis=0).transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] - num + j_index - 1
if res.shape[1] == 0:
res = np.zeros((2, 1), dtype=np.int)

edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])]
graph.add_edges_from(edges)

# partial connected components

subgraphs = nx.connected_components(graph)
comp = [g for g in subgraphs]
return comp
#raise TypeError(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't bother with parallel LeafletFinder as it is broken at the moment #76 , #79

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I think I packed too much inside this PR. I was intended to discover the possibility to parallel LeafletFinder both among frames and inside single frame. Because for now, it only starts to work on the next frame after all the jobs are done in the current one.

So I changed this more coarsed instead of passing hundreds of jobs per frame * hundreds of frames to dask graph.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is twofold (after AtomGroup and everything are implemented):

  • XDR and DCD format files fail to pickle the last frame:
u = mda.Universe(GRO_MEMPROT, XTC_MEMPROT)
u.trajectory[4] #  last frame
pickle.loads(pickle.dumps(u.trajectory))
EOFError: Trying to seek over max number of frames

The major problem is trajectory._xdr.current_frame == 5 (1-based). I might need to add extra fix (and test?) to https://github.com/MDAnalysis/mdanalysis/pull/2723/files, or maybe in an individual PR? since the pickling is handled on their own

  • The algorithm itself gets different results (for the same ts) with different n_jobs (maybe because I made some mistakes).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "last frame" thing is a real issue. Oops!

Don't worry about LeafletFinder at the moment, it's not really your job to fix it, and it has lots of issues. (If you need it for your own research and you have an interest in getting it working then that's a bit different but I'd still say, focus on the serialization core problem for now.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed a fix for the "last frame" issue.

Not LeafletFinder per se, but maybe a general framework to suit all conditions. e.g.

  • in each frame deserves parallelism.
  • run multiple analysis on one single universe.
  • run one analysis on multiple universe.
  • complex dependencies between each job

A solution I can think of is to let ParallelAnalysisBase inherents basic methods DaskMethodsMixin as a dask custom collection, so that we can build complex dask graph. But I am not sure how well we can build a legit API that users do not need to care too much about the underlying implementation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good analysis of use cases and it would be useful to write this down somewhere. With PMDA so far (except LeafletFinder) we have been focusing on the simple split-apply-combine because that can be put in a simple "framework". Beyond that, it becomes difficult to do a "one size fits all" and it becomes a serious research project in CS.

I would be happy if we had a library that allows users to easily write their own split/apply/combine type analysis and where we provide a few additional parallelized analysis that might not fit into this scheme (such as LeafletFinder).

An interesting idea that has been coming up repeatedly is to "stack" multiple analysis, i.e., run multiple _single_frame() sequentially so that the expensive data loading into memory time is amortized.

Finally, run one analysis on multiple universes seems to be a standard pleasingly parallel job that can make use of existing workflow management tools – I don't see what we can do directly to support it.

comp_s = []
for data in data_list:
window, index = data
num = window[0].shape[0]
i_index = index[0]
j_index = index[1]
graph = nx.Graph()

if i_index == j_index:
train = window[0]
test = window[1]
else:
train = np.vstack([window[0], window[1]])
test = np.vstack([window[0], window[1]])

tree = cKDTree(train, leafsize=40)
edges = tree.query_ball_point(test, cutoff)
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list))
for idx, dest_list in enumerate(edges)]

edge_list_flat = np.array([list(item) for sublist in edge_list for
item in sublist])

if i_index == j_index:
res = edge_list_flat.transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] + j_index - 1
else:
removed_elements = list()
for i in range(edge_list_flat.shape[0]):
if (edge_list_flat[i, 0] >= 0 and
edge_list_flat[i, 0] <= num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= num and
edge_list_flat[i, 1] <= 2 * num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1):
removed_elements.append(i)
res = np.delete(edge_list_flat, removed_elements,
axis=0).transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] - num + j_index - 1
if res.shape[1] == 0:
res = np.zeros((2, 1), dtype=np.int)

edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])]
graph.add_edges_from(edges)

# partial connected components

subgraphs = nx.connected_components(graph)
comp = [g for g in subgraphs]
comp_s.append(comp)
return comp_s

# pylint: disable=arguments-differ
def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs,
Expand Down Expand Up @@ -200,12 +204,13 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs,
# Distribute the data over the available cores, apply the map function
# and execute.
parAtoms = db.from_sequence(arranged_coord,
npartitions=len(arranged_coord))
npartitions=n_jobs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeafletFinder is not parallelized over frames... I am not sure that choosing n_jobs is the correct choice here. Need to look at the original paper/algorithm.

parAtomsMap = parAtoms.map_partitions(self._find_connected_components,
cutoff=cutoff)
Components = parAtomsMap.compute(**scheduler_kwargs)

# Gather the results and start the reduction. TODO: think if it can go
Components = [item for sublist in Components for item in sublist]
# to the private _reduce method of the based class.
result = list(Components)

Expand Down
Loading