In [None]:
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import os
import time
import json
import re
import logging
import numpy as np
from typing import Optional
#from urllib3.exceptions import ConnectTimeoutError

In [None]:
os.getcwd()

In [None]:
def esearch(rsid: str) -> Optional[bytes]:
    """
    Fetch SNP XML data through ncbi e-fetch api
    """
    logging.info(f"Fetching SNP info for {rsid}")
    base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
    params = {
        "db": "snp",
        "id": rsid,
    }
    try:
        response = requests.get(base_url, params=params, timeout=10)
        response.raise_for_status()
        return response.content
    except requests.RequestException as e:
        logging.error(f"Failed to fetch {rsid}: {e}")
        return None

In [None]:
def spdi_to_vcf_field(spdi_string: str, max_retries=3) -> Optional[dict]:
    """
    Convert SPDI to VCF fields
    """
    base_url = f"https://api.ncbi.nlm.nih.gov/variation/v0/spdi/{spdi_string}/vcf_fields"
    for attempt in range(max_retries):
        try:
            response = requests.get(base_url, timeout=10)
            response.raise_for_status()
            data = response.json().get("data")
            if data:
                return data
            else:
                return None
        except Exception as e:
            logging.warning(f"Attempt {attempt + 1} failed for SPDI {spdi_string}: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
    logging.error(f"All retries failed for SPDI {spdi_string}")
    return None

In [None]:
def parse_esearch_xml(xml_data):
    # Decode the byte string to a UTF-8 encoded string
    xml_string = xml_data.decode("utf-8")
    
    # Parse the XML string
    root = ET.fromstring(xml_string)
    
    # Find required entities
    snp_id = root.find("SNP_ID").text
    snp_class = root.find("SNP_CLASS").text
    acc2chr={root.find("ACC").text:root.find("CHR").text}
    location_hg38 = int(root.find("CHRPOS").text.split(':')[1])
    location_hg19 = int(root.find("CHRPOS_PREV_ASSM").text.split(':')[1])
    variation_vcf_fields = [spdi_to_vcf_field(item) for item in root.find("SPDI").text.split(',')]
    
    if None in variation_vcf_fields:
        print(variation_vcf_fields)
        logging.warning(f"Skipping {snp_id} due to fetch SDPI failure.")
        return None 
    else:
        variation_hg38 = [ f"{acc2chr[var['chrom']]}:{var['pos']}:{var['ref']}:{var['alt']}" for var in variation_vcf_fields if var]

        if len(set([ var['pos'] for var in variation_vcf_fields]))>1:
            print(snp_id)
            
        if variation_vcf_fields[0]['pos'] == location_hg38:
            variation_hg19 = [ var.replace(str(location_hg38),str(location_hg19)) for var in variation_hg38]
        else:
            offset = location_hg38-variation_vcf_fields[0]['pos']
            location_hg19_fix = location_hg19 - offset
            location_hg38_fix = variation_vcf_fields[0]['pos']
            variation_hg19 = [ var.replace(str(location_hg38_fix),str(location_hg19_fix)) for var in variation_hg38]
        return {
            "snp_id": snp_id,
            "snp_class":snp_class,
            "variation_hg38": variation_hg38,
            "variation_hg19": variation_hg19
        }

In [None]:
variant_annotation=pd.read_csv("data/snplist.txt",sep="\t",header=None)
variant_annotation=variant_annotation.rename(columns={0:'Variant'})

#### ignore snp without rs id

In [None]:
# Remove rows without rsID
uniq_rsid=variant_annotation["Variant"][variant_annotation['Variant'].str.contains('^rs[0-9]+')].unique()
print(f"{len(uniq_rsid)} unique variants with rsID.")

#### fetch genomic position and alleles through E-search API

In [None]:
result_list={}
waiting_list=[id for id in uniq_rsid]
fail_list=[]
while len(waiting_list) > 0:
    rs_id=waiting_list.pop()
    xml_data = esearch(rs_id)
    if xml_data:
        parsed_data = parse_esearch_xml(xml_data)
        if parsed_data:
            result_list[rs_id]=parsed_data
        else:
            ## add rsIDs which failed from parsing xml back to waiting_list
            print(f"Failed to parse data for {rs_id}")
            waiting_list.append(rs_id)
    else:
        ## add the rsID which failed from e-searching back to fail_list
        print(f"No data received from esearch for {rs_id}")
        fail_list.append(rs_id)

In [None]:
with open('data/result.json', 'w') as outfile:
    json.dump(result_list, outfile,indent=4)

In [None]:
pos_rsid_dict={'rs_id':[],'variation_hg19':[],'variation_hg38':[]}
for rsid,detail in result_list.items():
    for index in range(len(detail['variation_hg19'])):
        pos_rsid_dict['rs_id'].append(rsid)
        pos_rsid_dict['variation_hg19'].append(detail['variation_hg19'][index])
        pos_rsid_dict['variation_hg38'].append(detail['variation_hg38'][index])

In [None]:
variant_df = pd.DataFrame.from_dict(pos_rsid_dict)
variant_df.to_csv('data/result.tsv',sep="\t",index=None)