In [1]:
import collections
import datetime
import logging
import os
import subprocess

import matplotlib.pyplot as plt
import pandas as pd
import yaml

# TODO 
# does the org plink files have a reference? different versions GRCh37/b37 and Hg19  https://gatk.broadinstitute.org/hc/en-us/articles/360035890951-Human-genome-reference-builds-GRCh38-or-hg38-b37-hg19
# if so, use that for liftover
# if not, use the hs37-1kg reference panel for liftover

# We likly need to add some plink commands:
# --keep-allele-order Use this EVERY SINGLE TIME you call a plink command, otherwise the order of Allele1 and Allele2 may (or probably will) be flipped in your data. \
# --allow-no-sex PLINK will default to removing individuals that have unassigned sex, use this to force it to keep them. \
# --snps-only Removes indels from your variant data and keeps only snps \
# --biallelic-only Removes sites with 2+ alleles \

# Set up logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Function to run shell commands
def run_command(command) -> tuple[bool, str]:
    try:
        result = subprocess.run(
            command, shell=True, check=True, text=True, capture_output=True
        )
        logging.info(result.stdout)
        return True, result.stdout
    except subprocess.CalledProcessError as e:
        logging.error(f"Command '{e.cmd}' returned non-zero exit status {e.returncode}.")
        logging.error(f"Error message: {e.stderr}")
        return False, e.stderr

def create_directory(directory) -> None:
    if not os.path.exists(directory):
        os.makedirs(directory)
        logging.info(f"Created directory: {directory}")
    else:
        logging.info(f"Directory already exists: {directory}")

def update_build(batch_prefix, strand_file, output_dir):
    create_directory(output_dir)
    temp_dir = os.path.join(output_dir, "temp")
    create_directory(temp_dir)

    temp_prefix = os.path.join(temp_dir, "TEMP_FILE_XX72262628_")
    chr_file = f"{strand_file}.chr"
    pos_file = f"{strand_file}.pos"
    flip_file = f"{strand_file}.flip"

    run_command(f"cut -f 1,2 {strand_file} > {chr_file}")
    run_command(f"cut -f 1,3 {strand_file} > {pos_file}")
    run_command(f"awk '{{if ($5==\"-\") print $0}}' {strand_file} | cut -f 1 > {flip_file}")

    run_command(f"plink --bfile {batch_prefix} --update-map {chr_file} --update-chr --make-bed --out {temp_prefix}1")
    run_command(f"plink --bfile {temp_prefix}1 --update-map {pos_file} --make-bed --out {temp_prefix}2")
    run_command(f"plink --bfile {temp_prefix}2 --flip {flip_file} --make-bed --out {temp_prefix}3")
    run_command(f"plink --bfile {temp_prefix}3 --extract {pos_file} --make-bed --out {os.path.join(output_dir, 'updated')}")

    # Cleanup temporary files
    for file in [f"{temp_prefix}{i}" for i in range(1, 4)]:
        for ext in [".bed", ".bim", ".fam", ".log", ".nosex"]:
            try:
                os.remove(file + ext)
            except FileNotFoundError:
                pass

    logging.info("Process completed successfully.")
    return os.path.join(output_dir, "updated")

def update_rsids(input_prefix, output_prefix, rsid_data):
    success, message = run_command(f"plink2 --bfile {input_prefix} --update-name {rsid_data} --make-bed --out {output_prefix}")
    if success and all(os.path.exists(f"{output_prefix}.{ext}") for ext in ["bed", "bim", "fam"]):
        logging.info(f"RS IDs updated successfully for {output_prefix}")
        return output_prefix
    else:
        logging.error(f"Failed to update RS IDs for {output_prefix}: {message}")
        return None

def identify_duplicate_iids(input_prefix):
    fam_file = f"{input_prefix}.fam"
    duplicates_file = f"{input_prefix}_duplicate_iids.txt"

    try:
        with open(fam_file, 'r') as file:
            family_data = [line.strip().split()[:2] for line in file.readlines()]
    except FileNotFoundError:
        logging.error(f"File not found: {fam_file}")
        return None

    iid_to_fids = collections.defaultdict(list)
    for fid, iid in family_data:
        iid_to_fids[iid].append(fid)

    duplicates = [(fid, iid) for iid, fids in iid_to_fids.items() for fid in fids if len(fids) > 1]

    if duplicates:
        with open(duplicates_file, 'w') as file:
            file.write("#FID\tIID\n")
            for fid, iid in duplicates:
                file.write(f"{fid}\t{iid}\n")
        logging.info(f"Duplicate FID-IID pairs identified and written to {duplicates_file}")
        return duplicates_file
    else:
        logging.warning(f"No duplicate IIDs found in {input_prefix}.fam")
        return None

