In [None]:
# imports and functions
import re
import gdt
import pandas as pd
from pathlib import Path
from datetime import datetime
from collections import defaultdict

RE_ID = re.compile(r"ID=([^;]+)")
RE_parent = re.compile(r"Parent=([^;]+)")

# Regex patterns for extracting GFF attributes
RE_gene = re.compile(r"gene=([^;]+)")
RE_product = re.compile(r"product=([^;]+)")
RE_description = re.compile(r"description=([^;]+)")
RE_name = re.compile(r"Name=([^;]+)")
RE_note = re.compile(r"Note=([^;]+)")
RE_gene_synonym = re.compile(r"gene_synonym=([^;]+)")

# Features to extract from GFF files (must match the regex names above without RE_ prefix)
# ORDER MATTERS: Listed from most descriptive/best for identification to least descriptive
# This order determines the priority for filling the feature_name column - earlier entries take precedence
features_name = ["gene", "product", "description", "name", "note", "gene_synonym"]

# To add a new feature for extraction:
# 1. Create a regex pattern with the naming convention: RE_{feature_name}
#    Use any regex pattern that captures the desired value from GFF files.
#    Common pattern: RE_{feature_name} = re.compile(r"{attribute_name}=([^;]+)")
#    but you can customize the regex to capture exactly what you need.
#
# 2. Add the feature name (without RE_ prefix) to the features_name list
#    IMPORTANT: Place it in the appropriate position based on how descriptive/useful
#    it is for identification. More descriptive features should come first in the list.
#
# Example - to extract 'locus_tag' attributes:
# RE_locus_tag = re.compile(r"locus_tag=([^;]+)")  # or any custom regex
# features_name = ["locus_tag", "gene", "product", "description", "name", "note", "gene_synonym"]  # if locus_tag is most descriptive
# # OR
# features_name = ["gene", "product", "description", "locus_tag", "name", "note", "gene_synonym"]  # if locus_tag is moderately descriptive
# # OR
# features_name = ["gene", "product", "description", "name", "note", "gene_synonym", "locus_tag"] # if locus_tag is least descriptive
#
# CRITICAL: The regex variable name (after RE_) must exactly match the name
# you add to features_name for the extraction to work properly

re_features = {}
for name in features_name:
    try:
        re_features[name] = globals()[f"RE_{name}"]
    except KeyError:
        print(f"Warning: No regex found for '{name}' (expected variable: RE_{name})")


def increment_gdt_file(path):
    """
    Increment the GDT file name by 1.
    Example: fungi-ncbi_pilot_03.gdt -> fungi-ncbi_pilot_04.gdt
    """
    plist = path.stem.split("_")
    if plist[-1] == "stripped":
        plist[-1] = "pilot"
        plist.append(0)

    try:
        number = int(plist[-1]) + 1
        plist[-1] = f"{number:02d}"
    except ValueError:
        raise ValueError(
            f"Invalid GDT file name: {path.name}. Expected format: <preferred_name>_##.gdt, where ## is a number."
        )
    return path.parent / f'{"_".join(plist)}{path.suffix}', number


def get_most_recent_gdt(dir_path, prefix="TEMP_"):
    """
    Get the most recent gdt file in the directory.
    Arguments:
        dir_path (Path): Directory to search for GDT files.
        prefix (str): Prefix of the GDT files to search for. It will match files like "<prefix>*.gdt".
    Returns:
        Path: Path to the most recent GDT file.
    """
    temp_files = list(
        dir_path.glob(f"*{prefix}*.gdt")
    )  # TODO maybe change to check for numbers after prefix?
    if not temp_files:
        return dir_path / f"{prefix}00.gdt"
    return gdt.gdt_impl.natural_sort(temp_files, key=lambda x: x.stem)[-1]

In [None]:
# Defines all the global variables used in the script.
# Change these variables to match your local setup.
# The most_recent_gdt_file variable should be set to the path of the most recent GDT file,
# OR the stripped GDT file used in filter command, if applicable.

DATA_DIR = "../example/fungi_mt"
most_recent_gdt_filename = "fungi_mt_pilot_06.gdt"
global_query_string = gdt.gff3_utils.QS_GENE_TRNA_RRNA
remove_orfs = True
gct = "MIT"
gff_suffix = ".gff3"

print(f"Chosen feature query string: '{global_query_string}'")


# just checking
DATA_DIR = Path(DATA_DIR).resolve()
if not DATA_DIR.is_dir():
    raise FileNotFoundError(f"Path {DATA_DIR} is not a directory.")

