In [58]:
from riot_na.config import GENE_DB_DIR
from Bio import SeqIO
import pandas as pd
from pathlib import Path


def df_to_fasta(df: pd.DataFrame, output_path: Path):
    with output_path.open("w") as output_file:
        for row in df.itertuples(index=False):
            output_file.write(f">{row.description}\n")
            output_file.write(f"{row.sequence}\n")


def deduplicate_genes(input_path) -> tuple[pd.DataFrame, pd.DataFrame]:
    df = pd.DataFrame.from_records(
        (
            {"allele_id": record.id, "description": record.description, "sequence": str(record.seq)}
            for record in SeqIO.parse(input_path, "fasta")
        )
    )
    df["allele"] = df["allele_id"].str.split("*").str[1]
    df["gene_id"] = df["allele_id"].str.split("*").str[0]

    df = df.sort_values(["gene_id", "allele"])
    deduplicated_df = df.drop_duplicates(subset=["sequence"])
    first_allele_df = deduplicated_df.groupby("gene_id").first()
    return deduplicated_df, first_allele_df


AA_GENES_DIR = GENE_DB_DIR / "gene_db" / "aa_genes"
OUTPUT_GENES_DEDUP_DIR = GENE_DB_DIR / "gene_db" / "aa_genes_deduplicated"
OUTPUT_GENES_FIRST_ALLELE_DIR = GENE_DB_DIR / "gene_db" / "aa_genes_first_allele"


for organism in ["human", "mouse"]:

    input_path = AA_GENES_DIR / "v_genes" / f"{organism}.fasta"

    deduplicated_df, first_allele_df = deduplicate_genes(input_path)
    output_dir = OUTPUT_GENES_DEDUP_DIR / "v_genes"
    output_dir.mkdir(exist_ok=True, parents=True)
    df_to_fasta(deduplicated_df, output_dir / f"{organism}.fasta")
    output_dir = OUTPUT_GENES_FIRST_ALLELE_DIR / "v_genes"
    output_dir.mkdir(exist_ok=True, parents=True)
    df_to_fasta(first_allele_df, output_dir / f"{organism}.fasta")

with (
    open(OUTPUT_GENES_DEDUP_DIR / "v_genes" / "human.fasta") as human_file,
    open(OUTPUT_GENES_DEDUP_DIR / "v_genes" / "mouse.fasta") as mouse_file,
    open(OUTPUT_GENES_DEDUP_DIR / "v_genes" / "all_species.fasta", "w") as all_species_file,
):
    all_species_file.write(human_file.read() + mouse_file.read())

for organism in ["human", "mouse"]:

    for input_path in (AA_GENES_DIR / "j_genes" / organism).iterdir():

        deduplicated_df, first_allele_df = deduplicate_genes(input_path)
        output_dir = OUTPUT_GENES_DEDUP_DIR / "j_genes" / organism
        output_dir.mkdir(exist_ok=True, parents=True)
        df_to_fasta(deduplicated_df, output_dir / f"{input_path.stem}.fasta")
        output_dir = OUTPUT_GENES_FIRST_ALLELE_DIR / "j_genes" / organism
        output_dir.mkdir(exist_ok=True, parents=True)
        df_to_fasta(first_allele_df, output_dir / f"{input_path.stem}.fasta")

