In [None]:
# --- Import librerie ---

import csv
import pandas as pd
from tabulate import tabulate # type: ignore
import gffutils # type: ignore
import bisect
from pathlib import Path

In [None]:
# --- Lettura del file ---

project_dir = Path().resolve()

gtf_file = project_dir / "Annotazione_GTF" / "Homo_sapiens.GRCh38.113.gtf.gz"

df = pd.read_csv(gtf_file, sep='\t', comment='#', header=None, low_memory=False, compression='gzip')

df.columns = ["Seqname", "Source", "Feature", "Start", "End", "Score", "Strand", "Frame", "Attribute"]

In [None]:
# --- Selezione del cromosoma e creazione del dataframe ridotto ---

cromosomi_canonici = [str(i) for i in range(1, 23)] + ['X', 'Y', 'MT']

seqname = df['Seqname'].unique()
cromosomi = [c for c in seqname if c in cromosomi_canonici]

cromosoma = input("Inserisci il nome del cromosoma da analizzare: ")

while cromosoma not in cromosomi:
    print("Cromosoma non valido o non presente nel file. Riprova.")
    cromosoma = input("Inserisci un cromosoma valido da analizzare: ")

df_chr = df[df['Seqname'] == cromosoma].copy()

print(f"\nSelezionati {len(df_chr)} record per il cromosoma {cromosoma}")

In [None]:
# --- Salva un DataFrame come file GTF e crea un database gffutils associato ---

def save_gtf_and_create_db(df_chr, output_dir, gtf_name="filtered_chr.gtf", db_name="filtered_chr.db"):
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    gtf_path = output_dir / gtf_name
    db_path = output_dir / db_name

    df_chr.to_csv(gtf_path, sep='\t', index=False, header=False, quoting=csv.QUOTE_NONE)

    gffutils.create_db(
        str(gtf_path),
        dbfn=str(db_path),
        force=True,
        keep_order=True,
        merge_strategy='merge',
        sort_attribute_values=True,
        disable_infer_transcripts=True,
        disable_infer_genes=True
    )
    return gtf_path, db_path

output_dir = project_dir / "Annotazione_GTF"
gtf_path, db_path = save_gtf_and_create_db(df_chr, output_dir)

In [None]:
# --- Funzioni di supporto ---

# Ottengo i trascritti di riferimento per ogni gene. 
# Trascritto di riferimento --> maggiore intervallo genomico (end - start)
def get_reference_transcripts(db):
    ref_transcripts = []

    for gene in db.features_of_type('gene'):
        gene_id = gene.attributes.get('gene_id', [None])[0]

        transcripts = list(db.children(gene, featuretype='transcript'))

        if transcripts:
            max_length_transcript = max(transcripts, key=lambda x: x.end - x.start)
            transcript_id = max_length_transcript.attributes.get('transcript_id', [None])[0]

            ref_transcripts.append({
                "Gene_Id": gene_id,
                "Ref_Transcript_Id": transcript_id,
                "Start": max_length_transcript.start,
                "End": max_length_transcript.end,
                "Length": max_length_transcript.end - max_length_transcript.start + 1,
                "Seqname": max_length_transcript.seqid,
                "Feature": "transcript",
                "Strand": max_length_transcript.strand
            })

    return ref_transcripts

# Prelevo gli esoni di un trascritto. Considerato lo strand
def get_exons(db, transcript_id):
    exons = []
    for exon in db.children(transcript_id, featuretype='exon'):
        exons.append((exon.start, exon.end, exon.strand))
    if exons and exons[0][2] == '-':
        exons.sort(key=lambda x: x[0], reverse=True)
    else:
        exons.sort(key=lambda x: x[0])
        
    return [(start, end) for start, end, _ in exons]

In [None]:
# --- Estrazione dei trascritti ---

db = gffutils.FeatureDB(str(db_path))

ref_transcripts = get_reference_transcripts(db)

transcripts = set()

for feature in db.features_of_type('transcript'):
    transcripts.add(feature.id)

print(f"\nSelezionati {len(transcripts)} trascritti")
print(f"\nSelezionati {len(ref_transcripts)} trascritti di riferimento\n")
print(tabulate(ref_transcripts, headers="keys", tablefmt="psql", showindex=False))

In [None]:
# --- Preparazione dei dati per il conteggio globale ---

# Dizionario per il conteggio degli eventi di splicing
splicing_event_counts = {
    "Exon Skipping": 0,
    "Intron Retention": 0,
    "5' Competing": 0,
    "3' Competing": 0,
    "Mutually Exclusive Exons": 0
}

