In [35]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
plt.style.use("seaborn")
np.random.seed(42)

In [36]:
class Node:
    def __init__(self, id, value):
        self.id = id
        self.value = value

    def __str__(self) -> str:
        return str(self.value)
    
    def __repr__(self) -> str:
        return f"{self.id}/{self.value}"

def edge_cost(frag1:str, frag2:str):
    if frag1 == frag2:
        return 0
    cost = len(frag1)
    for i in range(len(frag1)):
        sub1 = frag1[i+1:]
        sub2 = frag2[0:-i-1]
        # print(sub1, sub2)
        if sub1 == sub2:
            return i+1
    return cost

def create_cost_matrix(nodes):
    matrix = np.zeros([len(nodes), len(nodes)])
    for frag_from in nodes:
        for frag_to in nodes:
            cost = edge_cost(frag_from.value, frag_to.value)
            matrix[frag_from.id][frag_to.id] = cost
    return matrix

In [37]:
import random
def sequence_length(sequence:list[Node], cost_matrix):
    if len(sequence) == 0:
        return 0
    length = len(sequence[0].value)
    for node1, node2 in zip(sequence, sequence[1:]):
        length += cost_matrix[node1.id][node2.id]
    return length

def random_good_path(nodes, cost_matrix, max_seq_len, fragment_len, initial_sequence = None):
    nodes_local = nodes.copy()
    
    if initial_sequence is None:
        sequence = []
        current = random.choice(nodes_local)
        sequence.append(current)
        nodes_local.remove(current)
        current_seq_len = len(current.value)
    else:
        sequence = initial_sequence
        current = sequence[-1]
        current_seq_len = sequence_length(sequence, cost_matrix)

    can_continue = True
    while can_continue:
        avialble_ids = [x.id for x in nodes_local]
        if len(avialble_ids) == 0:
            can_continue = False
            break
        
        min_distance = min(cost_matrix[current.id][avialble_ids])
        closest_nodes = [ x for x in nodes_local if cost_matrix[current.id][x.id]==min_distance ]
        next_node = random.choice(closest_nodes)
        
        current_seq_len +=  cost_matrix[current.id][next_node.id]
        if current_seq_len > max_seq_len or min_distance == fragment_len:
            can_continue = False
            break
        else:
            sequence.append(next_node)
            nodes_local.remove(next_node)
            current = next_node
               
    return sequence

    
def generate_initial_pop(nodes, population_num, cost_matrix, max_seq_len, fragment_len):
    population = []

    for i in range(population_num):
        population.append(random_good_path(nodes, cost_matrix, max_seq_len, fragment_len))
    return population


def print_sequence(sequence, cost_matrix):
    print(len(sequence), sequence_length(sequence, cost_matrix), sequence)
    

# initial_population = generate_initial_pop(nodes, population_size)
# for sequence in initial_population:
#     print_sequence(sequence)

In [38]:
from numpy.random import choice
from random import choices

def f_score(sequence, max_nodes, max_seq_len, cost_matrix)->float:
        no_nodes = len(sequence) / max_nodes
        length = sequence_length(sequence, cost_matrix) / max_seq_len
        # gives more weight to no_nodes
        score = 1.01 * (no_nodes * length) / (0.01*no_nodes + length)
        return score

def my_score(sequence, population):
    seq_len = len(sequence)
    pop_len_sorted = sorted(population, key=lambda x: len(x))
    min_len = len(pop_len_sorted[0])
    max_len = len(pop_len_sorted[-1])
    score = (seq_len-min_len) / (max_len - min_len)
    return score

def calculate_propabilities(population):
    scores = np.asarray(
        [my_score(x, population) for x in population]
    )
    seq_propabilities = scores / np.sum(scores)
    return seq_propabilities
    
# propabilities = calculate_propabilities(initial_population)

def select(population, selectivity, propabilities):
    selected_num = int(len(population) * selectivity)
    # draw = choice(population, selected_num, p=propabilities).tolist()
    draw = choices(population, weights=propabilities, k=selected_num)
    return draw

def get_best_sequence(population:list[list[Node]]):
    scores = [(my_score(x, population),x) for x in population]
    scores_sorted = sorted(scores, key=lambda x : x[0], reverse=True)
    best_score, best_seq = scores_sorted[0]
    return best_seq
    

