In [1]:
# Uncomment and run to reload libs
# import importlib, pyutils; importlib.reload(pyutils)

import pandas as pd

from pyutils import (
    logging,
    ORIGIN_DATE,
    SURVEILLANCE_FILE,
    WILD_TYPE_SEQ_FILE,
    MUTATION_PER_SEQ_FILE,
    SUS_MUTATION_PER_SEQ_FILE,
)


In [2]:
logging.info("Load variant surveilance")

df: pd.DataFrame = pd.read_csv(
    SURVEILLANCE_FILE,
    sep="\t",
    low_memory=False,
    index_col=0,
)
logging.info(f"{len(df.index)} in raw data")

df = df[df["Collection date"].str.len() == 10]
logging.info(f"{len(df.index)} has complete date")

df["Collection date"] = pd.to_datetime(df["Collection date"])
df = df[df["Collection date"] > ORIGIN_DATE]
logging.info(f"{len(df.index)} after {ORIGIN_DATE}")

df = df[df["Host"] == "Human"]
logging.info(f"{len(df.index)} using human host")

df["Continent"] = df["Location"].str.split(" / ").str[0]
df["Area"] = df["Location"].str.split(" /").str[1].str.strip()
df["Region"] = df["Location"].str.split(" /").str[2].str.strip()


In [3]:
logging.info("Get all possible mutations...")
seqs_mutations = []
seqs_wt = []
c_date: pd.Timestamp
c_date_group: pd.DataFrame
area: str
area_group: pd.DataFrame
for c_date, c_date_group in df.groupby("Collection date", sort=False):
    c_date_str = c_date.strftime("%Y-%m-%d")
    logging.info(f"{c_date_str}")
    for area, area_group in c_date_group.groupby("Area", sort=False):
        for ac, mut, pango, region in area_group[[
            "AA Substitutions",
            "Pango lineage",
            "Region"
        ]].itertuples():
            if not pd.isna(mut) and mut != "":
                mut: str = mut[1:-1]
                if mut:
                    mut_list = mut.split(",")
                else:
                    mut_list = ()
                    seqs_wt.append({
                        "Accession": ac,
                        "Date": c_date,
                        "Lineage": pango,
                        "Area": area,
                    })
                    # The sequence is known to have no mutation
            else:
                # The mutation of the sequence is unknown?
                mut_list = ()
            for m in mut_list:
                if m:
                    seqs_mutations.append({
                        "Accession": ac,
                        "Date": c_date_str,
                        "Mutation": m,
                        "Lineage": pango,
                        "Area": area,
                    })

seqs_wt: pd.DataFrame = pd.DataFrame.from_records(seqs_wt)
seqs_mutations: pd.DataFrame = pd.DataFrame.from_records(seqs_mutations)
logging.info(f"{seqs_mutations['Mutation'].nunique()} unique mutations")


In [4]:
seqs_wt.to_csv(WILD_TYPE_SEQ_FILE, index=False)
logging.info(f"{WILD_TYPE_SEQ_FILE} saved!")


In [5]:
# Extract all unique mutations
mut_info = pd.DataFrame(
    seqs_mutations["Mutation"].unique(), columns=["Mutation"])
# Separate protein name and mutation name
mut_info_split = mut_info["Mutation"].str.split("_").str
protein_name: pd.Series = mut_info_split[0]
mut_name: pd.Series = mut_info_split[1]

# Some mutation names are invalid such as "NSP2_" in "EPI_ISL_3922842"
invalid_mut = mut_name == ""
mut_info = mut_info[~invalid_mut]

# Assign protein names
protein_name = protein_name[~invalid_mut]
mut_info["Protein"] = protein_name

# Separate mutation into unmutated state, pos, and mutated state
mut_name = mut_name[~invalid_mut]
mut_aa: pd.DataFrame = mut_name.str.split("([A-Z]|ins)(\d+)(\w+)", expand=True)
# Assign the separated mutation info
mut_info["From"] = mut_aa[1]
mut_info["Pos"] = mut_aa[2].astype(int)
mut_info["To"] = mut_aa[3]

# Make sure the string split was done correct
assert all(mut_info["Protein"] + "_" + mut_info["From"] +
           mut_info["Pos"].astype(str) + mut_info["To"] == mut_info["Mutation"])
logging.info(f"{len(mut_info.index)} unique valid mutations")


In [6]:
# Remove a mutation when 
sus_mut_removed = []
sus_mut = []
all_mut_summary = {}
pos_group: pd.DataFrame
for (protein_name, pos), pos_group in mut_info.groupby(["Protein", "Pos"], sort=False):
    # Unmutated state of all types of mutation
    mut_summary = pos_group["From"].value_counts()
    all_mut_summary[f"{protein_name}_{pos}"] = mut_summary
    if "ins" in mut_summary.index:
        unmutated = "ins"
        if len(mut_summary.index) > 1:
            unmutated = mut_summary.drop("ins").idxmax()
        non_sus = (pos_group["From"] == unmutated) | (pos_group["From"] == "ins")
    else:
        unmutated = mut_summary.idxmax()
        non_sus = (pos_group["From"] == unmutated)
    sus_mut_removed.append(pos_group[non_sus])
    sus_mut.append(pos_group[~non_sus])

sus_mut_removed: pd.DataFrame = pd.concat(sus_mut_removed)
logging.info(f"{len(sus_mut_removed.index)} unique mutations not sus")
sus_mut: pd.DataFrame = pd.concat(sus_mut)


In [7]:
pd.merge(
    seqs_mutations,
    sus_mut,
    on="Mutation"
).to_csv(SUS_MUTATION_PER_SEQ_FILE, index=False)
logging.info(f"{SUS_MUTATION_PER_SEQ_FILE} saved!")


In [8]:
pd.merge(
    seqs_mutations,
    sus_mut_removed,
    on="Mutation"
).to_csv(MUTATION_PER_SEQ_FILE, index=False)
logging.info(f"{MUTATION_PER_SEQ_FILE} saved!")
