In [22]:
from Bio import SeqIO
import re
import pandas as pd
import os
from pathlib import Path
from dataclasses import dataclass

from local.constants import WORKSPACE_ROOT
from local.caching import load, save
ws = Path("./cache/epi300_genotype")
ws.mkdir(exist_ok=True)

In [2]:
k12_gb_path = WORKSPACE_ROOT/"data/references/reference_genomes/k12mg1655/k12mg1655.gb"
k12_ref_path = WORKSPACE_ROOT/"data/references/reference_genomes/k12mg1655/k12mg1655.fna"
for e in SeqIO.parse(k12_gb_path, "genbank"):
    for feature in e.features:
        if feature.type != "rep_origin": continue
        print(feature)
        oriC = str(feature.extract(e.seq))
        print(oriC)

type: rep_origin
location: [3925743:3925975](+)
qualifiers:
    Key: note, Value: ['oriC']

GATCTATTTATTTAGAGATCTGTTCTATTGTGATCTCTTATTAGGATCGCACTGCCCTGTGGATAACAAGGATCCGGCTTTTAAGATCAACAACCTGGAAAGGATCATTAACTGTGAATGATCGGTGATCCTGGACCGTATAAGCTGGGATCAGAATGAGGGGTTATACACAACTCAAAAACTGAACAACAGTTGTTCTTTGGATAACTACCGGTTGATCCAAGCTTCCTGA


In [3]:
epi300_asm_path = WORKSPACE_ROOT/"data/assembly/epi300.fna"
for e in SeqIO.parse(epi300_asm_path, "fasta"):
    epi300_desc = e.description
    epi300_seq = str(e.seq)
    break
n_hits = len(re.findall(oriC, epi300_seq))
loc_oriC = epi300_seq.index(oriC)
print(loc_oriC, n_hits)

163192 1


In [None]:
# mummer_coords = ws/"epi300-k12.coords"
# if not mummer_coords.exists():
#     os.system(f"""\
#         cd {ws}
#         nucmer --threads 14 --prefix=epi300-k12 {k12_ref_path} {epi300_asm_path}
#         show-coords -rcl epi300-k12.delta > {mummer_coords.name}
#         mummerplot --prefix=epi300-k12 -t png epi300-k12.delta
#     """)

In [None]:
# paf_lines = []
# started = False
# with open(mummer_coords, "r") as f:
#     for line in f:
#         if not started:
#             if line.startswith("===="): 
#                 started = True
#             continue
#         parts = [x for x in line.split(" ") if x not in ["", "\n", "|"]]
#         # Extract relevant fields (adjust indices based on your coords.txt)
#         ref_start, ref_end = int(parts[0]), int(parts[1])
#         qry_start, qry_end = int(parts[2]), int(parts[3])
#         ref_len, qry_len = int(parts[7]), int(parts[8])
#         ref_name, qry_name = parts[-2], parts[-1]
#         strand = "+" if qry_start < qry_end else "-"
        
#         # Write PAF line (minimal fields)
#         paf_line = f"{qry_name}\t{qry_len}\t{min(qry_start, qry_end)}\t{max(qry_start, qry_end)}\t{strand}\t{ref_name}\t{ref_len}\t{ref_start}\t{ref_end}\t255\t0"
#         paf_lines.append(paf_line)

# mummer_paf = mummer_coords.with_suffix(".paf")
# # Save to PAF
# with open(mummer_paf, "w") as out:
#     out.write("\n".join(paf_lines))

In [6]:
# F- mcrA Δ(mrr-hsdRMS-mcrBC) φ80dlacZΔM15 ΔlacX74 recA1 endA1 araD139 Δ(ara, leu)7697 galU galK λ- rpsL nupG trfA dhfr
epi300_genotype = [
    "mcrA",
]+["mrr"]+[f"hsd{x}" for x in "RMS"]+[f"mcr{x}" for x in "BC"]+[ # Δ(mrr-hsdRMS-mcrBC)
    "lac", # φ80dlacZΔM15, ΔlacX74
    "recA", # recA1
    "endA", # endA1
    "araD", # araD139
] + ["ara", "leu"] + [ # Δ(ara, leu)7697
    "galU",
    "galK",
    "rpsL",
    "nupG",
    "trfA",
    "dhfr",
]

genotype_k12_sequences = {}
gene2loc = {}
for e in SeqIO.parse(k12_gb_path, "genbank"):
    for feature in e.features:
        if feature.type != "CDS": continue
        gene = feature.qualifiers.get("gene")
        prot = feature.qualifiers.get("product")
        if prot is not None: prot = prot[0]
        if gene is None: continue
        gene = gene[0]
        gene2loc[gene] = gene2loc.get(gene, [])+[(prot, feature.location.start, feature.location.end)]
        for k in epi300_genotype:
            if not gene.startswith(k): continue
            genotype_k12_sequences[k] = genotype_k12_sequences.get(k, [])+[(gene, prot, str(feature.extract(e.seq)))]
