In [5]:
import os
import pandas as pd
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score
import csv
from scipy.stats import pearsonr

def calculate_iqs_unphased(true_genotypes, imputed_dosages):
    # Convert imputed dosages to discrete values
    imputed_discrete = np.round(imputed_dosages).astype(int)

    # Clip the imputed discrete values to be within the range of 0 to 2
    imputed_discrete = np.clip(imputed_discrete, 0, 2)

    # Create a contingency table
    contingency_table = np.zeros((3, 3), dtype=int)

    # Fill the contingency table
    for true_geno, imputed_geno in zip(true_genotypes, imputed_discrete):
        for true_allele, imputed_allele in zip(true_geno, imputed_geno):
            contingency_table[int(true_allele), int(imputed_allele)] += 1

    # Calculate the total number of genotypes
    total_genotypes = np.sum(contingency_table)

    # Calculate observed proportion of agreement (Po)
    observed_agreement = np.trace(contingency_table) / total_genotypes

    # Calculate marginal sums
    row_marginals = np.sum(contingency_table, axis=1)
    col_marginals = np.sum(contingency_table, axis=0)

    # Calculate chance agreement (Pc)
    chance_agreement = np.sum((row_marginals * col_marginals) / (total_genotypes ** 2))

    # Calculate IQS
    if chance_agreement == 1:  # To prevent division by zero in case of perfect chance agreement
        iqs_score = 0
    else:
        iqs_score = (observed_agreement - chance_agreement) / (1 - chance_agreement)

    return iqs_score

def load_prs313_snps(prs313_path):
    prs313_df = pd.read_excel(prs313_path)
    prs313_snps = prs313_df[['Chromosome', 'Positionb', 'Reference Allele', 'Effect Allele']]
    prs313_snps = prs313_snps.set_index(['Chromosome', 'Positionb'])
    return prs313_snps

def parse_vcf(vcf_path, prs313_snps, sample_ids):
    allele_dosages = []
    positions = []
    with open(vcf_path, 'r') as f:
        for line in f:
            if line.startswith("#CHROM"):
                parts = line.strip().split("\t")
                vcf_sample_ids = parts[9:]
                if vcf_sample_ids != sample_ids:
                    raise ValueError("Sample IDs in VCF file do not match the expected order")
            elif not line.startswith("#"):
                parts = line.strip().split("\t")
                chrom = int(parts[0])
                pos = int(parts[1])
                ref = parts[3]
                alt = parts[4]

                if (chrom, pos) in prs313_snps.index:
                    try:
                        dosages = [float(gt.split(':')[1]) for gt in parts[9:]]
                    except:
                        vcf_alts = alt.split(",")
                        PRS313_alt = prs313_snps.loc[(chrom, pos)]["Effect Allele"]
                        print(f"multiallelic SNP found at {chrom}:{pos}")
                        dosages = [float(gt.split(':')[1][vcf_alts.index(PRS313_alt) * 2]) for gt in parts[9:]]

                    allele_dosages.append(dosages)
                    positions.append(pos)
    allele_dosages = np.array(allele_dosages).T  # Transpose to get samples as rows and SNPs as columns
    return allele_dosages, positions

def sort_true_labels(true_labels_df, positions):
    sorted_columns = sorted(true_labels_df.columns, key=lambda x: int(x.split('_')[1]))
    true_labels_df = true_labels_df[sorted_columns]
    sorted_positions = sorted(positions)
    try:
        position_index = [sorted_positions.index(int(col.split('_')[1])) for col in sorted_columns]
    except:
        print("Error in label sorting")
    return true_labels_df.values[:, position_index]

def safe_auc_metric(y_pred, y_true):
    if y_true.sum() == 0:
        print("All true labels are negative. Returning NaN.")
        return np.nan
    return roc_auc_score(y_true, y_pred)

def calculate_pearson_r_squared(true_labels, imputed_dosages):
    """Calculate Pearson's r² coefficient for each SNP"""
    pearson_r_squared = []
    
    for i in range(true_labels.shape[1]):
        # Calculate Pearson correlation coefficient
        r, _ = pearsonr(true_labels[:, i], imputed_dosages[:, i])
        # Square the correlation coefficient to get r²
        r_squared = r ** 2
        pearson_r_squared.append(r_squared)
    
    return np.array(pearson_r_squared)

