In [15]:
import gff3_parser
import polars as pl

from Bio.Seq import Seq

orfs = pl.from_pandas(gff3_parser.parse_gff3("input/mapping_orf_Scer_SGD_noMT.gff", parse_attributes=True))

GFF = gff3_parser.parse_gff3("input/Scer.gff", parse_attributes=True)

FASTA = list(SeqIO.parse("input/Scer.fna", "fasta"))


COLUMNS = [["seq_id", "start", "end", "strand", "phase", "attributes"]]

 Input genomic fasta file: Scer_SGD.fna
 Input gff file: Scer_SGD.gff
Building structured data...


100%|██████████| 351379/351379 [00:01<00:00, 279792.74it/s]


Adding Supplemental Attribute table...
Finding unique attribute keys...


100%|██████████| 351377/351377 [00:01<00:00, 323240.33it/s]


Making attribute table...


100%|██████████| 351377/351377 [00:03<00:00, 114835.99it/s]


date Tue Jan 13 13:06:13 2015
 Created by Saccharomyces Genome Database (http://www.yeastgenome.org/)
 Weekly updates of this file are available for download from:
 http://downloads.yeastgenome.org/curation/chromosomal_feature/saccharomyces_cerevisiae.gff
Building structured data...


100%|██████████| 23076/23076 [00:00<00:00, 281459.58it/s]


Adding Supplemental Attribute table...
Finding unique attribute keys...


100%|██████████| 23058/23058 [00:00<00:00, 278089.40it/s]


Making attribute table...


100%|██████████| 23058/23058 [00:00<00:00, 93561.85it/s]


In [205]:
from collections import OrderedDict as OD
import polars as pl
from Bio.SeqIO import parse
import re

GFF_POLARS = pl.from_pandas(GFF)

ORF_DF_COLUMNS = ['Seqid',
 'Source',
 'Type',
 'Start',
 'End',
 'Score',
 'Strand',
 'Phase',
 'Status',
 'color',
 'Parent',
 'ID',
 'Ovp_with',
 'Ovp_gene']


class GenomicFeature:

    def __init__(self, ID : str, start : int, end : int):
        self.ID = ID
        self.start = start
        self.end = end
        self.counter = 0


class Gene(GenomicFeature):

    counter = 0

    def __init__(self, ID, start, end, chromosome : str,  multi : bool, sense : str, frame : int):
        super().__init__(ID, start, end)
        self.chromosome = chromosome
        self.frame = frame
        self.sense = sense
        self.multi = multi
        self.aORFs = OD()
        self._exons = OD()
        Gene.counter += 1
        

    @property
    def exons(self):
        return self._exons
    
    @property
    def exons_list(self):
        return list(self._exons.values())

    @exons.setter
    def exons(self, value):
    
        self._exons = value

    def add_exon(self, key, value):
        
        if key in self._exons:
            raise KeyError(f'exon {key} already exists')

        elif type(value) != Exon:

            raise TypeError('exon must be an Exon')
        
        else:
            self._exons[key] = value

            
    def add_orf(self, key, value):

        if type(value) != Orf:
            raise TypeError('value must be Orf')

        elif key in self.aORFs:
            raise KeyError(f'orf {key} already exists')
        else:
            self.aORFs[key] = value

    
    def sort(self):

        if self.sense == "+":

            if int(self.start) > int(self.end) : self.start, self.end = self.end, self.start
            self._exons = OD(sorted(self._exons.items(), key=lambda x: x[1].start))

        elif self.sense == "-":

            if int(self.end) > int(self.start) : self.start, self.end = self.end, self.start
            self._exons = OD(sorted(self._exons.items(), key=lambda x: x[1].start, reverse=True))


class Exon(GenomicFeature):

    def __init__(self, ID, start, end, gene : Gene, abs_frame : int):
        super().__init__(ID, start, end )
        self.abs_frame = abs_frame
        self.gene = gene
 
class Orf(GenomicFeature):

    def __init__(self, ID, start, end, gene, ribospike : int):
        super().__init__(ID, start, end)
        self.gene = gene
        self.ribospike = ribospike
        self._ribostart = int(self.ribospike + self.start)
        self.ribostartLocalisation = None
        
    def locRiboStart(self):

        if self.gene.sense == "+":
            for exon in self.gene.exons_list:
                if exon.start <= self._ribostart <= exon.end:
                    self.ribostartLocalisation = "exon"
                    return
                
            if self.gene.start <= self._ribostart < self.gene.exons_list[0].start:
                self.ribostartLocalisation = "5UTR"
                return
            
            elif self.gene.exons_list[-1].end < self._ribostart <= self.gene.end:
                self.ribostartLocalisation = "3UTR"
                return

            elif self._ribostart < self.gene.start:
                self.ribostartLocalisation = "upstream"
                return
            
            elif self._ribostart > self.gene.end:
                self.ribostartLocalisation = "downstream"
                return
            
            else:
                self.ribostartLocalisation = "intron"
                return
            
        elif self.gene.sense == "-":
            for exon in self.gene.exons_list:
                if exon.end <= self._ribostart <= exon.start:
                    self.ribostartLocalisation = "exon"
                    return
                
            if self.gene.exons_list[0].start < self._ribostart <= self.gene.start:
                self.ribostartLocalisation = "5UTR"
                return
            
            elif self.gene.end <= self._ribostart < self.gene.exons_list[-1].end:
                self.ribostartLocalisation = "3UTR"
                return

            elif self._ribostart > self.gene.start:
                self.ribostartLocalisation = "upstream"
                return
            
            elif self._ribostart < self.gene.end:
                self.ribostartLocalisation = "downstream"
                return
            
            else:
                self.ribostartLocalisation = "intron"
                return
            
    

    
class GeneStructureError(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(self.message)


def get_good_column_names(columns):

    import re
    from itertools import chain

    parent_col = [col for col in columns if re.match(r"[Pp]arent", col)]
    name_col = [col for col in columns if re.match(r"[Nn]ame", col)]

    if len(parent_col) == 1 and len(name_col) == 1:

        return str(parent_col[0]), str(name_col[0])
    
    else:

        raise AttributeError("Problem with GFF columns : Parent or Name not found. See get_good_column_names()")
        
def return_gene_infos(gene_infos) -> dict:

    chromosome = gene_infos["Seqid"].unique().to_list()[0]
    multi = gene_infos["Type"].to_list().count("CDS") > 1

    gene_infos = gene_infos.filter(pl.col("Type") == "gene")

    start = gene_infos["Start"].to_list()[0]
    end = gene_infos["End"].to_list()[0]
    sense = gene_infos["Strand"].to_list()[0]
    
    return {

        "chromosome" : chromosome,
        "start" : int(start),
        "end" : int(end),
        "sense" : sense,
        "multi" : multi
    } 

def init_gene_object(gene_id, gff_dataframe):


    """
    
    On veut pour la gene_id donnée initiliaser un objet Gene avec ses attributs
    tirés du gff_dataframe.
    
    """

    pattern = fr".*{gene_id}.*"

    parent, name = get_good_column_names(gff_dataframe.columns)

    # Polars does not support regex filtering : pandas is used instead
    gene_rows = pl.from_pandas(gff_dataframe[
        gff_dataframe[parent].str.contains(pattern, regex=True, na=False) |
        gff_dataframe[name].str.contains(pattern, regex=True, na=False)
    ]) # Get all rows related to the gene being initialized


    
    gene_infos = return_gene_infos(gene_rows)


    gene = Gene(

        ID = gene_id,
        chromosome = gene_infos["chromosome"],
        start = gene_infos["start"],
        end = gene_infos["end"],
        sense = gene_infos["sense"],
        multi = gene_infos["multi"],
        frame = gene_infos["start"] % 3
        
    )
    

    cds_counter = 1
    for exon in gene_rows.filter(pl.col("Type") == "CDS").iter_rows(named = True):

        key = f'{exon["ID"]}-{cds_counter}' if exon["ID"] else f'{exon["Name"]}-{cds_counter}'
        gene.add_exon(

            key = key,
            value = Exon(
                        ID = key,
                        gene = gene,
                        start = int(exon["Start"]),
                        end = int(exon["End"]),
                        abs_frame = int(exon["Start"]) % 3)
        )

        cds_counter = cds_counter + 1

    gene.sort()

    return gene

def check_double_overlap(row : tuple):

    orf = dict(zip(ORF_DF_COLUMNS, row))
    
    overlaps = [match for item in orf["Ovp_with"].split("|") for match in re.findall(r"\b([\w-]+)_mRNA\b", item)]

    if len(overlaps) == 0:

        overlaps = [match for item in orf["Ovp_with"].split("|") for match in re.findall(r"\b([\w-]+)_CDS\b", item)]
    
    buffer = []

    if len(overlaps) != 1:

        for overlap in overlaps:

            
            if GFF_POLARS.filter(
                
                (pl.col("ID") == overlap)
                )["Strand"].unique().to_list()[0] == orf["Strand"]:

                buffer.append(overlap)
                

        if len(buffer) == 1: # Several genes are overlapped by the ORF, but only one is on the same strand

            orf["Ovp_gene"] = buffer[0]
            return tuple(orf.values())
        
    
        elif len(buffer) == 0: # No gene found on the same strand as the ORF

            orf["Ovp_gene"] = "NA"
            return tuple(orf.values())
        
        else: # Several genes found on the same strand as the ORF

            orf["Ovp_gene"] = "Two_or_more_genes"
            return tuple(orf.values())


    else: # If there is only one gene found in the overlapping information given by ORFMine ID

        orf["Ovp_gene"] = overlaps[0]
        return tuple(orf.values())

def multifasta_to_dict(path, genome = False):


    """
    Reads a FASTA file and returns a dictionary where each record in the file
    is a key-value pair with the record identifier as the key and the record sequence (in uppercase) as the value.

    Args:
    path (str): The path to the FASTA file to be read.

    Returns:
    dict: A dictionary where the keys are the record identifiers and the values
          are the corresponding record sequences as Seq objects in uppercase letters.
    """
    
    records = parse(path, "fasta")

    if genome:
        dico = {}
        for record in records:
            dico[record.id] = {}
            dico[record.id]["seq"] = Seq(str(record.seq).upper())
            dico[record.id]["len"] = len(record.seq)
            
        return dico
    
    else:

        return {record.id: Seq(str(record.seq).upper()) for record in records}

In [72]:
pattern = r'\b([\w-]+)_CDS\b'

same_CDS_dframe = (
    orfs
    .filter(pl.col("Type") == "nc_ovp_same-CDS")
    .with_columns([
        pl.col("ID").apply(lambda value: value.split("_")[3]).alias("Phase"), # Extract absolute strand phase from the ID generated by ORFMine
        pl.lit("NA").alias("Ovp_gene") # Create a column for the gene ID that will be filled by check_double_overlap()
    ])
)


same_CDS_dframe = same_CDS_dframe.select(ORF_DF_COLUMNS).apply(check_double_overlap) # Use .apply() method to leverage parallelization

same_CDS_dframe.columns = ORF_DF_COLUMNS

grouped = same_CDS_dframe.groupby("Ovp_gene")





In [207]:
gene_list = list()
FASTA_DICT = multifasta_to_dict("input/Scer.fna", genome = True)

for overlapped_feature_name , data in grouped: # This loop returns the name by which data is groupped, and the data itself as a polars dataframe

    if "gene" in GFF_POLARS.filter(pl.col("ID") == overlapped_feature_name)["Type"].unique().to_list(): # If overlapped feature is not a gene ( = transposable for example ) it's not stored

        gene = init_gene_object(overlapped_feature_name, GFF)

        for row in data.iter_rows(named = True):

            ribospike = 0 # For now, don't know how data will look like

            orf = Orf(
                ID = row["ID"],
                start = int(row["Start"]),
                end = int(row["End"]),
                gene = gene,
                ribospike = ribospike
                )

            orf.locRiboStart()

            gene.add_orf(key = orf.ID, 
                         value = orf)



        gene_list.append(gene)



5UTR
5UTR
5UTR
5UTR
downstream
5UTR
5UTR
5UTR
5UTR
upstream
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
downstream
5UTR
5UTR
5UTR
5UTR
5UTR
downstream
5UTR
5UTR
5UTR
exon
exon
exon
exon
exon
exon
upstream
exon
exon
exon
upstream
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
upstream
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
upstream
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
upstream
exon
exon
exon
exon
exon
exon
exon
upstream
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
exon
upstream
exon
exon
exon
downstream
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
downstream
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
exon
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
downstream
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
downstream
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
5UTR
exon
exon
upstream
exon
exon
do

KeyboardInterrupt: 

<class 'int'> <class 'int'>
<class 'int'> <class 'int'>
<class 'int'> <class 'int'>
