# Progetto di Elementi di Bioinformatica 24/25 - Tema 4  
*Christena Attia – Mat. 894887*

In [None]:
import matplotlib.pyplot as plt
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqIO import write
import statistics as stats
import pandas as pd
import os as os

Di seguito i parametri di input configurabili tra cui: 
1) il `file path` del file FASTQ (già presente nella cartella del progetto)
2) la lunghezza `k` dei k-mer da analizzare
3) la soglia di frequenza `frequency_threshold` minima oltre la quale un k-mer viene considerato significativo

In [None]:
file_path = 'SRR18961685-5000.fastq'  
k = 7
frequency_threshold = 0.001

La seguente funzione `parseFileFASTQ`, innanzitutto, controlla che il file sia presente, altrimenti segnala un errore. In seguito estrae una lista con tutte le sequenze e una lista con le qualità associate a ciascuna base. 

In [None]:
def parseFileFASTQ(file_path):
    
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Il file '{file_path}' non è stato trovato. Verificare il percorso.")
        
    reads = list(SeqIO.parse(file_path, "fastq"))
    sequences = [str(r.seq) for r in reads]
    qualityScores = [r.letter_annotations["phred_quality"] for r in reads] 
    readLen = len(sequences[0])
    
    print(f"Letti {len(sequences)} reads, lunghi {readLen} basi ciascuno.")
    return reads, sequences, qualityScores, readLen

reads, sequences, qualityScores, readLen = parseFileFASTQ(file_path)

In [None]:
print("\nEsempio di record:")
print(reads[0])

print("\nEsempio di sequenza:")
print(sequences[0])

print("\nPunteggi di qualità (Phred) della prima read:")
print(qualityScores[0])


La funzione `countKmers` scorre ogni sequenza e conta, per ogni posizione possibile, quanti k-mer di lunghezza `k` appaiono.  
Per ciascun k-mer costruisce una lista di conteggi, in cui ogni elemento rappresenta quante volte quel k-mer appare in una determinata posizione tra tutti i reads. 

*Esempio di output:
  'ATGCT': [3, 0, 1, 0, 5, 0, ...],
  'CGTAA': [0, 0, 7, 1, 0, 0, ...],
  ...*

In [None]:
def countKmers(sequences, k, readLen):
    kmerCounts = {}

    for index, seq in enumerate(sequences):
        for pos in range(readLen - k + 1):
            kmer = seq[pos:pos + k]

            if kmer not in kmerCounts:
                kmerCounts[kmer] = [0] * readLen

            kmerCounts[kmer][pos] += 1

    return kmerCounts


In [None]:
kmerDictionary = countKmers(sequences, k, readLen)

print("\n Esempi di distribuzione posizionale di due k-mer distinti")
firstKmer = list(kmerDictionary.keys())[0]
print(f"\n Primo k-mer distinto individuato: '{firstKmer}'")
print(f"    Distribuzione posizionale di '{firstKmer}': {kmerDictionary[firstKmer]}")

secondKmer = list(kmerDictionary.keys())[1]
print(f"\n Secondo k-mer distinto individuato: '{secondKmer}'")
print(f"    Distribuzione posizionale di '{secondKmer}': {kmerDictionary[secondKmer]}")

In [None]:
print(f"\nIn totale ci sono: {len(kmerDictionary)} k-mer distinti.")


La funzione `filterKmers` calcola la frequenza globale di ciascun k-mer nel dataset:  
$$
\text{frequenza} = \frac{\text{occorrenze totali del } k\text{-}mer}{\text{numero totale di posizioni analizzate}}
$$
Tiene solo i k-mer che superano la soglia `frequency_threshold`, eliminando quelli troppo rari per essere significativi. Inotre, crea una tabella riassuntiva mostrando sia i k-mer significativi (ovvero che superano la soglia di frequenza) che quelli meno significativi. 


In [None]:
def filterKmers(kmerDictionary, threshold, totalPos):
    filtered = {}
    rows = []

    for kmer, counts in kmerDictionary.items():
        total_occurrences = sum(counts)
        frequency = total_occurrences / totalPos
        included = frequency >= threshold

        if included:
            filtered[kmer] = counts

        rows.append({
            "k-mer": kmer,
            "Totale occorrenze": total_occurrences,
            "Frequenza": round(frequency, 6),
            "Incluso": "Sì" if included else "No"
        })

    table_filterd = pd.DataFrame(rows)
    table_filterd.sort_values(by="Frequenza", ascending=False, inplace=True)
    table_filterd.reset_index(drop=True, inplace=True)

    display(table_filterd)

    return filtered, rows