def evaluate_metrics(true_labels, imputed_dosages):
    # Ensure both arrays have the same shape
    if (imputed_dosages.shape != true_labels.shape):
        raise ValueError("Shape of imputed dosages and true labels do not match")
    
    # Calculate Pearson's r² instead of R²
    pearson_r_squared_scores = calculate_pearson_r_squared(true_labels, imputed_dosages)
    
    # Calculate IQS
    iqs_scores = np.array([calculate_iqs_unphased(true_labels[:, i].reshape(-1, 1), imputed_dosages[:, i].reshape(-1, 1)) for i in range(true_labels.shape[1])])
    
    # Calculate accuracy
    correct_predictions = np.sum(true_labels == np.round(imputed_dosages))
    total_predictions = true_labels.size
    accuracy_scores = correct_predictions / total_predictions
    
    # Calculate AUC
    individual_aucs = [
        [
            safe_auc_metric((imputed_dosages[:, j].round() == i).astype(float), (true_labels[:, j] == i).astype(float))
            for i in range(3)
        ]
        for j in range(true_labels.shape[1])
    ]
    flattened_aucs = [auc for sublist in individual_aucs for auc in sublist]
    auc_scores = np.nanmean(flattened_aucs)

    # Flatten individual AUC scores into a single array for each SNP
    individual_auc_scores = np.nanmean(np.array(individual_aucs), axis=1)

    # Calculate individual accuracy for each SNP
    individual_accuracy_scores = np.array([
        ((true_labels[:, i] == np.round(imputed_dosages[:, i])).sum() / true_labels.shape[0])
        for i in range(true_labels.shape[1])
    ])

    return pearson_r_squared_scores, iqs_scores, accuracy_scores, auc_scores, individual_auc_scores, individual_accuracy_scores

