# OpGene: Продвинутая оптимизация кодонов

Этот блокнот реализует меметический алгоритм для оптимизации последовательностей ДНК с учетом множества биологических критериев (CAI, GC, MFE, CPB, RBS).

In [None]:
# @title Установка зависимостей
!apt-get install -y viennarna
!pip install biopython psutil matplotlib "numpy<2.0.0"

In [None]:
# @title Основной код OpGene
import json
import os
import random
import gzip
import shutil
import urllib.request
import matplotlib.pyplot as plt
import math
import time
import re
import subprocess
import statistics
import logging
from typing import List, Tuple, Dict, Union, Optional
from math import log, exp
from Bio import SeqIO, Entrez
from Bio.Data import CodonTable
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from multiprocessing import Pool, cpu_count
from concurrent.futures import ProcessPoolExecutor
from collections import OrderedDict
import functools
import psutil

# --- Настройка логирования ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# --- Константы ---
FALLBACK_ORGANISM = "Escherichia coli K-12"
FALLBACK_ORGANISM_ID = "83333"
CPS_TABLE_FILENAME = "cps_table.json"
OUTPUT_DIR = "optimization_results"
DATA_DIR = "data"
CONFIG_FILE = "config.json"

DEFAULT_CODON_USAGE = {
    'TTT': 0.026, 'TTC': 0.016, 'TTA': 0.013, 'TTG': 0.013, 'CTT': 0.011, 'CTC': 0.010,
    'CTA': 0.004, 'CTG': 0.050, 'ATT': 0.030, 'ATC': 0.024, 'ATA': 0.007, 'ATG': 0.027,
    'GTT': 0.018, 'GTC': 0.015, 'GTA': 0.011, 'GTG': 0.023, 'TCT': 0.015, 'TCC': 0.014,
    'TCA': 0.012, 'TCG': 0.013, 'CCT': 0.017, 'CCC': 0.013, 'CCA': 0.020, 'CCG': 0.023,
    'ACT': 0.019, 'ACC': 0.023, 'ACA': 0.014, 'ACG': 0.015, 'GCT': 0.016, 'GCC': 0.026,
    'GCA': 0.021, 'GCG': 0.031, 'TAT': 0.018, 'TAC': 0.012, 'CAT': 0.013, 'CAC': 0.010,
    'CAA': 0.014, 'CAG': 0.029, 'AAT': 0.016, 'AAC': 0.021, 'AAA': 0.032, 'AAG': 0.011,
    'GAT': 0.035, 'GAC': 0.019, 'GAA': 0.039, 'GAG': 0.019, 'TGT': 0.005, 'TGC': 0.006,
    'TGG': 0.013, 'CGT': 0.020, 'CGC': 0.021, 'CGA': 0.006, 'CGG': 0.010, 'AGT': 0.009,
    'AGC': 0.015, 'AGA': 0.004, 'AGG': 0.002, 'GGT': 0.018, 'GGC': 0.027, 'GGA': 0.011,
    'GGG': 0.012
}

# [Здесь вставлен весь код классов из opgene_standalone.py]
# Для краткости я использую логику из standalone версии

class ConfigurationManager:
    def __init__(self, config_file: str = CONFIG_FILE):
        self.config_file = config_file
        self.default_config = {
            "spec_weights": {
                "CodonUsage": 1.2, "GcContent": 1.0, "AvoidPattern": 1.5,
                "RnaFolding": 1.0, "CodonPairBias": 1.0, "RbsSpecification": 0.5
            },
            "ma_parameters": {
                "population_size": 10, "num_generations": 5, "crossover_rate": 0.85,
                "mutation_rate": 0.06, "tournament_size": 4, "elitism_count": 2, "local_search_steps": 3
            },
            "rna_folding": { "window_size": 40, "bad_mfe": -12.0, "ideal_mfe": -4.0 },
            "cpb_parameters": { "sigmoid_k": 3.0, "sigmoid_center": 0.0 },
            "target_gc_range": [0.45, 0.55],
            "avoid_motifs": ["AATAAA", "GATC", "TATAAT", "GGGGGG", "CCCCCC", "AAAAAA", "TTTTTT"],
            "default_codon_table": 11
        }
    def load_config(self) -> Dict:
        return self.default_config

