In [None]:
import pandas as pd
import numpy as np
from os import path
from datetime import datetime

In [ ]:
def load_pathways_genes():
    # with open(pathways_dir, 'r') as f:
    #     lines = [str.upper(x.strip()).split('\t') for x in f]
    # pathways = {x[0]: [int(y) for y in x[2:]] for x in lines}
    #
    # return pathways

    pathways = {}
    with open(pathway_file_dir, 'r') as f:
        for line in f:
            parts = line.strip().upper().split('\t')  # Split each line into parts
            if len(parts) < 3:  # If there are not enough parts for name, size, and genes
                continue

            pathway_name = parts[0]  # The pathway name is the first part
            try:
                pathway_size = int(parts[1])  # The pathway size is the second part
            except ValueError:
                continue  # Skip this line if the size is not an integer

            # Further split the gene part by spaces and then take the number of genes specified by pathway_size
            genes = parts[2]  # The third part is the space-separated list of gene IDs
            gene_list = genes.split()  # Split the genes by spaces

            # Convert each gene to an integer
            try:
                genes = [int(gene) for gene in gene_list[:pathway_size]]
            except ValueError:
                continue  # Skip this line if any gene is not an integer

            pathways[pathway_name] = genes  # Add the pathway and its genes to the dictionary

    return pathways

def get_scores():
    # Path to the file containing the raw scores (adjust as necessary)
    raw_scores_file_path = experiment_file_path

    try:
        # Load raw data from the file
        raw_data = pd.read_excel(raw_scores_file_path)

        # Perform necessary preprocessing on raw_data
        # For instance, sorting, filtering, or extracting specific columns
        # Assuming 'GeneID' and 'Score' are columns in the raw data
        sorted_raw_data = raw_data.sort_values(by='GeneID').reset_index(drop=True)

        # Create a dictionary for gene_id_to_score
        scores_dict = {gene_id: score for gene_id, score in zip(sorted_raw_data['GeneID'], sorted_raw_data['Score'])}
        return scores_dict

    except FileNotFoundError:
        print(f"File not found: {raw_scores_file_path}")
        return pd.DataFrame(), {}
    except Exception as e:
        print(f"An error occurred: {e}")
        return pd.DataFrame(), {}

In [ ]:
def perform_statist():
    significant_pathways_with_genes = {}
    ks_p_values = []
    pathway_names = []
    mw_p_values = []
    # Filter genes for each pathway, only includes genes that are in the experiment and in the pathway file
    genes_by_pathway_filtered = {
        pathway: [gene_id for gene_id in genes if gene_id in scores]
        for pathway, genes in genes_by_pathway.items()
    }

    # Filter pathways based on gene count criteria
    pathways_with_many_genes = [
        pathway for pathway, genes in genes_by_pathway_filtered.items()
        if minimum_gene_per_pathway <= len(genes) <= maximum_gene_per_pathway
    ]

    # Perform statistical tests
    print('After filtering:', len(pathways_with_many_genes))
    for pathway in pathways_with_many_genes:
        pathway_scores = [scores[gene_id] for gene_id in genes_by_pathway_filtered[pathway]]
        background_genes = set(scores.keys()) - set(genes_by_pathway_filtered[pathway])
        background_scores = [scores[gene_id] for gene_id in background_genes]
        result = statistic_test(pathway_scores, background_scores)

        task.results[pathway] = PathwayResults(p_value=result.p_value, direction=result.directionality)
        ks_p_values.append(result.p_value)
        pathway_names.append(pathway)

    # Apply BH correction
    adjusted_p_values = bh_correction(np.array(ks_p_values))

    # Filter significant pathways based on adjusted p-values
    for i, pathway in enumerate(pathway_names):
        if adjusted_p_values[i] < 0.05:  # Using a significance threshold of 0.05
            significant_pathways_with_genes[pathway] = genes_by_pathway_filtered[pathway]

    specific_pathways = [
        "WP_DISRUPTION_OF_POSTSYNAPTIC_SIGNALING_BY_CNV",
        "WP_HIPPOCAMPAL_SYNAPTOGENESIS_AND_NEUROGENESIS",
        "WP_SYNAPTIC_SIGNALING_PATHWAYS_ASSOCIATED_WITH_AUTISM_SPECTRUM_DISORDER",
        "REACTOME_NEUROTRANSMITTER_RECEPTORS_AND_POSTSYNAPTIC_SIGNAL_TRANSMISSION"
    ]
    # print the p-values of specific pathways
    for pathway in specific_pathways:
        if pathway in task.results:
            print(f'{pathway}: {task.results[pathway].p_value}')

    # Mann-Whitney U test and FDR
    for pathway in pathways_with_many_genes:
        pathway_scores = [scores[gene_id] for gene_id in genes_by_pathway_filtered[pathway]]
        background_genes = set(scores.keys()) - set(genes_by_pathway_filtered[pathway])
        background_scores = [scores[gene_id] for gene_id in background_genes]

        # Perform Mann-Whitney U Test
        u_stat, mw_pval =  wilcoxon_rank_sums_test(pathway_scores, background_scores, alternative='two-sided')
        mw_p_values.append(mw_pval)
        pathway_names.append(pathway)

    # Apply BH correction to Mann-Whitney p-values
    adjusted_mw_p_values = bh_correction(np.array(mw_p_values))

    # Filter significant pathways based on adjusted Mann-Whitney p-values
    for i, pathway in enumerate(pathway_names):
        if adjusted_mw_p_values[i] < 0.05:  # Using a significance threshold of 0.05
            significant_pathways_with_genes[pathway] = genes_by_pathway_filtered[pathway]

    # Filter pathways based on adjusted p-values and Jaccard index
    filtered_pathways = {}
    JAC_THRESHOLD = 0.05  # Set your Jaccard threshold
    for i, pathway_i in enumerate(pathway_names):
        if adjusted_mw_p_values[i] > 0.05:
            continue

    return significant_pathways_with_genes