def remove_duplicate_samples(input_prefix, output_prefix):
    duplicates_file = identify_duplicate_iids(input_prefix)
    if duplicates_file:
        remove_command = f"plink2 --bfile {input_prefix} --remove {duplicates_file} --make-bed --out {output_prefix}"
        success, message = run_command(remove_command)
        if success:
            logging.info(f"Duplicate samples removed successfully for {output_prefix}")
            return output_prefix
        else:
            logging.error(f"Failed to remove duplicate samples for {output_prefix}: {message}")
            return None
    else:
        logging.error(f"No duplicate IIDs identified for {input_prefix}")
        return None

def plink_bed_to_vcf(bed_file, bim_file, fam_file, fasta_file=None, out_prefix=None, snps_only=True, chr_prefix=True):
    if out_prefix is None:
        out_prefix = os.path.splitext(bed_file)[0]

    snps_only_option = "--snps-only 'just-acgt'" if snps_only else ""
    chr_prefix_option = "--output-chr chrM" if chr_prefix else ""
    fasta_option = f"--ref-from-fa --fa {fasta_file}" if fasta_file else ""

    command1 = f"plink2 --bed {bed_file} --bim {bim_file} --fam {fam_file} --make-pgen --merge-x --sort-vars {snps_only_option} --out sorted"
    run_command(command1)

    command2 = f"plink2 --pfile sorted --export vcf id-paste=iid bgz {fasta_option} --out {out_prefix} {chr_prefix_option}"
    run_command(command2)

    command3 = "rm sorted.*"
    run_command(command3)

    vcf_path = f"{out_prefix}.vcf.gz"
    if os.path.exists(vcf_path):
        logging.info(f"VCF file generated successfully: {vcf_path}")
        command4 = f"bcftools index --threads 10 {vcf_path}"
        run_command(command4)
        logging.info(f"Indexing completed for VCF file: {vcf_path}")
    else:
        logging.error(f"Failed to generate VCF file at {vcf_path}")
        return None

    return vcf_path

def create_frequency_files(vcf_path, dataset_prefix):
    """Create frequency files from VCF using bcftools."""
    output_vcf = f"{dataset_prefix}_AF.vcf.gz"
    freq_file = f"{dataset_prefix}.frq"
    
    # Generate VCF with allele frequencies
    command1 = f"bcftools +fill-tags {vcf_path} -Oz -o {output_vcf} -- -t AF"
    if not run_command(command1):
        return None
    
    # Extract frequency data to a text file
    command2 = f"""bcftools query -f '%CHROM\\t%CHROM_%POS_%REF_%ALT\\t%REF\\t%ALT\\t%INFO/AF\\n' {output_vcf} | sed '1iCHR\\tSNP\\tREF\\tALT\\tAF' > {freq_file}"""
    if not run_command(command2):
        return None
    
    return freq_file

def analyze_frequency_files(freq_file, ref_freq_file, dataset_prefix, af_diff_limit=0.1):
    """Analyze frequency files to compare allele frequencies."""
    # Load frequency data
    freq_data = pd.read_csv(freq_file, sep='\t')
    ref_data = pd.read_csv(ref_freq_file, sep='\t')
    
    # Merge data on SNP identifiers
    merged_data = pd.merge(freq_data, ref_data, on='SNP', suffixes=('.chip', '.ref'))
    
    # Calculate differences in allele frequencies
    merged_data['AF_diff'] = abs(merged_data['AF.chip'] - merged_data['AF.ref'])
    
    # Filter data where differences exceed the threshold
    high_diff_data = merged_data[merged_data['AF_diff'] > af_diff_limit]
    
    # Plotting
    plt.figure(figsize=(10, 6))
    plt.scatter(merged_data['AF.ref'], merged_data['AF.chip'], c='blue', label='Within Threshold')
    plt.scatter(high_diff_data['AF.ref'], high_diff_data['AF.chip'], c='red', label='Above Threshold')
    plt.xlabel('Reference Allele Frequency')
    plt.ylabel('Chip Allele Frequency')
    plt.title(f'Allele Frequency Comparison for {dataset_prefix}')
    plt.legend()
    plt.grid(True)
    plt.savefig(f"{dataset_prefix}_allele_frequency_comparison.png")
    plt.show()
    
    return high_diff_data

def liftover_to_hg38(input_vcf, output_bcf, reject_bcf, src_fasta, ref_fasta, chain_file):
    command = (f"bcftools +liftover --no-version -Ou {input_vcf} -- "
               f"-s {src_fasta} -f {ref_fasta} -c {chain_file} --reject {reject_bcf} --write-src | "
               f"bcftools sort -o {output_bcf} -Ob --write-index")
    success, message = run_command(command)
    if success:
        logging.info(f"Liftover completed successfully for {input_vcf}")
        return output_bcf
    else:
        logging.error(f"Liftover failed for {input_vcf}: {message}")
        return None

def load_config(config_path):
    with open(config_path, 'r') as file:
        return yaml.safe_load(file)

