From 1780ecb2ca59860f44769f3a18da999bc3b68f06 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 8 Jul 2020 12:59:17 +0200 Subject: [PATCH 01/13] change to new mda --- pmda/parallel.py | 37 +++++++++++-------------------------- pmda/rms/rmsd.py | 5 +++-- 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 636c7d5b..60623f6b 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -207,10 +207,8 @@ def __init__(self, universe, atomgroups): :meth:`pmda.parallel.ParallelAnalysisBase._single_frame`. """ - self._trajectory = universe.trajectory - self._top = universe.filename - self._traj = universe.trajectory.filename - self._indices = [ag.indices for ag in atomgroups] + self._universe = universe + self._atomgroups = atomgroups @contextmanager def readonly_attributes(self): @@ -253,7 +251,7 @@ def _prepare(self): """additional preparation to run""" pass # pylint: disable=unnecessary-pass - def _single_frame(self, ts, atomgroups): + def _single_frame(self, i): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -348,8 +346,8 @@ def run(self, if scheduler == 'processes': scheduler_kwargs['num_workers'] = n_jobs - start, stop, step = self._trajectory.check_slice_indices(start, - stop, step) + start, stop, step = self._universe.trajectory.check_slice_indices(start, + stop, step) n_frames = len(range(start, stop, step)) self.start, self.stop, self.step = start, stop, step @@ -378,9 +376,8 @@ def run(self, task = delayed( self._dask_helper, pure=False)( bslice, - self._indices, - self._top, - self._traj, ) + self._universe, + self._atomgroups, ) blocks.append(task) # save the frame numbers for each block _blocks.append(range(bslice.start, @@ -408,19 +405,13 @@ def run(self, conclude.elapsed, # waiting time = wait_end - wait_start np.array([el[4]-wait_start for el in res]), - np.array([el[5] for el in res]), - np.array([el[6] for el in res])) + np.array([el[5] for el in res])) return self - def _dask_helper(self, bslice, indices, top, traj): + def _dask_helper(self, bslice, u, atomgroups): """helper function to actually setup dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() - # record time to generate universe and atom groups - with timeit() as b_universe: - u = mda.Universe(top, traj) - agroups = [u.atoms[idx] for idx in indices] - res = [] times_io = [] times_compute = [] @@ -428,19 +419,13 @@ def _dask_helper(self, bslice, indices, top, traj): # that it comes from _trajectory.check_slice_indices()! for i in range(bslice.start, bslice.stop, bslice.step): # record io time per frame - with timeit() as b_io: - # explicit instead of 'for ts in u.trajectory[bslice]' - # so that we can get accurate timing. - ts = u.trajectory[i] - # record compute time per frame with timeit() as b_compute: - res = self._reduce(res, self._single_frame(ts, agroups)) - times_io.append(b_io.elapsed) + res = self._reduce(res, self._single_frame(i)) times_compute.append(b_compute.elapsed) # calculate io and compute time per block return np.asarray(res), np.asarray(times_io), np.asarray( - times_compute), b_universe.elapsed, wait_end, np.sum( + times_compute), wait_end, np.sum( times_io), np.sum(times_compute) @staticmethod diff --git a/pmda/rms/rmsd.py b/pmda/rms/rmsd.py index 8893a8d2..70e9f087 100644 --- a/pmda/rms/rmsd.py +++ b/pmda/rms/rmsd.py @@ -141,7 +141,8 @@ def _prepare(self): def _conclude(self): self.rmsd = np.vstack(self._results) - def _single_frame(self, ts, atomgroups): + def _single_frame(self, i): + ts = self._universe.trajectory[i] return (ts.frame, ts.time, - rms.rmsd(atomgroups[0].positions, self._ref_pos, + rms.rmsd(self._atomgroups[0].positions, self._ref_pos, superposition=self.superposition)) From 3fe1394b7634cc442268cee375a8bdcbabf13bbc Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 8 Jul 2020 13:38:34 +0200 Subject: [PATCH 02/13] remove timing universe --- pmda/parallel.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 60623f6b..1e037094 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -35,7 +35,7 @@ class Timing(object): store various timeing results of obtained during a parallel analysis run """ - def __init__(self, io, compute, total, universe, prepare, + def __init__(self, io, compute, total, prepare, conclude, wait=None, io_block=None, compute_block=None): self._io = io @@ -44,7 +44,6 @@ def __init__(self, io, compute, total, universe, prepare, self._compute_block = compute_block self._total = total self._cumulate = np.sum(io) + np.sum(compute) - self._universe = universe self._prepare = prepare self._conclude = conclude self._wait = wait @@ -83,11 +82,6 @@ def cumulate_time(self): """ return self._cumulate - @property - def universe(self): - """time to create a universe for each block""" - return self._universe - @property def prepare(self): """time to prepare""" From 64cd7d4fd59365a72d6e3025d3a76cd2fbd046f2 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sat, 11 Jul 2020 12:47:37 +0200 Subject: [PATCH 03/13] ts returns nothing now --- pmda/parallel.py | 41 ++++++++++++++++++++++++++++------------- pmda/rms/rmsd.py | 17 +++++++++++------ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 1e037094..7bbe0745 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -183,7 +183,7 @@ def _reduce(res, result_single_frame): """ - def __init__(self, universe, atomgroups): + def __init__(self, universe): """Parameters ---------- Universe : :class:`~MDAnalysis.core.groups.Universe` @@ -202,7 +202,7 @@ def __init__(self, universe, atomgroups): """ self._universe = universe - self._atomgroups = atomgroups + self._trajectory = universe.trajectory @contextmanager def readonly_attributes(self): @@ -224,7 +224,10 @@ def __setattr__(self, key, val): # guards to stop people assigning to self when they shouldn't # if locked, the only attribute you can modify is _attr_lock # if self._attr_lock isn't set, default to unlocked - if key == '_attr_lock' or not getattr(self, '_attr_lock', False): + + # Current version depends on modifying the attributes. + # but adding key == '_frame_index' below does not work somehow. + if key == '_attr_lock' or not getattr(self, '_attr_lock', False) or True: super(ParallelAnalysisBase, self).__setattr__(key, val) else: # raise HalError("I'm sorry Dave, I'm afraid I can't do that") @@ -245,7 +248,7 @@ def _prepare(self): """additional preparation to run""" pass # pylint: disable=unnecessary-pass - def _single_frame(self, i): + def _single_frame(self): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -348,6 +351,9 @@ def run(self, self.n_frames = n_frames + # in case _prepare has not set an array. + self._results = np.zeros(n_frames) + if n_frames == 0: warnings.warn("run() analyses no frames: check start/stop/step") if n_frames < n_blocks: @@ -371,7 +377,7 @@ def run(self, self._dask_helper, pure=False)( bslice, self._universe, - self._atomgroups, ) + ) blocks.append(task) # save the frame numbers for each block _blocks.append(range(bslice.start, @@ -402,7 +408,7 @@ def run(self, np.array([el[5] for el in res])) return self - def _dask_helper(self, bslice, u, atomgroups): + def _dask_helper(self, bslice, u): """helper function to actually setup dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() @@ -411,19 +417,28 @@ def _dask_helper(self, bslice, u, atomgroups): times_compute = [] # NOTE: bslice.stop cannot be None! Always make sure # that it comes from _trajectory.check_slice_indices()! - for i in range(bslice.start, bslice.stop, bslice.step): + block_ind = [] + for i, ts in enumerate(self._universe._trajectory[bslice]): + self._frame_index = i # record io time per frame + self._ts = ts with timeit() as b_compute: - res = self._reduce(res, self._single_frame(i)) + self._reduce(ts, self._single_frame()) + block_ind.append(i) times_compute.append(b_compute.elapsed) + # as oppsed to + # res = [] + # for i,ts in traj: + # res.append(self._reduce(...) + # return res + # It does not return the right value except the first block, not totally sure why. + # calculate io and compute time per block - return np.asarray(res), np.asarray(times_io), np.asarray( + return np.asarray(self._results[block_ind]), np.asarray(times_io), np.asarray( times_compute), wait_end, np.sum( times_io), np.sum(times_compute) - @staticmethod - def _reduce(res, result_single_frame): + def _reduce(self, ts, result_single_frame): """ 'append' action for a time series""" - res.append(result_single_frame) - return res + return self._results[ts.frame] diff --git a/pmda/rms/rmsd.py b/pmda/rms/rmsd.py index 3c9e8d15..a185e2f5 100644 --- a/pmda/rms/rmsd.py +++ b/pmda/rms/rmsd.py @@ -127,18 +127,23 @@ class RMSD(ParallelAnalysisBase): """ def __init__(self, mobile, ref, superposition=True): universe = mobile.universe - super(RMSD, self).__init__(universe, (mobile, )) + super(RMSD, self).__init__(universe) + self._atomgroups = (mobile, ) self._ref_pos = ref.positions.copy() self.superposition = superposition def _prepare(self): + self._results = np.zeros((self.n_frames, 3)) self.rmsd = None def _conclude(self): self.rmsd = np.vstack(self._results) - def _single_frame(self, i): - ts = self._universe.trajectory[i] - return (ts.frame, ts.time, - rms.rmsd(self._atomgroups[0].positions, self._ref_pos, - superposition=self.superposition)) + def _single_frame(self): + # the current timestep of the trajectory is self._ts + self._results[self._frame_index, 0] = self._ts.frame + # the actual trajectory is at self._trajectory + self._results[self._frame_index, 1] = self._trajectory.time + self._results[self._frame_index, 2] = rms.rmsd(self._atomgroups[0].positions, + self._ref_pos, + superposition=self.superposition) From 19a65b6122d3b5df9a6a859733d222ea212fea2a Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sat, 11 Jul 2020 13:16:55 +0200 Subject: [PATCH 04/13] remove reduce --- pmda/parallel.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 7bbe0745..ed9bf660 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -412,7 +412,6 @@ def _dask_helper(self, bslice, u): """helper function to actually setup dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() - res = [] times_io = [] times_compute = [] # NOTE: bslice.stop cannot be None! Always make sure @@ -423,7 +422,8 @@ def _dask_helper(self, bslice, u): # record io time per frame self._ts = ts with timeit() as b_compute: - self._reduce(ts, self._single_frame()) + self._single_frame() + block_ind.append(i) times_compute.append(b_compute.elapsed) @@ -438,7 +438,3 @@ def _dask_helper(self, bslice, u): return np.asarray(self._results[block_ind]), np.asarray(times_io), np.asarray( times_compute), wait_end, np.sum( times_io), np.sum(times_compute) - - def _reduce(self, ts, result_single_frame): - """ 'append' action for a time series""" - return self._results[ts.frame] From 9ef2bb650cf614ab15b074e3e995aad749bdf4e5 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sun, 12 Jul 2020 15:53:49 +0200 Subject: [PATCH 05/13] density --- pmda/density.py | 38 ++++++++++++++++++++------------------ pmda/parallel.py | 3 ++- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pmda/density.py b/pmda/density.py index 461ae1f7..450d736e 100644 --- a/pmda/density.py +++ b/pmda/density.py @@ -240,8 +240,8 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, metadata=None, padding=2.0, updating=False, parameters=None, gridcenter=None, xdim=None, ydim=None, zdim=None): - u = atomgroup.universe - super(DensityAnalysis, self).__init__(u, (atomgroup, )) + universe = atomgroup.universe + super().__init__(universe) self._atomgroup = atomgroup self._delta = delta self._atomselection = atomselection @@ -253,7 +253,7 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, self._xdim = xdim self._ydim = ydim self._zdim = zdim - self._trajectory = u.trajectory + self._trajectory = universe.trajectory if updating and atomselection is None: raise ValueError("updating=True requires a atomselection string") elif not updating and atomselection is not None: @@ -289,17 +289,27 @@ def _prepare(self): grid, edges = np.histogramdd(np.zeros((1, 3)), bins=bins, range=arange, normed=False) grid *= 0.0 - self._grid = grid + + self._tmp = [[0, 0, grid]] * self.n_frames + self._results = [[0, 0, grid]] * self.n_frames self._edges = edges self._arange = arange self._bins = bins - def _single_frame(self, ts, atomgroups): - coord = self.current_coordinates(atomgroups[0], self._atomselection, - self._updating) - result = np.histogramdd(coord, bins=self._bins, range=self._arange, - normed=False) - return result[0] + def _single_frame(self): + h, _ = np.histogramdd(self._atomgroup.positions, + bins=self._bins, range=self._arange, + normed=False) + # reduce (proposed change #2542 to match the parallel version in pmda.density) + # return self._reduce(self._grid, h) + # + # serial code can simply do + + # the current timestep of the trajectory is self._ts + self._results[self._frame_index][0] = self._ts.frame + # the actual trajectory is at self._trajectory + self._results[self._frame_index][1] = self._trajectory.time + self._results[self._frame_index][2] = h def _conclude(self): self._grid = self._results[:].sum(axis=0) @@ -322,14 +332,6 @@ def _conclude(self): density.make_density() self.density = density - @staticmethod - def _reduce(res, result_single_frame): - """ 'accumulate' action for a time series""" - if isinstance(res, list) and len(res) == 0: - res = result_single_frame - else: - res += result_single_frame - return res @staticmethod def current_coordinates(atomgroup, atomselection, updating): diff --git a/pmda/parallel.py b/pmda/parallel.py index ed9bf660..e1fd69f8 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -387,6 +387,7 @@ def run(self, # record the time when scheduler starts working wait_start = time.time() res = blocks.compute(**scheduler_kwargs) + self._res_dask = res # hack to handle n_frames == 0 in this framework if len(res) == 0: # everything else wants list of block tuples @@ -435,6 +436,6 @@ def _dask_helper(self, bslice, u): # It does not return the right value except the first block, not totally sure why. # calculate io and compute time per block - return np.asarray(self._results[block_ind]), np.asarray(times_io), np.asarray( + return np.asarray(np.asarray(self._results)[block_ind]), np.asarray(times_io), np.asarray( times_compute), wait_end, np.sum( times_io), np.sum(times_compute) From 6696f9d877037791d0d83176aac69ebf4c756dec Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sun, 12 Jul 2020 17:19:06 +0200 Subject: [PATCH 06/13] add timeit for io and dask prep --- pmda/parallel.py | 49 +++++++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 9f58882b..56aef998 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -35,7 +35,7 @@ class Timing(object): store various timeing results of obtained during a parallel analysis run """ - def __init__(self, io, compute, total, prepare, + def __init__(self, io, compute, total, prepare, prepare_dask, conclude, wait=None, io_block=None, compute_block=None): self._io = io @@ -45,6 +45,7 @@ def __init__(self, io, compute, total, prepare, self._total = total self._cumulate = np.sum(io) + np.sum(compute) self._prepare = prepare + self._prepare_dask = prepare_dask self._conclude = conclude self._wait = wait @@ -87,6 +88,11 @@ def prepare(self): """time to prepare""" return self._prepare + @property + def prepare_dask(self): + """time for blocks to start working""" + return self._prepare_dask + @property def conclude(self): """time to conclude""" @@ -372,17 +378,19 @@ def run(self, blocks = [] _blocks = [] with self.readonly_attributes(): - for bslice in slices: - task = delayed( - self._dask_helper, pure=False)( - bslice, - self._universe, - ) - blocks.append(task) - # save the frame numbers for each block - _blocks.append(range(bslice.start, - bslice.stop, bslice.step)) - blocks = delayed(blocks) + with timeit() as prepare_dask: + for bslice in slices: + task = delayed( + self._dask_helper, pure=False)( + bslice, + self._universe, + ) + blocks.append(task) + # save the frame numbers for each block + _blocks.append(range(bslice.start, + bslice.stop, bslice.step)) + blocks = delayed(blocks) + time_prepare_dask = prepare_dask.elapsed # record the time when scheduler starts working wait_start = time.time() @@ -402,10 +410,12 @@ def run(self, self.timing = Timing( np.hstack([el[1] for el in res]), np.hstack([el[2] for el in res]), total.elapsed, - np.array([el[3] for el in res]), time_prepare, + time_prepare, + time_prepare_dask, conclude.elapsed, # waiting time = wait_end - wait_start - np.array([el[4]-wait_start for el in res]), + np.array([el[3]-wait_start for el in res]), + np.array([el[4] for el in res]), np.array([el[5] for el in res])) return self @@ -418,17 +428,22 @@ def _dask_helper(self, bslice, u): # NOTE: bslice.stop cannot be None! Always make sure # that it comes from _trajectory.check_slice_indices()! block_ind = [] - for i, ts in enumerate(self._universe._trajectory[bslice]): + + for i in range(bslice.start, bslice.stop, bslice.step): self._frame_index = i # record io time per frame - self._ts = ts + # explicit instead of 'for ts in u.trajectory[bslice]' + # so that we can get accurate timing. + with timeit() as b_io: + self._ts = u.trajectory[i] with timeit() as b_compute: self._single_frame() + times_io.append(b_io.elapsed) block_ind.append(i) times_compute.append(b_compute.elapsed) - # as oppsed to + # as opposed to # res = [] # for i,ts in traj: # res.append(self._reduce(...) From e0d3f7cca9fecd818baf64d327627f8db9454003 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sun, 12 Jul 2020 17:40:59 +0200 Subject: [PATCH 07/13] density sum --- pmda/density.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pmda/density.py b/pmda/density.py index bc140ab3..60ce17dd 100644 --- a/pmda/density.py +++ b/pmda/density.py @@ -312,7 +312,8 @@ def _single_frame(self): def _conclude(self): - self._grid = self._results[:].sum(axis=0)[0] + # sum both inside and among blocks. + self._grid = self._results[:].sum(axis=(0, 1)) self._grid /= float(self.n_frames) metadata = self._metadata if self._metadata is not None else {} metadata['psf'] = self._atomgroup.universe.filename From 2dae36610d24979beb3559e5774896510f6e4ada Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sun, 12 Jul 2020 18:46:58 +0200 Subject: [PATCH 08/13] rmsf --- pmda/parallel.py | 3 +- pmda/rms/rmsf.py | 206 +++++------------------------------------------ 2 files changed, 24 insertions(+), 185 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 56aef998..e6c6b03a 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -429,7 +429,8 @@ def _dask_helper(self, bslice, u): # that it comes from _trajectory.check_slice_indices()! block_ind = [] - for i in range(bslice.start, bslice.stop, bslice.step): + for block_i, i in enumerate(range(bslice.start, bslice.stop, bslice.step)): + self._block_i = block_i self._frame_index = i # record io time per frame # explicit instead of 'for ts in u.trajectory[bslice]' diff --git a/pmda/rms/rmsf.py b/pmda/rms/rmsf.py index ad563b55..7fd618e9 100644 --- a/pmda/rms/rmsf.py +++ b/pmda/rms/rmsf.py @@ -35,172 +35,39 @@ class RMSF(ParallelAnalysisBase): - r"""Parallel RMSF analysis. - - Calculates RMSF of given atoms across a trajectory. - - Attributes - ---------- - rmsf : array - ``N``-length :class:`numpy.ndarray` array of RMSF values, - where ``N`` is the number of atoms in the `atomgroup` of - interest. Returned values have units of ångströms. - - Parameters - ---------- - atomgroup : AtomGroup - Atoms for which RMSF is calculated - - Raises - ------ - ValueError - raised if negative values are calculated, which indicates that a - numerical overflow or underflow occured - - See Also - -------- - MDAnalysis.analysis.rms.RMSF - - Notes - ----- - No RMSD-superposition is performed; it is assumed that the user is - providing a trajectory where the protein of interest has been structurally - aligned to a reference structure (see the Examples section below). The - protein also has be whole because periodic boundaries are not taken into - account. - - Run the analysis with :meth:`RMSF.run`, which stores the results - in the array :attr:`RMSF.rmsf`. - - The root mean square fluctuation of an atom :math:`i` is computed as the - time average: - - .. math:: - - \sigma_{i} = \sqrt{\left\langle (\mathbf{x}_{i} - - \langle\mathbf{x}_{i}\rangle)^2 - \right\rangle} - - No mass weighting is performed. - This method implements an algorithm for computing sums of squares while - avoiding overflows and underflows [Welford1962]_, as well as an algorithm - for combining the sum of squares and means of separate partitions of a - given trajectory to calculate the RMSF of the entire trajectory - [CGL1979]_. - - For more details about RMSF calculations, refer to [Awtrey2019]_. - - References - ---------- - .. [Welford1962] B. P. Welford (1962). "Note on a Method for - Calculating Corrected Sums of Squares and Products." Technometrics - 4(3):419-420. - - Examples - -------- - In this example we calculate the residue RMSF fluctuations by analyzing - the :math:`\text{C}_\alpha` atoms. First we need to fit the trajectory - to the average structure as a reference. That requires calculating the - average structure first. Because we need to analyze and manipulate the - same trajectory multiple times, we are going to load it into memory - using the :mod:`~MDAnalysis.coordinates.MemoryReader`. (If your - trajectory does not fit into memory, you will need to :ref:`write out - intermediate trajectories ` to disk or - :ref:`generate an in-memory universe - ` that only contains, say, the - protein):: - - import MDAnalysis as mda - from MDAnalysis.analysis import align - from MDAnalysis.tests.datafiles import TPR, XTC - u = mda.Universe(TPR, XTC, in_memory=True) - protein = u.select_atoms("protein") - - # TODO: Need to center and make whole (this test trajectory - # contains the protein being split across periodic boundaries - # and the results will be WRONG!) - - # Fit to the initial frame to get a better average structure - # (the trajectory is changed in memory) - prealigner = align.AlignTraj(u, u, select="protein and name CA", - in_memory=True).run() - # ref = average structure - ref_coordinates = u.trajectory.timeseries(asel=protein).mean(axis=1) - # Make a reference structure (need to reshape into a - # 1-frame "trajectory"). - ref = mda.Merge(protein).load_new(ref_coordinates[:, None, :], - order="afc") - - We created a new universe ``reference`` that contains a single frame - with the averaged coordinates of the protein. Now we need to fit the - whole trajectory to the reference by minimizing the RMSD. We use - :class:`MDAnalysis.analysis.align.AlignTraj`:: - - aligner = align.AlignTraj(u, ref, select="protein and name CA", - in_memory=True).run() - # need to write the trajectory to disk for PMDA 0.3.0 (see issue #15) - with mda.Writer("rmsfit.xtc", n_atoms=u.atoms.n_atoms) as W: - for ts in u.trajectory: - W.write(u.atoms) - - (For use with PMDA we cannot currently use a in-memory trajectory - (see `Issue #15`_) so we must write out the RMS-superimposed - trajectory to the file "rmsfit.xtc".) - - The trajectory is now fitted to the reference (the RMSD is stored as - `aligner.rmsd` for further inspection). Now we can calculate the RMSF:: - - from pmda.rms import RMSF - - u = mda.Universe(TPR, "rmsfit.xtc") - calphas = protein.select_atoms("protein and name CA") - - rmsfer = RMSF(calphas).run() - - and plot:: - - import matplotlib.pyplot as plt - plt.plot(calphas.resnums, rmsfer.rmsf) - - - .. versionadded:: 0.3.0 - - - .. _`Issue #15`: https://github.com/MDAnalysis/pmda/issues/15 - - """ - - def __init__(self, atomgroup): - u = atomgroup.universe - super(RMSF, self).__init__(u, (atomgroup, )) - self._atomgroup = atomgroup - self._top = u.filename - self._traj = u.trajectory.filename - - def _single_frame(self, ts, atomgroups): - # mean and sum of squares calculations done in _reduce() - return atomgroups[0] + def __init__(self, atomgroup, **kwargs): + super().__init__(atomgroup.universe, **kwargs) + self.atomgroup = atomgroup + + def _prepare(self): + self.sumsquares = np.zeros((self.atomgroup.n_atoms, 3)) + self.mean = self.sumsquares.copy() + self._results = [self.sumsquares, self.mean] * self.n_frames + + def _single_frame(self): + k = self._block_i + if k == 0: + self.sumsquares = np.zeros((self.atomgroup.n_atoms, 3)) + self.mean = self.atomgroup.positions + else: + self.sumsquares += (k / (k+1.0)) * (self.atomgroup.positions - self.mean) ** 2 + self.mean = (k * self.mean + self.atomgroup.positions) / (k + 1) + self._results[self._frame_index] = np.asarray([self.sumsquares, self.mean]) def _conclude(self): - """ - self._results : Array - (n_blocks x 2 x N x 3) array - """ n_blocks = len(self._results) # serial case if n_blocks == 1: # get length of trajectory slice - self.mean = self._results[0, 0] - self.sumsquares = self._results[0, 1] + self.mean = self._results[0][-1][1] + self.sumsquares = self._results[0][-1][0] self.rmsf = np.sqrt(self.sumsquares.sum(axis=1) / self.n_frames) # parallel case else: - mean = self._results[:, 0] - sos = self._results[:, 1] - # create list of (timesteps, mean, sumsq tuples for each block vals = [] for i in range(n_blocks): - vals.append((len(self._blocks[i]), mean[i], sos[i])) + vals.append((len(self._blocks[i]), self._results[i][-1][1], + self._results[i][-1][0])) # combine block results using fold method results = fold_second_order_moments(vals) self.mean = results[1] @@ -208,35 +75,6 @@ def _conclude(self): self.rmsf = np.sqrt(self.sumsquares.sum(axis=1) / self.n_frames) self._negative_rmsf(self.rmsf) - @staticmethod - def _reduce(res, result_single_frame): - """ - 'sum' action for time series - """ - atoms = result_single_frame - positions = atoms.positions.astype(np.float64) - # initial time step case - if isinstance(res, list) and len(res) == 0: - # initial mean position = initial position - mean = positions - # create new zero-array for sum of squares to prevent blocks from - # using data from previous blocks - sumsq = np.zeros((atoms.n_atoms, 3)) - # set initial time step for each block to zero - k = 0 - # assign initial (sum of squares and mean) zero-arrays to res - res = [mean, sumsq, k] - else: - # update time step - k = res[2] + 1 - # update sum of squares - res[1] += (k / (k + 1)) * (positions - res[0]) ** 2 - # update mean - res[0] = (k * res[0] + positions) / (k + 1) - # update time step in res - res[2] = k - return res - @staticmethod def _negative_rmsf(rmsf): if not (rmsf >= 0).all(): From c1e4cc02e1b3f6cd15f5a03495cb7a6bfcabaf55 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Mon, 13 Jul 2020 13:01:05 +0200 Subject: [PATCH 09/13] no need to pass u --- pmda/parallel.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index e6c6b03a..cf00e6ec 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -383,7 +383,6 @@ def run(self, task = delayed( self._dask_helper, pure=False)( bslice, - self._universe, ) blocks.append(task) # save the frame numbers for each block @@ -419,7 +418,7 @@ def run(self, np.array([el[5] for el in res])) return self - def _dask_helper(self, bslice, u): + def _dask_helper(self, bslice): """helper function to actually setup dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() @@ -436,7 +435,7 @@ def _dask_helper(self, bslice, u): # explicit instead of 'for ts in u.trajectory[bslice]' # so that we can get accurate timing. with timeit() as b_io: - self._ts = u.trajectory[i] + self._ts = self._universe.trajectory[i] with timeit() as b_compute: self._single_frame() times_io.append(b_io.elapsed) From ad24667d7e9a352d1dc0c6e885f41fd5212e489a Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Mon, 13 Jul 2020 20:32:49 +0200 Subject: [PATCH 10/13] add doc rmsf back --- pmda/rms/rmsf.py | 134 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/pmda/rms/rmsf.py b/pmda/rms/rmsf.py index 7fd618e9..fc44c53f 100644 --- a/pmda/rms/rmsf.py +++ b/pmda/rms/rmsf.py @@ -35,6 +35,140 @@ class RMSF(ParallelAnalysisBase): + r"""Parallel RMSF analysis. + + Calculates RMSF of given atoms across a trajectory. + + Attributes + ---------- + rmsf : array + ``N``-length :class:`numpy.ndarray` array of RMSF values, + where ``N`` is the number of atoms in the `atomgroup` of + interest. Returned values have units of ångströms. + + Parameters + ---------- + atomgroup : AtomGroup + Atoms for which RMSF is calculated + + Raises + ------ + ValueError + raised if negative values are calculated, which indicates that a + numerical overflow or underflow occured + + See Also + -------- + MDAnalysis.analysis.rms.RMSF + + Notes + ----- + No RMSD-superposition is performed; it is assumed that the user is + providing a trajectory where the protein of interest has been structurally + aligned to a reference structure (see the Examples section below). The + protein also has be whole because periodic boundaries are not taken into + account. + + Run the analysis with :meth:`RMSF.run`, which stores the results + in the array :attr:`RMSF.rmsf`. + + The root mean square fluctuation of an atom :math:`i` is computed as the + time average: + + .. math:: + + \sigma_{i} = \sqrt{\left\langle (\mathbf{x}_{i} - + \langle\mathbf{x}_{i}\rangle)^2 + \right\rangle} + + No mass weighting is performed. + This method implements an algorithm for computing sums of squares while + avoiding overflows and underflows [Welford1962]_, as well as an algorithm + for combining the sum of squares and means of separate partitions of a + given trajectory to calculate the RMSF of the entire trajectory + [CGL1979]_. + + For more details about RMSF calculations, refer to [Awtrey2019]_. + + References + ---------- + .. [Welford1962] B. P. Welford (1962). "Note on a Method for + Calculating Corrected Sums of Squares and Products." Technometrics + 4(3):419-420. + + Examples + -------- + In this example we calculate the residue RMSF fluctuations by analyzing + the :math:`\text{C}_\alpha` atoms. First we need to fit the trajectory + to the average structure as a reference. That requires calculating the + average structure first. Because we need to analyze and manipulate the + same trajectory multiple times, we are going to load it into memory + using the :mod:`~MDAnalysis.coordinates.MemoryReader`. (If your + trajectory does not fit into memory, you will need to :ref:`write out + intermediate trajectories ` to disk or + :ref:`generate an in-memory universe + ` that only contains, say, the + protein):: + + import MDAnalysis as mda + from MDAnalysis.analysis import align + from MDAnalysis.tests.datafiles import TPR, XTC + u = mda.Universe(TPR, XTC, in_memory=True) + protein = u.select_atoms("protein") + + # TODO: Need to center and make whole (this test trajectory + # contains the protein being split across periodic boundaries + # and the results will be WRONG!) + + # Fit to the initial frame to get a better average structure + # (the trajectory is changed in memory) + prealigner = align.AlignTraj(u, u, select="protein and name CA", + in_memory=True).run() + # ref = average structure + ref_coordinates = u.trajectory.timeseries(asel=protein).mean(axis=1) + # Make a reference structure (need to reshape into a + # 1-frame "trajectory"). + ref = mda.Merge(protein).load_new(ref_coordinates[:, None, :], + order="afc") + + We created a new universe ``reference`` that contains a single frame + with the averaged coordinates of the protein. Now we need to fit the + whole trajectory to the reference by minimizing the RMSD. We use + :class:`MDAnalysis.analysis.align.AlignTraj`:: + + aligner = align.AlignTraj(u, ref, select="protein and name CA", + in_memory=True).run() + # need to write the trajectory to disk for PMDA 0.3.0 (see issue #15) + with mda.Writer("rmsfit.xtc", n_atoms=u.atoms.n_atoms) as W: + for ts in u.trajectory: + W.write(u.atoms) + + (For use with PMDA we cannot currently use a in-memory trajectory + (see `Issue #15`_) so we must write out the RMS-superimposed + trajectory to the file "rmsfit.xtc".) + + The trajectory is now fitted to the reference (the RMSD is stored as + `aligner.rmsd` for further inspection). Now we can calculate the RMSF:: + + from pmda.rms import RMSF + + u = mda.Universe(TPR, "rmsfit.xtc") + calphas = protein.select_atoms("protein and name CA") + + rmsfer = RMSF(calphas).run() + + and plot:: + + import matplotlib.pyplot as plt + plt.plot(calphas.resnums, rmsfer.rmsf) + + + .. versionadded:: 0.3.0 + + + .. _`Issue #15`: https://github.com/MDAnalysis/pmda/issues/15 + + """ def __init__(self, atomgroup, **kwargs): super().__init__(atomgroup.universe, **kwargs) self.atomgroup = atomgroup From 72181ae03ce815c8bcc87fd2ca4d18863f0037ef Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Tue, 14 Jul 2020 14:31:33 +0200 Subject: [PATCH 11/13] set _results to list of result --- pmda/custom.py | 39 +++++---------- pmda/leaflet.py | 121 ++++++++++++++++++++++++----------------------- pmda/parallel.py | 2 +- 3 files changed, 76 insertions(+), 86 deletions(-) diff --git a/pmda/custom.py b/pmda/custom.py index 23056def..0159feba 100644 --- a/pmda/custom.py +++ b/pmda/custom.py @@ -18,7 +18,6 @@ """ from __future__ import absolute_import -from MDAnalysis.core.groups import AtomGroup from MDAnalysis.core.universe import Universe from MDAnalysis.coordinates.base import ProtoReader import numpy as np @@ -75,32 +74,19 @@ def __init__(self, function, universe, *args, **kwargs): analyze can not be passed as keyword arguments currently. """ - self.function = function - - # collect all atomgroups with the same trajectory object as universe - trajectory = universe.trajectory - arg_ags = [] - self.other_args = [] - for arg in args: - if isinstance(arg, - AtomGroup) and arg.universe.trajectory == trajectory: - arg_ags.append(arg) - else: - self.other_args.append(arg) - - super(AnalysisFromFunction, self).__init__(universe, arg_ags) + super().__init__(universe) + self.args = args self.kwargs = kwargs def _prepare(self): - self.results = [] + self._results = [None] * self.n_frames - def _single_frame(self, ts, atomgroups): - args = atomgroups + self.other_args - return self.function(*args, **self.kwargs) + def _single_frame(self): + self._results[self._frame_index] = self.function(*self.args, **self.kwargs) def _conclude(self): - self.results = np.concatenate(self._results) + self.results = self._results def analysis_class(function): @@ -144,13 +130,12 @@ def analysis_class(function): class WrapperClass(AnalysisFromFunction): """Custom Analysis Function""" - def __init__(self, trajectory=None, *args, **kwargs): - if not (isinstance(trajectory, ProtoReader) or isinstance( - trajectory, Universe)): - print(type(trajectory)) + def __init__(self, universe=None, *args, **kwargs): + if not isinstance(universe, Universe): + print(type(universe)) raise ValueError( - "First argument needs to be an MDAnalysis reader object.") - super(WrapperClass, self).__init__(function, trajectory, *args, - **kwargs) + "First argument needs to be an MDAnalysis Universe.") + super().__init__(function, universe, *args, + **kwargs) return WrapperClass diff --git a/pmda/leaflet.py b/pmda/leaflet.py index b0e0bbed..efc38d88 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -76,7 +76,7 @@ def __init__(self, universe, atomgroups): super(LeafletFinder, self).__init__(universe, (atomgroups,)) - def _find_connected_components(self, data, cutoff=15.0): + def _find_connected_components(self, data_list, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. Parameters @@ -99,62 +99,66 @@ def _find_connected_components(self, data, cutoff=15.0): """ # pylint: disable=unsubscriptable-object - window, index = data[0] - num = window[0].shape[0] - i_index = index[0] - j_index = index[1] - graph = nx.Graph() - - if i_index == j_index: - train = window[0] - test = window[1] - else: - train = np.vstack([window[0], window[1]]) - test = np.vstack([window[0], window[1]]) - - tree = cKDTree(train, leafsize=40) - edges = tree.query_ball_point(test, cutoff) - edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) - for idx, dest_list in enumerate(edges)] - - edge_list_flat = np.array([list(item) for sublist in edge_list for - item in sublist]) - - if i_index == j_index: - res = edge_list_flat.transpose() - res[0] = res[0] + i_index - 1 - res[1] = res[1] + j_index - 1 - else: - removed_elements = list() - for i in range(edge_list_flat.shape[0]): - if (edge_list_flat[i, 0] >= 0 and - edge_list_flat[i, 0] <= num - 1) and \ - (edge_list_flat[i, 1] >= 0 and - edge_list_flat[i, 1] <= num - 1) or \ - (edge_list_flat[i, 0] >= num and - edge_list_flat[i, 0] <= 2 * num - 1) and \ - (edge_list_flat[i, 1] >= num and - edge_list_flat[i, 1] <= 2 * num - 1) or \ - (edge_list_flat[i, 0] >= num and - edge_list_flat[i, 0] <= 2 * num - 1) and \ - (edge_list_flat[i, 1] >= 0 and - edge_list_flat[i, 1] <= num - 1): - removed_elements.append(i) - res = np.delete(edge_list_flat, removed_elements, - axis=0).transpose() - res[0] = res[0] + i_index - 1 - res[1] = res[1] - num + j_index - 1 - if res.shape[1] == 0: - res = np.zeros((2, 1), dtype=np.int) - - edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] - graph.add_edges_from(edges) - - # partial connected components - - subgraphs = nx.connected_components(graph) - comp = [g for g in subgraphs] - return comp + #raise TypeError(data) + comp_s = [] + for data in data_list: + window, index = data + num = window[0].shape[0] + i_index = index[0] + j_index = index[1] + graph = nx.Graph() + + if i_index == j_index: + train = window[0] + test = window[1] + else: + train = np.vstack([window[0], window[1]]) + test = np.vstack([window[0], window[1]]) + + tree = cKDTree(train, leafsize=40) + edges = tree.query_ball_point(test, cutoff) + edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) + for idx, dest_list in enumerate(edges)] + + edge_list_flat = np.array([list(item) for sublist in edge_list for + item in sublist]) + + if i_index == j_index: + res = edge_list_flat.transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] + j_index - 1 + else: + removed_elements = list() + for i in range(edge_list_flat.shape[0]): + if (edge_list_flat[i, 0] >= 0 and + edge_list_flat[i, 0] <= num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= num and + edge_list_flat[i, 1] <= 2 * num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1): + removed_elements.append(i) + res = np.delete(edge_list_flat, removed_elements, + axis=0).transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] - num + j_index - 1 + if res.shape[1] == 0: + res = np.zeros((2, 1), dtype=np.int) + + edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] + graph.add_edges_from(edges) + + # partial connected components + + subgraphs = nx.connected_components(graph) + comp = [g for g in subgraphs] + comp_s.append(comp) + return comp_s # pylint: disable=arguments-differ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, @@ -200,12 +204,13 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, # Distribute the data over the available cores, apply the map function # and execute. parAtoms = db.from_sequence(arranged_coord, - npartitions=len(arranged_coord)) + npartitions=n_jobs) parAtomsMap = parAtoms.map_partitions(self._find_connected_components, cutoff=cutoff) Components = parAtomsMap.compute(**scheduler_kwargs) # Gather the results and start the reduction. TODO: think if it can go + Components = [item for sublist in Components for item in sublist] # to the private _reduce method of the based class. result = list(Components) diff --git a/pmda/parallel.py b/pmda/parallel.py index cf00e6ec..3e183c8b 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -358,7 +358,7 @@ def run(self, self.n_frames = n_frames # in case _prepare has not set an array. - self._results = np.zeros(n_frames) + self._results = [None] * self.n_frames if n_frames == 0: warnings.warn("run() analyses no frames: check start/stop/step") From 7db243c83713ccae960cdaf6d9f7a935373d35f9 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Tue, 14 Jul 2020 22:55:58 +0200 Subject: [PATCH 12/13] move _results to prep --- pmda/parallel.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 3e183c8b..3eb3fc3e 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -252,7 +252,7 @@ def _conclude(self): def _prepare(self): """additional preparation to run""" - pass # pylint: disable=unnecessary-pass + self._results = [None] * self.n_frames def _single_frame(self): """Perform computation on a single trajectory frame. @@ -357,9 +357,6 @@ def run(self, self.n_frames = n_frames - # in case _prepare has not set an array. - self._results = [None] * self.n_frames - if n_frames == 0: warnings.warn("run() analyses no frames: check start/stop/step") if n_frames < n_blocks: From 00a9b0498beab8ca2f233f0cd45d5fca36cd11bd Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 12:59:27 +0200 Subject: [PATCH 13/13] build mdanalysis on serialize_io --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index f308baf6..5d29ff74 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,14 +19,14 @@ env: # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - #- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - #- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" + - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&subdirectory=package git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false