<h1> Polygenic Risk Score (PRS) Calculation using Cartesian Genetic Programming (CGP)

---

Authors:<br>
Martin Hurta<br>
Faculty of Information Technology,<br>
Brno University of Technology,<br>
Czechia<br>

Jana Schwarzerova<br>
Faculty of Electrical Engineering and Communication,<br>
Brno University of Technology,<br>
Czechia<br>

---

Version: 1.0<br>
Last update: 2023-10-05

---


Import of libraries and software

In [None]:
# Publicly available libraries and software
import numpy as np
import pandas as pd
from multiprocessing import Pool, cpu_count
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt
# CGP library yet to be published
from cgp import CGP
from functions import add, sub, mul, p_div, identity, p_sqrt, power_f, sin_f, cos_f, abs_f, minimum_f, maximum_f

Load dataset data from provided paths. If list of specific snps is provided, only they will be loaded.

Args:
* genotype_path (str): String of path to genotype data.
* phenotype_path (str): String of path to phenotype data.
* snps (ndarray): Array of snps names only which will be loaded. If none, whole dataset will be loaded.

Returns:
* ndarray: 2D array of loaded genotype data. Axis 0 corresponds to alleles. Axis 1 corresponds to samples.
* ndarray: 1D array of phenotype data.
* ndarray: 1D array with names of alleles.

In [None]:
def load_dataset(genotype_path, phenotype_path, snps=None):
    # Load genotype data (SNPs) and phenotype data
    df_genotype = pd.read_csv(genotype_path)
    df_phenotype = pd.read_csv(phenotype_path)

    # If list of snps is provided, filter out the rest
    if snps is not None:
        df_genotype = df_genotype[np.append('Ind', snps)]

    # Connect corresponding data
    df_data = pd.merge(df_genotype, df_phenotype, on='Ind')

    # Remove sample names and create separate input genotype data and phenotypes
    df_data = df_data.drop(['Ind'], axis=1)
    df_data_x = df_data.drop(['phenotype'], axis=1)
    df_data_y = df_data['phenotype']

    # Save SNP labels
    snp_labels = np.asarray(df_data_x.columns)

    # Create numpy arrays for use in CGP
    nd_data_x = np.asarray(df_data_x).transpose()
    nd_data_y = np.asarray(df_data_y)

    return nd_data_x, nd_data_y, snp_labels

Training of CGP on data of selected allele.
5-fold cross validation is used to obtain 5 separate solutions.
Mean fitness across individual folds is returned together with all 5 CGP chromosomes.

Args:
* data_x (ndarray): 1D array of train input data.
* data_y (ndarray): 1D array of train target data.
* cgp_params (dict): Parameters of CGP algorithm.

Returns:
* ndarray: 1D Array with mean fitness over all 5 folds and CGP chromosomes from all 5 folds.

In [None]:
def snp_calculation(data_x, data_y, cgp_params):
    # Fitness and found solutions for the current SNP
    fitness = []
    solutions = []

    # Create 5-fold of data for better picture on CGP performance
    skf = KFold(5, shuffle=True)

    # Create indices for splitting of data
    splits = skf.split(data_x, data_y)

    # Run CGP on each of 5 folds and thus get 5 results and 5 CGP solutions
    for i, (train_index, test_index) in enumerate(splits):
        # Get data to be used for training in current fold
        train_x = data_x[train_index]
        train_y = data_y[train_index]

        # Create CGP according to provided parameters
        cgp = CGP(primary_inputs=1, rows=cgp_params["num_of_rows"], columns=cgp_params["num_of_columns"],
                  primary_outputs=1, lback=cgp_params["num_of_columns"], pop_size=5, cardinality=2,
                  constants=cgp_params["constants"], unary=cgp_params["unary_functions"],
                  binary=cgp_params["binary_functions"], mute=True)

        # Run evolution and obtain solution and its fitness on train data
        solution, train_fitness = cgp.evolution(input_vals=[train_x], target=[train_y],
                                                num_of_generations=cgp_params["num_of_generations"], error_type='MSE')

        # Save results of the current fold
        fitness.append(train_fitness)
        solutions.append(solution)

    # Return mean train fitness and best solutions over all folds
    return np.append(np.mean(fitness), solutions)