def main():
    config = load_config('config.yaml')

    batches = config['batches']
    strand_files = config['strand_files']
    rsid_files = config['rsid_files']
    human_g1k_v37_path = config['human_g1k_v37_path']
    GRCh37_path = config['GRCh37_path']
    GRCh38_path = config['GRCh38_path']
    chain_file = config['chain_file']

    updated_batches = {}
    for batch, path in batches.items():
        output_dir = f"data/1_updated/{batch}"
        updated_batches[batch] = update_build(path, strand_files[batch], output_dir)
        logging.info(f"Updated batch {batch} successfully. Output path: {updated_batches[batch]}")

    updated_rsids = {}
    for batch, updated_path in updated_batches.items():
        output_dir = f"data/2_updated_rsids/{batch}"
        create_directory(output_dir)
        output_prefix = os.path.join(output_dir, batch)
        result_path = update_rsids(updated_path, output_prefix, rsid_files[batch])
        if result_path:
            updated_rsids[batch] = result_path
            logging.info(f"Updated rsIDs for batch {batch} successfully. Output path: {updated_rsids[batch]}")
        else:
            logging.error(f"Error: No path provided for batch {batch}.")

    removed_duplicates = {}
    for batch, updated_path in updated_rsids.items():
        output_dir = f"data/3_removed_duplicates/{batch}"
        create_directory(output_dir)
        output_prefix = os.path.join(output_dir, f"{batch}_no_duplicates")
        removed_duplicates[batch] = remove_duplicate_samples(updated_path, output_prefix)
        if removed_duplicates[batch]:
            logging.info(f"Removed duplicates for batch {batch} successfully. Output path: {removed_duplicates[batch]}")
        else:
            logging.error(f"Failed to remove duplicates for batch {batch}.")

    vcf_outputs = {}
    for batch, updated_path in removed_duplicates.items():
        output_dir = f"data/4_vcf_outputs/{batch}"
        create_directory(output_dir)
        output_prefix = os.path.join(output_dir, batch)
        if updated_path is None:
            logging.error(f"Error: No path provided for batch {batch}.")
            continue
        bed_file = f"{updated_path}.bed"
        bim_file = f"{updated_path}.bim"
        fam_file = f"{updated_path}.fam"

        if not all(os.path.exists(file) for file in [bed_file, bim_file, fam_file]):
            logging.error(f"Missing required files for batch {batch}.")
            continue

        try:
            vcf_file = plink_bed_to_vcf(bed_file, bim_file, fam_file, fasta_file=GRCh37_path, out_prefix=output_prefix)
            vcf_outputs[batch] = vcf_file
            logging.info(f"VCF file generated for batch {batch}: {vcf_file}")            
        except Exception as e:
            logging.error(f"Failed to convert files for batch {batch}: {str(e)}")
            vcf_outputs[batch] = None

    # 

    liftover_outputs = {}
    for batch, vcf_file in vcf_outputs.items():
        if vcf_file is None:
            logging.error(f"Skipping liftover for batch {batch} due to previous errors.")
            continue

        output_dir = f"data/5_liftover/{batch}"
        create_directory(output_dir)
        output_bcf = os.path.join(output_dir, f"{batch}.bcf")
        reject_bcf = os.path.join(output_dir, f"{batch}_reject.bcf")

        result = liftover_to_hg38(vcf_file, output_bcf, reject_bcf, human_g1k_v37_path, GRCh38_path, chain_file)
        if result:
            liftover_outputs[batch] = result
            logging.info(f"Liftover to hg38 for batch {batch} successfully. Output path: {result}")
        else:
            logging.error(f"Liftover failed for batch {batch}.")
            
if __name__ == "__main__":
    main()

2024-07-02 15:02:38,133 - INFO - Created directory: data/1_updated/batch1
2024-07-02 15:02:38,134 - INFO - Created directory: data/1_updated/batch1/temp
2024-07-02 15:02:39,291 - INFO - 
2024-07-02 15:02:40,810 - INFO - 
2024-07-02 15:02:43,128 - INFO - 
2024-07-02 15:02:45,682 - INFO - PLINK v1.90b7.2 64-bit (11 Dec 2023)           www.cog-genomics.org/plink/1.9/
(C) 2005-2023 Shaun Purcell, Christopher Chang   GNU General Public License v3
Logging to data/1_updated/batch1/temp/TEMP_FILE_XX72262628_1.log.
Options in effect:
  --bfile data/Raw_data/batch1
  --make-bed
  --out data/1_updated/batch1/temp/TEMP_FILE_XX72262628_1
  --update-chr
  --update-map data/strand_files/HumanOmni2-5Exome-8-v1-1-A-strand-b37/HumanOmni2-5Exome-8-v1-1-A-b37.strand.chr

Note: --update-map <filename> + parameter-free --update-chr deprecated.  Use
--update-chr <filename> instead.
65536 MB RAM detected; reserving 32768 MB for main workspace.
2546527 variants loaded from .bim file.
1541 people (836 males, 68