# util

> I/O functions

In [None]:
# | default_exp util

In [None]:
# | hide
%load_ext autoreload
%autoreload 2

In [None]:
# | hide

from fastcore.docments import docments
from fastcore.test import *
from nbdev.showdoc import *

In [None]:
# | export
import os
from collections.abc import Sequence
from pathlib import Path
from typing import Literal

import numpy as np
import pandas as pd

The information about the location of genes in a genome can come from anywhere, but it is typically stored in a GFF3 file. While this is far from a standardized format, there exist some _guidelines_ about how a (syntactically) valid GFF3 file should look like; they can be found [here](https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md). Such a typical, 9-column, tab-separated file is what I have mostly been working with. If your GFF3 looks different you will have to figure out how to read it in yourself - or [open an issue](https://github.com/galicae/geneorder/issues/new) and I can try to help you.

In [None]:
# | export


def read_gff(
    loc: str | Path,  # input filepath
    gff_columns: list = [
        "seqid",
        "source",
        "type",
        "start",
        "end",
        "score",
        "strand",
        "phase",
        "attributes",
    ],  # column names for the GFF3 file
    skiprows: int = 1,  # how many rows to skip in the beginning
    header: (
        int | Sequence[int] | None | Literal["infer"]
    ) = None,  # whether to expect a header or not
    sep: str = "\t",  # separator for the table
    **kwargs,  # various pd.read_csv() arguments
) -> pd.DataFrame:  # the GFF3 file in DataFrame form
    "A function to read a GFF3 file. Expects 9 tab-separated fields, and will try to name the columns according to https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md"
    gff = pd.read_csv(loc, sep=sep, header=header, skiprows=skiprows, **kwargs)
    gff.columns = gff_columns
    return gff

For instance, let's try and read an example. This is a fake GFF3 file based on the _Pycnogonum litorale_ Hox gene cluster; for the real one, please refer to our [work on the sea spider](https://doi.org/10.1101/2024.11.20.624475).

In [None]:
# | hide
if "EXAMPLE_DATA_PATH" not in os.environ.keys():
    os.environ["EXAMPLE_DATA_PATH"] = (
        "/Users/npapadop/Documents/repos/geneorder/example_data/"
    )

In [None]:
gff = read_gff(os.environ["EXAMPLE_DATA_PATH"] + "plit.gff3")
gff

Unnamed: 0,seqid,source,type,start,end,score,strand,phase,attributes
0,pseudochrom_56,PacBio,gene,1927066,1936157,.,-,.,ID=PB.8615;function=Homeobox domain;gene=Hox1-...
1,pseudochrom_56,PacBio,mRNA,1927066,1936157,.,-,.,ID=PB.8615.1;Parent=PB.8615;function=Homeobox ...
2,pseudochrom_56,PacBio,exon,1927066,1928028,.,-,.,ID=PB.8615.1.exon1;Parent=PB.8615.1;function=H...
3,pseudochrom_56,PacBio,exon,1935229,1936157,.,-,.,ID=PB.8615.1.exon2;Parent=PB.8615.1;function=H...
4,pseudochrom_56,PacBio,CDS,1927066,1928028,.,-,1,ID=PB.8615.1.CDS1;Parent=PB.8615.1;function=Ho...
...,...,...,...,...,...,...,...,...,...
119,pseudochrom_12,AUGUSTUS,mRNA,2986021,2996225,1,-,.,ID=g7725.t1;Parent=g7725;function=sequence-spe...
120,pseudochrom_12,AUGUSTUS,exon,2986021,2986325,.,-,.,ID=g7725.t1.exon1;Parent=g7725.t1;function=seq...
121,pseudochrom_12,AUGUSTUS,exon,2995445,2996225,.,-,.,ID=g7725.t1.exon2;Parent=g7725.t1;function=seq...
122,pseudochrom_12,AUGUSTUS,CDS,2986021,2986325,1,-,2,ID=g7725.t1.CDS1;Parent=g7725.t1;function=sequ...


To begin with, we would like to visualize the 'real' Hox genes. We are also only interested in plotting entire genes (no exon structure), so we can filter the GFF rows based on that. We should also extract the gene IDs from the table to help us filter. Furthermore, we should be extracting gene names, in case we want to plot them too.

In [None]:
# | export


def gff_attribute_selector(
    line,  # the attributes field of a GFF3 line
    sep: str = ";",  # the field separator. Should be a semicolon for a GFF3 file.
    select="ID",  # the field ID. Should be one that is included in the GFF3 file. Refer to https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md for possibilities, or choose a manually defined tag that you know is present in the file.
) -> str:  # the value of field `select`
    "A function that extracts the value of a specified field from the attributes of a GFF3 line. Should be used on the `attributes` field of the corresponding pandas DataFrame."
    for field in line.split(sep):
        if field.startswith(select):
            value = field.split("=")[1]
            return value
    return None

In [None]:
gff["gene_id"] = gff["attributes"].apply(
    lambda x: gff_attribute_selector(x, select="ID")
)
gff["gene_name"] = gff["attributes"].apply(
    lambda x: gff_attribute_selector(x, select="gene")
)

In [None]:
hox_genes = [
    "PB.8615",
    "g9718",
    "PB.8616",
    "g9720",
    "g9721",
    "PB.8617",
    "g9723",
    "g9724",
    "g9725",
]
is_hox = gff["gene_id"].isin(hox_genes)
is_gene = gff["type"] == "gene"

slim = gff[is_gene & is_hox]
slim

Unnamed: 0,seqid,source,type,start,end,score,strand,phase,attributes,gene_id,gene_name
0,pseudochrom_56,PacBio,gene,1927066,1936157,.,-,.,ID=PB.8615;function=Homeobox domain;gene=Hox1-...,PB.8615,Hox1-A
6,pseudochrom_56,AUGUSTUS,gene,1998922,2024148,.,-,.,ID=g9718;function=sequence-specific DNA bindin...,g9718,Hox2-A
15,pseudochrom_56,PacBio,gene,2058396,2065953,.,-,.,ID=PB.8616;function=homeobox protein;gene=Hox3...,PB.8616,Hox3-A
56,pseudochrom_56,AUGUSTUS,gene,2195412,2206712,.,-,.,ID=g9720;function=sequence-specific DNA bindin...,g9720,Hox4-A
62,pseudochrom_56,AUGUSTUS,gene,2351936,2354374,.,-,.,ID=g9721;function=sequence-specific DNA bindin...,g9721,Hox5-A
68,pseudochrom_56,PacBio,gene,2373415,2375678,.,-,.,ID=PB.8617;function=sequence-specific DNA bind...,PB.8617,Hox6-A
79,pseudochrom_56,AUGUSTUS,gene,2565196,2594468,.,-,.,ID=g9723;function=sequence-specific DNA bindin...,g9723,Hox7-A
85,pseudochrom_56,AUGUSTUS,gene,2916314,2926445,.,-,.,ID=g9724;function=sequence-specific DNA bindin...,g9724,Hox8-A
91,pseudochrom_56,AUGUSTUS,gene,2986021,2996225,.,-,.,ID=g9725;function=sequence-specific DNA bindin...,g9725,Hox10-A


This table basically already contains all the information we need: 
* the name of the chromosome
* the location of each gene
* the strand of each gene
* the directionality of each gene (given by relative start/end positions)
* the name of each gene (hidden in the attributes)

The only thing that's left is to extract this information in the way that is needed for the plotter:

In [None]:
hox = slim[["seqid", "gene_name", "gene_id", "start", "end"]].reset_index(drop=True)
hox

Unnamed: 0,seqid,gene_name,gene_id,start,end
0,pseudochrom_56,Hox1-A,PB.8615,1927066,1936157
1,pseudochrom_56,Hox2-A,g9718,1998922,2024148
2,pseudochrom_56,Hox3-A,PB.8616,2058396,2065953
3,pseudochrom_56,Hox4-A,g9720,2195412,2206712
4,pseudochrom_56,Hox5-A,g9721,2351936,2354374
5,pseudochrom_56,Hox6-A,PB.8617,2373415,2375678
6,pseudochrom_56,Hox7-A,g9723,2565196,2594468
7,pseudochrom_56,Hox8-A,g9724,2916314,2926445
8,pseudochrom_56,Hox10-A,g9725,2986021,2996225


In [None]:
# | export


def decorate(
    gff: pd.DataFrame,  # a GFF file in Pandas dataframe form
    attributes: dict = {
        "gene_id": "ID",
        "gene_name": "gene",
    },  # a dictionary of tags to extract from the attributes column. For each key/value pair, the value under the `key` tag will be saved in a new column named `value`.
):
    "A function that"
    for attr_add, attr_filter in attributes.items():
        gff[attr_add] = gff["attributes"].apply(
            lambda x: gff_attribute_selector(x, select=attr_filter)
        )


# | export


def filter(
    gff,
    filter_by_type=True,
    filter_type="gene",
    filter_by_field=True,
    field="gene_id",
    field_values=None,
):
    if filter_type and filter_type not in gff[filter_type]:
        raise ValueError(
            f'The value {filter_type} is not found in the "type" column of the input table.'
        )

    if filter_by_field:
        if field not in gff.columns:
            raise ValueError(
                f'The value {filter_type} is not found in the "type" column of the input table.'
            )

    keep = np.array([True] * gff.shape[0])
    if genes is not None:
        is_genes = gff["gene_id"].isin(genes)
        keep = keep & is_genes

    if is_type:
        is_gene = gff["type"] == "gene"

    slim = gff[is_gene & is_hox]
    return slim

In [None]:
# | export


def syntenic_block_borders(
    gff: pd.DataFrame,  # a GFF in Pandas dataframe form. Only includes the genes of the syntenic block in question.
    flank_length: int = None,  # the amount of space to be granted on both sides of the syntenic region, in basepairs. If unspecified, it will be set to 5% of the syntenic block length.
    start: str = "start",  # the GFF column with the start position of the gene ("start").
    end: str = "end",  # the GFF column with the end position of the gene ("end").
) -> (int, int):
    "A function to calculate the boundaries of a syntenic block. It automatically pads the boundaries by an additional 5% of total length on both ends."
    possible = [gff[start].min(), gff[start].max(), gff[end].min(), gff[end].max()]
    block_start = min(possible)
    block_end = max(possible)
    if flank_length is None:
        flank_length = (block_end - block_start) // 100 * 5
    if not flank_length.is_integer():
        raise ValueError(
            f'The parameter `flank_length` has to be an integer. You supplied the value "{flank_length}" which is {type(flank_length)}.'
        )
    return max(block_start - flank_length, 0), block_end + flank_length

In [None]:
test_fail(
    syntenic_block_borders,
    contains="The parameter `flank_length` has to be an integer",
    args=(gff,),
    kwargs=dict(flank_length=5.2),
)

The process of reading in a GFF3 file can be expedited with the 

In [None]:
# | export


def read_aln(
    m8: str,  # the path to the MMseqs2 alignment table file
    id_sep: (
        str | None
    ) = None,  # (optional) a character that separates the species ID from the gene ID
    **kwargs,  # various arguments to be passed to the pd.read_csv() function
) -> pd.DataFrame:  # the tabulated form of the alignment results.
    "Reads"
    hits = pd.read_csv(m8, sep="\t", header=None)
    m8_columns = [
        "query",
        "target",
        "seq_id",
        "ali_len",
        "no_mism",
        "no_go",
        "q_start",
        "q_end",
        "t_start",
        "t_end",
        "eval",
        "bit",
    ]
    hits.columns = m8_columns
    # trim the query to just the ID
    if id_sep is not None:
        hits["query"] = hits["query"].str.split(id_sep).str[1].str.join(id_sep)
    return hits

In [None]:
# | export


def estimate_plot_size(
    gff,
    width_factor: int = 3,
    height: int = 2,
):
    x = len(gff) * width_factor
    return (x, height)

In [None]:
assert estimate_plot_size(gff[gff["type"] == "gene"]) == (39, 2)

In [None]:
# | export
def insert_gap(
    gff: pd.DataFrame,
    locus1,
    locus2,
    identifier,
    purge_columns=None,
    no_gaps=1,
) -> pd.DataFrame:
    "This function inserts a number of dummy entries between two loci (lines) in the GFF DataFrame."
    loc1_index = gff[gff[identifier] == locus1].index[0]
    loc2_index = gff[gff[identifier] == locus2].index[0]
    if loc2_index - loc1_index != 1:
        raise ValueError(
            f"The two loci are not consecutive; their indices are {locus1}: {loc1_index} and {locus2}: {loc2_index}, respectively. Please refer to your input dataframe."
        )

    offset_step = 1 / (no_gaps + 1)

    for i in range(no_gaps):
        offset = (i + 1) * offset_step
        tmp = gff.loc[loc1_index].copy()
        tmp["start"] = tmp["end"] + 1 + (i * 2)
        tmp["end"] = tmp["start"] + 1
        tmp[identifier] = f"gap_{locus1}-{i}"
        for column in purge_columns:
            tmp[column] = ""
        gff.loc[loc1_index + offset] = tmp

    gff = gff.sort_index().reset_index(drop=True)
    return gff

In [None]:
test_fail(
    insert_gap,
    contains="The two loci are not consecutive;",
    args=(hox, "PB.8615", "g9720", "gene_id"),
)

In [None]:
hox = insert_gap(
    hox, "PB.8615", "g9718", "gene_id", no_gaps=4, purge_columns=["gene_name"]
)

In [None]:
hox

Unnamed: 0,seqid,gene_name,gene_id,start,end
0,pseudochrom_56,Hox1-A,PB.8615,1927066,1936157
1,pseudochrom_56,,gap_PB.8615-0,1936158,1936159
2,pseudochrom_56,,gap_PB.8615-1,1936160,1936161
3,pseudochrom_56,,gap_PB.8615-2,1936162,1936163
4,pseudochrom_56,,gap_PB.8615-3,1936164,1936165
5,pseudochrom_56,Hox2-A,g9718,1998922,2024148
6,pseudochrom_56,Hox3-A,PB.8616,2058396,2065953
7,pseudochrom_56,Hox4-A,g9720,2195412,2206712
8,pseudochrom_56,Hox5-A,g9721,2351936,2354374
9,pseudochrom_56,Hox6-A,PB.8617,2373415,2375678


In [None]:
# | hide
import nbdev

nbdev.nbdev_export()