Learning of CGP on all alleles, effectively running GWAS.
Results of GWAS are saved into results_path. Results include row for each allele and columns: allele_name, train fitness and columns for five all parts of CGP genotype.

Args:
* genotype_path (str): String of path to genotype data.
* phenotype_path (str): String of path to phenotype data.
* cgp_params (dict): Parameters of CGP algorithm.
* results_path (str): File path to save the results.


In [None]:
def cgp_gwas(genotype_path, phenotype_path, cgp_params, results_path):
    # Load input SNPs data, target phenotypes and labels of individual SNPs
    data_x, data_y, snp_labels = load_dataset(genotype_path, phenotype_path)

    # Size of CGP chromosome (number_of_nodes(=rows*columns) * (cardinality+function) + number_of_outputs)
    num_of_genes = (2 + 1) * cgp_params["num_of_rows"] * cgp_params["num_of_columns"] + 1

    # Results of CGP, axis 0 corresponds to the number of SNPs,
    # axis 1 contains fitness and chromosomes of found solution per each fold
    results = np.empty(shape=(len(data_x), 1 + num_of_genes * 5))

    # Names of columns in results
    results_columns = ['Fitness'] + ['Gen_' + str(i // num_of_genes) + '_{}'.format(i % num_of_genes)
                                     for i in range(num_of_genes * 5)]

    # Multiprocess results and array of processes
    pool = Pool(processes=cpu_count())
    processes = []

    # Run CGP algorithm on each SNP and get final fitness and solutions
    for snp in data_x:
        processes.append(pool.apply_async(snp_calculation, args=(snp, data_y, cgp_params)))

    # Get results of CGP from processes
    for i in range(len(processes)):
        results[i] = processes[i].get()

    # Crate dataframe of results and add SNP labels as first column
    res_df = pd.DataFrame(results, index=snp_labels, columns=results_columns)
    res_df = res_df.reset_index()
    res_df = res_df.rename(columns={'index': 'SNP'})

    # Save all results into a file
    res_df.to_csv(results_path)

Get names and CGPs of SNPs in the best 20% of results obtained by GWAS.

Args:
* results_path (str): Path to the file with GWAS results.

Returns:
* ndarray: 1D array with names of SNPs that met the threshold.
* ndarray: 2D array with CGP chromosomes for SNPs that met the threshold.
* ndarray: 1D array with fitness on SNPs that met the threshold.

In [None]:
def get_the_top_snps(results_path):
    # Read results csv file and drop the first unnamed column
    df_results = pd.read_csv(results_path)
    df_results = df_results.drop('Unnamed: 0', axis=1)

    # Filter the results of SNPs with fitness in the lowest 20% of all SNPs
    n_to_use = int(np.ceil(len(df_results) / 5))
    df_filtered = df_results.nsmallest(n_to_use, 'Fitness')

    # Fitness obtained on individual filtered SNPs
    nd_fitness = np.asarray(df_filtered['Fitness'])

    # Names of filtered SNPs
    nd_snp_labels = np.asarray(df_filtered['SNP'], dtype='str')

    # Drop unwanted columns and left only CGP chromosome
    df_chromosomes = df_filtered.drop(['SNP', 'Fitness'], axis=1)

    # Get numpy array with chromosomes of said SNPs
    nd_chromosomes = np.asarray(df_chromosomes, dtype='int')

    # Separate 5 chromosomes obtained for each SNP to individual chromosomes
    nd_separate_chromosomes = np.asarray([np.array_split(snp_chromosomes, 5) for snp_chromosomes in nd_chromosomes])

    return nd_snp_labels, nd_separate_chromosomes, nd_fitness

Calculation of PRS from results in provided GWAS file.
PRS is calculated for all samples with usage of SNPs with fitness in the top 20% of all SNPs.

Args:
* results_path (str): Path to the file with GWAS results.
* genotype_path (str): String of path to genotype data.
* phenotype_path (str): String of path to phenotype data.
* cgp_params (dict): Parameters of CGP algorithm.

Raises:
* Exception: Insufficient number of snps with sufficient fitness!

In [None]:
def cgp_prs(results_path, genotype_path, phenotype_path, cgp_params):
    # Get target phenotype data
    _, nd_data_y, _ = load_dataset(genotype_path, phenotype_path)

    # Calculation of mean phenotype to be used as measure of low vs high plant yield
    mean_phenotype = np.mean(nd_data_y)

    # Creation of masks for higher and lower values
    higher_mask = nd_data_y >= mean_phenotype
    lower_mask = nd_data_y < mean_phenotype

    # Get SNPs in the top 20% and corresponding CGPs
    snps, chromosomes, fitness = get_the_top_snps(results_path + '.csv')

    # Inform if there are not any snps with sufficient threshold
    if len(snps) < 1:
        raise Exception("Insufficient number of snps with sufficient fitness!")

    # Load dataset, but only selected SNPs
    nd_data_x, nd_data_y, snp_labels = load_dataset(genotype_path, phenotype_path, snps)

    # Create CGP
    cgp = CGP(primary_inputs=1, rows=cgp_params["num_of_rows"], columns=cgp_params["num_of_columns"], primary_outputs=1,
              lback=cgp_params["num_of_columns"], pop_size=1, cardinality=2, constants=cgp_params["constants"],
              unary=cgp_params["unary_functions"], binary=cgp_params["binary_functions"], mute=True)

    # Results of CGPs on SNPs data
    results = []

    # Go over each SNP and corresponding CGP solutions
    for x, snp_chromosomes in zip(nd_data_x, chromosomes):

        # Results over all CGPs
        snp_results = []

        # Calculate SNP by all five chromosomes
        for chromosome in snp_chromosomes:
            snp_results.append(cgp.run_candidate_solutions([x], chromosome))

        # Calculate mean over five CGP solutions from different folds
        beta_over_snp = np.mean(snp_results, axis=0)[0][0]

        # Append results over all samples on current SNP
        results.append(beta_over_snp)

    # Calculate sum over SNPs for each sample
    prs_array = np.asarray(np.sum(results, axis=0))

    # Filtering of calculated PRS to ones belonging to samples with phenotype higher or lower than mean
    higher_prs = prs_array[higher_mask]
    lower_prs = prs_array[lower_mask]

    return lower_prs, higher_prs

Show boxplot of PRS for samples with phenotype lower or higher than phenotype.

Args:
* results_path (str): Name of used file with results.
* higher_prs (ndarray): PRS values for samples with higher phenotype.
* lower_prs (ndarray): PRS values for samples with lower phenotype.

In [None]:
def boxplot_prs(results_path, lower_prs, higher_prs):
    # Creation of boxplot
    fig, ax = plt.subplots()

    # Title of plot
    ax.set_title('PRS on ' + results_path + ' data')

    # Creation ox boxplot
    ax.boxplot([lower_prs, higher_prs], labels=['Low', 'High'])

    # Axis labels
    ax.set_xlabel('Rating of plants yield')
    ax.set_ylabel('PRS')

    # Drawing of the boxplot
    plt.show()

Main body with parameters settings and run of CGP "GWAS" and PRS calculation.

In [None]:
# Path to file with CGP GWAS results
results_path = "results.csv"

# Genotype file path
genotype_path = "Data/Genotype.csv"

# Phenotype file path
phenotype_path = "Data/Phenotype.csv"

# Parameters of CGP
cgp_params = {
    "num_of_generations": 100,
    "num_of_rows": 5,
    "num_of_columns": 5,
    "constants": [0, 1, 2, np.pi],
    "unary_functions": [identity, sin_f, cos_f, p_sqrt, power_f, abs_f],
    "binary_functions": [add, sub, mul, p_div, minimum_f, maximum_f]
}

# GWAS using CGP
cgp_gwas(genotype_path, phenotype_path, cgp_params, results_path)

# Calculation of PRS from GWAS data from previous step
lower_prs, higher_prs = cgp_prs(results_path, genotype_path, phenotype_path, cgp_params)

# Draw box plots of PRS
boxplot_prs(results_path, lower_prs, higher_prs)