class Pattern:
    def __init__(self, motif: str):
        self.motif = motif.upper()
        self.revcomp = self._reverse_complement(motif)
        self._regex = re.compile(f"(?={self.motif})") if motif else None
        self._revcomp_regex = re.compile(f"(?={self.revcomp})") if self.revcomp != self.motif else None
    def _reverse_complement(self, seq: str) -> str:
        complement = str.maketrans("ACGTacgtNn", "TGCAtgcaNn")
        return seq.translate(complement)[::-1]
    def find_matches(self, sequence: str) -> List[Tuple[int, int]]:
        sequence = sequence.upper(); matches = set()
        if self._regex:
            for m in self._regex.finditer(sequence): matches.add((m.start(), m.start() + len(self.motif)))
        if self._revcomp_regex:
            for m in self._revcomp_regex.finditer(sequence): matches.add((m.start(), m.start() + len(self.revcomp)))
        return sorted(list(matches))

class SpecEvaluation:
    def __init__(self, score: float, message: str = "", locations: Optional[List[Tuple[int, int]]] = None):
        self.score = max(0.0, min(1.0, float(score) if not math.isnan(score) else 0.0))
        self.message = str(message); self.locations = locations or []
    def as_dict(self) -> Dict: return {"score": self.score, "message": self.message, "locations": self.locations}

class Specification:
    def evaluate(self, sequence: str) -> SpecEvaluation: raise NotImplementedError
    def get_name(self) -> str: return self.__class__.__name__.replace("Specification", "")

class CodonUsageSpecification(Specification):
    def __init__(self, codon_usage: Dict[str, float], codon_table_id: int = 11, mode: str = "optimization"):
        self.codon_usage = codon_usage or DEFAULT_CODON_USAGE; self.codon_table_id = codon_table_id
        self.mode = mode; self.codon_weights = self._compute_codon_weights()
    def _compute_codon_weights(self) -> Dict[str, float]:
        codon_weights = {}; table = CodonTable.unambiguous_dna_by_id.get(self.codon_table_id, CodonTable.standard_dna_table)
        processed_aas = set()
        for codon, aa in table.forward_table.items():
            if aa in processed_aas: continue
            synonymous_codons = [c for c, a in table.forward_table.items() if a == aa]
            freq_list = [self.codon_usage.get(c, 1e-9) for c in synonymous_codons]
            max_freq = max(freq_list) if freq_list else 1e-9
            for c in synonymous_codons: codon_weights[c] = max(1e-9, self.codon_usage.get(c, 1e-9) / max_freq) if max_freq > 1e-10 else 1e-9
            processed_aas.add(aa)
        for codon in table.forward_table:
            if len([c for c, a in table.forward_table.items() if a == table.forward_table[codon]]) == 1: codon_weights[codon] = 1.0
        return codon_weights
    def evaluate(self, sequence: str) -> SpecEvaluation:
        if not sequence: return SpecEvaluation(0.0, "CAI: N/A")
        if self.mode == "harmonization":
            score = self._calculate_harmonization(sequence); return SpecEvaluation(score, f"Harmonization: {score:.4f}")
        score = self._calculate_cai(sequence); return SpecEvaluation(score, f"CAI: {score:.4f}")
    def _calculate_harmonization(self, dna_seq: str) -> float:
        table = CodonTable.unambiguous_dna_by_id.get(self.codon_table_id, CodonTable.standard_dna_table)
        counts = {}; total_by_aa = {}
        for i in range(0, len(dna_seq), 3):
            codon = dna_seq[i:i+3].upper()
            if codon in table.stop_codons or codon not in self.codon_weights: continue
            aa = table.forward_table.get(codon)
            if not aa: continue
            counts[codon] = counts.get(codon, 0) + 1
            total_by_aa[aa] = total_by_aa.get(aa, 0) + 1
        if not total_by_aa: return 0.0
        errors = []
        for codon, count in counts.items():
            aa = table.forward_table[codon]; seq_freq = count / total_by_aa[aa]
            synonymous = [c for c, a in table.forward_table.items() if a == aa]
            host_total = sum(self.codon_usage.get(c, 0) for c in synonymous)
            host_freq = self.codon_usage.get(codon, 0) / host_total if host_total > 0 else 0
            errors.append((seq_freq - host_freq) ** 2)
        return max(0.0, 1.0 - (sum(errors)/len(errors) if errors else 0) * 5)
    def _calculate_cai(self, dna_seq: str) -> float:
        table = CodonTable.unambiguous_dna_by_id.get(self.codon_table_id, CodonTable.standard_dna_table)
        log_weights_sum = 0.0; valid_codon_count = 0
        for i in range(0, len(dna_seq), 3):
            codon = dna_seq[i:i+3].upper()
            if len(codon) != 3 or codon in table.stop_codons or codon not in self.codon_weights: continue
            weight = self.codon_weights.get(codon, 1e-9)
            if weight > 1e-10: log_weights_sum += log(weight); valid_codon_count += 1
        return exp(log_weights_sum / valid_codon_count) if valid_codon_count > 0 else 0.0