def main():
    true_labels_dir = "../../../Data/y_test_labels_unphased/"
    imputed_vcf_dir = "../../../Data/Beagle_imputed_output/"
    prs313_path = "../../../Data/PRS313_with_23andMe.xlsx"

    # Load PRS313 SNPs
    prs313_snps = load_prs313_snps(prs313_path)

    # Initialize lists to store the performance metrics for each chromosome
    all_pearson_r_squared_scores = []
    all_iqs_scores = []
    all_accuracy_scores = []
    all_auc_scores = []
    all_chromosomes = []

    # Folders for saving results
    output_folder = "../../../Data/model_results_unphased_all_PRS/beagle/"
    csv_folder = output_folder + "csv_files/"

    if not os.path.exists(csv_folder):
        os.makedirs(csv_folder)

    # Assuming chromosomes are numbered 1 to 22
    for chr_num in range(1, 23):
        sample_ids = None

        true_labels_path = os.path.join(true_labels_dir, f"chr{chr_num}_true_labels_y_test.csv")
        if os.path.exists(true_labels_path):
            true_labels_df = pd.read_csv(true_labels_path, index_col=0)
            sample_ids = true_labels_df.index.tolist()

        if sample_ids is None:
            raise ValueError("No true labels file found to extract sample IDs")
    
        vcf_path = os.path.join(imputed_vcf_dir, f"chr{chr_num}_specific_positions.vcf")
        
        if os.path.exists(true_labels_path) and os.path.exists(vcf_path):
            # Load imputed dosages from VCF
            imputed_dosages, positions = parse_vcf(vcf_path, prs313_snps, sample_ids)

            if (true_labels_df.shape[1] != len(positions)):
                continue
            else:
                # Sort true labels to match imputed dosages order
                true_labels = sort_true_labels(true_labels_df, positions)
                pearson_r_squared_scores, iqs_scores, accuracy_scores, auc_scores, individual_auc_scores, individual_accuracy_scores = evaluate_metrics(true_labels, imputed_dosages)

                # Evaluate metrics
                all_pearson_r_squared_scores.append(pearson_r_squared_scores)
                all_iqs_scores.append(iqs_scores)
                all_accuracy_scores.append(accuracy_scores)
                all_auc_scores.append(auc_scores)
                all_chromosomes.append(chr_num)

                print(f"Chromosome {chr_num}: R2 Score = {np.mean(pearson_r_squared_scores):.4f}, IQS = {np.mean(iqs_scores):.4f}, Accuracy = {accuracy_scores:.4f}, AUC = {auc_scores:.4f}")

                # Get the names of the SNPs from the original dataframe
                snp_names = true_labels_df.columns

                individual_results_output_dir = f"{csv_folder}/chr{chr_num}"
                os.makedirs(individual_results_output_dir, exist_ok=True)

                # Save individual R2 Score scores to a CSV file
                csv_file = os.path.join(individual_results_output_dir, f'individual_r2_scores_chr{chr_num}.csv')

                with open(csv_file, 'w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['SNP', 'R2 Score'])
                    for snp, r_squared_score in zip(snp_names, pearson_r_squared_scores):
                        writer.writerow([snp, r_squared_score])

                print(f"Individual R2 Score saved at: {csv_file}")

                # Save individual IQS scores to a CSV file
                iqs_csv_file = os.path.join(individual_results_output_dir, f'individual_iqs_scores_chr{chr_num}.csv')

                with open(iqs_csv_file, 'w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['SNP', 'IQS Score'])
                    for snp, iqs_score in zip(snp_names, iqs_scores):
                        writer.writerow([snp, iqs_score])

                print(f"Individual IQS scores saved at: {iqs_csv_file}")

                # Save individual accuracy scores to a CSV file
                accuracy_csv_file = os.path.join(individual_results_output_dir, f'individual_accuracy_scores_chr{chr_num}.csv')

                with open(accuracy_csv_file, 'w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['SNP', 'Accuracy Score'])
                    for snp, accuracy_score in zip(snp_names, individual_accuracy_scores):
                        writer.writerow([snp, accuracy_score])

                print(f"Individual Accuracy scores saved at: {accuracy_csv_file}")

                # Save individual AUC scores to a CSV file
                auc_csv_file = os.path.join(individual_results_output_dir, f'individual_auc_scores_chr{chr_num}.csv')

                with open(auc_csv_file, 'w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['SNP', 'AUC Score'])
                    for snp, auc_score in zip(snp_names, individual_auc_scores):
                        writer.writerow([snp, auc_score])

                print(f"Individual AUC scores saved at: {auc_csv_file}")

        else:
            print(f"Files for Chromosome {chr_num} not found. Skipping...")

    # Drop nan
    no_nan_accuracies = [x for x in all_accuracy_scores if x != 0]
    overall_accuracy = np.mean(no_nan_accuracies)

    # Calculate overall metrics
    overall_pearson_r_squared = [np.mean(scores) for scores in all_pearson_r_squared_scores]
    overall_iqs = [np.mean(scores) for scores in all_iqs_scores]
    overall_auc = np.mean(all_auc_scores)

    print(f"Overall Pearson r²: {np.mean(overall_pearson_r_squared):.4f}")
    print(f"Overall IQS: {np.mean(overall_iqs):.4f}")
    print(f"Overall Accuracy: {overall_accuracy:.4f}")
    print(f"Overall AUC: {overall_auc:.4f}")

    # Save the overall performance metrics to a CSV file
    performance_df = pd.DataFrame({
        "Chromosome": all_chromosomes,
        'R2 Score': overall_pearson_r_squared,
        'IQS Score': overall_iqs,
        'Accuracy Score': all_accuracy_scores,
        'AUC Score': all_auc_scores
    })

    performance_csv_file = os.path.join(csv_folder, 'performance_metrics.csv')
    performance_df.to_csv(performance_csv_file, index=False)
    print(f"Performance metrics saved at: {performance_csv_file}")

if __name__ == "__main__":
    main()

All true labels are negative. Returning NaN.
Chromosome 1: R2 Score = 0.9052, IQS = 0.9160, Accuracy = 0.9631, AUC = 0.9540
Individual R2 Score saved at: ../../../Data/model_results_unphased_all_PRS/beagle/csv_files//chr1/individual_r2_scores_chr1.csv
Individual IQS scores saved at: ../../../Data/model_results_unphased_all_PRS/beagle/csv_files//chr1/individual_iqs_scores_chr1.csv
Individual Accuracy scores saved at: ../../../Data/model_results_unphased_all_PRS/beagle/csv_files//chr1/individual_accuracy_scores_chr1.csv
Individual AUC scores saved at: ../../../Data/model_results_unphased_all_PRS/beagle/csv_files//chr1/individual_auc_scores_chr1.csv
All true labels are negative. Returning NaN.
Chromosome 2: R2 Score = 0.8941, IQS = 0.9075, Accuracy = 0.9602, AUC = 0.9537
Individual R2 Score saved at: ../../../Data/model_results_unphased_all_PRS/beagle/csv_files//chr2/individual_r2_scores_chr2.csv
Individual IQS scores saved at: ../../../Data/model_results_unphased_all_PRS/beagle/csv_files