In [3]:
# ---------------------------------------------------------
# 1. SETUP: Imports, Config, and Helpers
# ---------------------------------------------------------
import sys, re, random
from collections import defaultdict
from datetime import datetime
import pandas as pd

# Auto-install if missing
try:
    from Bio import SeqIO
    from Bio.Seq import Seq
    from Bio.SeqRecord import SeqRecord
except ImportError:
    print("üì¶ Installing Biopython...")
    !{sys.executable} -m pip install biopython pandas openpyxl
    from Bio import SeqIO
    from Bio.Seq import Seq
    from Bio.SeqRecord import SeqRecord

# --- Configuration ---
CONFIG = {
    "FASTA": "c_elegans.PRJNA13758.WS297.genomic.fa",
    "GFF3":  "c_elegans.PRJNA13758.WS297.annotations.gff3",
    "CSV":   "gene_set.csv",
    "UPSTREAM_LEN": 2225,
    "WINDOW_SIZE": 250,
    "OVERLAP": 25,
    "MIN_LEN": 100,
    "BLOCK_OPPOSITE": True, # If True, don't overlap CDS on opposite strand
    "SKIP_MT": True,
    "CLIP_MARGIN": 2,
    "FWD_TAIL": "GTCGAGCCGGAACTggtctcttgcc",
    "REV_TAIL": "aaaatgagaccCACGCCGGCTAAAC",
    "SEED": 12345
}
if CONFIG["SEED"]: random.seed(CONFIG["SEED"])

# --- Helper Functions ---
def clean_id(x): return re.sub(r'[^a-z0-9]+', '', str(x).strip().lower()) if x else ""

def get_attr(attrs, key): 
    """Parses GFF3 attribute string for a specific key (Crash-proof version)."""
    if not attrs or key not in attrs: return None
    # Regex searches for 'key=value', ensuring it follows a semicolon or start of line
    # This prevents finding "ID" inside "Protein_ID"
    match = re.search(f"(?:^|;){key}=([^;]+)", attrs)
    if match:
        return match.group(1).replace("Gene:", "")
    return None

def break_bsai(seq_str):
    """Patches BsaI sites (GGTCTC/GAGACC) by mutating 1 random base."""
    s = list(seq_str)
    motifs = ["ggtctc", "gagacc"]
    patched = 0
    while True:
        low = "".join(s).lower()
        # Find earliest motif
        hit = min((low.find(m) for m in motifs if m in low), default=-1)
        if hit == -1: break
        
        # Mutate 1 base in the 6-mer
        k = hit + random.randint(0, 5)
        new = random.choice([b for b in "ACGT" if b != s[k].upper()])
        s[k] = new if s[k].isupper() else new.lower()
        patched += 1
    return "".join(s), patched

print("‚úÖ Setup Complete (Fixed get_attr).")

‚úÖ Setup Complete (Fixed get_attr).


In [6]:
# ---------------------------------------------------------
# 2. PARSING: Efficient GFF3 Processing (Maximum Speed)
# ---------------------------------------------------------
from tqdm import tqdm

print(f"‚è≥ Loading Genome and Parsing GFF3...")

# A. Load Genome
genome = SeqIO.to_dict(SeqIO.parse(CONFIG["FASTA"], "fasta"))

# B. Load Gene List
target_genes = pd.read_csv(CONFIG["CSV"])["Gene"].dropna().astype(str).unique()

# --- Pass 1: Map Gene Names (Aliases -> Canonical ID) ---
name_to_id = {}
print("   Scanning gene aliases...")
with open(CONFIG["GFF3"]) as fh:
    for line in tqdm(fh, desc="Pass 1", unit=" lines"):
        if line.startswith("#") or "\tgene\t" not in line: continue
        
        parts = line.split("\t")
        attrs = parts[8]
        
        gid = None
        if "ID=" in attrs:
            gid = attrs.split("ID=")[1].split(";")[0].replace("Gene:", "")
        if not gid: continue

        names = {gid}
        if "Name=" in attrs: names.add(attrs.split("Name=")[1].split(";")[0])
        if "locus=" in attrs: names.add(attrs.split("locus=")[1].split(";")[0])
        if "sequence_name=" in attrs: names.add(attrs.split("sequence_name=")[1].split(";")[0])
        if "Alias=" in attrs: 
            aliases = attrs.split("Alias=")[1].split(";")[0]
            names.update(aliases.split(","))
        
        for n in names:
            if n: name_to_id[clean_id(n)] = gid