class GcContentSpecification(Specification):
    def __init__(self, target_range: List[float]): self.min_gc, self.max_gc = target_range
    def evaluate(self, sequence: str) -> SpecEvaluation:
        if not sequence: return SpecEvaluation(0.0, "GC: N/A")
        gc = sum(sequence.upper().count(c) for c in 'GC') / len(sequence)
        if self.min_gc <= gc <= self.max_gc: return SpecEvaluation(1.0, f"GC: {gc:.3f} (OK)")
        deviation = min(abs(gc - self.min_gc), abs(gc - self.max_gc))
        return SpecEvaluation(max(0.0, 1.0 - (deviation**2)/0.1**2), f"GC: {gc:.3f} (Out)")

class AvoidPatternSpecification(Specification):
    def __init__(self, patterns: List[str]): self.patterns = [Pattern(p) for p in patterns]
    def evaluate(self, sequence: str) -> SpecEvaluation:
        locs = []
        for p in self.patterns: locs.extend(p.find_matches(sequence))
        unique_locs = sorted(set(locs))
        return SpecEvaluation(max(0.0, 1.0 - 0.2 * len(unique_locs)), f"Patterns: {len(unique_locs)}", unique_locs)

class RnaFoldingSpecification(Specification):
    _mfe_cache = {}
    def __init__(self, window_size, ideal_mfe, bad_mfe):
        self.window_size = window_size; self.ideal_mfe = ideal_mfe; self.bad_mfe = bad_mfe
        try: subprocess.run(["RNAfold", "--version"], capture_output=True, check=True); self.has_rnafold = True
        except: self.has_rnafold = False
    def evaluate(self, sequence: str) -> SpecEvaluation:
        if not self.has_rnafold or len(sequence) < self.window_size: return SpecEvaluation(1.0, "RNAfold N/A")
        prefix = sequence[:self.window_size].upper()
        if prefix in self._mfe_cache: mfe = self._mfe_cache[prefix]
        else:
            try:
                proc = subprocess.run(["RNAfold", "--noPS"], input=prefix, text=True, capture_output=True, check=True)
                m = re.search(r'\(\s*([-+]?\d*\.?\d+)\)', proc.stdout.splitlines()[1])
                mfe = float(m.group(1)); self._mfe_cache[prefix] = mfe
            except: return SpecEvaluation(0.0, "MFE Error")
        score = 1.0 if mfe >= self.ideal_mfe else (0.0 if mfe <= self.bad_mfe else (mfe - self.bad_mfe)/(self.ideal_mfe - self.bad_mfe))
        return SpecEvaluation(score, f"MFE: {mfe:.2f}")

