In [2]:
dir_path = "/home/lbrindel/output/western_diet_samples/all_samples/"
metadata = "~/Downloads/western_diet_exp/metadata_drama.tsv"
abundance_path = "~/Downloads/western_diet_exp/specI.mat"
taxonomic_path = "~/Downloads/western_diet_exp/taxonomies.tsv"
save_path = "/home/lbrindel/output/save_output_m2m"

In [2]:
from padmet.utils.sbmlPlugin import convert_from_coded_id as cfci
import pandas as pd
import os
from multiprocessing import Pool
from multiprocessing import cpu_count
from functools import partial


def get_scopes(file_name, path) -> pd.DataFrame:
    for root, dirs, files in os.walk(path):
        if file_name in files:
            scope_matrix = open_tsv(os.path.join(root, file_name), convert_cpd_id=True, rename_columns=True)
            return scope_matrix


def retrieve_all_sample_data_old_way(path):
    """Retrieve iscope, cscope, added_value and contribution_of_microbes files in the path given using os.listdir().

    Args:
        path (str): Directory path

    Returns:
        dict: Return a nested dict object where each key is a dictionnary of a sample. The key of those second layer dict [iscope, cscope, advalue, contribution] give acces to these files.
    """
    all_sample_data = {}
    for sample in os.listdir(path):
        if os.path.isdir(os.path.join(path, sample)):
            all_sample_data[sample] = {}
            all_sample_data[sample]["cscope"] = get_scopes("rev_cscope.tsv", os.path.join(path, sample))
            
    return all_sample_data


def retrieve_all_sample_data(sample, path):
    """Retrieve iscope, cscope, added_value and contribution_of_microbes files in the path given using os.listdir().

    Args:
        path (str): Directory path

    Returns:
        dict: Return a nested dict object where each key is a dictionnary of a sample. The key of those second layer dict [iscope, cscope, advalue, contribution] give acces to these files.
    """
    sample_directory_path = os.path.join(path, sample)
    if os.path.isdir(sample_directory_path):

        cscope_dataframe = get_scopes("rev_cscope.tsv", sample_directory_path)
        if cscope_dataframe is None:
            return None, sample

    return cscope_dataframe, sample


def melt_df_multi(dataframe: pd.DataFrame) -> pd.DataFrame:
    dataframe.reset_index(inplace=True)
    return dataframe.melt("smplID",var_name="Compound",value_name="Value")


def add_factor_column(metadata, serie_id, factor_id):
    if not is_indexed_by_id(metadata):
        metadata = metadata.set_index("smplID", drop=True)
    new_col = []
    for value in serie_id:
        new_col.append(str(metadata.at[value, factor_id]))
    return new_col


def is_indexed_by_id(df: pd.DataFrame):
    if df.index.name == "smplID":
        return True
    else:
        return False


def multiprocess_retrieve_data(path):
    """Open all directories given in -d path input. Get all cscopes tsv and load them in emomry as pandas
    dataframe. Also return a dataframe with the total production by each sample. 

    Args:
        path (str): Path of directory

    Returns:
        Tuple: (Dict , Dataframe)
    """
    retrieve_data = partial(retrieve_all_sample_data, path=path)

    nb_cpu = cpu_count() - 1
    if not type(nb_cpu) == int or nb_cpu < 1:
        nb_cpu = 1
    pool = Pool(nb_cpu)

    results_list = pool.map(retrieve_data,[sample for sample in os.listdir(path)])
    
    pool.close()
    pool.join()

    all_data = {}
    for df, smpl in results_list:
        if not df is None: 
            all_data[smpl] = {}
            all_data[smpl]["cscope"] = df

    return all_data


def sbml_to_classic(compounds_list):
    uncoded = []
    for coded in compounds_list:
        id, id_type, compart = cfci(coded)
        new_value = str(id)+"["+str(compart)+"]"
        uncoded.append(new_value)
    return uncoded

def open_tsv(file_name: str, convert_cpd_id: bool = False, rename_columns: bool = False, first_col: str = "smplID"):
    """Open tsv file as a pandas dataframe.

    Args:
        file_name (str): Path of the file
        rename_columns (bool, optional): Rename the first column and decode the metabolites names in sbml format into readable format. Defaults to False.
        first_col (str, optional): Label of the first col if rename_columns is True. Defaults to "smplID".

    Returns:
        Dataframe: Pandas dataframe
    """
    data = pd.read_csv(file_name, sep="\t")
    if rename_columns:
        data.columns.values[0] = first_col
    if convert_cpd_id:
        data.set_index(first_col,inplace=True,drop=True)
        data.columns = sbml_to_classic(data.columns.values)
    return data

