In [1]:
import abc
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
import numpy as np
from glob import glob
import polars as pl
from tqdm import tqdm
import itertools
from natsort import natsorted
import fileinput
import re
import pandas as pd

In [2]:
'''
Conventions:
Variabelnames:
    high-dimensional-arrays:
        - each axis is in the variable name
        - each axis is separated by an underscore (dimensionality is #underscores+1)
        - each axis name's first letter is capitalised and they should be plural (even if dim == 1)
        - the axes might be followed by a descriptor for the elememt's content which should be singular
        - for variables, underscores are reserved for high-dimensional-arrays with a few exceptions:
            * units that would look totally ugly without an underscore between the constant (kB_kcalmolK | kBkcalmolK)
        - Examples:
            * axis0_axis1_axis2 -> Hamiltonians_CollectiveVariables_Timeseries
            * axis0_axis1Property -> Hamiltonians_CVColumnsSamplesize
'''

"\nConventions:\nVariabelnames:\n    high-dimensional-arrays:\n        - each axis is in the variable name\n        - each axis is separated by an underscore (dimensionality is #underscores+1)\n        - each axis name's first letter is capitalised and they should be plural (even if dim == 1)\n        - the axes might be followed by a descriptor for the elememt's content which should be singular\n        - for variables, underscores are reserved for high-dimensional-arrays with a few exceptions:\n            * units that would look totally ugly without an underscore between the constant (kB_kcalmolK | kBkcalmolK)\n        - Examples:\n            * axis0_axis1_axis2 -> Hamiltonians_CollectiveVariables_Timeseries\n            * axis0_axis1Property -> Hamiltonians_CVColumnsSamplesize\n"

In [3]:
class FileParser(ABC):
    def __init__(self, files, CollectiveVariableColumns, subset=0):
        self.files: list[str] = files
        self.CVColumns: list[int] = CollectiveVariableColumns
        self.verbose: bool = False
        self.subset: int = (-subset)
        self.__post_init__()
        
    @abstractmethod
    def __post_init__(self):
        '''initialise subclass attributes'''
    
    @abstractmethod
    def parse_anchors(self) -> np.ndarray:
        '''abstract method to obtain force constant and anchor information form simulation output files'''
        
    @abstractmethod
    def parse_force_constants(self) -> np.ndarray:
        '''do stuff'''
        
    @abstractmethod
    def parse_collective_variables(self) -> list[np.ndarray]:
        '''abstract method to obtain raw data from simulation output''' 

    def calculateSamplesize(self) -> list[int]:
        pass
    
    
class AMBER_PMD_Parser(FileParser):
    def __post_init__(self):
        self.columnPattern: str = r'anchor\({}\)'
        self.anchorRegex: re.Pattern = re.compile(r'position = .?\d*.\d+,\s+(.?\d*.\d*),\s+(.?\d*.\d+),\s+.?\d*.\d+')
        self.forceConstantRegex: re.Pattern = re.compile(r'strength = (\d*.\d+),\s+(\d*.\d*)')
        ### this one should not be accessible from init try later first solve the samplesizes problem and the initialisation of the decorrelation engine
        self.forceConstantCorrectionFactor: float = 0.5

    def parse_anchors(self) -> np.ndarray:
        def search_anchor_in_line(file,CVcolumn):
            self.columnRegex = re.compile(self.columnPattern.format(CVcolumn))
            with open(file) as fileContent:
                    for line in fileContent:
                        match = re.search(self.columnRegex, line)
                        if match: 
                            anchor1, anchor2 = re.search(self.anchorRegex, line).group(1,2)
                            assert anchor1 == anchor2, "It seems, that your minimum is not defined as a point. Only harmonic potentials are implemented"
                            return float(anchor1)
        
        def loop_through_files_and_columns():
            anchors = []
            for file in self.files:
                anchors.append([search_anchor_in_line(file,CVcolumn) for CVcolumn in self.CVColumns])
            return anchors
        
        anchors = loop_through_files_and_columns()
        anchors = np.array(anchors).reshape(-1, len(self.CVColumns))
        return(anchors)
        
    def parse_force_constants(self) -> np.ndarray:
        def search_forceConstants_in_line(file,CVcolumn):
            self.columnRegex = re.compile(self.columnPattern.format(CVcolumn))
            with open(file) as fileContent:
                    for line in fileContent:
                        match = re.search(self.columnRegex, line)
                        if match: 
                            forceConstant1, forceConstant2 = re.search(self.forceConstantRegex, next(fileContent)).group(1,2)
                            assert forceConstant1 == forceConstant2, "It seems, that the steepness of your potential is asymmetric. Only harmonic potentials are implemented"
                            return float(forceConstant1)
        
        def loop_through_files_and_columns():
            forceConstants = []
            for file in self.files:
                forceConstants.append([search_forceConstants_in_line(file,CVcolumn) for CVcolumn in self.CVColumns])
            return forceConstants
            
        forceConstants = loop_through_files_and_columns()
        forceConstants = np.array(forceConstants).reshape(-1, len(self.CVColumns))
        return(forceConstants * self.forceConstantCorrectionFactor)
    
    
    def parse_collective_variables(self) -> list[np.ndarray]:
        CollectiveVariablesList = []
        for file in (tqdm(self.files) if self.verbose else self.files):
            
            CollectiveVariablesDF = pd.read_csv(file,delim_whitespace=True,comment='#',header=None,usecols=self.CVColumns)
            CollectiveVariablesList.append(CollectiveVariablesDF.values[self.subset:].T)
                 
        Hamiltonian_CollectiveVariable_Timeseries = np.column_stack((itertools.zip_longest(*CollectiveVariablesList, fillvalue=0)))
        Hamiltonian_CollectiveVariable_Timeseries[~np.isfinite(Hamiltonian_CollectiveVariable_Timeseries)] = 0
        Hamiltonian_CollectiveVariable_Timeseries = Hamiltonian_CollectiveVariable_Timeseries.reshape(len(self.files), len(self.CVColumns), -1)
        return Hamiltonian_CollectiveVariable_Timeseries
    

