In [1]:
import random
from collections import Counter

# Simple prototype

In [12]:
COMP = str.maketrans("ACGTacgt", "TGCAtgca")
def revcomp(s): 
    return s.translate(COMP)[::-1]

def within(v, lo, hi): 
    return lo <= v <= hi
    
def clean_seq(seq):
    return "".join(ch for ch in seq.upper() if ch in "ACGT")

def _seed_match(a, b, seed=4):
    if len(a) < seed or len(b) < seed:
        return False
    return a[-seed:].upper() == revcomp(b.upper())[:seed]

In [10]:
class Individual:
    """
    Individual of genetic algorithm representing a primer pair on a template
    """
    def __init__(self, template, f_start, f_len, r_start, r_len):
        self.f_start = f_start 
        self.f_len = f_len
        self.r_start = r_start
        self.r_len = r_len
        self.template = template 
        self.kcount_template = self.kmer_uniqueness(self.template, k=10)
        self.fitness = self.calc_fitness(kcount=self.kcount_template)

    # Sequence helpers
    def forward(self):
        return self.template[self.f_start:self.f_start+self.f_len]

    def reverse(self):
        frag = self.template[self.r_start:self.r_start+self.r_len]
        return revcomp(frag)

    def amplicon_len(self):
        return (self.r_start + self.r_len) - self.f_start

    def is_valid(self, len_win=(18,25), amp_bounds=(120,300)):
        n = len(self.template)
        if not (0 <= self.f_start < n and 0 <= self.r_start < n): 
            return False
        if self.f_start + self.f_len > n or self.r_start + self.r_len > n:
            return False
        if not (self.f_start < self.r_start): 
            return False
        if not (len_win[0] <= self.f_len <= len_win[1]): 
            return False
        if not (len_win[0] <= self.r_len <= len_win[1]): 
            return False
        if self.has_homopolymer(self.forward()) or self.has_homopolymer(self.reverse()): 
            return False
        amp = self.amplicon_len()
        if not (amp_bounds[0] <= amp <= amp_bounds[1]): return False
        return True

    def repair(self, len_win=(18,25), amp_bounds=(120,300)):
        """
        Clamp lengths/indices into legal ranges and try to nudge r_start to satisfy amplicon bounds.
        """
        n = len(self.template)
        self.f_len = max(len_win[0], min(len_win[1], self.f_len))
        self.r_len = max(len_win[0], min(len_win[1], self.r_len))
        self.f_start = max(0, min(n - self.f_len, self.f_start))
        self.r_start = max(self.f_start + self.f_len + 1, min(n - self.r_len, self.r_start))
        
        amp = self.amplicon_len()
        if amp < amp_bounds[0]:
            self.r_start = min(n - self.r_len, self.f_start + amp_bounds[0] - self.r_len)
        elif amp > amp_bounds[1]:
            self.r_start = max(self.f_start + self.f_len + 1, self.f_start + amp_bounds[1] - self.r_len)
            
        
    # Scoring features
    def gc_content(self, s):
        g = s.count("G")+s.count("C")
        return 100.0 * g / max(1, len(s))

    def tm_wallace(self, s):
        a = s.count("A")+s.count("T")
        g = s.count("G")+s.count("C")
        return 2*a + 4*g

    def has_homopolymer(self, s, k=4):
        run = 1
        for i in range(1, len(s)):
            run = run+1 if s[i]==s[i-1] else 1
            if run >= k: return True
        return False

    def gc_clamp_score(self, s):
        tail = s[-5:] if len(s)>=5 else s
        gcs = sum(1 for c in tail if c in "GC")
        # bonus for 1-2 GC in 3' tail, penalty if too many
        if 1 <= gcs <= 2: 
            return 1.0
        if gcs >= 4: 
            return -1.0
        return 0.0

    def short_3prime_complement_score(self, p, q, seed=4):
        """
        Penalize if 3' seeds strongly complement (self- or cross-dimers)
        """
        rp = revcomp(p)
        rq = revcomp(q)
        s1 = p[-seed:]
        s2 = q[-seed:]
        return 1.0 if s1 and s2 and s1 == rq[:seed] else 0.0

    def hairpin_penalty(self, p, seed=4):
        """
        Penalize if 3' seed is complementary to an internal segment
        """
        seed3 = p[-seed:]
        rc = revcomp(p)
        return 1.0 if seed3 and seed3 in rc[2:-2] else 0.0

    def kmer_uniqueness(self, seq, k=10):
        kmers = [seq[i:i+k] for i in range(0, max(0,len(seq)-k+1))]
        return Counter(kmers)

    def primer_uniqueness_score(self, prim, kcount, k=10):
        if len(prim) < k:
            return 0.5
        ks = [prim[i:i+k] for i in range(len(prim)-k+1)]
        repeats = sum(max(kcount.get(km, 0) - 1, 0) for km in ks)
        return 1.0 / (1.0 + repeats)

    # Fitness
    def calc_fitness(self, len_win=(18,25), tm_win=(58,62), 
                     gc_win=(40,60),amplicon_win=(120,300), 
                     kcount=None):

        if not self.is_valid(len_win, amplicon_win):
            self.repair(len_win, amplicon_win)
            if not self.is_valid(len_win, amplicon_win):
                self.fitness = -1e9
                return self.fitness
        
        fwd = self.forward()
        rev = self.reverse()

        if min(len(fwd), len(rev)) < 16:
            self.fitness = -1e9
            return self.fitness

        amp_len = self.amplicon_len()
        if amp_len < amplicon_win[0] or amp_len > amplicon_win[1]:
            center = 0.5 * (amplicon_win[0] + amplicon_win[1])
            amp_score = -abs((amp_len - center) / 50.0)
        else:
            amp_score = 1.0


        f_tm = self.tm_wallace(fwd)
        r_tm = self.tm_wallace(rev)
        f_gc = self.gc_content(fwd)
        r_gc = self.gc_content(rev)

        def win_score(v, lo, hi, width=5.0):
            if within(v, lo, hi): 
                return 1.0
            edge = lo if v < lo else hi 
            return max(0.0, 1.0 - abs(v-edge)/width)

        tm_score = 0.5*win_score(f_tm, *tm_win) + 0.5*win_score(r_tm, *tm_win)
        gc_score = 0.5*win_score(f_gc, *gc_win) + 0.5*win_score(r_gc, *gc_win)
        len_score = 0.5*win_score(len(fwd), *len_win) + 0.5*win_score(len(rev), *len_win)
        clamp_score = 0.5*self.gc_clamp_score(fwd) + 0.5*self.gc_clamp_score(rev)

        # penalties
        run_pen = 1.0 if (self.has_homopolymer(fwd) or self.has_homopolymer(rev)) else 0.0
        dimer_pen = self.short_3prime_complement_score(fwd, fwd) + self.short_3prime_complement_score(rev, rev) \
            + self.short_3prime_complement_score(fwd, rev) + self.short_3prime_complement_score(rev, fwd)
        hair_pen = self.hairpin_penalty(fwd) + self.hairpin_penalty(rev)

        if kcount is None:
            kcount = self.kcount_template
        uniq_score = 0.0
        if kcount is not None:
            uniq_score = 0.5* self.primer_uniqueness_score(fwd, kcount) + 0.5* self.primer_uniqueness_score(rev, kcount)

         # weights
        w_tm, w_gc, w_len, w_clamp, w_amp, w_uniq = 1.0, 1.0, 0.6, 0.5, 2.0, 0.8
        p_dimer, p_hair, p_run = 3.0, 2.0, 1.5

        fitness = (w_tm*tm_score + w_gc*gc_score + w_len*len_score \
                   +  w_clamp*clamp_score + w_amp*amp_score + w_uniq*uniq_score \
                   - p_dimer*dimer_pen - p_hair*hair_pen - p_run*run_pen)
        
        self.fitness = float(fitness)
        return self.fitness

    def genes(self):
        return [self.f_start, self.f_len, self.r_start, self.r_len]

    def from_genes(template, genes):
        return Individual(template, genes[0], genes[1], genes[2], genes[3])      

    def basic_metrics(self):
        F, R = self.forward(), self.reverse()
        return {
            "TmF": tm_wallace(F), "TmR": tm_wallace(R),
            "GCF": gc_content(F), "GCR": gc_content(R),
            "DG":  (hairpin_penalty(F) + hairpin_penalty(R) + dimer_penalty(F,R)),
            "Amp": self.amplicon_len(),
            "ClampF": gc_clamp_score(F), "ClampR": gc_clamp_score(R),
        }

        