# natural_selection = select(initial_population, selectivity, propabilities)
# print(propabilities)
# for sequence in natural_selection:
#     print_sequence(sequence)


In [39]:
def crossover(path1:list[Node], path2:list[Node], cost_matrix, max_seq_len, nodes, fragment_len):
    geneA_idx = random.randint(0, len(path1))
    geneB_idx = random.randint(0, len(path1))
    
    startGene = min(geneA_idx, geneB_idx)
    endGene = max(geneA_idx, geneB_idx)

    if startGene == endGene:
        endGene += 1

    child = [path1[i] for i in range(startGene, endGene)]
    child_len = sequence_length(child, cost_matrix)

    for next_node in path2:
        if next_node in child:
            continue

        child_len += cost_matrix[child[-1].id][next_node.id]
        if child_len <= max_seq_len:
            child.append(next_node)
        else:
            break

    child_len = sequence_length(child, cost_matrix)
    if child_len < max_seq_len:
        nodes_left = [item for item in nodes if item not in child]
        child = random_good_path(nodes_left, cost_matrix, max_seq_len, fragment_len, initial_sequence=child)

    return child



def swap_nodes(sequence:list[Node], mutation_rate:float, max_seq_len, cost_matrix):
    sequence_copy = sequence.copy()
    for swapped in range(len(sequence_copy)):
        swap_with = int(random.random() * len(sequence))

        sequence_copy[swapped], sequence_copy[swap_with] = sequence_copy[swap_with], sequence_copy[swapped]
        if not (sequence_length(sequence_copy, cost_matrix) <= max_seq_len and random.random() < mutation_rate):
            # reverse swap
            sequence_copy[swapped], sequence_copy[swap_with] = sequence_copy[swap_with], sequence_copy[swapped]
    return sequence_copy
        


def insert_nodes(sequence:list[Node], mutation_rate:float, nodes, max_seq_len, cost_matrix):
    sequence_copy = sequence.copy()
    nodes_left = [item for item in nodes if item not in sequence_copy]

    for insert_idx in range(len(sequence_copy)):

        insert_node = random.choice(nodes_left)
        sequence_copy.insert(insert_idx, insert_node)

        if not(sequence_length(sequence_copy, cost_matrix) <= max_seq_len and random.random() < mutation_rate):
            sequence_copy.remove(insert_node)
    return sequence_copy

def breed_population(parents: list[Node], population_size, crossover_p, cost_matrix, max_seq_len, nodes, fragment_len):
    children = []
    for _ in range(population_size):
        if np.random.rand() < crossover_p:
            parent1, parent2 = random.sample(parents, k=2)
            child = crossover(parent1, parent2, cost_matrix, max_seq_len, nodes, fragment_len)
        else:
            child = random.choice(parents)
        children.append(child)
    return children

    
def mutate_population(children: list[Node], mutate_p, max_seq_len, cost_matrix, nodes):
    mutated = []
    for child in children:
        mutated_child = swap_nodes(child, mutate_p, max_seq_len, cost_matrix)
        mutated_child = insert_nodes(mutated_child, mutate_p, nodes, max_seq_len, cost_matrix)
        mutated.append(mutated_child)
    return mutated


# children = breed_population(natural_selection, population_size, 0.5)
# mutated = mutate_population(children, 0.1)
# for sequence in natural_selection:
#     print_sequence(sequence)

# print("Breeded and mutated")
# for sequence in mutated:
#     print_sequence(sequence)

In [64]:
from pathlib import Path
import time

instance_types_names = {
    1: "negatywnymi losowymi",
    2: "negatywnymi wynikającymi z powtórzeń",
    3: "pozytywnymi losowymi",
    4: "pozytywnymi, przekłamania na końcach oligonukleotydów"
}