MISC_DIR = DATA_DIR / "misc"
GDT_DIR = MISC_DIR / "gdt"
GDT_DIR.mkdir(511, True, True)  # 511 = 0o777

AN_missing_dbxref_GeneID = MISC_DIR / "AN_missing_dbxref_GeneID.txt"

if not AN_missing_dbxref_GeneID.is_file():
    raise FileNotFoundError(
        f"Missing {AN_missing_dbxref_GeneID}, did you run gdt-cli filter?"
    )

if "most_recent_gdt_filename" in globals():
    gdt_path = GDT_DIR / most_recent_gdt_filename
    if not gdt_path.is_file():
        print(
            f"Not found {gdt_path.name}, does it exist in misc/gdt?\nGDTs in {GDT_DIR}:"
        )
        [print(f" - {f.name}") for f in sorted(GDT_DIR.glob("*.gdt"))]
        raise FileNotFoundError(
            f"Most recent GDT file {gdt_path.name} does not exist in {GDT_DIR}."
        )
else:
    print(
        "Warning: 'most_recent_gdt_filename' variable not set.\n\n"
        "You should have a GDT file from AN_missing_gene_dict.ipynb:\n"
        "• Set the most_recent_gdt_filename variable\n"
        "• Re-run this cell\n\n"
        "If you intend to run this without a GDT file (e.g., because your GFF files "
        "don't have dbxrefs and AN_missing_dbxref_GeneID.ipynb isn't needed), this warning can be ignored."
    )
    # to simplify the code, a exetution without most_recent_gdt_filename is
    # basically the same as with one, but with and empty gdt file
    gdt_path = GDT_DIR / "pilot_00.gdt"
    gdt.gdt_impl.create_empty_gdt(gdt_path)

In [None]:
log_file = MISC_DIR / "01_missing_dbxref_GeneID.log"

log = gdt.logger_setup.create_simple_logger(
    print_to_console=True,
    console_level="DEBUG",
    save_to_file=True,
    file_level="TRACE",
    log_file=log_file,
)
log.debug("Running from notebook AN_missing_dbxref_GeneID.ipynb")

### Deeper investigation using other gff attributes, primarily gene=

In [None]:
with open(AN_missing_dbxref_GeneID, "r") as f:
    ANs = [line.strip() for line in f.readlines() if line.strip()]
log.info(f"Found {len(ANs)} ANs in {AN_missing_dbxref_GeneID}")
log.trace(f"ANs: {ANs}")

In [None]:
# Load the GDT file (even if empty)
gene_dict = gdt.gdt_impl.create_gene_dict(gdt_path, max_an_sources=0)
log.info(f"GeneDict loaded from {gdt_path.name}")
log.debug(f"path: {gdt_path}")

log.info("Header:")
for x in gene_dict.header:
    log.info(f"\t{x}")

log.info("GDT Info:")
for x in gene_dict.info:
    log.info(f"\t{x}")

In [None]:
temp_list = []
to_drop = ["source", "type", "start", "end", "score", "strand", "phase", "attributes"]

for AN in ANs:
    an_path = DATA_DIR / f"{AN}{gff_suffix}"
    df = gdt.gff3_utils.load_gff3(
        an_path, query_string=global_query_string, usecols=gdt.GFF3_COLUMNS
    )
    df = gdt.gff3_utils.filter_orfs(df) if remove_orfs else df

    df["gene_id"] = df["attributes"].str.extract(RE_ID, expand=False)
    df = df[~df["gene_id"].isin(gene_dict)]

    # Procedually extract features based on the regex patterns defined
    for name, pattern in re_features.items():
        df[name] = df["attributes"].str.extract(pattern, expand=False)

    if df[features_name].isna().all(axis=1).any():
        log.warning(f"{AN} has row(s) with no identifiable atribute.")
        log.warning(
            "Please modify this script to add a new possible identifiable attribute or just remove the AN from your dataset"
        )
        log.debug(df[df[features_name].isna().all(axis=1)])

    temp_list.extend(df.to_dict("records"))

features_info_df = pd.DataFrame(temp_list)
features_info_df = features_info_df.drop(columns=to_drop, errors="ignore")

drop_cols = [col for col in features_name if features_info_df[col].isna().all()]

# Procedurally fill 'best_feature' based on the order of features_name
features_info_df["best_feature"] = features_info_df[features_name[0]]
for col in features_name[1:]:
    features_info_df["best_feature"] = features_info_df["best_feature"].fillna(
        features_info_df[col]
    )

