In [15]:
from molecular_mpns.config import data_dir
from molecular_mpns.proto_molecule import Molecule
import numpy as np
import mdtraj
import sympy
import scipy.linalg as scl

In [16]:
# load data
xyz_traj = np.load(str(data_dir)+'/proto_mol_traj.npy')
dh_traj = np.load(str(data_dir)+'/proto_angles.npy') # for comparison
m = xyz_traj.shape[0]

In [17]:
""" General Settings: """
# Number of dominant timescales:
nc = 6
# Downsampling parameter:
stride = 1
# Tolerance for small eigenvalues:
tol_ev = 1e-8
# Tolerance for positive eigenvalues:
tol_pos = 1e-3

In [18]:
""" System Settings: """
# System Settings:
N = 5
beta = 15.0
kb = 1.0
rb = 2.0
ka = 1.0
ra = np.pi/2
kd = np.array([[0.02, 3], [0.02, 2]])
# Generate instance of the system class:
Mol = Molecule(N, beta, kb, rb, ka, ra, kd)

In [19]:
# Calculate dihedrals:
traj = mdtraj.Trajectory(xyz_traj, Mol._create_top())
atom_ind_dih = np.array([[0, 1, 2, 3], [1, 2, 3, 4]])
xi_traj = mdtraj.compute_dihedrals(traj, atom_ind_dih).T # not consistent with dh_traj...
np.save(str(data_dir)+'/proto_xi_traj.npy',xi_traj)

In [20]:
""" Gradient of dihedral angle coordinate:
-------------------------------
"""


def compute_dihedral_jacobian(xyz, atom_ind, output_freq=None):
    """ Compute the Jacobian of a set of dihedral angles w.r.t. all
        atomic coordinates involved in the definition of these dihedrals.

    Parameters:
    -----------
    xyz, ndarray(m, natoms, 3):     m snapshots of the Euclidean coordinates of natoms
                                    atoms in a molecule.
    atom_ind, ndarray(ndih, 4):     list of the indices of the four atoms spanning each
                                    dihedral in question.
    output_freq, int (optional):   Print out a message every output_freq'th step.
     """
    # Get info:
    m, natoms, _ = xyz.shape
    ndih = atom_ind.shape[0]
    # Prepare output:
    traj_dxi = np.zeros((ndih, natoms, 3, m))
    for ll in range(m):
        for jj in range(ndih):
            traj_dxi[jj, atom_ind[jj, :], :, ll] = dih_grad(xyz[ll, atom_ind[jj, :], :])
        if (output_freq is not None) and (np.remainder(ll + 1, output_freq) == 0):
            print("Completed %d snapshots." % (ll + 1))

    return traj_dxi


""" Evaluate the gradient of a single dihedral w.r.t. the twelve atomic
    coordinates defining the dihedral.
"""
def dih_grad(r):
    """
    Parameters:
    ------------
    r, ndarray(4, 3): position vectors of four atoms.

    Returns:
    ------------
    grad, ndarray(4, 3): gradient of the dihedral w.r.t. all Euclidean coords.

    """
    # Extract position vectors of individual atoms:
    ri = r[0, :]
    rj = r[1, :]
    rk = r[2, :]
    rl = r[3, :]
    # Assemble gradient:
    grad = np.zeros((4, 3))
    grad[0, :] = m1(ri, rj, rk, rl)
    grad[1, :] = ((S1(ri, rj, rk, rl) - 1) * m1(ri, rj, rk, rl)
                  + S2(ri, rj, rk, rl) * m2(ri, rj, rk, rl))
    grad[2, :] = -(S1(ri, rj, rk, rl) * m1(ri, rj, rk, rl) +
                   (S2(ri, rj, rk, rl) - 1) * m2(ri, rj, rk, rl))
    grad[3, :] = -m2(ri, rj, rk, rl)
    return grad

""" Helper Functions to calculate Jacobian of dihedral angles: """
# Distances and distance vectors:
def dist_vec(ri, rj):
    return ri - rj

def dist(ri, rj):
    return scl.norm(ri - rj)

