In [None]:
import sys
import os
import numpy as np
import scanpy as sc
import anndata as ad
import pandas as pd
import re
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from scipy import io
from scipy.sparse import csr_matrix
from gears import PertData
from tqdm import tqdm
import requests


In [None]:
#Download and splitting norman data
pert_data = PertData('norman') # specific saved folder
pert_data.load(data_name = 'norman') # specific dataset name
pert_data.prepare_split(split = 'simulation', seed = 1) # get data split with seed
pert_data.get_dataloader(batch_size = 32, test_batch_size = 128) # prepare data loader

In [None]:
data_folder = "..data/gene_perturb_data/norman"
data_path = os.path.join(data_folder, "perturb_processed.h5ad")
adata_norman = sc.read(data_path)

In [None]:
perturbed_genes = []

for condition in adata_norman.obs["condition"]:
    if condition != "ctrl":
        split_condition = condition.split("+")
        for gene in split_condition:
            if gene not in perturbed_genes:
                if gene != "ctrl":
                    perturbed_genes.append(gene)

In [None]:
def get_protein_sequence(protein_id: str) -> str:
    base_url = "https://rest.ensembl.org"
    headers_json = {"Content-Type": "application/json"}

    url = f"{base_url}/sequence/id/{protein_id}?type=protein"
    r = requests.get(url, headers={"Content-Type": "text/plain"})
    r.raise_for_status()
    protein_seq = r.text.strip()
    
    return protein_seq 

In [None]:
def get_status(gene_id):
    url = f"https://rest.ensembl.org/sequence/id/{gene_id}"

    params = {
        "type": "protein",
        "multiple_sequences": 1
    }
    headers = {
        "Content-Type": "text/x-fasta"
    }

    response = requests.get(url, params=params, headers=headers)
    return response.ok


In [None]:

def gene_to_ensembl(gene_name, species="human"):

    url = f"https://mygene.info/v3/query"
    params = {
        "q": gene_name,
        "species": species,
        "fields": "ensembl.gene",
        "size": 1
    }
    
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        results = response.json()
        
        if "hits" in results and results["hits"]:
            hit = results["hits"][0]
            ensembl = hit.get("ensembl")
            if isinstance(ensembl, list):
                return ensembl[0].get("gene")
            elif isinstance(ensembl, dict):
                return ensembl.get("gene")
        return None
    except Exception as e:
        print(f"Error retrieving Ensembl ID for {gene_name}: {e}")
        return None


In [None]:
gene_ensembl_dict = {}
perturbed_ids = []
for gene in perturbed_genes:
    ens_id = gene_to_ensembl(gene)
    perturbed_ids.append(ens_id)
    gene_ensembl_dict[gene] = ens_id


In [None]:
dict_path = os.path.join(data_folder, "gene_ensembl_dict.pkl")

with open(dict_path, "wb") as f:
    pickle.dump(gene_ensembl_dict, f)


In [None]:
successes = []
failures = []

for gene_id in tqdm(perturbed_ids):
    status = get_status(gene_id)
    if status == True:
        successes.append(gene_id)
    else:
        failures.append(gene_id)

In [None]:
def get_canonical_transcript_id(ensembl_gene_id: str) -> str:
    base_url = "https://rest.ensembl.org"
    headers_json = {"Content-Type": "application/json"}
    
    # Step 1: Get canonical transcript for the gene
    url = f"{base_url}/lookup/id/{ensembl_gene_id}?expand=1"
    r = requests.get(url, headers=headers_json)
    r.raise_for_status()
    gene_info = r.json()
    
    canonical_transcript_id = gene_info.get("canonical_transcript")
    #if canonical_transcript_id is None:
        #raise ValueError(f"No canonical transcript found for gene {ensembl_gene_id}")

    return canonical_transcript_id


In [None]:
def get_protein_translation_id(canonical_transcript_id: str) -> str:
    base_url = "https://rest.ensembl.org"
    headers_json = {"Content-Type": "application/json"}

    url = f"{base_url}/lookup/id/{canonical_transcript_id}?expand=1"
    r = requests.get(url, headers=headers_json)
    r.raise_for_status()
    tx_info = r.json()
    
    protein_id = tx_info.get("Translation", {}).get("id")
    #if protein_id is None:
        #raise ValueError(f"No protein translation found for transcript {canonical_transcript_id}")

    return protein_id


In [None]:
def get_protein_sequence(protein_id: str) -> str:
    base_url = "https://rest.ensembl.org"
    headers_json = {"Content-Type": "application/json"}

    url = f"{base_url}/sequence/id/{protein_id}?type=protein"
    r = requests.get(url, headers={"Content-Type": "text/plain"})
    r.raise_for_status()
    protein_seq = r.text.strip()
    
    return protein_seq

In [None]:
aa_dict = {}

successful_genes = []
failure_genes = []

#Maybe determine successes first
for gene_id in tqdm(successes):
    protein_seq = None
    canon_transcript_id = get_canonical_transcript_id(gene_id)
    ct_id_new = canon_transcript_id.split(".")[0]
    
    if ct_id_new is not None:
        protein_id = get_protein_translation_id(ct_id_new)
        
    if protein_id is not None:
        protein_seq = get_protein_sequence(protein_id)

    if protein_seq is not None:
        successful_genes.append(gene_id)
        aa_dict[gene_id] = protein_seq
    else:
        failure_genes.append(gene_id)


In [None]:
csv_path = os.path.join(data_folder, "perturbed_genes.csv")

aa_df = pd.DataFrame.from_dict(aa_dict, orient="index", columns=["aa_sequence"])
aa_df = aa_df.reset_index().rename(columns={"index": "gene_id"})
aa_df.to_csv(csv_path, index=False)