features_info_df = features_info_df.drop(columns=drop_cols)
features_info_df = features_info_df.sort_values(by="best_feature")
log.debug(f"Features info df, writing to {MISC_DIR / 'features_info.tsv'}")
features_info_df.to_csv(MISC_DIR / "features_info.tsv", sep="\t", index=False)

In [None]:
add_gdt_compliance = True
comment = "Manual from missing_dbxref_GeneID feature names"

if add_gdt_compliance:
    gdt_str = f' #gd MANUAL{ " #c " + comment if comment else "" }'
else:
    gdt_str = ""

# df with 2 columns, one for feature_names and one for in_gene_dict
new_df = pd.DataFrame({"best_feature": features_info_df["best_feature"].unique()})
new_df["in_gene_dict"] = new_df["best_feature"].isin(gene_dict)

unique_names = new_df[~new_df["in_gene_dict"]]["best_feature"]
unique_names = gdt.gdt_impl.natural_sort(unique_names)
with open(MISC_DIR / "feature_names.txt", "w+") as f1:
    for name in unique_names:
        f1.write(f"{name}{gdt_str}\n")

The user must now parse feature_names.txt  

Features that can be easily identifiable must be added to the current  
version of the gdt, and features that need deeper investigation should be  
copied to a new file name 'feature_unks.txt'
  
The script will now try to automatically add the gene_ids with feature names   
that __are not in 'feature_unks.txt'__ to gene_dict.

In [None]:
# Check if the names exist in the gene_dict
features_info_df = pd.read_csv(MISC_DIR / "features_info.tsv", sep="\t")

names_unk = set()
with open(MISC_DIR / "feature_unks.txt", "r") as f1:
    for line in f1:
        line = line.strip()
        if not line:
            continue

        if "#gd" in line:
            line = line.split("#gd")[0].strip()

        names_unk.add(line)

In [None]:
log.info("Verifing that all feature_names.txt values (excluding those in feature_unks.txt) exist in the most_recent_gdt_filename.")
gene_dict = gdt.gdt_impl.create_gene_dict(gdt_path, max_an_sources=0)
log.info(f"GeneDict loaded from {gdt_path.name}")
log.debug(f"path: {gdt_path}")

log.info("Header:")
for x in gene_dict.header:
    log.info(f"\t{x}")

log.info("GDT Info:")
for x in gene_dict.info:
    log.info(f"\t{x}")

names_not_in_dict = set()
all_names = set(features_info_df["best_feature"].unique()) - names_unk
for name in all_names:
    if name not in gene_dict:
        names_not_in_dict.add(name)

if names_not_in_dict:
    log.debug(f"Warning: {len(names_not_in_dict)} name(s) not in most_recent_gdt_filename!")
    log.debug(
        "These names are not in feature_unk, so you marked them as identifiable. Please identify them or add them feature_unk."
    )
    log.debug(
        "It could also be that you forgot to update and/or reload the most_recent_gdt_filename with the changes you made above."
    )
    [log.debug(name) for name in names_not_in_dict]
    raise ValueError(f"Error: {len(names_not_in_dict)} names not in most_recent_gdt_filename!")

In [None]:
comment = "automated insertion from missing_dbxref_GeneID best feature"
unique_gene_ids = features_info_df[~features_info_df["best_feature"].isin(names_unk)][
    "gene_id"
].unique()

log.info(" ---- [Starting 'Automated insertion of gene_ids with known features from features_info.tsv'] ----")
for gene_id in unique_gene_ids:
    df = features_info_df[features_info_df["gene_id"] == gene_id]

    # sanity check, are all feature_names the same?
    if df["best_feature"].nunique() != 1:
        log.warning(
            f"{gene_id} has multiple best_features: {df['best_feature'].unique()}"
        )
        log.debug("\tChecking if they have the same label in gene_dict...")

        labels = {gene_dict[feat].label for feat in df["best_feature"].unique()}
        if len(labels) != 1:
            log.error(f"\tError: {gene_id} has multiple labels: {labels}")
            raise ValueError(
                f"Error: {gene_id} has multiple labels: {labels}. "
                "Please edit features_info.tsv to resolve this issue."
            )
        else:
            log.debug(f"\tAll best_features have the same label: {labels.pop()}")

    label = gene_dict[df["best_feature"].iloc[0]].label
    an_sources = df["seqid"].unique().tolist()
    log.debug(
        f"Adding {gene_id} with label '{label}', an_sources: {an_sources}, comment: {comment}"
    )
    gene_dict[gene_id] = gdt.gdt_impl.GeneGeneric(
        label=label, an_sources=an_sources, c=comment
    )