# Dizionario per i geni in cui si verificano gli eventi
genes_per_event = {
    "Exon Skipping": set(),
    "Intron Retention": set(),
    "5' Competing": set(),
    "3' Competing": set(),
    "Mutually Exclusive Exons": set()
}

# Dizionario per i trascritti di ciascun gene per ogni evento
transcripts_per_event = {
    "Exon Skipping": {},
    "Intron Retention": {},
    "5' Competing": {},
    "3' Competing": {},
    "Mutually Exclusive Exons": {}
}

# Mappa i colori ai tipi di evento
event_colors = {
    "Exon Skipping": "#008000",  # Verde
    "Intron Retention": "#CCCCFF",  # Viola
    "5' Competing": "#FF4040",  # Rosso
    "3' Competing": "#DAA520",  # Giallo
    "Mutually Exclusive Exons": "#AFEEEE"  # Ciano
}

# Cache per memorizzare gli esoni di ogni trascritto
exons_cache = {}

# Raggruppa i trascritti alternativi per gene
transcripts_by_gene = {}
for transcript in db.features_of_type('transcript'):
    gene = transcript.attributes.get('gene_id', [None])[0]
    if gene not in transcripts_by_gene:
        transcripts_by_gene[gene] = []
    transcripts_by_gene[gene].append(transcript)

In [None]:
# --- Funzioni di supporto ---

# Aggiornamento dei dizionari
def add_splicing_event(event_type, gene_id, transcript_id):
    seen_transcripts = transcripts_per_event[event_type].get(gene_id, set())
    if transcript_id not in seen_transcripts:
        splicing_event_counts[event_type] += 1
    genes_per_event[event_type].add(gene_id)
    transcripts_per_event[event_type].setdefault(gene_id, set()).add(transcript_id)

# Controllo dell'esone saltato
def is_exon_skipped(ref_exon, alt_exons):
    return ref_exon not in alt_exons

# Controllo sovrapposizione
def overlap(exon1, exon2):
    return not (exon1[1] < exon2[0] or exon2[1] < exon1[0])

In [None]:
# --- Funzioni principali di riconoscimento degli eventi di splicing alternativo ---

# Exon Skipping
def detect_exon_skipping(gene_id, ref_middle_exons, trans_middle_exons, transcript_id):
    trans_middle_exons_set = set(trans_middle_exons)
    for exon in ref_middle_exons:
        if is_exon_skipped(exon, trans_middle_exons_set):
            add_splicing_event("Exon Skipping", gene_id, transcript_id)

# Intron Retention
def detect_intron_retention(gene_id, ref_exons, transcript_exons, transcript_id):
    if not transcript_exons:
        return

    transcript_exons_sorted = sorted(transcript_exons, key=lambda x: x[0])
    starts = [exon[0] for exon in transcript_exons_sorted]

    for i in range(len(ref_exons) - 1):
        intron_start = ref_exons[i][1] + 1
        intron_end = ref_exons[i + 1][0] - 1
        idx = bisect.bisect_right(starts, intron_start)
        if idx > 0:
            candidate_exon = transcript_exons_sorted[idx - 1]
            if candidate_exon[0] <= intron_start and candidate_exon[1] >= intron_end:
                add_splicing_event("Intron Retention", gene_id, transcript_id)
                break

# 5' e 3' Competing
def detect_competing_sites(gene_id, ref_middle_exons, trans_middle_exons, transcript_id):
    for exon in ref_middle_exons:
        for t_exon in trans_middle_exons:
            # 5' Competing
            if exon[0] == t_exon[0] and exon[1] != t_exon[1]:
                add_splicing_event("5' Competing", gene_id, transcript_id)
                break
            # 3' Competing
            if exon[1] == t_exon[1] and exon[0] != t_exon[0]:
                add_splicing_event("3' Competing", gene_id, transcript_id)
                break

# Mutually Exclusive Exons
def detect_mutually_exclusive_exons(gene_id, ref_middle_exons, trans_middle_exons, transcript_id):
    trans_middle_exons_set = set(trans_middle_exons)
    for exon in ref_middle_exons:
        if exon not in trans_middle_exons_set:
            for alt_exon in trans_middle_exons_set:
                if abs((alt_exon[1] - alt_exon[0]) - (exon[1] - exon[0])) < 30:
                    if overlap(exon, alt_exon):
                        add_splicing_event("Mutually Exclusive Exons", gene_id, transcript_id)
                        break


In [None]:
# --- Riconoscimento degli eventi ---

