In [142]:
import jupyter_black

jupyter_black.load()

In [143]:
import requests
import sys
import time
from pathlib import Path

import pandas as pd
from pyfaidx import Fasta

base = Path("../../data/Pla2g2/")
raw = base / "raw"
data_info_excel = raw / "SM2.xlsx"
fasta_alignment = raw / "SM9_Pla2g2_alignment.fasta"

fasta_out = base / "Pla2g2.fasta"
csv_out = base / "Pla2g2.csv"

In [144]:
# --- create FASTA file ---
fasta_handler = Fasta(fasta_alignment)
with open(fasta_out, "w") as fasta_handle:
    for header, seq in fasta_handler.items():
        if header.startswith("---"):
            continue
        seq = str(seq).replace("-", "")
        fasta_handle.write(f">{header}\n")
        fasta_handle.write(f"{seq}\n")

In [145]:
df = pd.read_excel(data_info_excel, sheet_name="Dataset Key")
df = df.rename(
    columns={"Name in the dataset": "uid", "Species": "taxa_name", "Gene": "gene"}
)
df = df[["uid", "gene", "strand", "taxa_name"]]
df[["gene", "gene_info"]] = df["gene"].str.split(" ", n=1, expand=True)

taxa_rename = {
    "Ailurus fulgens styani": "Ailurus styani",
    "Anser cygnoides": "Anser cygnoid",
    "Apteryx australis mantelli": "Apteryx mantelli mantelli",
    "Crotalus helleri": "Crotalus oreganus helleri",
    "Deinakgistrodon acutus": "Deinagkistrodon acutus",
    "Nannopterum auritus": "Nannopterum",  # (species not in UniProt)
    "Ophisaurus gracilis": "Dopasia gracilis",
    "Urile pelagicus": "Phalacrocorax",  # (species not in UniProt) Urile penicillatus
}
for old_name, new_name in taxa_rename.items():
    df.loc[df["taxa_name"] == old_name, "taxa_name"] = new_name

In [None]:
# Helper function to download data
def get_url(url, **kwargs):
    response = requests.get(url, **kwargs)

    if not response.ok:
        print(response.text)
        response.raise_for_status()
        sys.exit()

    return response

class TaxonAPI:
    def __init__(self, taxon_mapper_path: Path) -> None:
        # documentaion: https://www.ebi.ac.uk/proteins/api/doc/
        # limit = 200 requests/second/user
        self.base_url = "https://www.ebi.ac.uk/proteins/api/taxonomy"
        # TODO experiment with SQLite
        self.taxon_mapper_path = taxon_mapper_path
        self.df_taxon_mapper = self._load_taxon_mapper()

    def _load_taxon_mapper(self):
        # taxon_mapper_file = raw / "taxon_mapper.csv"
        if not self.taxon_mapper_file.is_file():
            df_taxon_mapper = pd.DataFrame()
        else:
            df_taxon_mapper = pd.read_csv(self.taxon_mapper_file)
        return df_taxon_mapper

    def _get_taxa_id(self, taxon_name: str) -> tuple[str, str]:
        """For a given taxa name, search and return its taxa ID.

        Return taxa ID (int) and its rank if taxon_name exists, else return None.
        """
        # make request
        url = f"{self.base_url}/name/{taxon_name}"
        response = get_url(url)
        try:
            taxonomies = response.json()["taxonomies"]
        except requests.HTTPError as err:
            txt = err
            if str(txt).startswith("404 Client Error: Not Found for url"):
                taxonomies = None
            else:
                raise Exception(err)

        # get taxa_id
        if taxonomies is None:
            taxa_id, rank = None, None
        elif len(taxonomies) == 1:
            taxa_id = int(taxonomies[0]["taxonomyId"])
            rank = taxonomies[0]["rank"]
        else:
            raise Exception(f"{taxon_name}: found more than one taxa.")
        return taxa_id, rank

    def _get_taxa_rank(self, taxa_id: str, rank) -> str:
        url = f"{self.base_url}/lineage/{taxa_id}"
        response = get_url(url=url)

        rank_name = "None"
        for taxonomy in response.json()["taxonomies"]:
            if taxonomy["rank"] == rank:
                rank_name = taxonomy["scientificName"]
                break
        return rank_name
        
    def fill_

In [156]:


# get species that are in df but not in taxon_mapper
if "taxon_name" in df_taxon_mapper:
    taxon_lst = df_taxon_mapper["taxon_name"].to_list()
else:
    taxon_lst = []
unknown_taxon = df.loc[~df["taxa_name"].isin(taxon_lst), "taxa_name"].unique()


ranks = ["class", "order", "clade", "family", "genus", "species"]
api_call_counter = 0
for idx, taxon_name in enumerate(unknown_taxon):
    df_idx = len(df_taxon_mapper)
    taxa_id, rank = get_taxa_id(taxon_name=taxon_name)
    new_row = pd.DataFrame(
        {"taxon_id": taxa_id, "taxon_name": taxon_name, rank: taxon_name},
        index=[df_idx],
    )
    df_taxon_mapper = pd.concat([new_row, df_taxon_mapper.loc[:]])
    for rank in ranks:
        for idx, row in df_taxon_mapper.iterrows():
            if (rank in row) and pd.notna(row[rank]):
                continue
            api_call_counter += 1
            taxa = get_taxa_rank(taxa_id=row["taxon_id"], rank=rank)
            df_taxon_mapper.loc[idx, rank] = taxa
            if api_call_counter % 200 == 0:
                time.sleep(1)
    if (idx + 1) % 200 == 0:
        time.sleep(1)
    break
# df_taxon_mapper["taxon_id"] = df_taxon_mapper["taxon_id"].astype(int)


# get additional taxonomies
ranks = ["class", "order", "family", "genus", "species"]
api_call_counter = 0

df_taxon_mapper.to_csv(taxon_mapper_file, index=False)

In [157]:
df_taxon_mapper

Unnamed: 0,taxon_id,taxon_name,species,class,order,family,genus
1,424585,Ailurus styani,Ailurus styani,Mammalia,Carnivora,Ailuridae,Ailurus
0,9646,Ailuropoda melanoleuca,Ailuropoda melanoleuca,Mammalia,Carnivora,Ursidae,Ailuropoda


In [129]:
taxas = df_taxon_mapper.columns.to_list()[1:]
cols = ["species", "genus", "family", "order"]
for idx, row in df.iterrows():
    species = row["taxa_name"]
    for taxa in taxas:
        if species in df_taxon_mapper[taxa].to_list():
            df.loc[idx, cols] = df_taxon_mapper.loc[
                df_taxon_mapper[taxa] == species, cols
            ].iloc[0]
            break
    else:
        raise Exception(f"Error: {species} not founf in taxon mapper.")

In [141]:
df.to_csv(csv_out, index=False)