# PGS data processing

This script is specifically designed to automate the process of downloading Alzheimer's Disease (AD) research data from the PGS catalog. It focuses on extracting files from FTP links provided in a CSV file and ensures that downloaded data undergoes preliminary data cleaning and quality control.

Script Purpose
*  Targeted Downloading: Automates the downloading of AD research data files from the PGS catalog, addressing the needs of researchers focusing on genetic contributions to Alzheimer's Disease.
* Data Cleaning and Quality Control: While the script prepares data for cleaning and quality checks, these processes are conducted separately to ensure data integrity and usability.
s.


## Import Libs

In [None]:
# import packages 
import pandas as pd
import requests
import os
import gzip
import os
from tqdm import tqdm
import re
import numpy as np

## Data Download And Decompress
1. **Reads the CSV**: Loads the CSV file and extracts file URLs from the specified column.
2. **Downloads Files**: Downloads each file from its URL to the specified folder.
3. **Checks Existing Files**: Prevents re-downloading files that already exist.
4. **Logs Status**: Outputs the status of each file download, noting both successes and failures.
5. **Decompress**: Outputs the decompressed data

In [None]:
# Set the path to the CSV file and the download directory
csv_file_path = os.path.join(r"C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data", "pgs_scores_Alzheimer.csv")
download_folder = os.path.join(r"C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data", "AlzheimerPGS")

# Ensure the download directory exists
os.makedirs(download_folder, exist_ok=True)
# Specify the directory where decompressed files should be stored
decompression_folder = os.path.join(download_folder, "PGS_Decompress")

# Ensure the decompression directory exists
os.makedirs(decompression_folder, exist_ok=True)

In [None]:

# Read the CSV data
df = pd.read_csv(csv_file_path)
compressed_files = []
# Process each row to handle file downloads and renaming
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Downloading and decompressing files"):
    file_url = row['Scoring File (FTP Link)']  # Extract the FTP link
    original_filename = file_url.split('/')[-1]  # Get the original filename from the URL
    
    core_filename = original_filename.split('.')[0]  # Extract the core identifier before the first '.'

    file_url = file_url.replace(original_filename, 'Harmonized/'+core_filename +'_hmPOS_GRCh38.txt.gz')
    file_path = os.path.join(download_folder, core_filename +'_hmPOS_GRCh38.txt.gz')  # Update the path with the new filename
    compressed_files.append(core_filename +'_hmPOS_GRCh38.txt.gz')
    # Check if the new file already exists
    if not os.path.exists(file_path):
        try:
            print(f"Downloading and renaming {original_filename} to {core_filename}...")
            response = requests.get(file_url, stream=True)
            if response.status_code == 200:
                with open(file_path, 'wb') as f:
                    f.write(response.content)
            else:
                print(f"Failed to download {original_filename}. Status code: {response.status_code}")
        except Exception as e:
            print(f"An error occurred while downloading {original_filename}: {e}")
    else:
        print(f"{new_filename} already exists.")
print(f'Download complete!')

In [None]:
# Get a list of all .gz files in the download directory
gz_files = [f for f in os.listdir(download_folder) if f.endswith('.gz')]

# Iterate over the list of files and decompress each one
for gz_file in tqdm(gz_files, desc='Decompressing files'):
    file_path = os.path.join(download_folder, gz_file)
    decompressed_file_path = os.path.join(decompression_folder, gz_file[:-3])  # Removes '.gz'

    # Open the gzipped file and decompress it
    with gzip.open(file_path, 'rb') as gzipped_file:
        with open(decompressed_file_path, 'wb') as decompressed_file:
            decompressed_file.write(gzipped_file.read())
            print(f"Decompressed {gz_file} to {decompressed_file_path}")

print("All files have been decompressed!")

## Data Cleaning

### Step 1
1. **Annotation**: we run the pipeline to annotate the files and fill the missing data with NA
2. **Removing the unknown rsID**: remove the unknown rsID
3. **File selection**: exclude the file raise errors

### For Files with issues, we annoate the hm_rsID

In [None]:
exclude_filenames = ['PGS003957','PGS003958']

### Step 2
1. **scan all files and get a annotation map**: the annotation map has columns of ['SNP_coord', 'hm_rsID', 'hm_chr', 'hm_pos', 'effect_allele'] with no duplicate variants

