In [None]:
import polars as pl

In [9]:

from typing import Literal

class FileTypeCouldNotBeInferredError(Exception):
    def __init__(self, message="The file type of the supposed gtf or gff3 file could not be inferred."):
        self.message = message
        super().__init__(self.message)

def is_XorY(name:str)->bool:
    """Checks whether a chromosome is X or Y chromosome by its name."""
    return 'x' in name.lower() or 'y' in name.lower()

def infer_file_type(fname:str)-> Literal['gtf','gff3']:
    '''Infer the file type of the gtf or gff file.'''
    file_type = None
    with open(fname,'r') as file:
        for line in file:
            if line.startswith('#'): # ignore headers
                continue
            else:
                attributes = line.split('\t')[-1]
                if "=" in attributes:
                    file_type = 'gff3'
                elif '"' in attributes:
                    file_type = 'gtf'
                else:
                    raise FileTypeCouldNotBeInferredError("The file type of the supposed gtf or gff3 file could not be inferred.")
                
    return file_type


def _extract_line_metadata_gtf(line:str) -> dict:

    # get chromosome name
    chr_name = line.split('\t')[0]
    # get metadata
    metadata = line.split('\t')[-1].split('; ')
    headers = [m.split()[0] for m in metadata]
    values =  [m.split()[1].replace('"','') for m in metadata]
    info = dict(zip(headers,values))
    # add chr name to dict
    info['chr'] = chr_name

    if 'gene_id' not in info:
        info['gene_id'] = ''
    if 'gene_name' not in info:
        info['gene_name'] = ''


    return info


def _extract_line_metadata_gff3(line:str) -> dict:

    # get chromosome name
    chr_name = line.split('\t')[0]
    # get metadata
    metadata = line.split('\t')[-1].split(';') # no spaces for gff3
    headers = [m.split('=')[0] for m in metadata]
    values =  [m.split('=')[1] for m in metadata]
    info = dict(zip(headers,values))
    # add chr name to dict
    info['chr'] = chr_name

    if 'ID' not in info:
        info['ID'] = ''
    if 'Name' not in info:
        info['Name'] = ''


    # rename some keys
    info['gene_id'] = info.pop('ID')
    info['gene_name'] = info.pop('Name')



    return info


def extract_line_metadata(line:str, file_type:Literal['gff3','gtf']) -> dict:

    if file_type == 'gtf':
        info = _extract_line_metadata_gtf(line=line)
    elif file_type == 'gff3':
        info = _extract_line_metadata_gff3(line=line)
    else:
        raise ValueError(f'File type should be either `gff3` or `gtf` not `{file_type}`')

    return info

def parse_gxf(gxf_file:str):
    """Parse gxf file into  tsv file."""
    extension = gxf_file.split(".")[-1]
    tsv_file = gxf_file.replace(extension,'tsv')
    file_type = infer_file_type(fname=gxf_file)
    with open(tsv_file,'w') as tsv:
        tsv.write("chr\tgene_id\tgene_name\tline\n") # header line
        with open(gxf_file,'r') as file:
            for i,line in enumerate(file):
                if line.startswith('#'): # ignore headers
                    continue
                elif line.startswith('\n'): # last line
                    pass
                else:
                    chr_name = line.split('\t')[0]
                    if True:
                        info = extract_line_metadata(line=line,file_type=file_type)
                        to_write = f"{info['chr']}\t{info['gene_id']}\t{info['gene_name']}\t{i}\n"
                        tsv.write(to_write)
    
    return

In [13]:
parse_gxf('data/Homo_sapiens.GRCh38.113.gtf')

In [15]:
df = pl.read_csv('data/Homo_sapiens.GRCh38.113.tsv',separator='\t',schema_overrides={'chr': pl.Utf8})

In [16]:
df

chr,gene_id,gene_name,line
str,str,str,i64
"""1""","""ENSG00000142611""","""PRDM16""",5
"""1""","""ENSG00000142611""","""PRDM16""",6
"""1""","""ENSG00000142611""","""PRDM16""",7
"""1""","""ENSG00000142611""","""PRDM16""",8
"""1""","""ENSG00000142611""","""PRDM16""",9
…,…,…,…
"""KI270718.1""","""ENSG00000309831""",,4114450
"""KI270755.1""","""ENSG00000309258""",,4114451
"""KI270755.1""","""ENSG00000309258""",,4114452
"""KI270755.1""","""ENSG00000309258""",,4114453


In [17]:
df.unique(['chr','gene_id'])

chr,gene_id,gene_name,line
str,str,str,i64
"""14""","""ENSG00000139908""","""TSSK4""",2911899
"""6""","""ENSG00000271793""",,1399640
"""19""","""ENSG00000236483""","""MTND2P40""",3817397
"""19""","""ENSG00000290719""",,3929554
"""9""","""ENSG00000201451""","""Y_RNA""",2066582
…,…,…,…
"""21""","""ENSG00000300030""",,4100022
"""2""","""ENSG00000233426""","""EIF3FP3""",533863
"""16""","""ENSG00000270313""","""COX6CP16""",3252508
"""14""","""ENSG00000202337""","""RNU6-8""",2897367


In [21]:
duplicate_gene_ids = (
    df.group_by("gene_name")
      .agg(pl.col("chr").n_unique().alias("unique_chr_count"))
      .filter(pl.col("unique_chr_count") > 1)
)

In [22]:
duplicate_gene_ids

gene_name,unique_chr_count
str,u32
"""LINC00102""",2
"""PPP2R3B""",2
"""U8""",12
"""SNORA62""",7
"""DDX11L16""",5
…,…
"""LSP1P5""",2
"""P2RY8""",2
"""7SK""",5
"""ASMTL-AS1""",2


In [27]:
multi_chr_genes = (
    df.group_by("gene_name")
      .agg([
          pl.col("chr").n_unique().alias("unique_chr_count"),
          pl.col("chr").unique().alias("chr_list")
      ])
      .filter(pl.col("unique_chr_count") > 1)
)

In [28]:
multi_chr_genes

gene_name,unique_chr_count,chr_list
str,u32,list[str]
"""SNORA72""",4,"[""3"", ""8"", … ""1""]"
"""RNA5SP498""",2,"[""X"", ""Y""]"
"""ELOCP24""",2,"[""Y"", ""X""]"
"""5_8S_rRNA""",7,"[""21"", ""KI270442.1"", … ""14""]"
"""FABP5P13""",2,"[""Y"", ""X""]"
…,…,…
"""LINC00106""",2,"[""Y"", ""X""]"
"""DPH3P2""",2,"[""Y"", ""X""]"
"""SHOX""",2,"[""X"", ""Y""]"
"""SNORA73""",2,"[""18"", ""6""]"


In [None]:
df.group_by("gene_name").agg()

<polars.dataframe.group_by.GroupBy at 0x22140d20920>