In [4]:
def selection(population, turnament_size):
    k = min(len(population), turnament_size)
    participants = random.sample(population, k)
    return max(participants, key=lambda x: x.fitness)

In [5]:
def crossover(p1, p2, template, len_win=(18,25), amp_bounds=(120,300)):
    cut = random.randint(1,3)
    g1 = p1.genes()
    g2 = p2.genes()
    child_genes = g1[:cut] + g2[cut:]
    child = Individual.from_genes(template, child_genes)
    child.repair(len_win, amp_bounds)
    return child

In [6]:
def mutation(child, len_win, amp_bounds, max_shift=5, rate=0.35):
    if random.random() < rate: 
        child.f_len += random.choice([-1,1])
    if random.random() < rate: 
        child.r_len += random.choice([-1,1])
    if random.random() < rate: 
        child.f_start += random.randint(-max_shift, max_shift)
    if random.random() < rate: 
        child.r_start += random.randint(-max_shift, max_shift)
    child.repair(len_win, amp_bounds)
    return child

In [7]:
def random_individual(template, len_win, amp_bounds, attempts=400):
    template = clean_seq(template)
    n = len(template)
    for _ in range(attempts):
        f_len = random.randint(*len_win)
        r_len = random.randint(*len_win)
        f_start = random.randint(0, n - f_len - 1)
        min_gap = amp_bounds[0] - f_len
        r_min = max(f_start + f_len + 1, f_start + f_len + min_gap)
        r_min = min(r_min, n - r_len)
        if r_min > n - r_len: 
            continue
        r_start = random.randint(r_min, n - r_len)
        ind = Individual(template, f_start, f_len, r_start, r_len)
        if ind.is_valid(len_win, amp_bounds):
            ind.calc_fitness()
            return ind
    # Fallback
    ind = Individual(template, 10, 20, min(n-20, 10+20+150), 20)
    ind.repair(len_win, amp_bounds)
    ind.calc_fitness()
    return ind
    