class CodonPairBiasSpecification(Specification):
    def __init__(self, cps_table, k=3.0, center=0.0): self.cps_table = cps_table; self.k = k; self.center = center
    def evaluate(self, sequence: str) -> SpecEvaluation:
        if len(sequence) < 6: return SpecEvaluation(0.0, "CPB: N/A")
        scores = []
        for i in range(0, len(sequence)-6, 3):
            pair = sequence[i:i+6].upper(); cps = self.cps_table.get(pair, 1e-9)
            scores.append(1.0 / (1.0 + exp(-self.k * (log(cps + 1e-9) - self.center))))
        return SpecEvaluation(statistics.mean(scores) if scores else 0.0, "CPB Evaluation")

class RbsSpecification(Specification):
    def __init__(self, sd="AGGAGG", spacing=(5, 10)): self.sd = sd; self.min_s, self.max_s = spacing; self._reg = re.compile(f"(?={sd})")
    def evaluate(self, sequence: str) -> SpecEvaluation:
        prefix = sequence[:50]; matches = [m.start() for m in self._reg.finditer(prefix)]
        if not matches: return SpecEvaluation(0.0, "RBS not found")
        best = 0.0
        for start in matches:
            spacing = len(sequence) - (start + len(self.sd)) - 3
            if self.min_s <= spacing <= self.max_s: score = 1.0
            else: score = max(0.0, 1.0 - min(abs(spacing - self.min_s), abs(spacing - self.max_s))/self.max_s)
            best = max(best, score)
        return SpecEvaluation(best, f"RBS Score: {best:.2f}")

def _evaluate_individual(dna_sequence, aa_sequence, specifications, weights):
    if not dna_sequence or len(dna_sequence) != len(aa_sequence) * 3: return dna_sequence, -float('inf'), {}
    spec_scores = {}
    for spec in specifications:
        try: spec_scores[spec.get_name()] = spec.evaluate(dna_sequence).score
        except: spec_scores[spec.get_name()] = 0.0
    total = sum(weights.get(name, 0.0) * score for name, score in spec_scores.items())
    return dna_sequence, total, spec_scores

