# **PRIMER MACHINE LEARNING MODEL**

## **PREPARATION**

In [19]:
# Library
from collections import deque
import math
import numpy as np
import pandas as pd
import os

# random -> for testing
import random


In [20]:
# DEFINING VALUES (for future development)
SHORT_LOWER_BOUND = 80
SHORT_UPPER_BOUND = 100
MEDIUM_LOWER_BOUND = 100
MEDIUM_UPPER_BOUND = 150
LONG_LOWER_BOUND = 200
LONG_UPPER_BOUND = 250
PRIMER_LENGTH_LOWER_BOUND = 18
PRIMER_LENGTH_UPPER_BOUND = 22
OPTIMAL_TM_LOWER_BOUND = 58
OPTIMAL_TM_UPPER_BOUND = 60
CPG_THRESHOLD = 5

# DEFINING VALUES (for current development)
EXACTA_SHORT_LENGTH = 90
EXACTA_MEDIUM_LENGTH = 150
EXACTA_LONG_LENGTH = 250
EXACTA_PRIMER_LENGTH = 22

# DEFINING VALUES (for easier development)
DICT_GENOME_KEY = ["A","C","G","T"]
DICT_GENOME = {"A":0,"C":0,"G":0,"T":0}
KEY_CPG = "CG"

### **Class** (Kalau perlu, rasanya tidak)

In [21]:
class Primer:
    KEY_CPG = "CG"

    def __init__(self, category: str, start: int, end: int, genome: str):
        self.category = category
        self.start = start
        self.end = end
        self.genome = genome

    def get_cpg_count(self) -> int:
        return sum(1 for i in range(len(self.genome) - 1) if self.genome[i:i+2] == self.KEY_CPG)

    def __repr__(self):
        return f"Primer(category={self.category}, start={self.start}, end={self.end}, cpg_count={self.get_cpg_count()})"

    def toDictionary(self):
        return {
            "Category": self.category,
            "Start": self.start,
            "End": self.end,
            "CpG": self.get_cpg_count()
        }


In [22]:
class PrimerSet:
    def __init__(self, f1: Primer, r1: Primer, r2: Primer, r3: Primer):
        self.f1 = f1
        self.r1 = r1
        self.r2 = r2
        self.r3 = r3

    def get_all_cpg_counts(self):
        """Return CpG counts for each primer."""
        return {
            "f1": self.f1.get_cpg_count(),
            "r1": self.r1.get_cpg_count(),
            "r2": self.r2.get_cpg_count(),
            "r3": self.r3.get_cpg_count()
        }

    def total_cpg(self):
        """Total CpG across all primers."""
        return sum(self.get_all_cpg_counts().values())

    def toDictionary(self):
        """Return a combined dictionary of all primers."""
        return {
            "f1": self.f1.toDictionary(),
            "r1": self.r1.toDictionary(),
            "r2": self.r2.toDictionary(),
            "r3": self.r3.toDictionary(),
            "TotalCpG": self.total_cpg()
        }

    def __repr__(self):
        return f"WholeBase(f1={self.f1}, r1={self.r1}, r2={self.r2}, r3={self.r3}, totalCpG={self.total_cpg()})"


## **TOOLS**

### **TESTCASE**

In [23]:
def create_big_test_case():
  # Make clusters of CpGs + some scattered points
  cpg_positions = []
  # Cluster 1: dense CpGs between 100–300
  cpg_positions += [random.randint(100, 300) for _ in range(50)]
  # Cluster 2: dense CpGs between 1000–1500
  cpg_positions += [random.randint(1000, 1500) for _ in range(120)]
  # Cluster 3: scattered CpGs in 5000–6000
  cpg_positions += [random.randint(5000, 6000) for _ in range(40)]
  # Cluster 4: very dense CpGs 8000–8050
  cpg_positions += [random.randint(8000, 8050) for _ in range(200)]
  # Add some random noise across the genome
  cpg_positions += [random.randint(0, 10000) for _ in range(200)]
  # Sort positions
  cpg_positions = sorted(cpg_positions)
  return cpg_positions

### **TM CALCULATOR**

In [24]:
def calculateTM(primer):
  counter = DICT_GENOME.copy()
  for i in range(len(primer)):
    counter[primer[i]]+=1
  print(counter)
  return ((4*(counter[DICT_GENOME_KEY[2]]+counter[DICT_GENOME_KEY[1]]))+(2*(counter[DICT_GENOME_KEY[0]]+counter[DICT_GENOME_KEY[3]])))