# --- Pass 2: Map Transcripts & CDS ---
tx_map = defaultdict(set)       # gene_id -> {tx_ids}
tx_props = {}                   # tx_id -> {chrom, strand, min, max}
cds_index = defaultdict(list)   # (chrom, strand) -> [(start, end, tx_id)]

print("   Indexing CDS locations...")
with open(CONFIG["GFF3"]) as fh:
    for line in tqdm(fh, desc="Pass 2", unit=" lines"):
        if line.startswith("#"): continue
        parts = line.strip().split("\t")
        if len(parts) != 9: continue
        
        chrom, _, ftype, start, end, _, strand, _, attrs = parts
        
        if ftype != "CDS" and "RNA" not in ftype and "transcript" not in ftype:
            continue

        s, e = int(start), int(end)
        
        if ftype == "CDS":
            if CONFIG["SKIP_MT"] and chrom.startswith("MtDNA"): continue
            if "Parent=" not in attrs: continue
            parents = attrs.split("Parent=")[1].split(";")[0]
            
            for tx in parents.split(","):
                # Update Transcript Props
                if tx not in tx_props:
                    tx_props[tx] = {"chrom": chrom, "strand": strand, "min": s, "max": e, "gid": None}
                else:
                    if s < tx_props[tx]["min"]: tx_props[tx]["min"] = s
                    if e > tx_props[tx]["max"]: tx_props[tx]["max"] = e
                
                # Store tx_id instead of gid for now (faster)
                cds_index[(chrom, strand)].append((s, e, tx))
                if CONFIG["BLOCK_OPPOSITE"]:
                    cds_index[chrom].append((s, e, tx))

        else:
            if "ID=" in attrs and "Parent=" in attrs:
                tx_id = attrs.split("ID=")[1].split(";")[0]
                parent = attrs.split("Parent=")[1].split(";")[0].replace("Gene:", "")
                gid = name_to_id.get(clean_id(parent))
                if gid:
                    tx_map[gid].add(tx_id)
                    if tx_id in tx_props: tx_props[tx_id]["gid"] = gid

# --- Finalize Indices (Optimized) ---
print("   Finalizing indices (Linking Transcripts to Genes)...")

# 1. Create Reverse Map (Massive speedup for missing GIDs)
tx_reverse_map = {tx: gid for gid, txs in tx_map.items() for tx in txs}

final_index = defaultdict(list)
# Use list() to force iteration so tqdm works
keys = list(cds_index.keys())

for key in tqdm(keys, desc="Linking", unit=" chroms"):
    entries = cds_index[key]
    clean_entries = []
    
    for s, e, tx in entries:
        # Fast Lookup 1: Check props
        gid = tx_props[tx].get("gid")
        
        # Fast Lookup 2: Check reverse map
        if not gid:
            gid = tx_reverse_map.get(tx)
            
        if gid:
            clean_entries.append((s, e, gid))
            
    # Sort for binary search usage later
    final_index[key] = sorted(clean_entries, key=lambda x: x[0])

print(f"‚úÖ Parsed {len(tx_props)} transcripts and built collision indices.")

‚è≥ Loading Genome and Parsing GFF3...
   Scanning gene aliases...


Pass 1: 53237586 lines [00:16, 3201680.35 lines/s]


   Indexing CDS locations...


Pass 2: 53237586 lines [00:37, 1436942.10 lines/s]


   Finalizing indices (Linking Transcripts to Genes)...


Linking: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 18/18 [00:00<00:00, 101.82 chroms/s]

‚úÖ Parsed 32010 transcripts and built collision indices.





In [7]:
# ---------------------------------------------------------
# 3. EXTRACTION LOOP
# ---------------------------------------------------------
records, windows, opools, skipped = [], [], [], []

print(f"üöÄ Processing {len(target_genes)} genes...")

