In [None]:
from glob import glob
from PipeLine import *
import pandas as pd
import numpy as np

from functools import reduce

import statsmodels.tsa.stattools as tsas
import statsmodels.graphics.tsaplots as tsap

In [None]:
def all_in_one_attributes(
                    attr_path, attr_name, properties_db, simulation_type,
                    geometry, direction=None, save_to=True):
    """takes ensemble-averaged data of a given attribute and performs the \
    following operations on them: First, it concatenates the \
    ensemble-averaged files into one dataframe and adds the properties of \
    the ensemble to the merged files. Finally, it combines all the \
    ensembles from all the groups into one dataframe.

    Parameters:
    attr_path (str): path to the the files of theattribute of interest.
    attr_name (str): name of the attribute of interest.
    properties_db (Pandas dataframe): The database of ensemble-average \
    properties.
    simulation_type (str): the type of particles for which the analysis is \
    one. For now, it is either 'all' or 'bug'. 'all' means all the species \
    in the system. 'bug' means the whole polymeric chain.
    geometry (str): Geomtery of the simulation box.
    direction (str): Direction along which the distributions are computed.
    save_to (str): address to which the ensemble files of a group are saved.

    Return;
    A pandad databse in which all the dsitbrutions of all the simulation \
    groups are merged.
    """
    attr_ext = "-" + attr_name + "-ens_avg.csv"
    attr_csvs = glob(attr_path + "/N*" + attr_ext)
    attr_csvs = organizer.sort_filenames(attr_csvs, extensions=[attr_ext])
    properties = pd.read_csv(properties_db, index_col=0)
    attr_db = []
    properties[['ens_name', 'garbage']] = properties.filename.str.split(
        pat='-', expand=True)  # find the ensemble names.
    selected_cols = [
        'filename', 'nmon', 'dcyl', 'dcrowd', 'phi_c_bulk', 'phi_c_bulk_eff',
        'ens_name']
    for attr_csv in attr_csvs:
        attr_df = pd.read_csv(attr_csv[0], index_col=0)
        ens_name = list(attr_df.columns)[0].split('-')[0]  # name of the file
        cell_attrs = SumRule(ens_name, geometry, warning=False)

        dist_names = [
            'HistsCrd', 'HistsMon', 'PhisCrd', 'PhisMon', 'RhosCrd', 'RhosMon',
            'FloryHists', 'Hists', 'Rhos', 'Phis']
        if (direction is not None) and \
                (attr_name.split(direction)[-1] in dist_names):  \
                # a distribtuion needs the index column
            attr_df = pd.read_csv(
                attr_csv[0], names=[attr_name], skiprows=1, index_col=0)
            attr_df.reset_index(inplace=True)
            attr_df.rename(columns={'index': direction}, inplace=True)
            bin_center_norm_box = {
                'r': cell_attrs.dcyl,
                'z': cell_attrs.lcyl,
                'theta': 4 * np.pi
                }
            attr_df[direction+'_norm'] = \
                2 * attr_df[direction] / bin_center_norm_box[direction]
            if direction != 'theta':
                attr_df[direction+'_norm_mon'] = attr_df[direction]  \
                    # This should be divided by 'dmon' but this operation \
                # is not done since dmon = 1.0
                attr_df[direction+'_norm_crd'] = \
                    attr_df[direction] / cell_attrs.dcrowd
        else:
            attr_df = pd.read_csv(
                attr_csv[0], names=[attr_name], skiprows=1, index_col=0)
            attr_df.reset_index(inplace=True)
            attr_df.rename(columns={'index': 'time'}, inplace=True)
            attr_df['time'] = attr_df['time'] * cell_attrs.dt
        for col in selected_cols:
            cond = properties['ens_name'] == ens_name
            attr_df[col] = properties[cond][col].values[0]
        # Defining concise name for ensembles and groups
        attr_df['ens_name'] = f"N{cell_attrs.nmon}D{cell_attrs.dcyl}\
            ac{cell_attrs.dcrowd}nc{cell_attrs.ncrowd}"
        attr_df['group_name'] = f"N{cell_attrs.nmon}D{cell_attrs.dcyl}\
            ac{cell_attrs.dcrowd}"
        attr_df.drop(['filename'], axis=1, inplace=True)
        attr_db.append(attr_df)
    attr_db = pd.concat(attr_db)
    attr_db.reset_index(inplace=True, drop=True)
    if save_to:
        attr_save_to = properties_db.split('all_in_one')[0] + "all_in_one-"
        + simulation_type + attr_ext
        attr_db.to_csv(attr_save_to)
    return attr_db


