# utils

> This contains useful functions

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
#| default_exp utils

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
from bokeh.io import output_notebook #|hide_line
output_notebook(hide_banner=True) #|hide_line

In [None]:
#| export
import numpy as np
import pandas as pd
import io

from collections import defaultdict
import warnings
import gzip
import urllib.request
import os
import re
from platform import uname


In [None]:
#| export
def is_gzipped_file(file_path):
    try:
        with gzip.open(file_path, 'rb') as f:
            # Attempt to read a small chunk from the file
            f.read(1)
        return True
    except IOError:
        return False
    
def download_file(url, save_path):
    if os.path.exists(save_path):
        print(f"File already exists: {save_path}")
    else:
        urllib.request.urlretrieve(url, save_path)
        print(f"File downloaded and saved: {save_path}")

In [None]:
# Example usage
file_url = 'https://ftp.ncbi.nlm.nih.gov/refseq/H_sapiens/annotation/GRCh38_latest/refseq_identifiers/GRCh38_latest_genomic.gff.gz'
human_genome_gff = 'GRCh38_latest_genomic.gff.gz'

download_file(file_url, human_genome_gff)
is_gzipped_file(human_genome_gff)

File already exists: GRCh38_latest_genomic.gff.gz


True

In [None]:
#| export
def extract_attribute(input_str:str, #attribute string to parse
                      attr_name:str, #name of the attribute to extract
                     ) -> str:
    """Extracts the attribute called attr_name from the GFF attributes string"""
    
    pattern = f"[{attr_name[0].lower()}{attr_name[0].upper()}]{attr_name[1:]}=(?P<{attr_name}>[^;]+)"
    match = re.search(pattern, input_str)
    if match:
        return match.groupdict()[attr_name]
    else:
        return None

In [None]:
input_str = 'ID=cds-ATV02827.1;Parent=gene-SaO11_00001;Dbxref=NCBI_GP:ATV02827.1;Name=ATV02827.1;gbkey=CDS;gene=dnaA;locus_tag=SaO11_00001;product=Chromosomal replication initiator protein DnaA;protein_id=ATV02827.1;transl_table=11'
extract_attribute(input_str,"gene")

'dnaA'

In [None]:
#| hide
input_str = 'locus_tag=SaO11_00001;product=Chromosomal replication initiator protein DnaA;protein_id=ATV02827.1;transl_table=11'
assert extract_attribute(input_str,"gene") == None

In [None]:
#| export
def extract_all_attributes(input_str:str)->dict:
    """Extracts all attributes from the GFF attributes column"""
    
    pattern = "(?P<key>\w+[-\w]*)=(?P<value>[^;]+)"
    match = re.findall(pattern, input_str)
    d=defaultdict()
    d.update(match)
    return d

In [None]:
extract_all_attributes(input_str)

defaultdict(None,
            {'locus_tag': 'SaO11_00001',
             'product': 'Chromosomal replication initiator protein DnaA',
             'protein_id': 'ATV02827.1',
             'transl_table': '11'})

In [None]:
#| export
def attributes_to_columns(features: pd.DataFrame):
    attr_dicts=features.attributes.apply(extract_all_attributes)
    all_keys=list(set().union(*[d.keys() for d in attr_dicts]))
    
    attr_dict=dict([(k,[d.get(k,None) for d in attr_dicts]) for k in all_keys])
    features=features.copy()
    for k,v in attr_dict.items():
        features[k]=v
    
    features.fillna("")
    return features
    

In [None]:
#| export
def set_positions(annotation: pd.DataFrame, # an annotation DataFrame extracted from a gff file
                            ):
    """Sets left and right as the position of the feature on the sequence, left is always lower than right.
    start and end represent the begining and end of the feature where start can be greater than end depending on the feature strand.
    """
    annotation=annotation.copy()
    annotation.loc[:, "left"] = annotation[["start"]].values
    annotation.loc[:, "right"] = annotation[["end"]].values
    
    mask = annotation["strand"] == "+"
    annotation.loc[mask, "start"] = annotation.loc[mask, "left"].values
    annotation.loc[mask, "end"] = annotation.loc[mask, "right"].values
    
    mask = annotation["strand"] == "-"
    annotation.loc[mask, "start"] = annotation.loc[mask, "right"].values
    annotation.loc[mask, "end"] = annotation.loc[mask, "left"].values
    
    annotation["middle"] = (annotation.right + annotation.left) / 2
    
    return annotation

