In [154]:
import polars as pl
import os
import re
from rich import print
from rich.console import Console
from pathlib import Path
import csv
from itertools import islice

console = Console()


In [155]:
def get_df(file_path):
  '''Carga un archivo, detecta su separador y devuelve un dataframe.

  Args:
      file_path(str): Ruta del archivo.

  Returns:
      pl.DataFrame: DataFrame con los datos.
  '''

  try:

    file_path = Path(file_path).resolve()
    print(f"[yellow]:warning:[/] Se utilizará el siguiente archivo [bold green]{file_path.name}[/]")

    with open(file_path, 'r', newline='', encoding='utf-8') as f:

        sample = ''.join(islice(f, 5)) # Lee (solo) las 5 primeras lineas y las concatena en una cadena ('' hace que no se meta nada en el medio) (Problema csvfile.readlines(): leería todo el archivo y despues se cogerian las 5 primeras)

        delimiter = csv.Sniffer().sniff(sample).delimiter # La clase sniffer detecta el formato de un csv. Sniff es un metodo que devuelve el objeto Dialect. delimiter es un atributo de dialect que contiene el separador
        print(f"[yellow]:warning:[/] Se ha detectado el separador: [bold yellow]{repr(delimiter)}[/]")

        f.seek(0) # Para devolver el puntero al principio del archivo

        df = pl.read_csv(f, separator=delimiter, infer_schema_length=10000) # Polars lee todo el archivo y devuelve el dataframe, aumentamos el infer_schema_length para que detecte bien los tipos de dato de la columna prob

    #with console.pager(styles=True): # Para que se abra la tabla como un scrolleable (esto lo puse por probar a usarlo porque polars te limita las lineas)
        #console.print(f"[bold] :bar_chart: Dataframe: [/] \n {df}")
    return df

  except FileNotFoundError:
      console.print(f":x: Archivo no encontrado en la ruta: {file_path}")

In [156]:
df_annotations=get_df(r"merge_tables\BeiRNA\BeiRNA_JoinedAnnotations.tsv")
df_counts=get_df(r"merge_tables\BeiRNA\countsfilteredBeiRNA_BagBrown_vs_countsfilteredBeiRNA_SeaWater.txt")

print(df_annotations)
print(df_counts)



## Validar la tabla de annotations 
 En este caso tengo que ver si la tabla tiene 15 o 35 columnas, si tiene solo 15 devuelve un df con estas tres X.ID", "GENENAME", "DESCRIPTION". Si la tabla tiene 35 se quedas solo con estas 12 ("ORF.ID", "Gene.name", "Gene.length", "ORF.length", "ORF.start",
  "ORF.end", "Strand", "Protein.sequence", "Pfam", "InterPro",
  "GENENAME", "DESCRIPTION").

Si tiene cualquier otro número de columnas lanzar un error e indicar que hay que usar una con 15 o 35

In [157]:
def validate_annotation_table(df, df_name = "df_annotations"):
    if df.width == 15:
        header = ["X ID", "GENENAME", "DESCRIPTION"]
        missing_columns = []

        console.print(f"El DataFrame {"df"} contiene [bold green]15[/] columnas.")

        for column in header:
            if column not in df.columns:
                missing_columns.append(column)    

        if len(missing_columns)!= 0:
            console.print(f"[bold red]:x:[/] La(s) columna(s) [italic purple]{missing_columns}[/] no está(n) presente(s) en el DataFrame {"df"}.")

        else:        
            console.print(f"[bold yellow]:warning: Se seleccionarán las siguientes columnas del DataFrame: [italic green]{header}[/]")
            return df.select([pl.col("X ID").alias("ID"),
                               "GENENAME",
                                 "DESCRIPTION"])

    elif df.width == 35:
        header = ["ORF ID", "Gene name", "Gene length", "ORF length", "ORF start", "ORF end", 
                  "Strand", "Protein sequence", "Pfam", "InterPro", "GENENAME", "DESCRIPTION"]
        missing_columns = []

        console.print(f"El DataFrame {df_name}contiene [bold green]35[/] columnas.")

        for column in header:
            if column not in df.columns:
                missing_columns.append(column)    

        if len(missing_columns)!= 0:
            console.print(f"[bold red]:x:[/] La(s) columna(s) [italic purple]{missing_columns}[/] no está(n) presente(s) en el DataFrame {"df"}.")

        else:        
            console.print(f"[bold yellow]:warning: Se seleccionarán las siguientes columnas del DataFrame: [italic green]{header}[/]")
            return df.select([pl.col("ORF ID").alias("ID"), "Gene name", "Gene length", "ORF length", "ORF start", "ORF end", 
                  "Strand", "Protein sequence", "Pfam", "InterPro", "GENENAME", "DESCRIPTION"])
    
    else: 
        console.print(f"[bold red]:x:[/] DataFrame no válido. Debe contener 15 o 35 columnas.")

In [158]:
df_annotations = validate_annotation_table(df_annotations)
print(df_annotations.columns)
print(df_annotations)

In [None]:
def validate_counts_table(df, df_name = "df_counts"):
    if df.width == 6:

        if df.columns[0] == '':
            df = df.rename({ '': "old_ID" }) #el fallo puede ser por esto?

        console.print(f"[bold yellow]:warning:[/] Se han cambiado los nombres de las columnas a: [italic green] ID, countsfiltered_ControlDMSO_mean, countsfiltered_DEHP_mean, theta, prob y log2FC[/].")
    
        df = df.select([
            pl.col(df.columns[0]).alias("old_ID"),
            pl.col("old_ID").str.slice(offset = pl.col("old_ID").str.find("~~") + 2).alias("new_ID"),
            pl.col(df.columns[1]).alias("countsfiltered_ControlDMSO_mean"),
            pl.col(df.columns[2]).alias("countsfiltered_DEHP_mean"),
            pl.col(df.columns[3]).alias("theta"),
            pl.col(df.columns[4]).alias("prob"),
            pl.col(df.columns[5]).alias("log2FC"),
            ])
        
        return df
    
    else:
        console.print(f"[bold red]:x:[/] DataFrame no válido. Debe contener 6 columnas.")
    

In [160]:
print(df_counts.columns)
df_counts = validate_counts_table(df_counts)
print(df_counts)

In [161]:
df_counts = df_counts.select(
        pl.col("old_ID")
        .str.slice(offset = pl.col("old_ID").str.find("~~") + 2)
        .alias("new_ID"),
        pl.all()
        )

print(df_counts)

#con esto consigo que me coja solo lo que va despues de los "~~" y 
#se crea una tabla nueva con esa y todas las columnas anteriores, se puede hacer con withcolumns pero aparece new_ID al final en vez de al principio


DuplicateError: the name 'new_ID' is duplicate

It's possible that multiple expressions are returning the same default column name. If this is the case, try renaming the columns with `.alias("new_name")` to avoid duplicate column names.

In [None]:
def merge_tables (annotation_path, count_path, output_path, output_name):

    df_annotations = get_df(r"annotation_path")
    df_annotations = validate_annotation_table(df_annotations)

    df_counts = get_df(r"count_path")
    df_counts = validate_counts_table(df_counts)

    if df_annotations.width == 3:
        

    
    elif df_annotations == 12: 

    
    

IndentationError: expected an indented block after 'if' statement on line 9 (396386831.py, line 13)