In [1]:
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO

from collections import Counter
import os
import csv

In [2]:
def loadRepeats(folder):
    files = []

    for file in os.listdir(folder):
        if file.startswith("."):
            continue
        files.append(folder + "/" + file)

    repeats = []

# Read data from each file
    for file in files:
        with open(file, "r") as f:
            for line in f:
                parts = line.strip().split(",")
                if len(parts) >= 2:
                    repeat = parts[1]
                    repeats.append(repeat)

    known_repeats = list(set(repeats))
    repeats.clear()
    return known_repeats

In [3]:
def extract_repeat_counts(sequence, repeat_list):
    return [sequence.count(r) for r in repeat_list]

In [4]:
files = []
folder = "../trainSet"

known_repeats = loadRepeats("../proba2/direct/")

for file in os.listdir(folder):
    if file.startswith("."):
        continue
    files.append(folder + "/" + file)

fieldnames = ['label'] + [f'repeat_{i}' for i in range(len(known_repeats))]

with open("X_train.csv", "w") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    
    for file in files:
        for record in SeqIO.parse(file, "fasta"):
            labelTrain = file.replace("../trainSet/", "").replace(".fasta", "")
            counts_list = extract_repeat_counts(record.seq, known_repeats)
            counts = {f'repeat_{i}': count for i, count in enumerate(counts_list)}

            row = {'label': labelTrain}
            row.update(counts)
            writer.writerow(row)

In [None]:
files = []
folder = "../testSet"

known_repeats = loadRepeats("../proba2/direct/")

for file in os.listdir(folder):
    if file.startswith("."):
        continue
    files.append(folder + "/" + file)

fieldnames = ['label'] + [f'repeat_{i}' for i in range(len(known_repeats))]

with open("X_test.csv", "w") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    
    for file in files:
        for record in SeqIO.parse(file, "fasta"):
            labelTrain = file.replace("../testSet/", "").replace(".fasta", "")
            counts_list = extract_repeat_counts(record.seq, known_repeats)
            counts = {f'repeat_{i}': count for i, count in enumerate(counts_list)}

            row = {'label': labelTrain}
            row.update(counts)
            writer.writerow(row)

In [5]:
import pandas as pd
from sklearn.linear_model import SGDClassifier  # or any estimator with partial_fit
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
from itertools import islice

In [None]:
CLASSES = ["SARS-CoV", "MERS-CoV", "feline-CoV", "canine-CoV", "porcine-CoV", "bat-CoV", "bovine-CoV", "proba100000", "hedgehog-CoV"]  # replace with your actual labels

# Make your model:
clf = SGDClassifier()

files = []
folder = "../testSet"

sequencesTest = []
labelTest = []

for file in os.listdir(folder):
    if file.startswith("."):
        continue
    files.append(folder + "/" + file)

known_repeats = loadRepeats("../proba2/direct")
# Read data from each file
for file in files:
    records = SeqIO.parse(file, "fasta")
    for record in records:
        sequencesTest.append(record.seq)
        labelTest.append(file.replace("../testSet/", "").replace(".fasta", ""))

X_test = [extract_repeat_counts(seq, known_repeats) for seq in sequencesTest]
y_test = labelTest

labelTest.clear()
sequencesTest.clear()

X_test = np.array(X_test)
y_test = np.array(y_test)

files = []
folder = "../trainSet"

sequencesTrain = []
labelTrain = []

for file in os.listdir(folder):
    if file.startswith("."):
        continue
    files.append(folder + "/" + file)

def fasta_batch_iterator(fasta_file, batch_size):
    """Yield lists of SeqRecord objects in batches."""
    record_iter = SeqIO.parse(fasta_file, "fasta")
    while True:
        batch = list(islice(record_iter, batch_size))
        if not batch:
            break
        yield batch

batch_size = 1000  # adjust as needed

for fasta_file in files:
    print("File: " + fasta_file)
    for batch in fasta_batch_iterator(fasta_file, batch_size):
        print(f"Processing batch of size: {len(batch)}")
        for record in batch:
            sequencesTrain.append(record.seq)
            labelTrain.append(file.replace("../testSet/", "").replace(".fasta", ""))
        
        X_train = [extract_repeat_counts(seq, known_repeats) for seq in sequencesTrain]
        y_train = labelTrain
        sequencesTrain.clear()
        labelTrain.clear()
        clf.partial_fit(X_train, y_train, classes=CLASSES)

        # Evaluate:
        y_pred = clf.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        print(f"Current accuracy: {acc:.4f}")
        

known_repeats.clear()