log.info(" ---- [Finished 'Automated insertion of gene_ids with known features from features_info.tsv'] ----")

In [None]:
new_path, nth_iteration = increment_gdt_file(gdt_path)
log.info(f"Writing gene_dict file: {new_path} | Iteration: {nth_iteration}")
gene_dict.info = gdt.gdt_impl.get_gene_dict_info(gene_dict)
gene_dict.header.append(
    f"{datetime.now().strftime('%Y-%m-%d %H:%M')} - Data added from 'Automated insertion of gene_ids with known features from features_info.tsv'"
)
gdt.gdt_impl.write_gdt_file(gene_dict, new_path, overwrite=True)
log.info(f"{new_path.name} was created in misc/gdt!")
log.info("You must now add it to most_recent_gdt_filename in the Setup cell, and rerun the cell")

#### TEMP Mapping

In [None]:
# Make sure the new GDT file is the most recent one in the setup cell!
# Otherwise you will create a new gdt file without the previous changes.

# TODO CRITIAL HERE

In [None]:
gene_dict = gdt.gdt_impl.create_gene_dict(gdt_path, max_an_sources=0)
log.info(f"GeneDict loaded from {gdt_path.name}")
log.debug(f"path: {gdt_path}")

log.info("Header:")
for x in gene_dict.header:
    log.info(f"\t{x}")

log.info("GDT Info:")
for x in gene_dict.info:
    log.info(f"\t{x}")

In [None]:
names_unk = set()
with open(MISC_DIR / "feature_unks.txt", "r") as f1:
    for line in f1:
        line = line.strip()
        if not line:
            continue

        if "#gd" in line:
            line = line.split("#gd")[0].strip()

        names_unk.add(line)

features_info_df = pd.read_csv(MISC_DIR / "features_info.tsv", sep="\t")
features_unk_df = (
    features_info_df[features_info_df["best_feature"].isin(names_unk)]
    .copy()
    .reset_index(drop=True)
)
unk_dict = features_unk_df.groupby("seqid")["gene_id"].agg(list).to_dict()

In [None]:
temp_unk = gdt.gdt_impl.GeneDict()
label_count = 0
change_gene_dict = False
log.debug(
    "missing_dbxref_GeneID: matching probable 'child feature + parent gene' pair (on the an original gff3, using all the features)"
)
for an in unk_dict.keys():
    gene_ids = unk_dict[an]
    log.debug(f"AN: {an}| gene_ids: {gene_ids}")
    an_path = DATA_DIR / f"{an}{gff_suffix}"

    df = gdt.gff3_utils.load_gff3(an_path, usecols=gdt.GFF3_COLUMNS)
    df = gdt.gff3_utils.filter_orfs(df) if remove_orfs else df

    df["gene_id"] = df["attributes"].str.extract(RE_ID, expand=False)
    df["parent"] = df["attributes"].str.extract(RE_parent, expand=False)

    for name, pattern in re_features.items():
        df[name] = df["attributes"].str.extract(pattern, expand=False)

    # Procedurally fill 'best_feature' based on the order of features_name
    df["best_feature"] = df[features_name[0]]
    for col in features_name[1:]:
        df["best_feature"] = df["best_feature"].fillna(df[col])

    for gene_id in gene_ids:
        candidates = df[df["parent"] == gene_id]
        log.debug(f" gene_id: {gene_id} | {len(candidates) = }")

        # Handle case where no candidates found
        if len(candidates) == 0:
            log.debug(f" no candidate with parent={gene_id} found")
            log.debug("  adding it to UNKNOWN label\n")
            temp_unk[gene_id] = gdt.gdt_impl.GeneGeneric(
                label=f"{gct}-UNKNOWN",
                an_sources=[an],
                c=f"unknown gene_id from {an}{gff_suffix} | "
                f"a: {df[df['gene_id'] == gene_id]['attributes'].iloc[0] if not df[df['gene_id'] == gene_id].empty else 'N/A'}",
            )
            continue

        # Handle feature name conflicts
        if candidates["best_feature"].nunique() > 1:
            log.debug(
                " more than one canditate found, but with best_feature conflict, chosing the first one."
            )
            log.debug(f"  best_features: {candidates['best_feature'].unique()}")

        best_feature = candidates["best_feature"].iloc[0]
        log.debug(f"  chosen best_feature: {best_feature}")
        [
            log.debug(f"\tt: {x.type} | fn: {x.best_feature} | a: {x.attributes}")
            for x in candidates.itertuples()
        ]

        # Handle case where feature_name is in gene_dict
        if best_feature in gene_dict:
            change_gene_dict = True
            label = gene_dict[best_feature].label
            log.debug(f"  best_feature in gene_dict, label: {label}\n")
            gene_dict[gene_id] = gdt.gdt_impl.GeneGeneric(
                label=label,
                an_sources=[an],
                c=f"insertion from missing_dbxref_GeneID feature mapping, source: {best_feature} | type: {candidates['type'].iloc[0]}",
            )
            continue

        # Handle case where feature_name is NOT in gene_dict
        log.debug(f"  best_feature not in gene_dict: {best_feature}")
        log.debug("  checking in temp_unk")

        if best_feature in temp_unk:
            label = temp_unk[best_feature].label
            log.debug(f"  found in temp_unk, label: {label}\n")
            gene_dict[gene_id] = gdt.gdt_impl.GeneGeneric(
                label=label,
                an_sources=[an],
                c=f"insertion from missing_dbxref_GeneID feature mapping, source: {best_feature} | type: {candidates['type'].iloc[0]}",
            )
        else:
            label_count += 1
            label = f"{gct}-TEMP-{label_count}"
            log.debug(f"  not found in temp_unk, new label: {label}\n")
            temp_unk[best_feature] = gdt.gdt_impl.GeneDescription(
                label=label, source="MANUAL", c=None
            )

            temp_unk[gene_id] = gdt.gdt_impl.GeneGeneric(
                label=label,
                an_sources=[an],
                c=f"insertion from missing_dbxref_GeneID feature mapping, source: {best_feature} | type: {candidates['type'].iloc[0]}",
            )