In [25]:
# TEST Calculate_Tm
def test_calculateTM():
  res = calculateTM("ACGTACGTACGTACGTAAAACCCGGTT")
  print(res);

test_calculateTM()

{'A': 8, 'C': 7, 'G': 6, 'T': 6}
80


### **PRUNING**

In [26]:
def find_Cpg(genome: str):
  position = []
  for i in range(len(genome) - 1):
    if genome[i:i+2] == KEY_CPG:
      position.append(i)
  return position

In [27]:
def test_findCpg():
  genome1 = "ATCGATCGCG"   # CG at 2, 6, 8
  genome2 = "AAAAA"        # no CG
  genome3 = "CGCGCG"       # overlapping CGs: 0, 2, 4
  genome4 = "C"            # too short, should return []
  genome5 = "ATCCGGATC"    # CG at 3

  print(find_Cpg(genome1))  # [2, 6, 8]
  print(find_Cpg(genome2))  # []
  print(find_Cpg(genome3))  # [0, 2, 4]
  print(find_Cpg(genome4))  # []
  print(find_Cpg(genome5))  # [3]

test_findCpg()

[2, 6, 8]
[]
[0, 2, 4]
[]
[3]


In [28]:
def find_cpg_intervals(cpg_positions,max_span, min_threshold=CPG_THRESHOLD,start_point = PRIMER_LENGTH_LOWER_BOUND):
  window_queue = deque()
  intervals = []

  for pos in cpg_positions:
    if pos > start_point:
      window_queue.append(pos)

    while window_queue and window_queue[0] < pos - (max_span):
      window_queue.popleft()

    if len(window_queue) >= min_threshold:
      start = window_queue[0]
      end = pos
      intervals.append({"start":start,"end": end,"length": (end-start+1),"cpg_count":len(window_queue)})

  return intervals


In [38]:
def test_find_cpg_intervals():
  cpg_positions = [5, 20, 35, 50, 60, 62, 65, 120, 130, 135]
  short_cpg_span = SHORT_UPPER_BOUND-(2*PRIMER_LENGTH_LOWER_BOUND)
  result = find_cpg_intervals(cpg_positions,short_cpg_span)
  print(result)

test_find_cpg_intervals()

[{'start': 20, 'end': 62, 'length': 43, 'cpg_count': 5}, {'start': 20, 'end': 65, 'length': 46, 'cpg_count': 6}]


In [37]:
def find_nested_intervals(inner, outer):
  # seng iki yo vi
  # gapaham aku sing ini
  # aku mikirnya dari semua kemungkinan cluster CpG mu itu
  # difilter lagi pake kondisi yg s 90, m 150, l 250
  # trs jmlh CpG yg s 5, m 10, l 15
  # tp ini masih bisa munculin semua kemungkinan yang memenuhi syarat itu
  # lek uda, baru kasih syarat lagi yg kek Tm dkk buat milih 1 yg paling optimal

  pass #lek ws diisi comment

In [50]:
# ini coba pake function mu tapi dimodif
def find_cpg_intervals(cpg_positions, max_length, min_cpg):
    intervals = []
    for i in range(len(cpg_positions)):
        for j in range(i+1, len(cpg_positions)):
            start = cpg_positions[i]
            end = cpg_positions[j]
            cpg_count = j - i + 1
            length = end - start + 1
            if length <= max_length and cpg_count >= min_cpg:
                intervals.append({
                    "start": start,
                    "end": end,
                    "length": length,
                    "cpg_count": cpg_count
                })
    return intervals

In [54]:
def filter_exact_products(cpg_positions):
    results = {"short": [], "medium": [], "long": []}

    # short product (5 CpG)
    short_candidates = find_cpg_intervals(cpg_positions, max_length=999999, min_cpg=CPG_THRESHOLD)
    for interval in short_candidates:
        if interval["cpg_count"] == 5:
            interval["length"] = EXACTA_SHORT_LENGTH  # set ke 90
            results["short"].append(interval)

    # medium product (10 CpG)
    medium_candidates = find_cpg_intervals(cpg_positions, max_length=999999, min_cpg=CPG_THRESHOLD*2)
    for interval in medium_candidates:
        if interval["cpg_count"] == 10:
            interval["length"] = EXACTA_MEDIUM_LENGTH  # set ke 150
            results["medium"].append(interval)

    # long product (15 CpG)
    long_candidates = find_cpg_intervals(cpg_positions, max_length=999999, min_cpg=CPG_THRESHOLD*3)
    for interval in long_candidates:
        if interval["cpg_count"] == 15:
            interval["length"] = EXACTA_LONG_LENGTH  # set ke 250
            results["long"].append(interval)

    return results