In [4]:
@dataclass
class DecorrelationEngine(ABC):
    
    Hamiltonian_CollectiveVariable_Timeseries: np.ndarray[float]
    Hamiltonian_CVColumnsSamplesize: np.ndarray[int]
    correlationTime: np.ndarray[float] = field(init=False)
    subsampleIndexArray: np.ndarray[int] = field(init=False)
        
    @abstractmethod
    def find_correlation_time() -> np.ndarray[float]:
        pass
    
    @abstractmethod
    def prepare_subsample_index_array() -> np.ndarray:
        pass
    
    @abstractmethod
    def resize_samplesizes() -> np.array:
        pass
    
    @abstractmethod
    def decorrelate_collective_variables() -> np.ndarray:
        pass

@dataclass
class DecorrelationBSE(DecorrelationEngine):
    ### Does this work upon init ?
    # def __post_init__(self, safetyFactor=2.0, minSamplesize=1000):
    safetyFactor: float
    minSamplesize: int
    
    def __post_init__(self):
        self.maxBlockSize: int = int(np.round(self.Hamiltonian_CVColumnsSamplesize / self.minSamplesize))
        
    def find_correlation_time(self) -> np.ndarray[float]:
        
        def logistic(x, a, k):
            return a / (1. + np.exp(-k * (x - 1))) - 0.5 * a

        def d2logistic_dx2(x, a, k):   # 2nd derivative of the logistic function above necessary for fitting a BSE curve
            return -a * k**2 * (np.exp(k*(x-1)) - 1) * np.exp(k*(x-1)) / (np.exp(k*(x-1)) + 1)**3        
        
        ### there should be a loop through columns aswell
        for i,_ in enumerate(self.Hamiltonian_CVColumnsSamplesize):
            samplesize = self.Hamiltonian_CVColumnsSamplesize[i]
            collectiveVariables = self.Hamiltonian_CollectiveVariable_Timeseries[i,:samplesize]
            
            BSE_List = []

            for blockSize in range(1,self.maxBlockSize):
                skip = samplesizes % blockSize   # to screen all integer blocksizes the first couple of CV sampes need to be excluded sometimes
                blockedAverages = np.mean(collectiveVariables[skip:].reshape(-1, blockSize), axis=0)
                blockedStandardError = np.std(blockedAverages)
                BSE_List.append(blockedStandardError / np.sqrt(blockSize))

            BSE = np.array(BSE_List)
            blockSizes = np.arange(BSE.shape[0])

            (logisticAmplitude, logisticGrowthRate), _ = opt.curve_fit(logistic, blockSizes, BSE)
            BSE_fit = logistic(blockSizes,
                               logisticAmplitude,
                               logisticGrowthRate)

            BSE_fit_2nd_derivative = d2logistic_dx2(blockSizes,
                                                    logisticAmplitude,
                                                    logisticGrowthRate)

            correlationTimeEstimates.append(np.argsort(BSE_fit_2nd_derivative)[0])    # grossfield et al: t_corr = 2 * inflection_point; but whatever
        self.correlationTime = np.array(correlationTimeEstimates * self.safetyFactor)
        
        return self.correlationTimeEstimate
    
    def prepare_subsample_index_array(self) -> None:
        subsampleIndexList = []
        for i,_ in eumerate(self.Hamiltonian_CVColumnsSamplesize):
            indices = np.arange(self.Hamiltonian_CVColumnsSamplesize[i])
            np.random.shuffle(indices)
            decorrelatedSamplesize = int(self.Hamiltonian_CVColumnsSamplesize[i]/self.correlationTime[i])
            indices[:decorrelatedSamplesize] = np.sort(indices[:decorrelatedSamplesize])
            subsampleIndexList.append(indices)
        self.subsampleIndexArray = np.array(subsampleIndexList)
    
    def resize_samplesizes(self) -> np.array:
        for i,_ in enumerate(self.Hamiltonian_CVColumnsSamplesize):
            self.Hamiltonian_CVColumnsSamplesize[i] = int(self.Hamiltonian_CVColumnsSamplesize[i]/self.correlationtime[i])
        return self.Hamiltonian_CVColumnsSamplesize
    
    def decorrelate_collective_variables(self) -> np.ndarray:
        for i,_ in enumerate(self.Hamiltonian_CollectiveVariable_Timeseries):
            self.Hamiltonian_CollectiveVariable_Timeseries[i] = self.Hamiltonian_CollectiveVariable_Timeseries[i,self.subsampleIndexArray[i]]
        return self.Hamiltonian_CollectiveVariable_Timeseries
    
        
        