# Scalar products:
def S1(ri, rj, rk, rl):
    rkj = dist(rk, rj)
    return np.dot(dist_vec(ri, rj), dist_vec(rk, rj))/(rkj**2)

def S2(ri, rj, rk, rl):
    rkj = dist(rk, rj)
    return np.dot(dist_vec(rk, rl), dist_vec(rk, rj))/(rkj**2)

# Normal vectors:
def m_vec(ri, rj, rk, rl):
    return np.cross(dist_vec(ri, rj), dist_vec(rk, rj))

def n_vec(ri, rj, rk, rl):
    return np.cross(dist_vec(rk, rj), dist_vec(rk, rl))

# Unit normal vectors:
def m_vec_norm(ri, rj, rk, rl):
    m = m_vec(ri, rj, rk, rl)
    return m / (scl.norm(m)**2)

def n_vec_norm(ri, rj, rk, rl):
    n = n_vec(ri, rj, rk, rl)
    return n / (scl.norm(n)**2)

def m1(ri, rj, rk, rl):
    rkj = dist(rk, rj)
    mn = m_vec_norm(ri, rj, rk, rl)
    return rkj*mn

def m2(ri, rj, rk, rl):
    rkj = dist(rk, rj)
    nn = n_vec_norm(ri, rj, rk, rl)
    return rkj*nn

In [21]:
# compute dihedral gradidents
dxi_traj = compute_dihedral_jacobian(xyz_traj, atom_ind_dih, output_freq=None)
dxi_traj = np.reshape(dxi_traj, (2, N*3, m))
#np.save(str(data_dir)+'/proto_dxi_traj.npy',dxi_traj)

In [10]:
# Contract along spatial dimension:
dxi_traj = (2.0 / beta) * np.einsum("ikl, jkl-> ijl", dxi_traj, dxi_traj) # computes effective a term

In [11]:
""" Convert list of symbolic functions into a callable function:
----------------------------------------------------------------
"""

class Sym2numeric:

    def __init__(self, psi_list, var_list):
        """ This class allows to pass a list of symbolic functions and create that
        can be called to evaluate the basis at any given point. First and second order
        derivatives can also be evaluated.

        Parameters:
        -----------
        psi_list, list:         symbolic functions defining the basis set.
        var_list, list:         list of symbolic variables defining state space.
        """
        # Get info:
        self.psi = psi_list
        self.var = var_list
        self.n = len(self.psi)
        self.d = len(self.var)
        # Generate numerical functions:
        self.psi_eval = [sympy.lambdify(self.var, self.psi[ii], "numpy") for ii in range(self.n)]
        # Generate numerical functions for first and second order derivatives:
        self.dpsi_eval = [[sympy.lambdify(self.var, self.psi[ii].diff(self.var[jj]), "numpy")
                           for jj in range(self.d)] for ii in range(self.n)]
        self.ddpsi_eval = [[[sympy.lambdify(self.var, (self.psi[ii]).diff(self.var[kk]).diff(self.var[jj]), "numpy")
                           for kk in range(self.d)] for jj in range(self.d)] for ii in range(self.n)]

    def __call__(self, x):
        """ Evaluate the basis set along array of positions x

        Parameters:
        -----------
        x, ndarray(d, m):       array of m positions in d-dimensional space.

        Returns:
        --------
        psi_x, ndarray(n, m):   Evaluation of the basis set at all positions.
        """
        # Get info:
        m = x.shape[1]
        # Prepare output:
        psi_x = np.zeros((self.n, m))
        for ii in range(self.n):
            psi_x[ii, :] = self.psi_eval[ii](*[x[ll, :] for ll in range(self.d)])

        return psi_x

    def diff(self, x):
        """ Evaluate gradients of the basis set along array of positions x

        Parameters:
        -----------
        x, ndarray(d, m):       array of m positions in d-dimensional space.

        Returns:
        --------
        dpsi_x, ndarray(n, d, m):   Evaluation of the gradients of the basis set at all positions.

        """
        # Get info:
        m = x.shape[1]
        # Prepare output:
        dpsi_x = np.zeros((self.n, self.d, m))
        for ii in range(self.n):
            for jj in range(self.d):
                dpsi_x[ii, jj, :] = self.dpsi_eval[ii][jj](*[x[ll, :] for ll in range(self.d)])

        return dpsi_x

    def ddiff(self, x):
        """ Evaluate Hessian of the basis set along array of positions x

        Parameters:
        -----------
        x, ndarray(d, m):       array of m positions in d-dimensional space.

        Returns:
        --------
        ddpsi_x, ndarray(n, d, d, m):   Evaluation of the Hessian of the basis set at all positions.

        """
        # Get info:
        m = x.shape[1]
        # Prepare output:
        ddpsi_x = np.zeros((self.n, self.d, self.d, m))
        for ii in range(self.n):
            for jj in range(self.d):
                for kk in range(self.d):
                    ddpsi_x[ii, jj, kk, :] = self.ddpsi_eval[ii][jj][kk](*[x[ll, :] for ll in range(self.d)])

        return ddpsi_x