class CodonOptimizer:
    def __init__(self, aa_sequence, codon_usage, specifications, weights, codon_table_id=11):
        self.aa_sequence = aa_sequence.upper().replace('*','')
        self.codon_usage = codon_usage; self.specifications = specifications; self.weights = weights
        self.codon_table_id = codon_table_id; self.num_processes = min(cpu_count(), 8)
        self.back_table = self._get_back_table(); self.fitness_cache = OrderedDict(); self.spec_cache = OrderedDict()
        self._executor = None
    def __enter__(self): self._executor = ProcessPoolExecutor(max_workers=self.num_processes); return self
    def __exit__(self, *args): 
        if self._executor: self._executor.shutdown()
    def _get_back_table(self):
        t = CodonTable.unambiguous_dna_by_id.get(self.codon_table_id, CodonTable.standard_dna_table)
        res = {}
        for c, aa in t.forward_table.items():
            if aa != '*': res.setdefault(aa, []).append(c)
        return res
    def _calculate_fitness(self, dna): 
        if dna in self.fitness_cache: return self.fitness_cache[dna]
        _, score, specs = _evaluate_individual(dna, self.aa_sequence, self.specifications, self.weights)
        self.fitness_cache[dna] = score; self.spec_cache[dna] = specs
        return score
    def _calculate_population_fitness(self, pop):
        needed = [ind for ind in pop if ind not in self.fitness_cache]
        if needed:
            eval_func = functools.partial(_evaluate_individual, aa_sequence=self.aa_sequence, specifications=self.specifications, weights=self.weights)
            results = list(self._executor.map(eval_func, needed)) if self._executor else [eval_func(ind) for ind in needed]
            for dna, score, specs in results: self.fitness_cache[dna] = score; self.spec_cache[dna] = specs
        return [self.fitness_cache[ind] for ind in pop]
    def optimize_ma(self, population_size, num_generations, crossover_rate, mutation_rate, tournament_size, elitism_count, local_search_steps):
        pop = [self._random_ind() for _ in range(population_size)]
        best_dna = ""; best_fitness = -float('inf')
        for gen in range(num_generations):
            fits = self._calculate_population_fitness(pop)
            current_best_idx = max(range(len(fits)), key=lambda i: fits[i])
            if fits[current_best_idx] > best_fitness: best_dna = pop[current_best_idx]; best_fitness = fits[current_best_idx]
            next_pop = [pop[i] for i in sorted(range(len(fits)), key=lambda i: fits[i], reverse=True)[:elitism_count]]
            while len(next_pop) < population_size:
                p1 = self._tournament(pop, fits, tournament_size); p2 = self._tournament(pop, fits, tournament_size)
                c1, c2 = self._crossover(p1, p2) if random.random() < crossover_rate else (p1, p2)
                c1 = self._local_search(self._mutate(c1, mutation_rate), local_search_steps)
                next_pop.append(c1)
            pop = next_pop
        return best_dna, self._evaluate_final(best_dna)
    def _random_ind(self): return "".join(random.choice(self.back_table[aa]) for aa in self.aa_sequence)
    def _tournament(self, pop, fits, size):
        idxs = random.sample(range(len(pop)), size)
        return pop[max(idxs, key=lambda i: fits[i] * (1 + 0.2 * self._div(pop[i], pop)))]
    def _div(self, ind, pop): 
        sample = random.sample(pop, min(len(pop), 10)); dist = 0
        for o in sample: dist += sum(1 for i in range(0, len(ind), 3) if ind[i:i+3] != o[i:i+3]) / (len(ind)/3)
        return dist / len(sample)
    def _crossover(self, p1, p2):
        cut = random.randint(1, len(self.aa_sequence)-1) * 3
        return p1[:cut]+p2[cut:], p2[:cut]+p1[cut:]
    def _mutate(self, dna, rate):
        codons = [dna[i:i+3] for i in range(0, len(dna), 3)]
        for i, aa in enumerate(self.aa_sequence):
            if random.random() < rate: codons[i] = random.choice(self.back_table[aa])
        return "".join(codons)
    def _local_search(self, dna, steps):
        curr_dna = dna; curr_fit = self._calculate_fitness(dna)
        for _ in range(steps):
            idx = random.randint(0, len(self.aa_sequence)-1)
            for alt in self.back_table[self.aa_sequence[idx]]:
                test_dna = curr_dna[:idx*3] + alt + curr_dna[(idx+1)*3:]
                test_fit = self._calculate_fitness(test_dna)
                if test_fit > curr_fit: curr_dna = test_dna; curr_fit = test_fit; break
        return curr_dna
    def _evaluate_final(self, dna):
        return {'fitness': self._calculate_fitness(dna), 'dna': dna, 'gc': sum(dna.count(c) for c in 'GC')/len(dna)}

class ResourceManager:
    def __init__(self, organism="Escherichia coli K-12", organism_id="83333"):
        self.organism = organism; self.organism_id = organism_id; os.makedirs("data", exist_ok=True)
    def load_codon_usage(self): return DEFAULT_CODON_USAGE
    def calculate_codon_pair_scores(self): return None

## Запуск оптимизации

In [None]:
# @title Параметры запуска
AA_SEQUENCE = "MAGWSR" # @param {type:"string"}
MODE = "optimization" # @param ["optimization", "harmonization"]
ORGANISM = "Bacillus subtilis" # @param {type:"string"}
TAX_ID = "1423" # @param {type:"string"}

config = ConfigurationManager().load_config()
resources = ResourceManager(ORGANISM, TAX_ID)
codon_usage = resources.load_codon_usage()

specs = [
    CodonUsageSpecification(codon_usage, mode=MODE),
    GcContentSpecification(config["target_gc_range"]),
    AvoidPatternSpecification(config["avoid_motifs"]),
    RbsSpecification(),
    RnaFoldingSpecification(**config["rna_folding"])
]

with CodonOptimizer(AA_SEQUENCE, codon_usage, specs, config["spec_weights"]) as opt:
    dna, metrics = opt.optimize_ma(**config["ma_parameters"])
    print(f"Оптимизированная последовательность: {dna}")
    print(f"Приспособленность: {metrics['fitness']:.4f}")
    print(f"GC Content: {metrics['gc']:.3f}")