In [None]:
#| export
def default_open_gz(gff_path):
    if is_gzipped_file(gff_path):
        return gzip.open(gff_path,'rt')
    else:
        return open(gff_path,'r')
    

In [None]:
#| export
default_types=["CDS", "repeat_region", "ncRNA", "rRNA", "tRNA"]
default_attributes=["gene", "locus_tag", "product"]


def parse_gff(gff_path:str, # path to the gff file
              seq_id: str=None, # sequence id (first column of the gff)
              bounds: tuple=None, # (left limit, right limit)
              feature_types: list = None, # list of feature types to extract
             )->pd.DataFrame:
    cwd = os.getcwd()
    
    with default_open_gz(gff_path) as gff_file:
        # Create an in-memory file buffer using the io.StringIO class
        file_buffer = io.StringIO()
        default_seq_id=None
        buffer_empty=True
        for line in gff_file:
            if line[0]=="#":
                continue
            else:
                r=line.split('\t')
                if not seq_id and not default_seq_id:
                    default_seq_id=r[0]
                    seq_id=r[0]
                if r[0]==seq_id:
                    if feature_types==None or r[2] in feature_types:
                        if bounds==None or (int(r[3])<bounds[1] and int(r[4])>bounds[0]):
                            # Write each line to the file buffer
                            file_buffer.write(line)
                            buffer_empty=False
                        
                
        # Reset the file pointer to the beginning of the file buffer
        file_buffer.seek(0)
        if buffer_empty:
            warnings.warn("The annotation DataFrame is empty. Check that the feature_types and seq_id are correct.")
            df=pd.DataFrame(columns=["seq_id", "source","type","start","end","score","strand","phase","attributes"])
        else:
            df=pd.read_csv(file_buffer,sep="\t",header=None)
            df.columns=["seq_id", "source","type","start","end","score","strand","phase","attributes"]
            df=attributes_to_columns(df)
            df=set_positions(df)
     
        return df

In [None]:
df=parse_gff(human_genome_gff, 
             seq_id="NC_000001.11",
             bounds=(10000,50000))
df.head()

Unnamed: 0,seq_id,source,type,start,end,score,strand,phase,attributes,chromosome,...,gene_synonym,pct_identity_gapopen_only,matchable_bases,Note,Target,blast_aligner,for_remapping,left,right,middle
0,NC_000001.11,RefSeq,region,1,248956422,.,+,.,ID=NC_000001.11:1..248956422;Dbxref=taxon:9606...,1.0,...,,,,,,,,1,248956422,124478211.5
1,NC_000001.11,BestRefSeq,pseudogene,11874,14409,.,+,.,"ID=gene-DDX11L1;Dbxref=GeneID:100287102,HGNC:H...",,...,,,,,,,,11874,14409,13141.5
2,NC_000001.11,BestRefSeq,transcript,11874,14409,.,+,.,ID=rna-NR_046018.2;Parent=gene-DDX11L1;Dbxref=...,,...,,,,,,,,11874,14409,13141.5
3,NC_000001.11,BestRefSeq,exon,11874,12227,.,+,.,ID=exon-NR_046018.2-1;Parent=rna-NR_046018.2;D...,,...,,,,,,,,11874,12227,12050.5
4,NC_000001.11,BestRefSeq,exon,12613,12721,.,+,.,ID=exon-NR_046018.2-2;Parent=rna-NR_046018.2;D...,,...,,,,,,,,12613,12721,12667.0