In [8]:
def ga(template,
       elitism=2,
       generations=120,
       pop_size=150,
       turnament_size=13,
       len_win=(18, 25),
       amp_bounds=(120, 300),
       seed=42):
    
    random.seed(seed)
    # Init population
    population = [random_individual(template, len_win, amp_bounds)for _ in range(pop_size)]

    for _ in range(generations):
        # Elitism
        population.sort(key=lambda ind: ind.fitness, reverse=True)
        next_population = population[:elitism]
        
        while len(next_population) < pop_size:
            p1 = selection(population, turnament_size=2)
            p2 = selection(population, turnament_size=2)
            child = crossover(p1, p2, template, len_win, amp_bounds)
            child = mutation(child, len_win=len_win, amp_bounds=amp_bounds, rate=0.35)

            next_population.append(child)
        population = next_population
    
    population.sort(key=lambda ind: ind.fitness, reverse=True)
    best = population[0]
    return best, population[:10]
    

In [9]:
template = ("ATGCGTACGTTGACCTGATCGATCGGATCCGATGCTAGCTAGCTAGGCTTACGATCGATCG"*6)
best, top10 = ga(template, pop_size=120, generations=120, seed=42)
print("BEST fitness:", round(best.fitness,3))
print("BEST primers:")
print("  F:", best.forward(), "| start:", best.f_start, "len:", best.f_len)
print("  R:", best.reverse(), "| start:", best.r_start, "len:", best.r_len)
print("  Amplicon:", best.amplicon_len(), "bp")

BEST fitness: 5.509
BEST primers:
  F: CGATCGATGCGTACGTTGAC | start: 116 len: 20
  R: GATCCGATCGATCAGGTCAAC | start: 313 len: 21
  Amplicon: 218 bp


# Multiplex NSGA-II for PCR primer design