def build_df(dir_path, metadata, abundance_path: str = None, taxonomic_path: str = None):
    """
    Extract community scopes present in directory from CLI then build a single dataframe from the metabolites produced by each comm_scopes.

    Args:
        dir_path (str): Directory path containing comm scopes
        metadata (tsv file): tsv file containing the metadata of the scopes. The number of row must be equal to the number of comm_scopes given in dir_path.

    Returns:
        global_data: dict
        sample_data: dict
        abundance_data: pandas dataframe
    """

    all_data = {}

    all_data["metadata"] = open_tsv(metadata)
    # print(all_data["metadata"].dtypes)

    # ## AUTO-CONVERT when opening seems fine.
    # all_data["metadata"] = all_data["metadata"].convert_dtypes()
    # print(all_data["metadata"].dtypes)
    # quit()
    all_data["sample_data"], all_data["producers_long_format"] = multiprocess_retrieve_data(dir_path, all_data["metadata"])
    quit()
    main_df = build_main_dataframe(all_data["sample_data"])

    all_data["main_dataframe"] = main_df

    if abundance_path is not None:
        try:
            raw_abundance_file = open_tsv(abundance_path)
            normalised_abundance_dataframe = relative_abundance_calc(raw_abundance_file, all_data["sample_data"])
        except Exception as e:
            print("Abundance process went wrong.",e)
            normalised_abundance_dataframe = None
    else:
        normalised_abundance_dataframe = None

    if taxonomic_path is not None:
        try:
            raw_taxonomic_data = open_tsv(taxonomic_path)
            long_taxonomic_data = taxonomic_data_long_format(
                    raw_taxonomic_data, get_bin_list(all_data["sample_data"]), all_data["metadata"].columns[1:],all_data["metadata"]
                )
        except Exception as e:
            print("Taxonomy process went wrong.", e)
            long_taxonomic_data = None
    else:
        long_taxonomic_data = None

    total_production_dataframe = total_production_by_sample(all_data["main_dataframe"], all_data["sample_data"], all_data["metadata"], normalised_abundance_dataframe)

    return all_data, normalised_abundance_dataframe, long_taxonomic_data, total_production_dataframe


def producers_by_compound_and_samples(all_data: dict, metadata: pd.DataFrame):
    all_producers = []
    for sample in all_data.keys():
        serie_value = []
        serie_index = []
        df = all_data[sample]["cscope"]
        for i in range(len(df.columns)):
            serie_index.append(df.columns[i])
            serie_value.append(df[df.columns[i]].to_numpy().sum())
        all_producers.append(pd.Series(serie_value,index=serie_index,name=sample))

    res = pd.concat(all_producers,axis=1).T
    res.fillna(0,inplace=True)
    res.index.name = "smplID"
    res.reset_index(inplace=True)
    res = pd.merge(res,metadata,'outer',"smplID")

    return res

def producers_by_compounds_and_samples_multi(all_data: dict, metadata: pd.DataFrame):
        
    cpu_available = cpu_count() - 1
    if not type(cpu_available) == int or cpu_available < 1:
        cpu_available = 1
    pool = Pool(cpu_available)
    all_producers = pool.starmap(individual_producers_processing,[(all_data[sample]["cscope"], sample) for sample in all_data.keys()])
    pool.close()
    pool.join()

    res = pd.concat(all_producers,axis=1).T
    res.fillna(0,inplace=True)
    res.index.name = "smplID"
    res.reset_index(inplace=True)
    res = pd.merge(res,metadata,'outer',"smplID")

    return res

def individual_producers_processing(sample_cscope: pd.DataFrame , sample: str):
        serie_value = []
        serie_index = []

        for i in range(len(sample_cscope.columns)):
            serie_index.append(sample_cscope.columns[i])
            serie_value.append(sample_cscope[sample_cscope.columns[i]].to_numpy().sum())
        return pd.Series(serie_value,index=serie_index,name=sample)


In [8]:
dir_path = "/home/lbrindel/output/western_diet_samples/all_samples/"
metadata = "~/Downloads/western_diet_exp/metadata_drama.tsv"
abundance_path = "~/Downloads/western_diet_exp/specI.mat"
taxonomic_path = "~/Downloads/western_diet_exp/taxonomies.tsv"
save_path = "/home/lbrindel/output/save_output_m2m"

all_data = {}

all_data["metadata"] = open_tsv(metadata)

all_data["sample_data_multi"] = multiprocess_retrieve_data(dir_path) # 1m 38.6s
all_data["producers_long_format"] = producers_by_compound_and_samples(all_data["sample_data_multi"],all_data["metadata"]) # 2m 36.9s (res_smpl1)
# CRASH at 17m (all_samples)


: 

In [3]:
dir_path = "/home/lbrindel/output/western_diet_samples/all_samples/"
metadata = "~/Downloads/western_diet_exp/metadata_drama.tsv"
abundance_path = "~/Downloads/western_diet_exp/specI.mat"
taxonomic_path = "~/Downloads/western_diet_exp/taxonomies.tsv"
save_path = "/home/lbrindel/output/save_output_m2m"

all_data = {}

all_data["metadata"] = open_tsv(metadata)

all_data["sample_data"] = multiprocess_retrieve_data(dir_path) # 4m 15.3s old ways
all_data["producers_long_format"] = producers_by_compounds_and_samples_multi(all_data["sample_data"],all_data["metadata"]) # 1m 46s (res_smpl1)
# 8m 6.6s (all_samples)