In [None]:
if temp_unk:
    temp_path = get_most_recent_gdt(GDT_DIR, prefix="TEMP_Mapping_")
    new_path, map_iteration = increment_gdt_file(temp_path)
    log.info(f"Writing TEMP Mapping GDT file: {new_path} | Iteration: {map_iteration}")
    temp_unk.info = gdt.gdt_impl.get_gene_dict_info(temp_unk)
    temp_unk.header = [
        "version 0.0.2",
        f"TEMP Mapping - {map_iteration}",
        f"{datetime.now().strftime('%Y-%m-%d %H:%M')} - TEMP Mapping child features to parent genes",
    ]
    gdt.gdt_impl.write_gdt_file(temp_unk, new_path, overwrite=True)

In [None]:
if change_gene_dict:
    log.debug("gene_dict changed, incrementing gdt file and writing it")
    gdt_path, _ = increment_gdt_file(gdt_path)
    log.info(f"Writing GDT file: {gdt_path}")
    gene_dict.info = gdt.gdt_impl.get_gene_dict_info(gene_dict)
    gene_dict.header.append(
        f"{datetime.now().strftime('%Y-%m-%d %H:%M')} - Data added from TEMP Mapping"
    )
    gdt.gdt_impl.write_gdt_file(gene_dict, gdt_path)

### Genes exclusion of to_remove_2.txt

In [None]:
append_string = "discard-"
genes_to_remove = "to_remove_2.txt"

remove_gene_ids = defaultdict(set)
with open(MISC_DIR / genes_to_remove, "r") as f:
    for line in f:
        if (
            not line.strip()
            or line.startswith("#")
            or line.startswith("[")
            or "#gd" in line
        ):
            continue

        gene_id, an = line.split("#c", 1)[0].split("#gn", 1)
        remove_gene_ids[an.strip()].add(gene_id.strip())

In [None]:
log.info(f"Removing {len(remove_gene_ids)} gene IDs from GFF files.")
for an in remove_gene_ids.keys():
    log.trace(f"Processing {an} for removal of gene IDs {remove_gene_ids[an]}")
    an_path = DATA_DIR / f"{an}{gff_suffix}"
    with open(an_path, "r") as f:
        lines = f.readlines()

    headers, index = [], 0
    while lines[index].startswith("#"):
        headers.append(lines[index].strip())
        index += 1

    pattern = re.compile("|".join([f"ID={x};" for x in remove_gene_ids[an]]))
    log.trace(f"Pattern for removal: {pattern.pattern}")
    contents = []

    for line in lines[index:]:
        if not (line := line.strip()):
            continue
        line = line.split("\t")

        # line[2] is type, line[8] is attributes
        if pattern.search(line[8]):
            if append_string not in line[2]:
                line[2] = append_string + line[2]

        contents.append("\t".join(line))

    with open(an_path, "w") as f:
        f.write("\n".join(headers))
        f.write("\n")
        f.write("\n".join(contents))
        f.write("\n\n")

log.info(f"Finished removing gene IDs from {len(remove_gene_ids)} GFF files.")