no_refs = [x for x in epi300_genotype if not any(k.startswith(x) for k in genotype_k12_sequences.keys())]

_todo = list(genotype_k12_sequences.items())
genotype_k12_sequences = {}
for k, ss in _todo:
    for gene, prot, seq in ss:
        kk = gene, prot
        genotype_k12_sequences[kk] = seq

seen = {}
genotype_k12_sequences_fasta = Path("./cache/genotype_k12_sequences.fna")
with open(genotype_k12_sequences_fasta, "w") as f:
    for (gene, prot), seq in genotype_k12_sequences.items():
        i = seen.get(gene, 1)
        f.write(f">{gene}{i if i > 1 else ""} [{prot}] n={len(seq)}\n")
        f.write(f"{seq}\n")
        seen[gene] = i+1

no_refs

['trfA', 'dhfr']

In [7]:
COLS = "qseqid sseqid qstart qend sstart send nident qlen slen".split(" ")
blast_result = ws/"temp.result.tsv"
# blast_result = ws/"temp.result.blast"
if not blast_result.exists() or True:
    os.system(f"""\
        cd {ws}
        makeblastdb \
            -dbtype nucl \
            -in {genotype_k12_sequences_fasta.absolute()} \
            -out ./db/genotype_k12_ref

        echo "blasting"
        blastn \
            -num_threads 14 \
            -evalue 999 \
            -query {epi300_asm_path} \
            -outfmt "6 {' '.join(COLS)}" \
            -db ./db/genotype_k12_ref \
            -out {blast_result.name}
    """)



Building a new DB, current time: 08/24/2025 13:46:54
New DB name:   /home/tony/workspace/projects/model_strains/main/genome_annoucements/cache/epi300_genotype/db/genotype_k12_ref
New DB title:  /home/tony/workspace/projects/model_strains/main/genome_annoucements/cache/genotype_k12_sequences.fna
Sequence type: Nucleotide
Deleted existing Nucleotide BLAST database named /home/tony/workspace/projects/model_strains/main/genome_annoucements/cache/epi300_genotype/db/genotype_k12_ref
Keep MBits: T
Maximum file size: 3000000000B
Adding sequences from FASTA; added 35 sequences in 0.00376582 seconds.


blasting


In [8]:
df_blast = pd.read_csv(blast_result, sep="\t", header=None)
df_blast.columns = COLS
df_blast = df_blast.sort_values("sseqid")
df_blast

Unnamed: 0,qseqid,sseqid,qstart,qend,sstart,send,nident,qlen,slen
15,C1,araC,3389176,3390054,879,1,879,4691561,879
5,C1,araE,3906059,3907477,1419,1,1419,4691561,1419
10,C1,araF,2904266,2905255,990,1,990,4691561,990
4,C1,araG,2902682,2904196,1515,1,1515,4691561,1515
13,C1,araH,2901681,2902667,987,1,987,4691561,987
7,C1,araJ,1179170,1180354,1185,1,1185,4691561,1185
16,C1,endA,4016841,4017548,1,708,707,4691561,708
11,C1,galK,1670741,1671728,1149,162,988,4691561,1149
12,C1,galK,1673058,1673225,168,1,168,4691561,1149
14,C1,galU,2161016,2161924,1,909,909,4691561,909


In [9]:
_hits = set(df_blast.sseqid.values)
for k, d in genotype_k12_sequences:
    if k in _hits: continue
    print(k, d)

araD L-ribulose-5-phosphate 4-epimerase AraD
araA L-arabinose isomerase
araB ribulokinase
leuD 3-isopropylmalate dehydratase subunit LeuD
leuC 3-isopropylmalate dehydratase subunit LeuC
leuB 3-isopropylmalate dehydrogenase
leuA 2-isopropylmalate synthase
leuL leu operon leader peptide
leuO DNA-binding transcriptional dual regulator LeuO
mcrA Type IV methyl-directed restriction enzyme EcoKMcrA
mcrC Type IV methyl-directed restriction enzyme EcoKMcrBC subunit
mcrB 5-methylcytosine-specific restriction enzyme subunit McrB
mcrB McrBS
hsdS type I restriction enzyme EcoKI specificity subunit
hsdM type I restriction enzyme EcoKI methylase subunit
hsdR type I restriction enzyme EcoKI endonuclease subunit
mrr Type IV methyl-directed restriction enzyme EcoKMrr


In [10]:
df_blast[df_blast.slen == df_blast.nident]