def find_genome(input_path:Path):
    fragment_len = 10 
    # max_seq_len = 209 # changes with different files
    file_info = input_path.name.split('_')
    filename = file_info[0]
    max_seq_len = int(file_info[1])
    instance_type = instance_types_names[int(file_info[2])]

    print("Sequencing", filename, max_seq_len, instance_type)

    population_size = 50
    selectivity = 0.2
    crossover_p = 0.5
    mutate_p = 0.3
    iterations = 5

    fragments = []
    with open(input_path) as f:
        fragments = f.read().splitlines()

    nodes =[ Node(id, frag) for id,frag in enumerate(fragments)]
    cost_matrix = create_cost_matrix(nodes)
    max_nodes = len(nodes)

    history = []
    best_sequence = None

    start = time.time()

    population = generate_initial_pop(nodes, population_size, cost_matrix, max_seq_len, fragment_len)
    best_sequence = get_best_sequence(population)
    history.append(best_sequence)
    for _ in range(iterations):
        propabilities = calculate_propabilities(population)
        parents = select(population, selectivity, propabilities)
        children = breed_population(parents, population_size, crossover_p, cost_matrix, max_seq_len, nodes, fragment_len)
        mutated_children = mutate_population(children, mutate_p, max_seq_len, cost_matrix, nodes)
        population = mutated_children
        best_sequence = get_best_sequence(population)
        history.append(best_sequence)

    end = time.time()

    for seq in history:
        print_sequence(seq, cost_matrix)

    return (filename, max_seq_len, instance_type, end-start, len(best_sequence), sequence_length(best_sequence, cost_matrix), len(history[0]), sequence_length(history[0], cost_matrix), export_sequence(best_sequence, cost_matrix) )

def export_sequence(sequence_list:list[Node], cost_matrix) -> str:
    sequence_str = sequence_list[0].value

    for node1, node2 in zip(sequence_list, sequence_list[1:]):
        length = int(cost_matrix[node1.id][node2.id])
        sequence_str += node2.value[-length:]
    return sequence_str
        



In [77]:
import csv
test_files = Path("instances").iterdir()
outputs = []



header = ["Filename", "Max Sequence length", "Instance type", "Time",  "No. of nodes", "Sequence length", "Initial no. of nodes", "Initial sequence length", "Sequence"]
with open('results.csv', 'a', encoding='UTF8', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(header)

    for test_file in Path("instances").iterdir():
        out = find_genome(test_file)
        writer.writerow(out)
        # writer.writerows(out)
        # outputs.append(out)



Sequencing 62.400+160 409 pozytywnymi losowymi
396 409.0 [454/GTTGGTCTCG, 497/TCTCGCTGCC, 269/CTCGCTGCCG, 488/TCGCTGCCGT, 237/CGCTGCCGTC, 384/GCTGCCGTCC, 285/CTGCCGTCCA, 516/TGCCGTCCAG, 358/GCCGTCCAGG, 203/CCGTCCAGGT, 249/CGTCCAGGTG, 438/GTCCAGGTGC, 484/TCCAGGTGCA, 186/CCAGGTGCAC, 164/CAGGTGCACC, 99/AGGTGCACCG, 427/GGTGCACCGC, 445/GTGCACCGCC, 510/TGCACCGCCT, 343/GCACCGCCTG, 145/CACCGCCTGC, 43/ACCGCCTGCC, 199/CCGCCTGCCT, 234/CGCCTGCCTC, 365/GCCTGCCTCT, 217/CCTGCCTCTC, 287/CTGCCTCTCA, 518/TGCCTCTCAG, 361/GCCTCTCAGC, 210/CCTCTCAGCA, 271/CTCTCAGCAG, 496/TCTCAGCAGG, 265/CTCAGCAGGA, 480/TCAGCAGGAT, 158/CAGCAGGATG, 77/AGCAGGATGG, 347/GCAGGATGGA, 160/CAGGATGGAC, 89/AGGATGGACG, 398/GGATGGACGT, 336/GATGGACGTG, 118/ATGGACGTGA, 524/TGGACGTGAT, 394/GGACGTGATG, 317/GACGTGATGG, 58/ACGTGATGGA, 251/CGTGATGGAT, 444/GTGATGGATG, 507/TGATGGATGG, 337/GATGGATGGC, 120/ATGGATGGCT, 528/TGGATGGCTG, 399/GGATGGCTGC, 339/GATGGCTGCC, 121/ATGGCTGCCA, 529/TGGCTGCCAG, 409/GGCTGCCAGT, 382/GCTGCCAGTT, 283/CTGCCAGTTC, 513

In [76]:
len(list(Path("instances").iterdir()))
list(Path("instances").glob("*62*"))

[WindowsPath('instances/62.400+160_409_3'),
 WindowsPath('instances/62.400+40_409_4'),
 WindowsPath('instances/62.400-160_409_1'),
 WindowsPath('instances/62.400-80_409_1')]