"""
Reversible gEDMD
----------------
"""

class GEDMD_Reversible:
    """
        Estimator class for gEDMD, assuming reversible dynamics.

        psi, instance of Sym2numeric:   callable object to evaluate basis set
                                        and their derivatives.
        p, int:                         dimension of the state space.

    """
    def __init__(self, psi, p):
        self.psi = psi
        # Extract basis set size and dimension:
        self.nf = self.psi.n
        self.p = p
        # Flag if estimation has been carried out or not:
        self._estimated = False

    def fit(self, X, dxi, rw=None):
        """
        Estimate generator model from trajectory data:

        X, ndarray (p, m):
            p-dimensional trajectory data.
        dxi, nd-array (p, p, m)
            time series of "transformed diffusion" nabla xi^T sigma sigma^T  nabla xi.
        rw, nd-array (m,) (optional)
            time series of re-weighting factors if the data does not sample the invariant
            measure.
        """
        # Obtain shape of the data:
        _, m = X.shape

        # Evaluate all basis functions and their gradients:
        psi_traj = self.psi(X)
        dpsi_traj = self.psi.diff(X)

        # Contract gradients with Jacobians, to shape (nf, p, m):
        dpsi1 = np.einsum("ijk, jlk -> ilk", dpsi_traj, dxi)
        # Build data-based Galerkin matrices:
        if rw is not None:
            self.G = (1.0 / m) * np.dot(psi_traj * rw[None, :], psi_traj.T)
            self.A = (-0.5 / m) * np.einsum("ijk, ljk -> il", dpsi1, dpsi_traj * rw[None, None, :])
        else:
            self.G = (1.0 / m) * np.dot(psi_traj, psi_traj.T)
            self.A = (-0.5 / m) * np.einsum("ijk, ljk -> il", dpsi1, dpsi_traj)
        self._estimated = True

    def eigen_decomposition(self, tol, symfun=False):
        """
        Compute eigenvalues and eigenvectors of the generator matrix.

        tol, float:
            cut-off for small singular values of mass matrix.
        symfun, bool
            return a list of symbolic expressions for eigenfunctions


        Returns:
        --------
        kappa, ndarray(nev,)
            array of eigenvalues, sorted in decreasing order.
        V, ndarray(nf, nev)
            array of corresponding eigenvectors.
        ev_fun, list:
            list of symbolic expressions for eigenfunctions (only if symfun=True)

        """
        # Compute transformed generator matrix and transformation matrix:
        U, L = self.compute_UL(tol)
        # Diagonalize L:
        kappa, V = scl.eigh(L)
        # Sort in decreasing order:
        kappa = kappa[::-1]
        V = V[:, ::-1]
        # Transform eigenvectors back to original basis:
        V = np.dot(U, V)
        # Generate symbolic expressions if required:
        if symfun:
            nev = V.shape[1]
            ev_sym = []
            for ii in range(nev):
                phi = V[0, ii] * self.psi.psi[0]
                for jj in range(1, self.nf):
                    phi = phi + V[jj, ii] * self.psi.psi[jj]
                ev_sym.append(phi)
            return kappa, V, ev_sym
        else:
            return kappa, V

    def compute_UL(self, tol):
        """
        Compute transformation U to orthonormal basis and generator matrix L.

        tol, float:
            cut-off for small singular values of mass matrix.

        Returns:
        --------
        U, ndarray (nf, r):
            transformation to orthonormal basis (possible of smaller size)
        L, ndarray (r, r):
            generator matrix for transformed basis

        """
        if self._estimated:
            # Transform basis:
            U = self._transform_ortho(self.G, tol)
            # Compute projected generator matrices:
            L = np.dot(U.T, np.dot(self.A, U))
            return U, L
        else:
            raise RuntimeError("Galerkin Matrices have not been estimated yet.")

    def identify_drift(self, cx, tol=1e-6):
        """
        Perform system id for drift coefficients:

        cx, ndarray (nf, p):
            coefficients of the coordinate functions in the basis.
        tol, float (optional):
            cut-off for small singular values of mass matrix

        Returns:
        --------
        b, ndarray (nf, p):
            coefficients of all drift components in the basis.
        """
        # Compute U and L:
        U, L = self.compute_UL(tol)
        # Identify coefficients of coordinate functions in reduced representation:
        c_id = scl.lstsq(U, cx)[0]
        # Identify drift coefficients:
        b = np.dot(L, c_id)
        # Re-express in full basis:
        b = np.dot(U, b)

        return b

    def identify_diffusion(self, cx2, cxb, tol=1e-6):
        """
        Perform system id for diffusion coefficients:

        cx2, nd-array (nf, p, p):
            coefficients of z * z^T in the basis.
            Needs to be symmetric, i.e. cx2[:, i, j] = cx2[:, j, i]
        cxb, nd-array (nf, p, p):
            coefficients of (b * z^T + z * b^T) in the basis.
            Needs to be symmetric, i.e. cxb[:, i, j] = cxb[:, j, i]
        tol, float (optional):
            cut-off for small singular values of mass matrix

        Returns:
        --------
        a, ndarray(nf, p, p):
            coefficients of the diffusion matrix in the basis.
            Result is symmetric, i.e. a[:, i, j] = a[:, j, i].
        """
        # Compute U and L:
        U, L = self.compute_UL(tol)
        # Reshape both coefficient vectors:
        cx2 = cx2.reshape((self.nf, self.p * self.p))
        cxb = cxb.reshape((self.nf, self.p * self.p))
        # Identify coefficient of z*z^T in reduced representation:
        c_x2 = scl.lstsq(U, cx2)[0]
        # Identify coefficients of (b*z^T + z*b^T) in reduced representation:
        c_xb = scl.lstsq(U, cxb)[0]
        # Identify diffusion:
        a = 0.5*(np.dot(L, c_x2) - c_xb)
        # Re-express in full basis and reshape to matrix form:
        a = np.dot(U, a)
        a = a.reshape((self.nf, self.p, self.p))

        return a

    def _transform_ortho(self, G, tol):
        """
        Compute transformation to orthonormal basis:

        G, nd-array (nf, nf):
            mass matrix of the basis set
        tol, float:
            cut-off for small singular values of mass matrix
        """
        # Eigenvalue decomposition:
        s, U = scl.eigh(G)
        # Sort in decreasing order
        s = s[::-1]
        U = U[:, ::-1]
        # Elminate small singular values:
        ind = np.where(s / s[0] >= tol)[0]
        s = s[ind]
        U = U[:, ind]
        # Return transformation matrix:
        return np.dot(U, np.diag(s ** (-0.5)))
    
