# Import

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import subprocess
import RNA
from Bio import Entrez, SeqIO
import pandas as pd
from base64 import b64encode
import json
from urllib import request, parse
import requests
import os

%matplotlib inline

# primer generation and alignment

In [34]:
class PrimerAnalyzer:
    """
    Основной класс для анализа праймеров, включающий генерацию, проверку через BLAST,
    и расчет основных параметров праймеров.
    """
    def __init__(self, email, reference_db, primer3_path="primer3_core"):
        self.email = email
        self.reference_db = reference_db
        self.primer3_path = primer3_path
        Entrez.email = email

    # ---------- Шаг 0: Загрузка последовательности из NCBI ----------
    def fetch_gene_sequence(self, gene_name, database='nucleotide'):
        try:
            handle = Entrez.esearch(db=database, term=gene_name, retmax=1)
            record = Entrez.read(handle)
            handle.close()

            if not record['IdList']:
                raise ValueError(f"Ген '{gene_name}' не найден в базе данных {database}.")

            gene_id = record['IdList'][0]
            handle = Entrez.efetch(db=database, id=gene_id, rettype='fasta', retmode='text')
            record = SeqIO.read(handle, 'fasta')
            handle.close()

            print(f"Последовательность гена '{gene_name}' успешно загружена.")
            return str(record.seq)
        except Exception as e:
            raise RuntimeError(f'Ошибка при загрузке последовательности: {e}')

    def run_primer3(self, sequence, target_start=300, target_length=500):
        if len(sequence) < 50:
            raise ValueError('Длина последовательности слишком мала для генерации праймеров!')
        # все проблемы тут, нужно оптимизировать параметры
        sequence_length = len(sequence)
        target_end = min(target_start + target_length, sequence_length)
    
        input_data = f"""
SEQUENCE_ID=example
SEQUENCE_TEMPLATE={sequence}
PRIMER_PRODUCT_SIZE_RANGE=100-250
PRIMER_OPT_SIZE=20
PRIMER_MIN_SIZE=18
PRIMER_MAX_SIZE=25
PRIMER_MIN_TM=63.0
PRIMER_OPT_TM=64.0
PRIMER_MAX_TM=65.0
PRIMER_MIN_GC=40
PRIMER_MAX_GC=65
PRIMER_NUM_RETURN=5
PRIMER_MAX_HAIRPIN_TH=5.0
PRIMER_MAX_SELF_ANY_TH=5.0
PRIMER_MAX_SELF_END_TH=5.0
=
"""
        with open('primer3_input.txt', 'w') as f:
            f.write(input_data.strip())
        
        subprocess.run([self.primer3_path, 'primer3_input.txt', '-output', 'primer3_output.txt'])
        
        with open('primer3_output.txt', 'r') as f:
            return f.read()

    def extract_primer(self, primer_output, pair_index, side):
        key = f'PRIMER_{side}_{pair_index}_SEQUENCE='
        for line in primer_output.split('\n'):
            if line.startswith(key):
                return line.split('=')[-1]
        return None

    # ---------- Шаг 2: Проверка праймеров с использованием BLAST ----------
    def run_local_blast(self, primer_sequence, evalue=0.01):
        blast_input = 'primer.fasta'
        blast_output = 'blast_output.txt'
        
        with open(blast_input, 'w') as f:
            f.write(f">primer\n{primer_sequence}\n")
        
        subprocess.run([
            'blastn', '-query', blast_input, '-db', self.reference_db,
            '-evalue', str(evalue), '-outfmt', '6', '-out', blast_output
        ])
        
        results = []
        if os.path.exists(blast_output):
            with open(blast_output, 'r') as f:
                for line in f:
                    results.append(line.strip().split('\t'))
        os.remove(blast_input)
        os.remove(blast_output)
        return results

    # ---------- Дополнительные расчеты ----------
    def calculate_tm(self, sequence):
        """Рассчитать Tm."""
        at_count = sequence.count('A') + sequence.count('T')
        gc_count = sequence.count('G') + sequence.count('C')
        return 2 * at_count + 4 * gc_count

    def calculate_gc_content(self, sequence):
        """Рассчитать процент GC-содержания."""
        gc_count = sequence.count('G') + sequence.count('C')
        return round((gc_count / len(sequence)) * 100, 2)

    # ---------- Полный анализ ----------
    def analyze_primer_for_gene(self, gene_name):
        print(f"Шаг 0: Загрузка последовательности для гена '{gene_name}'...")
        sequence = self.fetch_gene_sequence(gene_name)

        print('\nШаг 1: Генерация праймеров с помощью Primer3...')
        primer_output = self.run_primer3(sequence)

        print('\nШаг 2: Извлечение и проверка праймеров...')
        primer_pairs = []
        for i in range(5):
            forward = self.extract_primer(primer_output, i, 'LEFT')
            reverse = self.extract_primer(primer_output, i, 'RIGHT')
            if forward and reverse:
                primer_pairs.append((forward, reverse))

        print('\nШаг 3: Проверка специфичности с помощью BLAST...')
        valid_primer_pairs = []
        for forward, reverse in primer_pairs:
            forward_valid = not self.run_local_blast(forward)
            reverse_valid = not self.run_local_blast(reverse)
            if forward_valid and reverse_valid:
                valid_primer_pairs.append((forward, reverse))

        print('\nШаг 4: Расчет параметров праймеров...')
        primer_report = []
        for forward, reverse in valid_primer_pairs:
            primer_report.append({
                'forward': {
                    'sequence': forward,
                    'Tm': self.calculate_tm(forward),
                    'GC': self.calculate_gc_content(forward),
                },
                'reverse': {
                    'sequence': reverse,
                    'Tm': self.calculate_tm(reverse),
                    'GC': self.calculate_gc_content(reverse),
                },
                'product_length': len(sequence),
            })

        print('\n=== Итоговые валидные пары праймеров ===')
        for report in primer_report:
            print(f'Пара праймеров:')
            print(f'  Forward: {report['forward']['sequence']} (Tm: {report['forward']['Tm']}°C, GC: {report['forward']['GC']}%)')
            print(f'  Reverse: {report['reverse']['sequence']} (Tm: {report['reverse']['Tm']}°C, GC: {report['reverse']['GC']}%)')
            print(f'  Длина продукта амплификации: {report['product_length']} нуклеотидов\n')

        return primer_report

