In [1]:
# default_exp diffresults_handling

In [2]:
#hide
%reload_ext autoreload
%autoreload 2

### Data Structures to manage the quantitative information


In [3]:
#export

class QuantifiedMultiConditionComparison():
    """
    class that contains all information to be used for the multi-condition
    analysis.
    """
    def __init__(self):
        self._proteinname2multicondprots = {}
    
    def add_quantified_proteins_to_comparison(self, quantified_proteins):
        for quantified_protein in quantified_proteins:
            self._initialize_multicond_protein_if_necessary(quantified_protein)
            self._add_quantified_protein_to_multicond(quantified_protein)

    def get_quantified_protein(self, protein_name, condpair):
        multicondprot =  self._proteinname2multicondprots.get(protein_name)
        return multicondprot.condpair2quantified_protein.get(condpair)
            
            
    def _initialize_multicond_protein_if_necessary(self, quantified_protein):
        if quantified_protein.name not in self._proteinname2multicondprots:
            self._proteinname2multicondprots[quantified_protein.name] = QuantifiedProteinMultiCondition(quantified_protein.name)

    def _add_quantified_protein_to_multicond(self, quantified_protein):
        multicondprot = self._proteinname2multicondprots.get(quantified_protein.name)
        multicondprot.condpair2quantified_protein[quantified_protein.condpair] = quantified_protein

class QuantifiedProteinMultiCondition():
    def __init__(self, name):
        self.name = name
        self.condpair2quantified_protein = {}
    

class QuantifiedProteinInCondpair():
    def __init__(self, condpair, name, log2fc, p_value, fdr):
        self.condpair = condpair
        self.name = name
        self.log2fc = log2fc
        self.p_value = p_value
        self.fdr = fdr
    
        
class QuantifiedProteinCanditateInCondpair(QuantifiedProteinInCondpair):
    """stump for cluster-based protein processing"""
    def __init__(self, peptide_nodes_of_cluster):
        self.peptides = None
    
    def _get_quantprot_properties_from_peptide_nodes(self, peptide_nodes_of_cluster):
        pass


In [10]:
#hide
quantified_proteins = [QuantifiedProteinInCondpair("AvsB", "prot1",log2fc = 1, p_value = 2, fdr = 3), QuantifiedProteinInCondpair("AvsC", "prot1", log2fc = 4, p_value = 5,fdr = 6), 
QuantifiedProteinInCondpair("AvsD", "prot1", log2fc = 7, p_value = 8,fdr = 9), QuantifiedProteinInCondpair("AvsB", "prot2", log2fc = 10, p_value = 11,fdr = 12), QuantifiedProteinInCondpair("AvsB", "prot3", log2fc = 13, p_value = 14, fdr = 15)]

quant_multicomp = QuantifiedMultiConditionComparison()
quant_multicomp.add_quantified_proteins_to_comparison(quantified_proteins)

assert quant_multicomp.get_quantified_protein("prot1", "AvsB").log2fc ==1
assert quant_multicomp.get_quantified_protein("prot1", "AvsB").p_value ==2
assert quant_multicomp.get_quantified_protein("prot2", "AvsB").p_value ==11
assert quant_multicomp.get_quantified_protein("prot1", "AvsD").log2fc ==7

### Load results files

In [3]:
#export

class ResultsDirectoryReader():
    def __init__(self, results_dir, condpairs_selected):
        self.quantified_multicondition_comparison = QuantifiedMultiConditionComparison() #initialize empty multicomparison object
        
        self.__condpairs_selected = condpairs_selected
        self.__localizer = ResultstableLocalizer(results_dir)
        self._add_all_condpairs_to_multicondition_comparison()

    def _add_all_condpairs_to_multicondition_comparison(self):
        for condpair in self.__condpairs_selected:
            file = self.__localizer.condpairname2file.get(condpair)
            quantified_proteins = ResultsTableReader(condpair, file).quantified_proteins
            self.quantified_multicondition_comparison.add_quantified_proteins_to_comparison(quantified_proteins)

In [4]:
#export
import pandas as pd

class ResultsTableReader():
    def __init__(self, condpair, file):
        self.quantified_proteins = []
        
        self.__condpair = condpair
        self.__results_df = self._read_table(file)
        self.__protnames = self._get_property("protein")
        self.__log2fcs = self._get_property("log2fc")
        self.__p_values = self._get_property("pval")
        self.__fdrs = self._get_property("fdr")
        
        self._init_quantified_proteins()

    def _read_table(self, file):
        return pd.read_csv(file, sep = "\t")

    def _get_property(self, property):
        return list(self.__results_df[property])

    def _init_quantified_proteins(self):
        for idx in range(len(self.__protnames)):
            self.quantified_proteins.append(QuantifiedProteinInCondpair(condpair=self.__condpair, name = self.__protnames[idx], 
            log2fc= self.__log2fcs[idx], p_value=self.__p_values[idx], fdr = self.__fdrs[idx]))





In [5]:
#export
import re
class ResultstableLocalizer():
    def __init__(self, results_dir):
        self._resultsfilepaths = ResultsFiles(results_dir=results_dir).filepaths
        self.condpairname2file = {}
        self._load_condpairname2file()

    def _load_condpairname2file(self):
        for filepath in self._resultsfilepaths:
            filename = self._parse_condpairname_from_filepath(filepath)
            self.condpairname2file.update({filename: filepath})

    @staticmethod
    def _parse_condpairname_from_filepath(file):
        pattern = "(.*\/|^)(results.*\/)(.*)(.results.tsv)"
        matched = re.search(pattern, file)
        return matched.group(3)