In [ ]:
def kolmogorov_smirnov_test(experiment_scores, control_scores, alternative='two-sided') -> StatResults:
    """
    Performs the Kolmogorov-Smirnov test on two sets of scores.

    Args:
        experiment_scores (array_like): Array of scores from the experiment group.
        control_scores (array_like): Array of scores from the control group.
        alternative (str, optional): Defines the alternative hypothesis. Possible values are 'two-sided', 'less', or 'greater'.

    Returns:
        StatResults: Object containing the p-value, directionality, and name of the test.
    """
    ks_stat, p_value = ks_2samp(experiment_scores, control_scores, alternative=alternative)


    # Convert lists to numpy arrays and sort
    experiment_scores = np.sort(experiment_scores)
    control_scores = np.sort(control_scores)

    # Initialize variables
    en1 = len(experiment_scores)
    en2 = len(control_scores)


    # Calculate empirical cumulative distribution functions for both sets
    data_all = np.concatenate([experiment_scores, control_scores])
    cdf_experiment = np.searchsorted(experiment_scores, data_all, side='right') / en1
    cdf_control = np.searchsorted(control_scores, data_all, side='right') / en2

    # Find the maximum distance
    D = np.max(np.abs(cdf_experiment - cdf_control))

    # Calculate the KS statistic
    en = np.sqrt(en1 * en2 / (en1 + en2))
    p_value = ks((en + 0.12 + 0.11 / en) * D)

    # Determine directionality
    if np.mean(experiment_scores) > np.mean(control_scores):
        direction = 'greater'
    elif np.mean(experiment_scores) < np.mean(control_scores):
        direction = 'less'
    else:
        direction = 'not significant'

    return StatResults(p_value=p_value, directionality=direction, name="Kolmogorov-Smirnov Test")




def ks(alam):
    EPS1 = 1e-6  # Convergence criterion based on the term's absolute value
    EPS2 = 1e-10  # Convergence criterion based on the sum's relative value
    a2 = -2.0 * alam**2  # Squared and negated lambda for exponential calculation
    fac = 2.0
    sum = 0.0
    termbf = 0.0

    # Iteratively calculate the KS probability
    for j in range(1, 101):
        term = fac * np.exp(a2 * j**2)  # Calculate term of the series
        sum += term  # Add to sum

        # Check for convergence
        if np.abs(term) <= EPS1 * termbf or np.abs(term) <= EPS2 * sum:
            return sum

        fac = -fac  # Alternate the sign
        termbf = np.abs(term)  # Update term before flag

    # Return 1.0 if the series does not converge in 100 terms
    return 1.0

In [ ]:
minimum_gene_per_pathway = 20
maximum_gene_per_pathway = 60
FDR_threshold = 0.05
root_path = path.dirname(path.realpath(__file__))
output_path = path.join(root_path, 'Outputs')
figure_name = 'figure'
figure_title = 'Pathway Enrichment'

In [ ]:
experiment_name = "roded_T_v_N"
species = 'H_sapiens'
statistic_test = kolmogorov_smirnov_test
results = dict()
root_folder = path.dirname(path.realpath(__file__))
data_file = 'Data'
genes_names_file = 'H_sapiens.gene_info'  # optional if needed
date = datetime.today().strftime('%d_%m_%Y__%H_%M_%S')
pathway_file = 'pathways'
data_dir = path.join(root_folder, data_file)
genes_names_file_path = path.join(data_dir, species, 'genes_names', genes_names_file)
pathway_file_dir = path.join(data_dir, species, 'pathways', pathway_file)
input_dir = path.join(root_folder, 'Inputs', 'experiments_data')
experiment_file_path = path.join(input_dir, f'{experiment_name}.xlsx')
output_folder = path.join(root_folder, 'Outputs', 'enrichment_scores', experiment_name)
genes_by_pathway = load_pathways_genes()
scores = get_scores()

In [ ]:
print("running enrichment")
significant_pathways_with_genes = perform_statist()

print("Time elapsed: {} seconds".format(end - start))
