<a href="https://colab.research.google.com/github/AhmedAboushanab/Genomic-Insights-into-Specialized-and-Primary-Metabolism-Gene-Clusters/blob/main/DNA_Sequence_Extractor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install biopython pandas

# **Nodes are needed**

In [15]:
import os
from Bio import SeqIO
import pandas as pd
from google.colab import files

def extract_sequences(fasta_path, coords_csv_path, output_fasta_path, output_csv_path):
    # Load all sequences in the FASTA file
    sequences = SeqIO.to_dict(SeqIO.parse(fasta_path, "fasta"))
    df = pd.read_csv(coords_csv_path)

    sample_id = os.path.splitext(os.path.basename(fasta_path))[0]  # e.g., "CA_25"
    df_sample = df[df["Sample-id"] == sample_id]

    if df_sample.empty:
        print(f"⚠️ No matching entries for {sample_id} in {os.path.basename(coords_csv_path)}")
        return

    extracted_records = []
    extracted_fasta = []

    for idx, row in df_sample.iterrows():
        node = str(row['Control']).strip() # Changed 'Node' to 'Control'
        start = int(str(row['From']).replace(',', '')) - 1
        end = int(str(row['To']).replace(',', ''))
        strand = row.get('strand', '+')

        # Match sequence header that starts with this node name
        matched_id = None
        for header in sequences:
            if header.startswith(node):
                matched_id = header
                break

        if not matched_id:
            print(f"⚠️ Node {node} not found in {os.path.basename(fasta_path)}")
            continue

        full_seq = sequences[matched_id].seq
        if end > len(full_seq) or start < 0:
            print(f"⚠️ Invalid coordinates for {matched_id}: {start+1}-{end} exceeds length {len(full_seq)}")
            continue

        sub_seq = full_seq[start:end]
        if strand == '-':
            sub_seq = sub_seq.reverse_complement()

        record_id = f"{sample_id}_{node}_{start+1}_{end}_{strand}"
        extracted_fasta.append(f">{record_id}\n{str(sub_seq)}")
        extracted_records.append({
            'record_id': record_id,
            'sample_id': sample_id,
            'node': node, # Keep 'node' for the output CSV if that's the desired column name
            'start': start + 1,
            'end': end,
            'strand': strand,
            'sequence': str(sub_seq)
        })


    if not extracted_records:
        print(f"⚠️ No sequences extracted from {os.path.basename(fasta_path)}.")
        return


    with open(output_fasta_path, "w") as f:
        f.write("\n".join(extracted_fasta))


    pd.DataFrame(extracted_records).to_csv(output_csv_path, index=False)
    print(f"✅ Saved: {output_fasta_path}, {output_csv_path}")


def process_all_fastas_in_folder(input_folder, coords_csv_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    for file in os.listdir(input_folder):
        if file.endswith(".fasta") or file.endswith(".fa"):
            full_path = os.path.join(input_folder, file)
            base = os.path.splitext(os.path.basename(file))[0]
            print(f"\n🧬 Processing {file}...")

            output_fasta_path = os.path.join(output_folder, f"{base}_extracted.fasta")
            output_csv_path = os.path.join(output_folder, f"{base}_extracted.csv")

            extract_sequences(
                fasta_path=full_path,
                coords_csv_path=coords_csv_path,
                output_fasta_path=output_fasta_path,
                output_csv_path=output_csv_path
            )

In [None]:
process_all_fastas_in_folder("/content/drive/MyDrive/BGC-Seqs/References", "/content/References.csv", "/content/drive/MyDrive/Output/References")

# **No node needed**

In [17]:
import os
from Bio import SeqIO
import pandas as pd

def extract_sequences(fasta_path, coords_csv_path, output_fasta_path, output_csv_path):
    # Load sequences from FASTA
    sequences = list(SeqIO.parse(fasta_path, "fasta"))
    if not sequences:
        print(f"⚠️ No sequences found in {fasta_path}")
        return
    if len(sequences) > 1:
        print(f"⚠️ Multiple sequences found in {fasta_path}. Using the first one only.")

    seq_record = sequences[0]  # Use the first sequence in the file
    full_seq = seq_record.seq
    sample_id = os.path.splitext(os.path.basename(fasta_path))[0]

    # Load coordinates
    df = pd.read_csv(coords_csv_path)
    df_sample = df[df["Sample-id"] == sample_id].copy()

    if df_sample.empty:
        print(f"⚠️ No matching entries for {sample_id} in coordinates CSV.")
        return

    extracted_records = []
    extracted_fasta = []

    for _, row in df_sample.iterrows():
        start = int(str(row['From']).replace(',', '')) - 1
        end = int(str(row['To']).replace(',', ''))
        strand = row.get('strand', '+')

        if end > len(full_seq) or start < 0:
            print(f"⚠️ Invalid coordinates: {start+1}-{end} > {len(full_seq)}")
            continue

        sub_seq = full_seq[start:end]
        if strand == '-':
            sub_seq = sub_seq.reverse_complement()

        record_id = f"{sample_id}_{start+1}_{end}_{strand}"
        extracted_fasta.append(f">{record_id}\n{str(sub_seq)}")

        updated_row = row.copy()
        updated_row['record_id'] = record_id
        updated_row['sequence'] = str(sub_seq)
        extracted_records.append(updated_row)

    if not extracted_records:
        print(f"⚠️ No sequences extracted for {sample_id}.")
        return

    # Save output
    with open(output_fasta_path, "w") as f:
        f.write("\n".join(extracted_fasta))
    pd.DataFrame(extracted_records).to_csv(output_csv_path, index=False)

    print(f"✅ Saved: {output_fasta_path}, {output_csv_path}")

def process_all_fastas_in_folder(input_folder, coords_csv_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    for file in os.listdir(input_folder):
        if file.endswith(".fasta") or file.endswith(".fa"):
            full_path = os.path.join(input_folder, file)
            base = os.path.splitext(os.path.basename(file))[0]
            print(f"\n🧬 Processing {file}...")

            output_fasta_path = os.path.join(output_folder, f"{base}_extracted.fasta")
            output_csv_path = os.path.join(output_folder, f"{base}_extracted.csv")

            extract_sequences(
                fasta_path=full_path,
                coords_csv_path=coords_csv_path,
                output_fasta_path=output_fasta_path,
                output_csv_path=output_csv_path
            )


In [None]:
process_all_fastas_in_folder("/content/drive/MyDrive/BGC-Seqs/References", "/content/References-NRPS.csv", "/content/drive/MyDrive/Output/References/NRPS")

In [None]:
process_all_fastas_in_folder("/content/drive/MyDrive/BGC-Seqs/References", "/content/References-NI-Siderophore.csv", "/content/drive/MyDrive/Output/References/NI-Siderophore")