In [1]:
from Bio import SeqIO
import random
import csv
import os
import re

In [2]:
def chunk_sequence(seq, chunk_size=512, stride=256):
    seq = seq.upper()
    for i in range(0, len(seq) - chunk_size + 1, stride):
        yield seq[i:i + chunk_size]


In [3]:
def clean_sequence(seq):
    seq = seq.upper()
    seq = re.sub(r"[^ACGTN]", "", seq)
    return seq

In [4]:
def fasta_to_csv(fasta_path, csv_writer, label, name, source="GenBank", min_len=200 ):
    count = 0
    for record in SeqIO.parse(fasta_path, "fasta"):
        seq = clean_sequence(str(record.seq))
        if len(seq) < min_len:
            continue

        seq_id = record.id

        csv_writer.writerow([
            seq_id,
            name,
            seq,
            label,
            source,
            len(seq)
        ])
        count += 1

    print(f"[✓] {count} séquences enregistrees ")

In [5]:
def build_dataset(fasta_path, output_csv, isGMO=False):
    file_exists = os.path.exists(output_csv)
    os.makedirs(os.path.dirname(output_csv), exist_ok=True)
    
    filename = os.path.basename(fasta_path)
    name = os.path.splitext(filename)[0]
    seqName = "_".join(name.split("_")[:2])

    with open(output_csv, "a", newline="") as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(["id", "name", "sequence", "label", "source", "length"])

        fasta_to_csv(
            fasta_path=fasta_path,
            csv_writer=writer,
            label= 1 if isGMO else 0,
            name=seqName
        )

    print(f"[✓] Dataset Enregistree vers {output_csv}")

In [10]:
hirsutum_fasta="raw/Gossypium_hirsutum_v2.1_genomic.fna"
output_csv="processed/datas.csv"
build_dataset(fasta_path=hirsutum_fasta, output_csv=output_csv, isGMO=False)

[✓] 1027 séquences enregistrees 
[✓] Dataset Enregistree vers processed/datas.csv


In [8]:
unguiculata_fasta="raw/Vigna_unguiculata_v2_genomic.fna"
output_csv="processed/datas.csv"
build_dataset(fasta_path=unguiculata_fasta, output_csv=output_csv, isGMO=False)

[✓] 676 séquences enregistrees 
[✓] Dataset Enregistree vers processed/datas.csv


In [6]:
cry1ac_fasta="raw/bt_cry1Ac_sequences.fasta"
output_csv="processed/datas.csv"
build_dataset(fasta_path=cry1ac_fasta, output_csv=output_csv, isGMO=True)

[✓] 20 séquences enregistrees 
[✓] Dataset Enregistree vers processed/datas.csv


In [7]:
cp4epsps_fasta="raw/bt_cp4epsps_sequences.fasta"
output_csv="processed/datas.csv"
build_dataset(fasta_path=cp4epsps_fasta, output_csv=output_csv, isGMO=True)

[✓] 11 séquences enregistrees 
[✓] Dataset Enregistree vers processed/datas.csv


In [9]:
cotton_gmo_fasta="raw/cotton_gmo.fna"
output_csv="processed/datas.csv"
build_dataset(fasta_path=cotton_gmo_fasta, output_csv=output_csv, isGMO=True)

[✓] 3 séquences enregistrees 
[✓] Dataset Enregistree vers processed/datas.csv


Synthetisation de GMO

In [11]:
def clean(seq):
    seq = seq.upper()
    return re.sub(r"[^ACGT]", "", seq)

def random_subseq(seq, min_len, max_len):
    L = len(seq)
    size = random.randint(min_len, max_len)
    if L <= size:
        return seq
    start = random.randint(0, L - size)
    return seq[start:start+size]

def mutate(seq, rate=0.001):
    bases = ["A","C","G","T"]
    seq = list(seq)
    for i in range(len(seq)):
        if random.random() < rate:
            seq[i] = random.choice(bases)
    return "".join(seq)