def sort_ev(kappa, V, tol_ev, tol_pos=0.0, tol_im=1e-16, ef=None):
    """

    kappa, ndarray(nc,):array of eigenvalues.
    V , ndarray(m, n):  array of eigenvectors.
    tol_ev , float:     discard eigenvalues of absolute magnitude below this threshold.
    tol_pos, float:     discard eigenvalues of greater than this threshold.
    tol_im, float:      discard eigenvalues of imaginary part above this threshold.
    ef, ndarray(nc,):   array of eigenfunctions.

    Returns kappa, V, and ef if desired, after filtering out all eigenvalues that do not match
        specified tolerances, sorted in decreasing order.
    """

    # Filter out only real eigenvalues below threshold:
    ind = np.where(np.logical_and(kappa <= tol_pos, np.logical_and(np.abs(kappa) > tol_ev,
                                  np.abs(np.imag(kappa)) < tol_im)))[0]
    kappa0 = kappa[ind]
    V0 = V[:, ind]
    if ef is not None:
        ef0 = ef[ind]
    # Sort in decreasing order:
    ind = np.argsort(kappa0)[::-1]

    if ef is not None:
        return kappa0[ind], V0[::, ind], ef0[ind]
    else:
        return kappa0[ind], V0[::, ind]

In [12]:
""" Define basis set: """
# Number of Gaussians per direction:
ng = 10
# Centers for Gaussians:
mx_list_1 = np.linspace(-3.0, 3.0, ng)
mx_list_2 = np.linspace(-3.0, 3.0, ng)
# Width of the Gaussian:
sg = 0.05
# Define symbolic variables and template symbolic functions:
x, y, z, mx = sympy.symbols("x y z mx")
gauss = sympy.exp(-(0.5 / sg) * sympy.sin(0.5*(z - mx))**2)
# Define one-dimensional basis first:
basis_x = []
basis_y = []
# 2d trigonometric polynomials:
for ii in range(ng):
    basis_x.append(gauss.subs([(mx, mx_list_1[ii]), (z, x)]))
    basis_y.append(gauss.subs([(mx, mx_list_2[ii]), (z, y)]))