import glob
class ResultsFiles():
    def __init__(self, results_dir):
        self._results_dir = results_dir
        self.filepaths = self.__get_result_filepaths()

    def __get_result_filepaths(self):
        return glob.glob(f'{self._results_dir}/*.results.tsv')
    


### Test results file loading


In [6]:
#hide
import pandas as pd
import os
import shutil
import random
import numpy as np


RESULTS_DIR_SIMULATED = "test_data/unit_tests/results_file_handling/toy_data/results"


class ResultsDirSimulator():
    def __init__(self, results_dir, resultswriterconfig_vec):
        self._results_dir = results_dir
        self._resultswriterconfig_vec = resultswriterconfig_vec
        self.condpairs = []


    def simulate(self):
        self._create_toy_results_dirs()
        self._write_out_simulated_dataframes()
    
    def _create_toy_results_dirs(self):
        self._create_or_replace_folder(self._results_dir)
    
    @staticmethod
    def _create_or_replace_folder(folder):
        if os.path.exists(folder):
            shutil.rmtree(folder)
        os.makedirs(folder)

    def _write_out_simulated_dataframes(self):
        for idx, resultswriterconfig in enumerate(self._resultswriterconfig_vec):
            condpairname = f"cond{idx}_VS_pair{idx}" #make up a name for the tables
            self.condpairs.append(condpairname)
            resultswriterconfig.filenames_w_config.append(condpairname)
            resutstable_simulator = ResultsTableSimulator(resultswriterconfig)
            self._save_dataframe(resutstable_simulator.protein_df, self._results_dir, condpairname)

    @staticmethod
    def _save_dataframe(df, results_dir, name):
        df.to_csv(f"{results_dir}/{name}.results.tsv", sep = "\t", index = None)


class ResultsTableSimulator():
    def __init__(self, resultswriterconfig):
        self.protein_df = None
        self._resultswriterconfig = resultswriterconfig
        self._simulate_dataframes_as_specified()
    
    def _simulate_dataframes_as_specified(self):
        self.protein_df = self._simulate_protein_df()

    def _simulate_protein_df(self):
        proteins = [f'name{x}' for x in range(self._resultswriterconfig.length)]
        fcs = self._get_fcs()
        fdr = [10**(-np.random.uniform(0, 10)) for x in range(self._resultswriterconfig.length)]
        pvals =[10**(-np.random.uniform(0, 10)) for x in range(self._resultswriterconfig.length)]
        return pd.DataFrame(data = {'protein' : proteins, 'log2fc' : fcs, 'fdr' : fdr, 'pval' : pvals})

    def _get_fcs(self):
        if self._resultswriterconfig.fc_for_all_proteins is not None:
            return [self._resultswriterconfig.fc_for_all_proteins for x in range( self._resultswriterconfig.length)]
        else:
            return [np.random.uniform(low = -2, high= 2) for x in range(self._resultswriterconfig.length)]

class ResultsWriterConfig():
    def __init__(self, length, fc_for_all_proteins = None):
        self.length = length
        self.filenames_w_config = []
        self.fc_for_all_proteins = fc_for_all_proteins




In [9]:
#hide

def test_that_expected_numbers_of_results_files_are_found(num_results_files):
    results_simulator = get_results_dir_simulator(num_results_files,fc_for_all_proteins=None)
    results_simulator.simulate()
    localizer = ResultstableLocalizer(RESULTS_DIR_SIMULATED)
    assert len(set(localizer.condpairname2file.keys())) == num_results_files
    print('performed tests')

def test_that_multicondition_objects_have_expected_fold_changes(num_results_files, fc_for_all_proteins):
    results_simulator = get_results_dir_simulator(num_results_files=num_results_files, fc_for_all_proteins= fc_for_all_proteins)
    results_simulator.simulate()
    reader = ResultsDirectoryReader(RESULTS_DIR_SIMULATED, results_simulator.condpairs)
    multicond_comparison = reader.quantified_multicondition_comparison

    check_consistency_for_protein(multicond_comparison, "name1", fc_for_all_proteins=fc_for_all_proteins,num_results_files=num_results_files)
    check_consistency_for_protein(multicond_comparison, "name15", fc_for_all_proteins=fc_for_all_proteins,num_results_files=num_results_files)
    check_consistency_for_protein(multicond_comparison, "name3", fc_for_all_proteins=fc_for_all_proteins,num_results_files=num_results_files)


    multicond_protein = multicond_comparison._proteinname2multicondprots["name13"]
    assert len(multicond_protein.condpair2quantified_protein) == num_results_files
    assert {x.log2fc for x in multicond_protein.condpair2quantified_protein.values()} == {fc_for_all_proteins}
    print('performed tests')
     

def get_results_dir_simulator(num_results_files, fc_for_all_proteins):
    config1 = ResultsWriterConfig(length = 50, fc_for_all_proteins=fc_for_all_proteins)
    config_vec = [config1 for x in range(num_results_files)]
    return ResultsDirSimulator(RESULTS_DIR_SIMULATED, config_vec)

def check_consistency_for_protein(multicond_comparison, protein_name, fc_for_all_proteins, num_results_files):
    multicond_protein = multicond_comparison._proteinname2multicondprots[protein_name]
    assert len(multicond_protein.condpair2quantified_protein) == num_results_files
    assert {x.log2fc for x in multicond_protein.condpair2quantified_protein.values()} == {fc_for_all_proteins}

num_results_files = 50
fc_for_all_proteins = 42

get_results_dir_simulator(num_results_files, fc_for_all_proteins)
test_that_expected_numbers_of_results_files_are_found(num_results_files)
test_that_multicondition_objects_have_expected_fold_changes(num_results_files, fc_for_all_proteins)

performed tests
performed tests