In [15]:
@dataclass
class DataContainer:
    kB_kcalmolK: float = 0.001987204259
    temperatureK: np.ndarray = np.array(300)
    betaFactor: np.ndarray = 1 / (kB_kcalmolK*temperatureK)
    
    Anchors: np.ndarray = field(init=False)
    ForceConstants: np.ndarray = field(init=False)
    Hamiltonian_CollectiveVariable_Timeseries: list[np.ndarray] = field(init=False)
    Hamiltonian_CVColumnsSamplesize: np.ndarray = field(init=False)
    
    def ParseData(self, file_parser: FileParser) -> None:
        
        def samplesizes_from_collective_variables():
            samplesizes = [ts[ts!=0].shape[0] for CV_ts in self.Hamiltonian_CollectiveVariable_Timeseries for ts in CV_ts]
            samplesizes = np.array(samplesizes).reshape(self.Hamiltonian_CollectiveVariable_Timeseries.shape[:-1])
            return samplesizes
        
        self.Anchors = file_parser.parse_anchors()
        self.ForceConstants = file_parser.parse_force_constants()
        self.Hamiltonian_CollectiveVariable_Timeseries = file_parser.parse_collective_variables()
        self.Hamiltonian_CVColumnsSamplesize = samplesizes_from_collective_variables()
        
    def DecorrelateData(self, decor_engine: DecorrelationEngine) -> None:
        self.Hamiltonian_CollectiveVariable_Timeseries = DecorrelationEngine.decorrelate_collective_variables()
        self.Hamiltonian_CVColumnsSamplesize = DecorrelationEngine.resize_samplesizes()

In [14]:
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

flattened = [element for row in matrix for element in row]

print(flattened)

[1, 2, 3, 4, 5, 6, 7, 8]


In [16]:
# def main():
files = natsorted(glob('../_sim/13*/_output/pmd_p*[!a].txt'))

parser = AMBER_PMD_Parser(files, [5])

dG_oS_DataContainer = DataContainer()
dG_oS_DataContainer.ParseData(parser)

decorr_engine = DecorrelationBSE(dG_oS_DataContainer.Hamiltonian_CVColumnsSamplesize,
                                 dG_oS_DataContainer.Hamiltonian_CollectiveVariable_Timeseries,
                                3,1000) ### Does this work upon init ?

dG_oS_DataContainer.DecorrelateData(decorr_engine)

    # dG_oS_DataContainer.ParseData(AMBER_PMD_Parser)
    # dG_oS_dataloader.decorrelate(decorrelationScheme())
    # dG_oS_Energy = FE_contribution('PMF-like')
    
# if __name__ == '__main__':
    # main()

  Hamiltonian_CollectiveVariable_Timeseries = np.column_stack((itertools.zip_longest(*CollectiveVariablesList, fillvalue=0)))


TypeError: only size-1 arrays can be converted to Python scalars