for name in target_genes:
    # 1. Resolve ID
    gid = name_to_id.get(clean_id(name))
    if not gid:
        skipped.append({"Gene": name, "Reason": "Not found in GFF"})
        continue

    # 2. Get Isoform Boundaries
    isoforms = set()
    for tx in tx_map.get(gid, []):
        p = tx_props.get(tx)
        if p: isoforms.add((p["chrom"], p["strand"], p["min"] if p["strand"] == "+" else p["max"]))
    
    if not isoforms:
        skipped.append({"Gene": name, "Reason": "No CDS found"})
        continue

    # 3. Extract & Clip Upstreams
    seen_seqs = set()
    for i, (chrom, strand, b1) in enumerate(sorted(isoforms)):
        try:
            seq_obj = genome[chrom].seq
            
            # --- CLIPPING LOGIC ---
            # Finds nearest neighbor to stop extraction early
            limit = 0 if strand == "+" else len(seq_obj)
            
            # Check same strand neighbors
            neighbors = final_index.get((chrom, strand), [])
            # Check opposite strand neighbors (if configured)
            if CONFIG["BLOCK_OPPOSITE"]: neighbors += final_index.get(chrom, [])
            
            if strand == "+":
                # Looking upstream (left) on + strand
                # Find max END of a neighbor that is < boundary
                valid_ends = [e for s, e, g in neighbors if g != gid and e < (b1 - CONFIG["CLIP_MARGIN"])]
                floor = max(valid_ends) if valid_ends else 0
                
                start = max(floor, (b1 - 1) - CONFIG["UPSTREAM_LEN"])
                upstream = seq_obj[start : b1 - 1] # Stop before ATG
                
            else:
                # Looking upstream (right) on - strand
                # Find min START of a neighbor that is > boundary
                valid_starts = [s for s, e, g in neighbors if g != gid and s > (b1 + CONFIG["CLIP_MARGIN"])]
                ceil = min(valid_starts) - 1 if valid_starts else len(seq_obj)
                
                end = min(ceil, b1 + CONFIG["UPSTREAM_LEN"])
                upstream = seq_obj[b1 : end].reverse_complement() # Start after ATG
            
            # --- CHECKS ---
            if len(upstream) < CONFIG["MIN_LEN"]: 
                skipped.append({"Gene": f"{name}_iso{i}", "Reason": "Too short after clipping"})
                continue
                
            if str(upstream) in seen_seqs: continue
            seen_seqs.add(str(upstream))
            
            # --- SAVE FULL UPSTREAM ---
            tag = f"{name}" if len(isoforms) == 1 else f"{name}_iso{i+1}"
            records.append(SeqRecord(upstream, id=tag, description=""))

            # --- WINDOWING & OPOOLS ---
            up_len = len(upstream)
            step = CONFIG["WINDOW_SIZE"] - CONFIG["OVERLAP"]
            # Generate start coordinates (proximal -> distal)
            starts = [s for s in range(up_len - CONFIG["WINDOW_SIZE"], -1, -step)]
            
            for w_idx, s in enumerate(starts, 1):
                insert = upstream[s : s + CONFIG["WINDOW_SIZE"]]
                if len(insert) != CONFIG["WINDOW_SIZE"]: continue
                
                # Raw Window
                win_tag = f"{tag}|win{w_idx}"
                windows.append(SeqRecord(insert, id=win_tag, description=""))
                
                # oPool (Patched + Tails)
                patched_seq, mutations = break_bsai(str(insert))
                final_seq = CONFIG["FWD_TAIL"] + patched_seq + CONFIG["REV_TAIL"]
                
                opool_tag = f"{win_tag}|oPool" + (f"|BsaI_mut{mutations}" if mutations else "")
                opools.append(SeqRecord(Seq(final_seq), id=opool_tag, description=f"len={len(final_seq)}"))

        except Exception as e:
            skipped.append({"Gene": name, "Reason": str(e)})

print(f"‚úÖ Complete! Generated {len(opools)} oligos.")

üöÄ Processing 424 genes...
‚úÖ Complete! Generated 1947 oligos.


In [8]:
# ---------------------------------------------------------
# 4. EXPORT
# ---------------------------------------------------------
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
f_up   = f"01_upstream_{ts}.fa"
f_win  = f"02_windows_{ts}.fa"
f_pool = f"03_opools_{ts}.fa"
f_xlsx = f"skipped_{ts}.xlsx"

SeqIO.write(records, f_up, "fasta")
SeqIO.write(windows, f_win, "fasta")
SeqIO.write(opools,  f_pool, "fasta")

# Excel Report
df_skip = pd.DataFrame(skipped)
if not df_skip.empty:
    with pd.ExcelWriter(f_xlsx) as writer:
        df_skip[df_skip["Reason"].str.contains("Not found")].to_excel(writer, sheet_name="Not Found", index=False)
        df_skip.to_excel(writer, sheet_name="All Skipped", index=False)

print(f"üíæ Saved:\n1. {f_up}\n2. {f_win}\n3. {f_pool}\n4. {f_xlsx}")

üíæ Saved:
1. 01_upstream_20260117_191542.fa
2. 02_windows_20260117_191542.fa
3. 03_opools_20260117_191542.fa
4. skipped_20260117_191542.xlsx