In [None]:
email = 'your_email@example.com'
reference_db = 'data/gencode_transcripts'
primer3_path = 'primer3_core'

analyzer = PrimerAnalyzer(email=email, reference_db=reference_db, primer3_path=primer3_path)

gene_name = 'MT-ND1'

target_start = 300
target_length = 500

print('Шаг 0: Загрузка последовательности для гена...')
sequence = analyzer.fetch_gene_sequence(gene_name)

print('\nШаг 1: Генерация праймеров...')
primer_output = analyzer.run_primer3(sequence, target_start=target_start, target_length=target_length)

print('\nШаг 2: Извлечение и проверка праймеров...')
primer_report = analyzer.analyze_primer_for_gene(gene_name)

print('\n=== Результаты анализа праймеров ===')
for idx, report in enumerate(primer_report):
    print(f'Пара {idx + 1}:')
    print(f'  Прямой праймер (Forward): {report['forward']['sequence']} '
          f'(Tm: {report['forward']['Tm']}°C, GC: {report['forward']['GC']}%)')
    print(f'  Обратный праймер (Reverse): {report['reverse']['sequence']} '
          f'(Tm: {report['reverse']['Tm']}°C, GC: {report['reverse']['GC']}%)')
    print(f'  Длина продукта амплификации: {report['product_length']} нуклеотидов\n')

# IDT API

In [16]:
class OligoAnalyzerAPI:
    """
    Класс для взаимодействия с OligoAnalyzer API.
    """

    TOKEN_URL = 'https://eu.idtdna.com/Identityserver/connect/token'
    ANALYSIS_URLS = {
        'self_dimer': 'https://eu.idtdna.com/restapi/v1/OligoAnalyzer/SelfDimer',
        'hetero_dimer': 'https://eu.idtdna.com/restapi/v1/OligoAnalyzer/HeteroDimer',
    }

    def __init__(self, client_id, client_secret, username, password):
        self.client_id = client_id
        self.client_secret = client_secret
        self.username = username
        self.password = password
        self.api_token = None

    def get_access_token(self):
        """
        Получает токен доступа для API OligoAnalyzer.
        """
        try:
            auth_str = b64encode(f'{self.client_id}:{self.client_secret}'.encode()).decode()
            headers = {
                'Content-Type': 'application/x-www-form-urlencoded',
                'Authorization': f'Basic {auth_str}',
            }
            data = parse.urlencode({
                'grant_type': 'password',
                'scope': 'test',
                'username': self.username,
                'password': self.password,
            }).encode()

            response = request.urlopen(request.Request(self.TOKEN_URL, data=data, headers=headers, method='POST'))
            response_data = json.loads(response.read().decode())

            self.api_token = response_data.get('access_token')
            print('Токен успешно получен!')
        except Exception as e:
            print(f'Ошибка при получении токена: {e}')
            self.api_token = None

    def analyze_dimer(self, analysis_type, params):
        """
        Общий метод для анализа димеров.
        :param analysis_type: Тип анализа ('self_dimer' или 'hetero_dimer').
        :param params: Параметры для запроса.
        """
        if not self.api_token:
            print('Нет токена доступа!')
            return None

        try:
            response = requests.post(
                self.ANALYSIS_URLS[analysis_type],
                headers={'Authorization': f'Bearer {self.api_token}'},
                params=params,
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f'Ошибка при анализе {analysis_type}: {e}')
            return None