Unnamed: 0,qseqid,sseqid,qstart,qend,sstart,send,nident,qlen,slen
15,C1,araC,3389176,3390054,879,1,879,4691561,879
5,C1,araE,3906059,3907477,1419,1,1419,4691561,1419
10,C1,araF,2904266,2905255,990,1,990,4691561,990
4,C1,araG,2902682,2904196,1515,1,1515,4691561,1515
13,C1,araH,2901681,2902667,987,1,987,4691561,987
7,C1,araJ,1179170,1180354,1185,1,1185,4691561,1185
14,C1,galU,2161016,2161924,1,909,909,4691561,909
20,C1,lacA,2210104,2210715,612,1,612,4691561,612
8,C1,lacI,2215190,2216272,1083,1,1083,4691561,1083
6,C1,lacY,2210781,2212034,1254,1,1254,4691561,1254


In [11]:
df_blast[df_blast.sseqid=="galK"]

Unnamed: 0,qseqid,sseqid,qstart,qend,sstart,send,nident,qlen,slen
11,C1,galK,1670741,1671728,1149,162,988,4691561,1149
12,C1,galK,1673058,1673225,168,1,168,4691561,1149


In [13]:
import numpy as np
from local.figures.template import BaseFigure, ApplyTemplate, go

from local.figures.base.layout import Canvas, Panel, Transform
from local.figures.base.coordinates import to_cart
from local.figures.base.geometry import Brush
from local.figures.base.text import TextPlotter
from local.figures.colors import COLORS, Color, Palettes, XColor, ListOfXColor

In [14]:
k12toEpi300 = 827981
genome_length = len(epi300_seq)
genome_length

4691561

In [16]:
df_rrna = pd.read_csv(WORKSPACE_ROOT/"data/ecoli/annotations/epi300.barrnap.gff", sep="\t", comment="#", header=None)
rrnas = []
for _, r in df_rrna.iterrows():
    loc = r[4]
    meta = r[8].split(";")
    meta = {m.split("=")[0]: m.split("=")[1] for m in meta}
    rrnas.append((meta["product"].split(" ")[0], loc))
rrnas

[('16S', 182133),
 ('23S', 185391),
 ('5S', 185599),
 ('16S', 275856),
 ('23S', 279207),
 ('5S', 279416),
 ('16S', 407761),
 ('23S', 411105),
 ('5S', 411313),
 ('16S', 449249),
 ('23S', 452507),
 ('5S', 452716),
 ('16S', 1028733),
 ('23S', 1032083),
 ('5S', 1032292),
 ('5S', 3649374),
 ('23S', 3652372),
 ('16S', 3654344),
 ('5S', 4352708),
 ('5S', 4352953),
 ('23S', 4355951),
 ('16S', 4357929)]

In [92]:
rrna_annots = []
for rna, loc in rrnas:
    found = False
    for seen_loc, lst in rrna_annots:
        if abs(seen_loc-loc) < 10000:
            lst.append(rna)
            found = True
            break
    if not found:
        rrna_annots.append((loc, [rna]))
rrna_annots

[(182133, ['16S', '23S', '5S']),
 (275856, ['16S', '23S', '5S']),
 (407761, ['16S', '23S', '5S']),
 (449249, ['16S', '23S', '5S']),
 (1028733, ['16S', '23S', '5S']),
 (3649374, ['5S', '23S', '16S']),
 (4352708, ['5S', '5S', '23S', '16S'])]

In [70]:
from Bio.SeqFeature import ExactPosition, CompoundLocation

def get_pos(e):
    if isinstance(e.location, CompoundLocation):
        parts = list(e.location.parts)
        
        # Calculate total length and find middle
        total_length = sum(len(part) for part in parts)
        middle_offset = total_length // 2
        
        # Find which part contains the middle
        current_pos = 0
        for part in parts:
            part_length = len(part)
            if current_pos + part_length > middle_offset:
                # Middle is in this part
                middle_in_part = middle_offset - current_pos
                middle_position = part.start + middle_in_part
                
                # Handle circular wrap-around
                if middle_position >= genome_length:
                    middle_position -= genome_length
                break
            current_pos += part_length
    else:
        # Simple linear feature
        middle_position = (e.location.start + e.location.end) // 2
    return middle_position

cds_locations = {}
for g in SeqIO.parse(WORKSPACE_ROOT/"data/ecoli/reference_genomes/EPI300/epi300.pgap.gb", "genbank"):
    for e in g.features:
        if e.type != "CDS": continue
        _q = e.qualifiers
        key = None
        for k in ["gene", "product"]:
            if k in _q:
                key = _q[k][0]
                break
        assert key is not None
        pos = get_pos(e)
        cds_locations[key] = pos
len(cds_locations)

4159