In [None]:
%%time
# distributions:
database = '/Users/amirhsi_mini/analysis/'
dist_db= database+"/N*-all-analysis-ens_avg/N*.csv"
properties_db  = database+'all_in_one-properties-ens_avg-normalized.csv'
simulation_type = 'all'
geometry='cylindrical'
radial_dists = PipeLine.generator_dist_all_in_one(dist_db, properties_db, simulation_type, geometry, 'radial')
longitudinal_dists = PipeLine.generator_dist_all_in_one(dist_db, properties_db, simulation_type, geometry, 'longitudinal')

In [None]:
%%time
from dask.distributed import Client
from dask import delayed
from dask import compute
client = Client(n_workers=2, threads_per_worker=1, silence_logs=1)
client
database = '/Users/amirhsi_mini/analysis/'
prop_path= database+"/N*-bug-analysis-ens_avg"
properties_db  = database+'all_in_one-properties-ens_avg-normalized.csv'
simulation_type = 'bug'
geometry='cylindrical'
analysis_delayed = []
attrs_dict = {'rFloryHists': 'r', 'rHists': 'r', 'zHists': 'z','thetaHists': 'theta','rPhis' : 'r',
              'zPhis': 'z','rRhos': 'r', 'zRhos': 'z'}
for attr, direction in attrs_dict.items():
    analysis = delayed(PipeLine.all_in_one_properties)(prop_path, attr, properties_db, simulation_type, geometry, direction = direction)
    analysis_delayed.append(analysis)
results = compute(analysis_delayed)

In [None]:
%%time
## for a a loop with one core, it takes ~ 4 min
## This does NOT work with more than 2 workers.
database = '/Users/amirhsi_mini/analysis/'
prop_path= database+"/N*-bug-analysis-ens_avg"
properties_db  = database+'all_in_one-properties-ens_avg-normalized.csv'
simulation_type = 'bug'
geometry='cylindrical'
#attrs_dict = {'fsd_t': None, 'gyr_t': None, 'rFlory_t': None}
attrs_dict = {'fsd_t-acf_ci_lower': None, 'gyr_t-acf_ci_lower': None, 'rFlory_t-acf_ci_lower': None, 'fsd_t-acf_ci_upper': None, 'gyr_t-acf_ci_upper': None, 'rFlory_t-acf_ci_upper': None, 'fsd_t-acf_only': None, 'gyr_t-acf_only': None, 'rFlory_t-acf_only': None}
for attr, direction in attrs_dict.items():
    _ = PipeLine.all_in_one_properties(prop_path, attr, properties_db, simulation_type, geometry, direction = None)

In [None]:
merging_cols = ['time', 'nmon', 'dcyl', 'dcrowd', 'phi_c_bulk', 'phi_c_bulk_eff', 'ens_name', 'group_name']
attrs_names = attrs_dict.keys() # change this to use.
database = '/Users/amirhsi_mini/analysis/'
file_type = 'all_in_one'
simulation_type = 'bug'
db_type = 'ens_avg.csv'
dfs_to_be_merged = []
for attr in attrs_names:
    df_name = database + "-".join([file_type, simulation_type, attr, db_type])
    df = pd.read_csv(df_name, index_col=0)
    dfs_to_be_merged.append(df)
df_merged = reduce(lambda  left, right: pd.merge(left, right, on=merging_cols), dfs_to_be_merged)
df_merged.to_csv(database+"-".join([file_type, simulation_type, "acf_t", db_type]))

In [None]:
merge_dfs(attrs_names.keys, merging_cols)