In [None]:
# Define the directory containing your CSV files
directory = r'C:\Users\gqu\Desktop\projects\Genevic\data\AlzheimerPGS\Annotated'

# List to store DataFrames
dataframes = []

# Loop through all the CSV files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.csv'):  # Check if the file is a CSV
        filepath = os.path.join(directory, filename)
        df = pd.read_csv(filepath)  # Read the CSV file
        dataframes.append(df)  # Append the DataFrame to the list

# Concatenate all DataFrames into a single DataFrame
full_df = pd.concat(dataframes, ignore_index=True)

# Select unique rows based on specific columns
annotation_map = full_df[['SNP_coord', 'hm_rsID', 'hm_chr', 'hm_pos', 'effect_allele']].drop_duplicates()

# Optionally, save the unique DataFrame to a new CSV file
annotation_map.to_csv(r'C:\Users\gqu\Desktop\projects\Genevic\data\annotation_map.csv', index=False)

# Print the unique DataFrame
print(annotation_map)

### Step 3
1. **Detect duplicates**: Duplicate CSV files are defined as the files containing the same set of rsID
2. **Merge duplicates**: Convert the effect weight to z-score and merge the duplicates by sum, append the ranks into a list

In [None]:
import os
import pandas as pd
import re

class UnionFind:
    def __init__(self):
        self.parent = {}

    def find(self, x):
        if self.parent[x] != x:
            self.parent[x] = self.find(self.parent[x])
        return self.parent[x]

    def union(self, x, y):
        rootX = self.find(x)
        rootY = self.find(y)
        if rootX != rootY:
            self.parent[rootY] = rootX
def extract_file_id(filename):
    # Define a helper to extract and validate the file ID from a filename
    match = re.match(r"(PGS\d+)_", filename)
    return match.group(1) if match else None
def find_duplicate_files(folder_path):
    file_data = {}
    file_sets = {}
    uf = UnionFind()
    
    # Iterate through each file in the folder
    for filename in os.listdir(folder_path):
        if filename.endswith('.csv'):
            file_id = extract_file_id(filename)
            if file_id:
                file_path = os.path.join(folder_path, filename)
                df = pd.read_csv(file_path)
                
                # Initialize union-find
                uf.parent[file_id] = file_id
                
                # Check for necessary columns
                if 'SNP_coord' in df.columns and 'hm_rsID' in df.columns:
                    # Create sets of 'SNP_coord' and 'hm_rsID'
                    snp_set = frozenset(df['SNP_coord'])
                    rsid_set = frozenset(df['hm_rsID'])
                    
                    # Store in dictionaries for comparison
                    file_data[file_id] = df
                    file_sets[file_id] = {'SNP_coord': snp_set, 'hm_rsID': rsid_set}

    # Identify matching files
    file_ids = list(file_sets.keys())
    
    for i in range(len(file_ids)):
        for j in range(i + 1, len(file_ids)):
            id1, id2 = file_ids[i], file_ids[j]
            if file_sets[id1]['SNP_coord'] == file_sets[id2]['SNP_coord'] or file_sets[id1]['hm_rsID'] == file_sets[id2]['hm_rsID']:
                uf.union(id1, id2)

    # Consolidate matches into groups, ensuring no self-only groups unless necessary
    groups = {}
    for file_id in file_ids:
        root = uf.find(file_id)
        groups.setdefault(root, []).append(file_id)
    
    # Filter out groups where each ID is only self-referencing, unless no other IDs are in the group
    final_groups = {k: v for k, v in groups.items() if len(v) > 1 or len(set(v)) > 1}

    return final_groups, file_data

In [None]:
from typing import Optional,List, Dict