In [24]:
skew = load("epi300_skew")
skew.shape # step=500, window=1000

recovering & decompressing cached data from [{WORKSPACE}/main/genome_annoucements/cache/epi300_skew.pkl.gz]


(9383,)

In [93]:
fig = BaseFigure()

BLACK = Color.Hex("212121")
@dataclass
class An:
    desc: str
    location: int # nucleotide location in k12 ref
    rad_start: float = 0
    rad_end: float = 0.1
    fontsize: int = 12
    color: XColor = BLACK
    dx: float=0
    dy: float=0

    def GetAngle(self):
        # loc = self.location + k12toEpi300 - loc_oriC
        loc = self.location - loc_oriC
        while loc < 0: loc += genome_length
        while loc > genome_length: loc -= genome_length
        angle = loc/genome_length * 2*np.pi
        return angle

def get_loc(genes):
    locs = [cds_locations[g] for g in genes]
    return ExactPosition(sum(locs)/len(locs))

nudges = {
    "φ80dlacZΔM15<br>ΔlacX74": (-0.02, -0.02),
    "galU": (0, +0.01),
}

# F- mcrA Δ(mrr-hsdRMS-mcrBC) φ80dlacZΔM15 ΔlacX74 recA1 endA1 araD139 Δ(ara, leu)7697 galU galK λ- rpsL nupG trfA DHFR
r_offset = 0.1
annotations: list[An] = [
    An(name, get_loc(ks)) for ks, name in [
        (["yajD"],          "mcrA"),
        (["opgB", "nanC"],  "Δ(mrr-hsdRMS-mcrBC)"),
        (["lacZ"],          "φ80dlacZΔM15<br>ΔlacX74"),
        (["recA"],          "recA1"),
        (["endA", "nupG"],  "endA1<br>nupG"),
        (["folA"],          "DHFR (folA)<br>araD139<br>Δ(ara, leu)7697"),
        (["galU"],          "galU"),
        (["galK"],          "galK"),
        (["rpsL"],          "rpsL"),
        (["trfA"],          "trfA"),
    ]
] + [
    An("", l, rad_end=-0.03, color=COLORS.RED) for l, ns in rrna_annots
] + [
    An("", l, rad_end=-0.03, color=COLORS.RED) for l, ns in rrna_annots
]

for a in annotations:
    if a.desc in nudges:
        x, y = nudges[a.desc]
        a.dx = x
        a.dy = y

root = Canvas()
text = TextPlotter(fig)
panel = root.NewPanel(Transform(sx=8/6))
def text_xy(x, y):
    return panel.ApplyTransforms(np.array([[x, y]]))[0]
brush_main = Brush(BLACK)

rad_chr = 1/4

triangle_w = 0.02
brush_main._pts.append(np.array([
    [0, rad_chr],
    [0+triangle_w/2, rad_chr+triangle_w],
    [0-triangle_w/2, rad_chr+triangle_w],
]))
brush_main._cmds.append("MLL")
x, y = text_xy(0, rad_chr+triangle_w)
text.Write("oriC", x, y, size=12, color=BLACK.color_value, yanchor="bottom")

brush_main.EllipticalArc(x_rad=rad_chr, end_angle=2*np.pi, width=0.005)
brushes: dict[str, Brush] = {}
for a in annotations:
    c = a.color
    k =  str(c)
    if k in brushes: continue
    brushes[k] = Brush(c)

for a in annotations:
    l_start = rad_chr + a.rad_start
    l_end = rad_chr + a.rad_end
    l_c = (l_start+l_end)/2
    l_w = abs(l_end-l_start)
    r = a.GetAngle()
    x, y = to_cart(r, l_end)
    x, y = text_xy(x, y)
    left = x < 0
    inside = a.rad_end < 0
    aln = "left" if left == inside else "right"
    text.Write(
        a.desc, x+a.dx, y+0.015+a.dy, size=a.fontsize, color=BLACK.color_value,
        xanchor=aln, yanchor="top",
        align=aln,
    )

    b = brushes[str(a.color)]
    b.EllipticalArc(
        x_rad=l_c,
        start_angle=r,
        end_angle=r+0.005,
        width=l_w,
    )

panel.AddElement(brush_main)
for b in brushes.values():
    panel.AddElement(b)

fig = root.Render(fig=fig, debug=False)
da = dict(showticklabels=False, ticks=None, linecolor=COLORS.TRANSPARENT)
W, H = 800, 600
fig = ApplyTemplate(
    fig, 
    default_xaxis=da, default_yaxis=da,
    layout=dict(
        font=dict(
            family="Arial",
        ),
        width=W, height=H,
    ),
)
fig.write_image(ws/"epi300_genotype.svg")
fig.show(config=dict(
    scrollZoom=False
))