In [None]:
totalPos = (readLen - k + 1) * len(sequences)

filteredDict, table_rows = filterKmers(kmerDictionary, frequency_threshold, totalPos)

In [None]:
print(f"Rimasti {len(filteredDict)} k-mer dopo il filtro su {len(kmerDictionary)} totali.\n")

La funzione `find_maxKmer` identifica il k-mer che tra tutti ha il numero massimo di occorrenze in una singola posizione.  
Restituisce:
- il nome del k-mer dominante
- la posizione in cui raggiunge il suo picco massimo
- Il valore delle occorrenze in quella posizione

In [None]:
def find_maxKmer(filteredDict):
    bestKmer, bestKmer_pos, max_count = None, None, 0
    
    for kmer, counts in filteredDict.items():
        for i, count in enumerate(counts):
            if count > max_count:
                max_count = count
                bestKmer = kmer
                bestKmer_pos = i
    
    if bestKmer is None:
        print("Nessun k-mer dominante trovato.")
    return bestKmer, bestKmer_pos, max_count


In [None]:
bestKmer, bestKmer_pos, max_count = find_maxKmer(filteredDict)  

table_best_kmer = pd.DataFrame({
    "k-mer dominante": [bestKmer],
    "Posizione massima": [bestKmer_pos],
    "Occorrenze massime": [max_count]
})

display(table_best_kmer)

Di seguito la funzione crea una tabella con i 5 k-mer che hanno avuto il valore massimo di occorrenze più alto in una singola posizione. Per ogni k-mer della lista filtrata viene mostrato:
- Il numero totale di occorrenze 
- La posizione con più occorrenze
- Il numero di occorrenze trovate per quella posizione

In [None]:
def top5_bestKmers(filteredDict):
    table = []
    
    for kmer, counts in filteredDict.items():
        max_count = max(counts)
        bestPos = counts.index(max_count)
        total = sum(counts)
        table.append((kmer, total, bestPos, max_count))  
    
    df = pd.DataFrame(table, columns=["k-mer", "Totale occorrenze", "Posizione max", "Occorrenze max"])
    top5_table = df.sort_values(by="Occorrenze max", ascending=False).head(5).reset_index(drop=True)
    display(top5_table)
    return top5_table

top5_table = top5_bestKmers(filteredDict)


Questa funzione disegna un grafico a barre che mostra l’andamento delle occorrenze del k-mer dominante rispetto alla posizione. La barra corrispondente alla posizione in cui il k-mer è più frequente viene tratteggiata di rosso.

In [None]:
def distributionDiagram(kmer, counts, bestKmer_pos):
    plt.figure(figsize=(15, 6))
    plt.bar(range(len(counts)), counts, color='lightgreen')
    plt.axvline(x=bestKmer_pos, color='red', linestyle='--')
    plt.xlabel("Posizione")
    plt.ylabel("Occorrenze")
    plt.title(f"Distribuzione posizionale del k-mer '{kmer} con occorrenze massime alla posizione {bestKmer_pos}")
    plt.tight_layout()
    plt.show()

if not filteredDict:
    print("Nessun k-mer supera la soglia di frequenza da analizzare. Cambiare i parametri di configurazione.")
else:
    distributionDiagram(bestKmer, filteredDict[bestKmer], bestKmer_pos)


Questa funzione estrae e salva in un file FASTA tutte le reads che contengono il k-mer dominante esattamente nella posizione in cui ha avuto il picco massimo. Per ogni read salvata, nell'header viene inserita anche la qualità media (media dei punteggi Phred) del read stesso.

In [None]:
def fastaOutput(reads, qualityScores, bestKmer, bestKmer_pos, k, output_filename):
    output = []
    
    for r, qual in zip(reads, qualityScores):
        seq = str(r.seq)
        
        if seq[bestKmer_pos:bestKmer_pos+k] == bestKmer:
            average = round(stats.mean(qual), 2)
            newRecord = SeqRecord(Seq(seq), id=f"{r.id} {average}", description="")
            output.append(newRecord)
    
    write(output, output_filename, "fasta")
    print(f"Salvati {len(output)} reads in '{output_filename}'.")

if bestKmer is not None and bestKmer_pos is not None:
    fastaOutput(reads, qualityScores, bestKmer, bestKmer_pos, k, "output.fasta")
else:
    print("Nessun k-mer dominante valido trovato. File FASTA non generato.")