def convert_to_z_scores(
    file_path: str,
    effect_size_col: str = 'effect_weight',
    rank_col: str = 'ranks',
    effect_allele_col: str = 'effect_allele',
    multiply_by_effect_allele_count: bool = False
) -> pd.DataFrame:
    """
    Reads a CSV file, optionally multiplies the absolute value of effect_weight by the number of effect alleles
    computed from effect_allele, converts the effect weights to Z-scores, and retains hm_rsID,
    effect_weight (Z-score), and ranks.

    Parameters:
    - file_path: Path to the CSV file.
    - effect_size_col: Name of the column containing effect weights.
    - rank_col: Name of the column containing ranks.
    - effect_allele_col: Name of the column containing effect alleles.
    - multiply_by_effect_allele_count: If True, multiplies the absolute value of effect_weight by the number of effect alleles
      before calculating Z-scores.

    Returns:
    - DataFrame with columns: hm_rsID, effect_weight (Z-score), ranks.
    """
    # Read the CSV file
    df = pd.read_csv(file_path)
    
    # Check if required columns exist
    required_columns = ['hm_rsID', effect_size_col, rank_col, effect_allele_col]
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"Column '{col}' not found in the file '{file_path}'.")

    # Optionally multiply the absolute value of effect_weight by the number of effect alleles
    if multiply_by_effect_allele_count:
        # Define a function to compute effect_allele_count
        def compute_effect_allele_count(effect_allele):
            if pd.isnull(effect_allele):
                return 0
            else:
                # Remove non-letter characters and count letters
                letters = re.findall(r'[A-Za-z]', effect_allele)
                return len(letters)
        
        # Compute effect_allele_count for each row
        df['effect_allele_count'] = df[effect_allele_col].apply(compute_effect_allele_count)
        # Multiply the absolute value of effect_weight by effect_allele_count
        df[effect_size_col] = np.abs(df[effect_size_col]) * df['effect_allele_count']
    else:
        # Convert effect_weight to its absolute value
        df[effect_size_col] = np.abs(df[effect_size_col])

    # Calculate mean and standard deviation of effect weights
    mu = df[effect_size_col].mean()
    sigma = df[effect_size_col].std()

    # Handle the case where standard deviation is zero
    if sigma == 0 or pd.isnull(sigma):
        df['effect_weight_z'] = 0
    else:
        # Compute Z-scores using the absolute values
        df['effect_weight_z'] = (df[effect_size_col] - mu) / sigma

    # Select the required columns
    df_z = df[['hm_rsID', 'effect_weight_z', rank_col]].copy()
    df_z.rename(columns={'effect_weight_z': 'effect_weight'}, inplace=True)

    return df_z


def merge_duplicate_dataframes(data_dict, duplicate_file_groups, effect_size_col='effect_weight', rank_col='ranks'):
    """
    Merges DataFrames in data_dict that correspond to duplicate files, summing effect_weight and collecting ranks into a list.

    Parameters:
    - data_dict: Dictionary with file_id as keys and DataFrames as values.
    - duplicate_file_groups: Dictionary where each key is a group representative file_id,
                             and the value is a list of duplicate file_ids (including the key).
    - effect_size_col: Name of the effect weight column in the DataFrames.
    - rank_col: Name of the rank column in the DataFrames.

    Returns:
    - A new data_dict with merged DataFrames for duplicate files.
    """
    # Create a copy of data_dict to avoid modifying the original
    merged_data_dict = data_dict.copy()
    
    # Keep track of file_ids that have been merged and should be removed
    file_ids_to_remove = set()
    
    for group_id, file_ids in duplicate_file_groups.items():
        # List to store DataFrames to be merged
        dfs_to_merge = []
        
        for file_id in file_ids:
            if file_id in merged_data_dict:
                dfs_to_merge.append(merged_data_dict[file_id])
                # Mark the file_id for removal if it's not the group_id
                if file_id != group_id:
                    file_ids_to_remove.add(file_id)
            else:
                print(f"Warning: file_id '{file_id}' not found in data_dict.")
        
        if dfs_to_merge:
            # Concatenate the DataFrames
            combined_df = pd.concat(dfs_to_merge, ignore_index=True)
            # Group by 'hm_rsID' and aggregate
            merged_df = combined_df.groupby('hm_rsID').agg({
                effect_size_col: 'sum',
                rank_col: lambda x: list(x)
            }).reset_index()
            # Update the data_dict with the merged DataFrame
            merged_data_dict[group_id] = merged_df
        else:
            print(f"No DataFrames found for group '{group_id}'.")
    
    # Remove the merged file_ids from data_dict
    for file_id in file_ids_to_remove:
        del merged_data_dict[file_id]
    
    return merged_data_dict


In [None]:
def filter_top_N(df, N=200, weight_column='effect_weight'):
    if len(df) > N:
        # Sort by 'effect_weight' in descending order and keep the top N
        return df.nlargest(N, weight_column)
    else:
        return df

In [None]:
import pandas as pd
import numpy as np
from functools import reduce
from collections import Counter

