# Libraries

In [44]:
import cobra as cb
import logging
logging.basicConfig(filename="log.txt"  , level=logging.INFO)
import os
import pandas as pd
import numpy as np
import itertools
from itertools import combinations_with_replacement
import scipy.stats as stats

# Set up

In [11]:
modelNames = ["ENGRO 1", "ENGRO 2"]

In [12]:
##############################################
# Create a folder if it doesn't already exist
# Parameters
# - path --> new folder path
##############################################
def createFolder(path):
    if not os.path.exists(path):
            os.mkdir(path)

# Loading models

In [13]:
##############################################
# Load the models files (.xml)
# Parameters
# - modelNames --> list of models names that must
# match the file names
# - modelFolder --> the folder containing the 
# models files
##############################################
def loadModels(modelNames, modelFolder):
    models = {}
    for modelName in modelNames:
        files = os.listdir(modelFolder)
        found = False
        for file in files:
            filename, extension = os.path.splitext(file)
            if(filename == modelName):
                found = True
                break
        if(found):
            if(extension == ".xml"):
                models[modelName] = cb.io.read_sbml_model(modelFolder + filename + extension)
            else:
                raise ImportError('Model file extension not supported')
        else:
            raise FileNotFoundError('File not found')
    return models

In [14]:
modelsDict = loadModels(modelNames, "../../models/")
modelReactionsDict = {}
for modelName in modelNames:
    listReactions = []
    for reaction in modelsDict[modelName].reactions:
        listReactions.append(reaction.id)
    modelReactionsDict[modelName] = listReactions

# Similarity tests

In [42]:
##############################################
# 
# Parameters
# - path --> new folder path
##############################################
def kstest(s1, s2):
    return stats.ks_2samp(s1, s2)

##############################################
# 
# Parameters
# - path --> new folder path
##############################################
def mannwhitney(s1, s2):
    return stats.mannwhitneyu(s1, s2)

##############################################
# It creates a pairs list for statistical tests with
# the files created by the sampling notebook. Each pair
# refers to two dataset with equal number 
# of samples, but from different executions.
# Parameters
# - names --> list of file names as ("nsamples_executionIndex_algorithm.csv")
##############################################
def testPairsCreator(algorithms, samples, executionsPerSamples):
    
    testNames = {}
    
    for algorithm in algorithms:
        testNames[algorithm] = []
        names = []
        if(samples == [-1]):
            for j in range(0, executionsPerSamples):
                    names.append(str(j) + "_" + "0_"+ algorithm + ".csv")
        else:
            for i in samples:
                for j in range(0, executionsPerSamples):
                    names.append(str(i) + "_" + str(j) + "_"+ algorithm + ".csv")
        temp = []
        last_nsample = names[0].split("_")[0]
        for name in names:
            nsample = name.split("_")[0]
            if(nsample == last_nsample):
                temp.append(name + "/")
                last_nsample = nsample
            else:
                temp_list = list(itertools.combinations_with_replacement(temp,2))
                temp_list=[(el[0],el[1]) for el in temp_list if el[0]!=el[1]]
                testNames[algorithm].extend(temp_list)
                temp = []
                temp.append(name + "/")
                last_nsample = nsample
        temp_list = list(itertools.combinations_with_replacement(temp,2))
        temp_list=[(el[0],el[1]) for el in temp_list if el[0]!=el[1]]
        testNames[algorithm].extend(temp_list)
    return testNames


##############################################
# 
# Parameters
# - path --> new folder path
##############################################
def similarityTest(modelNames, modelsDict, modelReactionsDict, pairList, tests, thinnings, groupedBy,
                   elementPath, resultPath):
    
    createFolder(resultPath)
    
    for modelName in modelNames:
        createFolder(resultPath + modelName)
        for algorithm in pairList:
            for thinning in thinnings:
                for test in tests:
                    createFolder(resultPath + modelName + "/" + test)
                    dfColumns = ['testName']
                    dfColumns.extend(modelReactionsDict[modelName])
                    resultDf = pd.DataFrame(columns = dfColumns)
                    lenDf = 0
                    nReactions = len(modelReactionsDict[modelName])
                    for pair in pairList[algorithm]:
                        if(thinning == -1):
                            samplePath = elementPath + modelName + "/" + algorithm + "groupedBy" + str(groupedBy)+ "/"
                        else:
                            samplePath = elementPath + modelName + "/" + algorithm + "Thinning" + str(thinning) + "/"
                        test_name = (pair[0] + pair[1]).split("/")
                        del test_name[-1]
                        formattedNameEl1 = test_name[0].split('_')
                        formattedNameEl2 = test_name[1].split('_')
                        res = [formattedNameEl1[0] + "_" + formattedNameEl1[1] + 
                               "_" + formattedNameEl2[0] + "_" + formattedNameEl2[1]]
                        logging.info("Executing test: " + res[0])
                        logging.info("Files: " + samplePath + test_name[0] + 
                              " and " + samplePath + test_name[1])
                        s1 = pd.read_csv(samplePath + test_name[0], index_col = 0)
                        s2 = pd.read_csv(samplePath + test_name[1], index_col = 0)
                        for h in range(nReactions):
                            if(test == "kstest"):
                                res.append(kstest(s1.iloc[:, h], s2.iloc[:, h]).pvalue)
                            elif(test == "mannwhitney"):
                                res.append(mannwhitney(s1.iloc[:, h], s2.iloc[:, h]).pvalue)
                            else:
                                raise NotImplementedError('Algorithm not supported')
                        resultDf.loc[lenDf] = res
                        lenDf = lenDf + 1
                    if(thinning == -1):
                        resultDf.set_index('testName').to_csv(resultPath + modelName + "/" + 
                                                              test + "/" + algorithm + "groupedBy" + str(groupedBy) + ".csv")
                    else:
                        resultDf.set_index('testName').to_csv(resultPath + modelName + "/" + 
                                                              test + "/" + algorithm + "Thinning" + str(thinning) + ".csv")
    pass

In [None]:
algorithms = ["achr", "optgp", "chrr"]

samplesNList = []
for i in range(1000, 30001, 1000):
    samplesNList.append(i)
    
executionsPerSampleSize = 20

thinnings = [1, 10, 100]

testNames = testPairsCreator(algorithms, samplesNList, executionsPerSampleSize)

tests = ["kstest", "mannwhitney"]

samplesFolder = "../../samples/"

resultFolder = "../../results/FDR/"

similarityTest(modelNames, modelsDict, modelReactionsDict, testNames, tests, thinnings, samplesFolder, resultFolder)


In [None]:
algorithms = ["cbs3"]

samplesNList = [-1]

groupedBy = 1000

executions = 20

thinnings = [-1]

testNames = testPairsCreator(algorithms, samplesNList, executions)

tests = ["kstest", "mannwhitney"]

samplesFolder = "../../samples/"

resultFolder = "../../results/FDR/"

similarityTest(modelNames, modelsDict, modelReactionsDict, testNames, tests, thinnings, groupedBy, 
               samplesFolder, resultFolder)