for ref_transcript in ref_transcripts:
    gene_id = ref_transcript["Gene_Id"]
    ref_transcript_id = ref_transcript["Ref_Transcript_Id"]

    if ref_transcript_id not in exons_cache:
        exons_cache[ref_transcript_id] = get_exons(db, ref_transcript_id)
    ref_exons = exons_cache[ref_transcript_id]
    ref_middle_exons = ref_exons[1:-1]

    if gene_id not in transcripts_by_gene:
        continue

    for transcript in transcripts_by_gene[gene_id]:
        transcript_id = transcript.attributes.get('transcript_id', [None])[0]
        if transcript_id == ref_transcript_id:
            continue

        if transcript_id not in exons_cache:
            exons_cache[transcript_id] = get_exons(db, transcript_id)
        transcript_exons = exons_cache[transcript_id]
        trans_middle_exons = transcript_exons[1:-1]

        detect_exon_skipping(gene_id, ref_middle_exons, trans_middle_exons, transcript_id)
        detect_intron_retention(gene_id, ref_exons, transcript_exons, transcript_id)
        detect_competing_sites(gene_id, ref_middle_exons, trans_middle_exons, transcript_id)
        detect_mutually_exclusive_exons(gene_id, ref_middle_exons, trans_middle_exons, transcript_id)


In [None]:
# --- Report Xlsx Finale ---

rows = []
for event_type, count in splicing_event_counts.items():
    genes = sorted(genes_per_event.get(event_type, []))
    if genes:
        for gene in genes:
            trascritti = transcripts_per_event.get(event_type, {}).get(gene, set())
            trascritti_ordinati = ", ".join(sorted(trascritti)) if trascritti else "Nessun trascritto"
            num_trascritti   = len(trascritti)
            rows.append([event_type, count, gene, num_trascritti, trascritti_ordinati])
    else:
        rows.append([event_type, count, "Nessun gene", 0, "Nessun trascritto"])

df_report = pd.DataFrame(
    rows,
    columns=["Tipo Evento", "Numero di Eventi", "Gene", "Numero di Trascritti", "Trascritti"]
)

parent_dir = project_dir.parent
xlsx_path = parent_dir / "report_splicing.xlsx"
with pd.ExcelWriter(xlsx_path, engine="xlsxwriter") as writer:
    df_report.to_excel(writer, sheet_name="Splicing Report", index=False)
    workbook = writer.book
    worksheet = writer.sheets["Splicing Report"]
    header_format = workbook.add_format({'bold': True, 'bg_color': '#D3D3D3', 'align': 'center', 'valign': 'vcenter'})
    worksheet.set_row(0, None, header_format)

    event_formats = {}
    for event, color in event_colors.items():
        event_formats[event] = workbook.add_format({
            'bg_color': color,
            'bold': True,
            'align': 'center',
            'valign': 'vcenter',
            'border': 1 
        })

    for row_num, event in enumerate(df_report["Tipo Evento"]):
        event_format = event_formats.get(event, workbook.add_format({'bg_color': '#FFFFFF'}))
        worksheet.write(row_num + 1, 0, event, event_format)

    worksheet.set_column("A:A", 22)
    align_format = workbook.add_format({'align': 'center', 'valign': 'vcenter'})
    worksheet.set_column("B:B", 15, align_format)
    worksheet.set_column("C:C", 20, align_format)
    worksheet.set_column("D:D", 17, align_format)

    header_len = len("Trascritti")
    
    if not df_report['Trascritti'].dropna().empty:
        max_content_len = max(df_report['Trascritti'].dropna().apply(lambda x: len(str(x))))
    else:
        max_content_len = 0 
    max_len_trascritti = max(header_len, max_content_len)

    final_width = min(max_len_trascritti + 3, 70)
    wrap_format = workbook.add_format({'text_wrap': True, 'valign': 'top'})

    worksheet.set_column('E:E', final_width, wrap_format)

print("Il report è stato salvato come report_splicing.xlsx \n")

In [None]:
# --- Report Numerico Finale ---

print("\n--- Riepilogo Generale ---\n")
print(f"Numero totale di record per il cromosoma {cromosoma}: {len(df_chr)}\n")
print(f"Numero totale di trascritti analizzati: {len(transcripts)}\n")
print(f"Numero di trascritti di riferimento: {len(ref_transcripts)}\n")
print(f"Numero totale di eventi di splicing rilevati: {sum(splicing_event_counts.values())}\n")
print("Dettaglio per tipo di evento di splicing:")
for event_type, count in splicing_event_counts.items():
    print(f"  - {event_type}: {count} eventi")
print("\n-------------------------\n")
print(tabulate(df_report, headers='keys', tablefmt='fancy_grid'))