def filter_top_N_advanced(df, N=200, weight_column='effect_weight'):
    """
    Advanced Top-N Filtering:
    1. If df has more than N rows, determine a quantile cutoff for top fraction (N/len(df)).
    2. Filter rows above this threshold.
    3. If still more than N remain, select the top N.
    Otherwise, return df as-is if ≤ N.
    """
    if len(df) > N:
        quantile_cutoff = 1 - (N / len(df))
        threshold = df[weight_column].quantile(quantile_cutoff)
        filtered = df[df[weight_column] >= threshold]

        if len(filtered) > N:
            return filtered.nlargest(N, weight_column)
        else:
            return filtered
    else:
        return df

def overlap_metric(variant_sets):
    """
    Compute a partial overlap metric based on how many studies each variant appears in.
    Uses sum over variants of C(count, 2).
    """
    all_variants = [v for s in variant_sets for v in s]
    counts = Counter(all_variants)
    score = sum((c*(c-1))//2 for c in counts.values())
    return score

def maximize_overlap_fraction_threshold(study_dataframes, id_column='hm_rsID', 
                                        weight_column='effect_weight', max_N=500, fraction=0.9):
    """
    Choose N as the smallest N that achieves at least 'fraction' of the maximum overlap score.

    Parameters:
    - study_dataframes: list of pd.DataFrame, one per study.
    - id_column: name of the variant ID column.
    - weight_column: name of the effect weight column.
    - max_N: maximum N to evaluate.
    - fraction: the fraction of the maximum overlap score we want to achieve.

    Returns:
    - chosen_N: the smallest N that achieves at least 'fraction' * max overlap.
    - chosen_score: the overlap score at chosen_N.
    - max_score: the maximum overlap score achieved at any N.
    """
    scores = []
    
    for N in range(1, max_N + 1):
        filtered_dataframes = [filter_top_N_advanced(df, N=N, weight_column=weight_column) for df in study_dataframes]
        variant_sets = [set(df[id_column]) for df in filtered_dataframes if not df.empty and id_column in df.columns]
        
        if variant_sets:
            current_score = overlap_metric(variant_sets)
        else:
            current_score = 0
        scores.append((N, current_score))
    
    # Find the maximum score
    max_score = max(score for _, score in scores)
    
    # Compute the threshold score
    threshold_score = fraction * max_score
    
    # Find the smallest N that meets or exceeds the threshold score
    # If all scores are zero, it will pick N=1.
    chosen_N, chosen_score = min((N_s for N_s in scores if N_s[1] >= threshold_score),
                                 key=lambda x: x[0],
                                 default=(1, 0))  # default if none meets threshold
    
    return chosen_N, chosen_score, max_score

# Example usage:
# chosen_N, chosen_score, max_score = maximize_overlap_fraction_threshold(study_dataframes_list, fraction=0.9)
# print(f"Chosen N: {chosen_N}, Overlap Score at N: {chosen_score}, Max Score: {max_score}")


In [None]:
from functools import reduce
from collections import Counter
study_frames = []
for file_id, df in merged_data_dict.items():
    df_copy = df.copy(deep=True)
    study_frames.append(df_copy)
chosen_N, chosen_score, max_score = maximize_overlap_fraction_threshold(study_dataframes=study_frames, id_column='hm_rsID', 
                                        weight_column='effect_weight', max_N=500, fraction=0.9)

print(chosen_N, chosen_score, max_score)
    

In [None]:
def standardize_effect_weights(merged_data_dict, effect_size_col='effect_weight', rank_col='ranks'):
    """
    Standardizes the summed effect_weight for each hm_rsID in the merged_data_dict and replaces the list of ranks 
    with their average. Then, reranks the hm_rsIDs based on these averages.

    Parameters:
    - merged_data_dict: Dictionary where each key is a file_id and the value is a merged DataFrame.
    - effect_size_col: Name of the effect weight column (default is 'effect_weight').
    - rank_col: Name of the rank column containing lists of ranks (default is 'ranks').

    Returns:
    - A new dictionary with standardized effect weights and updated ranks for each DataFrame.
    """
    standardized_data_dict = {}

    for file_id, df in merged_data_dict.items():
        df_copy = df.copy(deep=True)
        df_copy = filter_top_N(df_copy, N=chosen_N)
        # Calculate the average rank where ranks are in a list and update the ranks column
        df_copy[rank_col] = df_copy[rank_col].apply(lambda x: np.mean(x) if isinstance(x, list) else x)

        # Calculate the standard deviation based on the count of elements used to compute each average rank
        df_copy['std_dev'] = df_copy[rank_col].apply(lambda x: np.sqrt(len(x)) if isinstance(x, list) else 1)

        # Standardize the effect_weight
        df_copy[effect_size_col] = df_copy.apply(
            lambda row: row[effect_size_col] / row['std_dev'] if row['std_dev'] > 0 else np.nan,
            axis=1
        )

        # Drop the intermediate column
        df_copy.drop(columns=['std_dev'], inplace=True)

        # Rerank based on the updated single rank values
        df_copy.sort_values(by=rank_col, ascending=True, inplace=True)
        df_copy[rank_col] = range(1, len(df_copy) + 1)

        # Update the DataFrame in the new dictionary
        standardized_data_dict[file_id] = df_copy

    return standardized_data_dict


In [None]:
# Usage example:
folder_path = r'C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data\AlzheimerPGS\processed'
matches, file_data = find_duplicate_files(folder_path)
print("Duplicate File Matches:", matches)

In [None]:
# Example usage for a single file
file_path = r'C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data\AlzheimerPGS\Annotated\PGS000026_annotated_dataset.csv'
df_z = convert_to_z_scores(file_path)

# View the result
print(df_z)
print(df_z['effect_weight'].sum())
file_path = r'C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data\AlzheimerPGS\Annotated\PGS000876_annotated_dataset.csv'
df_z = convert_to_z_scores(file_path)

# View the result
print(df_z)
print(df_z['effect_weight'].sum())

In [None]:
dataset = dict()
for filename in os.listdir(folder_path):
    if filename.endswith('.csv'):
        file_id = extract_file_id(filename)
        if file_id:
            file_path = os.path.join(folder_path, filename)
            df_z = convert_to_z_scores(file_path, multiply_by_effect_allele_count=True)
            dataset[file_id] = df_z
merged_data_dict = merge_duplicate_dataframes(dataset, matches)
        

In [None]:
# print(merged_data_dict.keys())
# print(len(merged_data_dict.keys()))
# print(merged_data_dict['PGS000026'])
# print(merged_data_dict['PGS000026']['effect_weight'].sum())
# print(merged_data_dict['PGS000811'])
# print(merged_data_dict['PGS000811']['effect_weight'].sum())
# print(merged_data_dict['PGS000025'])

In [None]:
processed_data = standardize_effect_weights(merged_data_dict)

In [None]:
import pickle

# Save the dictionary as a pickle file
with open(r'C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data\data_dict.pkl', 'wb') as f:
    pickle.dump(processed_data, f)

In [None]:
# print(merged_data_dict['PGS000026'])
# print(merged_data_dict['PGS000026']['effect_weight'].sum())
# print(merged_data_dict['PGS000811'])
# print(merged_data_dict['PGS000811']['effect_weight'].sum())

In [None]:
print(merged_data_dict['PGS000026'])
print(processed_data['PGS000026'])

### We have the processed data 
1. **processed_data** saved as "data_dict.pkl"
2. **annotation_map** saved as "annotation_map.csv"

In [None]:
# check if we can import those data
# Load annotation map
annotation_map_check = pd.read_csv(r'C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data\annotation_map.csv', low_memory=False)
# Load the dictionary back from the pickle file
with open(r'C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data\data_dict.pkl', 'rb') as f:
    data_dict = pickle.load(f)

print(annotation_map_check.head)
print(data_dict.keys())
print(data_dict['PGS000026'])

In [None]:
#PGS003992
print(data_dict['PGS003992'])

In [None]:
import pandas as pd
import os
import matplotlib.pyplot as plt
labels_df = pd.read_csv(os.path.join(r"C:\Users\gqu\OneDrive - UTHealth Houston\projects\Genevic\data","AD_GWAS_Priority_Scores.csv"))
filtered_df = labels_df[labels_df['priority_score'].notna()]

# Display the distribution
plt.figure(figsize=(10, 6))
plt.hist(filtered_df['priority_score'], bins=20, edgecolor='black')
plt.title('Distribution of Priority Score')
plt.xlabel('Priority Score')
plt.ylabel('Frequency')
plt.grid(True)
plt.savefig('y_dist.png')