In [None]:
#| export
def available_feature_types(gff_path):
    ftypes=set()
    with default_open_gz(gff_path) as handle:
        for line in handle:
            if line[0]!="#":
                r=line.split('\t')
                if len(r)==9:
                    ftypes.add(r[2])
    return ftypes

In [None]:
from genomenotebook.data import get_example_data_dir
import os

In [None]:
data_path = get_example_data_dir()
gff_path = os.path.join(data_path, "MG1655_U00096.gff3")
available_feature_types(gff_path)

{'CDS',
 'exon',
 'gene',
 'mobile_genetic_element',
 'ncRNA',
 'origin_of_replication',
 'pseudogene',
 'rRNA',
 'recombination_feature',
 'region',
 'repeat_region',
 'sequence_feature',
 'tRNA'}

In [None]:
#| export
def available_attributes(gff_path):
    features=parse_gff(gff_path)
    return features.columns

In [None]:
available_attributes(gff_path)

Index(['seq_id', 'source', 'type', 'start', 'end', 'score', 'strand', 'phase',
       'attributes', 'pseudo', 'orig_protein_id', 'Parent',
       'mobile_element_type', 'genome', 'strain', 'gbkey', 'Name',
       'Is_circular', 'part', 'orig_transcript_id', 'product', 'exception',
       'substrain', 'transl_table', 'gene_biotype', 'mol_type', 'gene',
       'locus_tag', 'protein_id', 'Dbxref', 'recombination_class', 'ID',
       'gene_synonym', 'Note', 'transl_except', 'rpt_type', 'left', 'right',
       'middle'],
      dtype='object')

In [None]:
#| hide
#Testing mistake in seq_id
features=parse_gff(gff_path, "U00097.3") #mistake in seq_id



In [None]:
#| export
from collections import defaultdict

In [None]:
#| export
def split_string(string, max_length=10):
    if len(string) <= max_length:
        return string
    else:
        split_index = max_length
        while split_index > 0 and string[split_index] != ' ':
            split_index -= 1
        if split_index == 0:
            split_index = max_length  # If no suitable breaking point found, split at max_length
        return string[:split_index] + '\n' + split_string(string[split_index:].lstrip(), max_length)



In [None]:
# Example usage
long_string = "This is a very long string that needs to be split into multiple lines because it exceeds 50 characters."

split_result = split_string(long_string, max_length=50)
print(split_result)


This is a very long string that needs to be split
into multiple lines because it exceeds 50
characters.


In [None]:
#| export
def in_wsl() -> bool:
    return 'microsoft-standard' in uname().release

In [None]:
in_wsl()

True

In [None]:
#| export
def add_extension(filename,extension="svg"):
    base_name, ext = os.path.splitext(filename)
    if ext.lower() != '.'+extension:
        filename += '.'+extension
    return filename

In [None]:
add_extension("test.svg"), add_extension("test",extension="png")

('test.svg', 'test.png')

In [None]:
#| hide
def sort_list_dict(d: dict, #a dictionnary for which all values are of type list
                   ref_list="xs", #key of the list to use as a reference for sorting
                   func=lambda x: x, #A custom function can be supplied to customize the sort order. Default is the identity function.
                  ):
    ks=list(d.keys())
    ref_list_ix=ks.index(ref_list)
    # Sort all the lists in the dictionary based on the values of the reference list
    sorted_lists = sorted(zip(*[d[k] for k in ks]), key= lambda x: func(x[ref_list_ix]))

    # Convert the sorted tuples back into separate lists
    unzipped_lists = zip(*sorted_lists)

    # Create a new dictionary with the same keys as the original dictionary, but with the sorted lists as values
    d = {k: list(t) for k, t in zip(ks, unzipped_lists)}
    return d


In [None]:
#| hide
d={'xs':[[2,5],[3,1]],'a':[1,3],'b':["c","d"]}
sort_list_dict(d, ref_list='xs', func= lambda x: x[1]) #sort according to the second element of each element of list 'xs'

{'xs': [[3, 1], [2, 5]], 'a': [3, 1], 'b': ['d', 'c']}

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()