In [57]:
# Testing dari GPT buat mastiin function e bener ta ga

# Forward primer mulai dari 100
F1_start = 100

# Interval short: 46 bp (harus ada 5 CpG)
short_interval = [125, 130, 135, 140, 145]

# Interval medium: 38 bp (tambahan 5 CpG, total 10 dari F1)
medium_interval = [175, 180, 185, 190, 195]

# Interval long: 78 bp (tambahan 5 CpG, total 15 dari F1)
long_interval = [230, 240, 250, 260, 270]

# Gabungkan jadi cpg_positions
cpg_positions = sorted(short_interval + medium_interval + long_interval)

In [58]:
results = filter_exact_products(cpg_positions)
print(results)

{'short': [{'start': 125, 'end': 145, 'length': 90, 'cpg_count': 5}, {'start': 130, 'end': 175, 'length': 90, 'cpg_count': 5}, {'start': 135, 'end': 180, 'length': 90, 'cpg_count': 5}, {'start': 140, 'end': 185, 'length': 90, 'cpg_count': 5}, {'start': 145, 'end': 190, 'length': 90, 'cpg_count': 5}, {'start': 175, 'end': 195, 'length': 90, 'cpg_count': 5}, {'start': 180, 'end': 230, 'length': 90, 'cpg_count': 5}, {'start': 185, 'end': 240, 'length': 90, 'cpg_count': 5}, {'start': 190, 'end': 250, 'length': 90, 'cpg_count': 5}, {'start': 195, 'end': 260, 'length': 90, 'cpg_count': 5}, {'start': 230, 'end': 270, 'length': 90, 'cpg_count': 5}], 'medium': [{'start': 125, 'end': 195, 'length': 150, 'cpg_count': 10}, {'start': 130, 'end': 230, 'length': 150, 'cpg_count': 10}, {'start': 135, 'end': 240, 'length': 150, 'cpg_count': 10}, {'start': 140, 'end': 250, 'length': 150, 'cpg_count': 10}, {'start': 145, 'end': 260, 'length': 150, 'cpg_count': 10}, {'start': 175, 'end': 270, 'length': 15

## Logic

### Input (Biarin dulu)

In [31]:
# input (ON GOING)
"""
directory_address = ""
for file in os.listdir(directory_address):
    if file.endswith(".fasta"):
        file_path = os.path.join(directory_address, file)
"""

'\ndirectory_address = ""\nfor file in os.listdir(directory_address):\n    if file.endswith(".fasta"):\n        file_path = os.path.join(directory_address, file)\n'

In [None]:
genome = ""

### Process

In [32]:
# cpg_positions = find_Cpg(genome)
cpg_positions = create_big_test_case()
short = find_cpg_intervals(cpg_positions,max_span=SHORT_UPPER_BOUND-(2*PRIMER_LENGTH_LOWER_BOUND),min_threshold=CPG_THRESHOLD)
medium = find_cpg_intervals(cpg_positions,max_span=MEDIUM_UPPER_BOUND-(2*PRIMER_LENGTH_LOWER_BOUND),min_threshold=2*CPG_THRESHOLD)
long = find_cpg_intervals(cpg_positions,max_span=LONG_UPPER_BOUND-(2*PRIMER_LENGTH_LOWER_BOUND),min_threshold=3*CPG_THRESHOLD)

In [33]:
df_short = pd.DataFrame(short)
df_medium = pd.DataFrame(medium)
df_long = pd.DataFrame(long)

#### Display Intervals

In [34]:
df_short

Unnamed: 0,start,end,length,cpg_count
0,93,115,23,5
1,93,117,25,6
2,93,127,35,7
3,93,133,41,8
4,93,135,43,9
...,...,...,...,...
413,8000,8049,50,199
414,8000,8050,51,200
415,8000,8050,51,201
416,8013,8077,65,157


In [35]:
df_medium

Unnamed: 0,start,end,length,cpg_count
0,21,133,113,10
1,21,135,115,11
2,93,138,46,10
3,93,143,51,11
4,93,144,52,12
...,...,...,...,...
379,7938,8050,113,203
380,7979,8077,99,203
381,8000,8112,113,203
382,8028,8142,115,101


In [36]:
df_long

Unnamed: 0,start,end,length,cpg_count
0,21,145,125,15
1,21,151,131,16
2,21,153,133,17
3,21,161,141,18
4,21,162,142,19
...,...,...,...,...
375,7938,8142,205,206
376,7979,8158,180,206
377,7979,8170,192,207
378,8041,8255,215,45