psi_list = []
for ii in range(len(basis_x)):
    for jj in range(len(basis_y)):
        psi_list.append(basis_x[ii] * basis_y[jj])
nf = len(psi_list)
# Generate callable object for basis set:
psi_eval = Sym2numeric(psi_list, [x, y])

In [14]:
""" Apply Reversible gEDMD: """
# Fit the model:
print("Fitting gEDMD Model: ")
gE = GEDMD_Reversible(psi_eval, 2)
gE.fit(xi_traj, dxi_traj)
# Compute eigenfunctions:
kappa, V, ef = gE.eigen_decomposition(1e-6, symfun=True)
print("All eigenvalues: ")
print(kappa[np.isreal(kappa)])
# Filter out eigenvalues too close to zero:
kappa0, V0, ef0 = sort_ev(kappa, V, tol_ev=tol_ev, tol_pos=tol_pos, ef=np.array(ef))
print("Selected Eigenvalues: ")
print(kappa0)
print("Done.")

""" Analyse Results:"""
# Print out dominant timescales:
ts0 = -1.0/kappa0[1:nc]
print("Dominant Timescales: ")
print(ts0)

Fitting gEDMD Model: 
All eigenvalues: 
[-1.36271281e-04 -4.02678438e-02 -5.80231720e-02 -7.96058586e-02
 -9.34730429e-02 -1.04258169e-01 -2.12779976e-01 -2.64853841e-01
 -2.86710636e-01 -3.88897131e-01 -4.17515491e-01 -4.34706797e-01
 -4.60567978e-01 -4.92980431e-01 -5.16472303e-01 -5.79483938e-01
 -6.04892829e-01 -6.22926087e-01 -6.89775230e-01 -7.91570251e-01
 -8.43756109e-01 -8.58720687e-01 -8.77097272e-01 -8.91001111e-01
 -9.11468407e-01 -9.38966089e-01 -9.72225109e-01 -9.98124590e-01
 -1.02137905e+00 -1.06024221e+00 -1.12322224e+00 -1.13981460e+00
 -1.17841157e+00 -1.22140046e+00 -1.26122121e+00 -1.34011747e+00
 -1.41391133e+00 -1.42723000e+00 -1.45702769e+00 -1.51141089e+00
 -1.53533999e+00 -1.61052171e+00 -1.62306512e+00 -1.64098491e+00
 -1.68130636e+00 -1.69074195e+00 -1.74592329e+00 -1.77198107e+00
 -1.81127498e+00 -1.81821222e+00 -1.86859469e+00 -1.94903821e+00
 -1.96914485e+00 -1.98606018e+00 -2.07761281e+00 -2.11596195e+00
 -2.20249899e+00 -2.22236476e+00 -2.28937263e+00 -