In [18]:
def build_synthetic_gmo(plant_genomes, promoter_seqs, gene_seqs, terminator_seqs,
        n_samples=1000, flank_min=300, flank_max=2000, mutation_rate=0.0005):

    samples = []

    for i in range(n_samples):
        plant = random.choice(plant_genomes)
        promoter = random.choice(promoter_seqs)
        gene = random.choice(gene_seqs)
        terminator = random.choice(terminator_seqs)

        left_flank  = random_subseq(plant, flank_min, flank_max)
        right_flank = random_subseq(plant, flank_min, flank_max)

        synthetic = (left_flank + promoter + gene + terminator + right_flank)

        synthetic = mutate(synthetic, mutation_rate)
        
        samples.append({
            # "id": f"GMO_hisu_SYN_{i:06d}",
            "id": f"GMO_ungui_SYN_{i:06d}",
            "name" : "synthetic_GMO",
            "sequence": synthetic,
            "label": 1,
            "source": "me",
            "length": len(synthetic)
        })

    return samples

In [13]:

def load_fasta(path):
    seqs = []
    for rec in SeqIO.parse(path, "fasta"):
        seqs.append(clean(str(rec.seq)))
    return seqs

In [14]:
def append_to_csv(samples, csv_path):
    file_exists = os.path.exists(csv_path)
    os.makedirs(os.path.dirname(csv_path), exist_ok=True)

    with open(csv_path, "a", newline="") as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(["id", "name", "sequence", "label", "source", "length"])

        for s in samples:
            writer.writerow([s["id"], s["name"], s["sequence"], s["label"], s["source"], s["length"]])

In [None]:
# plant_genomes = load_fasta("raw/Gossypium_hirsutum_v2.1_genomic.fna")
plant_genomes = load_fasta("raw/Vigna_unguiculata_v2_genomic.fna")
promoters     = load_fasta("raw/promoters.fasta")
genes         = load_fasta("raw/gmo_genes.fasta")
terminators   = load_fasta("raw/terminators.fasta")

In [19]:
synthetic_samples = build_synthetic_gmo(plant_genomes=plant_genomes, promoter_seqs=promoters, gene_seqs=genes,
    terminator_seqs=terminators, n_samples=5000, flank_min=500, flank_max=3000, mutation_rate=0.0003)

append_to_csv(synthetic_samples, csv_path="processed/datas.csv")

print(f"[✓] Generated {len(synthetic_samples)} synthetic GMO sequences")

[✓] Generated 5000 synthetic GMO sequences


Subdivision du dataset en train, test et validation

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
import os

In [3]:
def split_dataset(input_csv, output_dir, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1, random_state=42):
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6

    os.makedirs(output_dir, exist_ok=True)

    df = pd.read_csv(input_csv)

    # Premier decoupage: train et temp (val+test)
    train_df, temp_df = train_test_split(df, test_size=(1 - train_ratio), stratify=df["label"], random_state=random_state)

    # Second decoupage: val vs test
    val_size = val_ratio / (val_ratio + test_ratio)

    val_df, test_df = train_test_split(
        temp_df,
        test_size=(1 - val_size),
        stratify=temp_df["label"],
        random_state=random_state
    )

    train_df.to_csv(os.path.join(output_dir, "train.csv"), index=False)
    val_df.to_csv(os.path.join(output_dir, "val.csv"), index=False)
    test_df.to_csv(os.path.join(output_dir, "test.csv"), index=False)

    print("[✓]Dataset split completed:")
    print(f"  Train: {len(train_df)}")
    print(f"  Val:   {len(val_df)}")
    print(f"  Test:  {len(test_df)}")

    print("\nLabel distribution:")
    print("Train:\n", train_df["label"].value_counts(normalize=True))
    print("Val:\n", val_df["label"].value_counts(normalize=True))
    print("Test:\n", test_df["label"].value_counts(normalize=True))

In [4]:
split_dataset(input_csv="processed/datas.csv", output_dir="processed/splits", 
        train_ratio=0.7, val_ratio=0.2, test_ratio=0.1, random_state=42)    

[✓]Dataset split completed:
  Train: 8215
  Val:   2347
  Test:  1175

Label distribution:
Train:
 label
1    0.8549
0    0.1451
Name: proportion, dtype: float64
Val:
 label
1    0.854708
0    0.145292
Name: proportion, dtype: float64
Test:
 label
1    0.855319
0    0.144681
Name: proportion, dtype: float64


In [5]:
train_ids = set(pd.read_csv("processed/splits/train.csv")["id"])
val_ids   = set(pd.read_csv("processed/splits/val.csv")["id"])
test_ids  = set(pd.read_csv("processed/splits/test.csv")["id"])

assert train_ids.isdisjoint(val_ids)
assert train_ids.isdisjoint(test_ids)
assert val_ids.isdisjoint(test_ids)
print("[✓] Aucune fuite détectée")

[✓] Aucune fuite détectée