class DimerVisualizer:
    """
    Класс для визуализации димеров.
    """

    @staticmethod
    def visualize(sequence, bonds, delta_g, base_pairs, secondary_sequence=None, **kwargs):
        """
        Текстовая визуализация димера.
        """
        secondary_sequence = secondary_sequence or sequence[::-1]
        start_position = kwargs.get('start_position', 0)
        top_padding = kwargs.get('top_padding', 0)
        bond_padding = kwargs.get('bond_padding', 0)
        bottom_padding = kwargs.get('bottom_padding', 0)
        title = kwargs.get('title', 'Dimer Visualization')

        top_line = ' ' * (top_padding + start_position) + f"5'  {sequence}  3'"
        connection_line =   * (4 + bond_padding + start_position) + .join(
            | if b == 2 else . if b == 1 else   for b in bonds
        )
        bottom_line =   * (bottom_padding + start_position) + f"3'  {secondary_sequence}  5'"

        max_length = max(len(top_line), len(connection_line), len(bottom_line))
        print(f'\n{title}\nDelta G: {delta_g} ккал/моль | Base Pairs: {base_pairs}')
        print(top_line.ljust(max_length))
        print(connection_line.ljust(max_length))
        print(bottom_line.ljust(max_length))
        print('-' * max_length)

In [5]:
# Настройки для OligoAnalyzerAPI
client_id = '21'
client_secret = '6d31fff8-d7cf-4ee4-98d8-d1ba69940f65'
username = 'takul'
password = 'ULAsFp5J6zjS24E'

# Инициализация API
api = OligoAnalyzerAPI(client_id, client_secret, username, password)
api.get_access_token()

if api.api_token:
    # Примеры праймеров для анализа
    forward_primer = 'ACCTCCTCCTCTTCTGGTTGGT'
    reverse_primer = 'TTCCGCTCCTGGATGTCCCTTG'

    # Анализ Self-Dimer для каждого праймера
    results = {
        'Forward Self-Dimer': api.analyze_dimer('self_dimer', {'primary': forward_primer}),
        'Reverse Self-Dimer': api.analyze_dimer('self_dimer', {'primary': reverse_primer}),
        'Hetero-Dimer': api.analyze_dimer('hetero_dimer', {'primary': forward_primer, 'secondary': reverse_primer}),
    }

    # Визуализация результатов
    visualizer = DimerVisualizer()
    for title, result in results.items():
        if result:
            for dimer in result:
                visualizer.visualize(
                    sequence=forward_primer if 'Forward' in title else reverse_primer,
                    secondary_sequence=reverse_primer if 'Hetero' in title else None,
                    bonds=dimer['Bonds'],
                    delta_g=dimer['DeltaG'],
                    base_pairs=dimer['BasePairs'],
                    start_position=dimer.get('StartPosition', 0),
                    top_padding=dimer.get('TopLinePadding', 0),
                    bond_padding=dimer.get('BondLinePadding', 0),
                    bottom_padding=dimer.get('BottomLinePadding', 0),
                    title=title,
                )
else:
    print('Токен доступа не получен. Проверьте настройки API.')

Токен успешно получен!

Forward Self-Dimer
Delta G: -4.41 ккал/моль | Base Pairs: 3
    5'  ACCTCCTCCTCTTCTGGTTGGT  3'
        |||            ...        
3'  TGGTTGGTCTTCTCCTCCTCCA  5'    
----------------------------------

Forward Self-Dimer
Delta G: -4.41 ккал/моль | Base Pairs: 3
5'  ACCTCCTCCTCTTCTGGTTGGT  3'
    |||  .          .  ...    
3'  TGGTTGGTCTTCTCCTCCTCCA  5'
------------------------------

Forward Self-Dimer
Delta G: -3.07 ккал/моль | Base Pairs: 2
 5'  ACCTCCTCCTCTTCTGGTTGGT  3'
      .  ||         ..  .      
3'  TGGTTGGTCTTCTCCTCCTCCA  5' 
-------------------------------

Forward Self-Dimer
Delta G: -3.07 ккал/моль | Base Pairs: 2
5'  ACCTCCTCCTCTTCTGGTTGGT  3'  
        .  ||      ..  .        
  3'  TGGTTGGTCTTCTCCTCCTCCA  5'
--------------------------------

Forward Self-Dimer
Delta G: -3.07 ккал/моль | Base Pairs: 2
5'  ACCTCCTCCTCTTCTGGTTGGT  3'   
        ||  .       .  ..        
   3'  TGGTTGGTCTTCTCCTCCTCCA  5'
---------------------------------

Forward Sel