In [14]:
class MultiplexChromosome:
    def __init__(self, pairs, template):
        self.pairs = pairs
        self.template = template

    def clone(self):
        new_pairs = [Individual( self.template, p.f_start, p.f_len, p.r_start, p.r_len) for p in self.pairs]
        return MultiplexChromosome(new_pairs, self.template)

    def is_valid(self, len_win=(18,25), amp_bounds=(120,300), seed=4):
        for p in self.pairs:
            if not p.is_valid(len_win, amp_bounds):
                return False
        # global 3' seed conflicts (inter-pair)
        all_primers = [pp.forward() for pp in self.pairs] + [pp.reverse() for pp in self.pairs]
        for i in range(len(all_primers)):
            for j in range(i+1, len(all_primers)):
                if _seed_match(all_primers[i], all_primers[j], seed): 
                    return False
                if _seed_match(all_primers[j], all_primers[i], seed): 
                    return False
        return True

    def repair(self, len_win=(18,25), amp_bounds=(120,300)):
        for p in self.pairs: p.repair(len_win, amp_bounds)

    def evaluate(self, t_target=60.0, gc_target=50.0, amp_target=200, d_min=40.0):
        """Returns objectives tuple: (f_Tm, f_GC, f_DG, f_Amp)
        f_Tm: mean |Tm_pair - t_target| + (maxTm - minTm)
        f_GC: mean |GC - 50| across primers
        f_DG: sum of hairpin/self-dimer per pair + cross-dimers across ALL primers
        f_Amp: mean |Amp - amp_target| + penalty if closest amplicon gap < d_min
        """
        per = [p.basic_metrics() for p in self.pairs]
        pair_Tms = [ (m["TmF"]+m["TmR"])/2.0 for m in per ]
        mean_Tm_dev = sum(abs(t - t_target) for t in pair_Tms)/len(pair_Tms)
        span_Tm = (max(pair_Tms) - min(pair_Tms)) if len(pair_Tms) > 1 else 0.0
        f_Tm = mean_Tm_dev + span_Tm

        # GC deviation across all primers (both F and R)
        gc_vals = []
        for m in per: gc_vals.extend([m["GCF"], m["GCR"]])
        f_GC = sum(abs(g - gc_target) for g in gc_vals)/len(gc_vals)

        # ΔG-like penalties: intra-pair + global inter-primer cross
        intra = sum(m["DG"] for m in per)
        # all primers list
        all_prims = []
        for pp in self.pairs:
            all_prims.append(pp.forward()); all_prims.append(pp.reverse())
        inter = 0.0
        for i in range(len(all_prims)):
            for j in range(i+1, len(all_prims)):
                # cross-dimer check both directions
                inter += 1.0 if _seed_match(all_prims[i], all_prims[j], 4) else 0.0
                inter += 1.0 if _seed_match(all_prims[j], all_prims[i], 4) else 0.0
        f_DG = intra + inter

        # Amplicon objectives: closeness to target + spacing
        amps = [m["Amp"] for m in per]
        mean_amp_dev = sum(abs(a - amp_target) for a in amps)/len(amps)
        closest_gap = math.inf
        for i in range(len(amps)):
            for j in range(i+1, len(amps)):
                gap = abs(amps[i]-amps[j])
                if gap < closest_gap: closest_gap = gap
        penal = 0.0
        if len(amps) > 1 and closest_gap < d_min:
            penal = (d_min - closest_gap)  # linear penalty if too close
        f_Amp = mean_amp_dev + penal

        return (round(f_Tm,6), round(f_GC,6), round(f_DG,6), round(f_Amp,6))

    def genes(self):
        g=[]
        for p in self.pairs: g.extend(p.genes())
        return g 

    @staticmethod
    def from_genes_flat(flat, M, template):
        pairs=[]
        for i in range(M):
            block = flat[4*i:4*(i+1)]
            pairs.append(Individual.from_genes(template, block))
        return MultiplexChromosome(pairs, template)


In [15]:
def multiplex_mutation(chromosome, rate=0.3, max_shift=5, len_win=(18,25), amp_bounds=(120,300)):
    # Pick one pair at random and mutate
    idx = ewndom.randrange(len(chromosome.pairs))
    p = chromosome.pairs[idx]
    mutation(p, len_win, amp_bounds, max_shift, rate)

In [17]:
def multiplex_crossover(p1, p2, len_win=(18,25), amp_bounds=(120,300)):
        # 1-point over pairs (swap sublists)
        cut = random.randint(1, len(self.pairs)-1) if len(self.pairs)>1 else 1
        left = [Individual.from_genes(p.genes(), p1.template) for p in p1.pairs[:cut]]
        right = [Individual.from_genes(p.genes(), p2.template) for p in p2.pairs[cut:]]
        child_pairs = left + right
        child = MultiplexChromosome(child_pairs, p1.template)
        child.repair(len_win, amp_bounds)
        return child

In [19]:
def random_multiplex(template, M=3, len_win=(18,25), amp_bounds=(120,300)):
    pairs=[]
    for _ in range(M):
        pairs.append(random_individual(template, len_win, amp_bounds))
    mux = MultiplexChromosome(pairs, template)
    mux.repair(len_win, amp_bounds)
    return mux

### NSGA-II core