# Function to test in the early phrese

From a sequence, chose casually n positions and from these poritions takes m foward bases, creating  in this way a read
starting at position n, with lenght m. The number of reads are: $ (len(seq) / lenght_reads) \times coverage  $

In [29]:
import random

seq = "ATTCAGCTGATCGGTAGCAGCTGAGTCGATCGATCGGTCGGATCGATCGTAGAGAGCACCATCGTGGTCACATCGGAAGCTAGGCTAGGGACACTCGGATCGGACCACTTATATAGCGCGCGTATACAGTCGATCCAGCTGATACCAGCTAGCCATCGATGC"
def comstum_reads(seq: str, length_reads = 10, coverage = 5) -> list:
    number_of_reads = int(len(seq)/length_reads) * coverage
    starting_pos = random.sample(range(0, len(seq)), number_of_reads)
    reads = []

    for num in starting_pos:
        over = num + length_reads
        if over > len(seq):
            reads.append(seq[-(over-len(seq)):])
        else:
            reads.append(seq[num:num+length_reads])

    return reads

reads = comstum_reads(seq, length_reads=15, coverage=2)


In [27]:
from Bio import pairwise2
from Bio.Seq import Seq
from inspyred import swarm
from inspyred import ec
from inspyred.ec import selectors
import numpy as np
import itertools
import random

def eval_allign(reads:list, par = [3, -1, -2, -1]):
    """Funtion that evaulate the alignment

    reads: list of tuple with sequence and index (node name) -> [("ATTAG", 1), ...]

    par: list of parameters to performe the alignment
    es (the examples represent the defoult parameters):
    match_score = 3;\n
    mismatch_penalty = -1;\n
    gap_open_penalty = -2;\n
    gap_extension_penalty = -1;\n

    output:
    matrix with the weigts (distances) between the reads (nodes)
    """

    storage = np.zeros((len(reads), len(reads)))

    for i in range(len(reads)):
        for j in range(len(reads)):
            if i == j:
                storage[i][j] = 0   
            else:
                storage[i][j] = pairwise2.align.localms(Seq(reads[i]), Seq(reads[j]), par[0], par[1], par[2], par[3])[0][2]

    return storage


class Assembly_problem():

    def __init__(self, weights):
        self.weights = weights # matrix of wiegths
        self.components = [swarm.TrailComponent((i, j), value=(1 / weights[i][j])) for i, j in itertools.permutations(range(len(weights)), 2)]
        self.bias = 0.5
        self.bounder = ec.DiscreteBounder([i for i in range(len(weights))])
        self.maximize = True

    def constructor(self, random, args):
        """Return a candidate solution for an ant colony optimization."""
        candidate = []
        while len(candidate) < len(self.weights) - 1:
            # Find feasible components
            feasible_components = []
            if len(candidate) == 0:
                feasible_components = self.components
            elif len(candidate) == len(self.weights) - 1:
                first = candidate[0]
                last = candidate[-1]
                feasible_components = [c for c in self.components if c.element[0] == last.element[1] and c.element[1] == first.element[0]]
            else:
                last = candidate[-1]
                already_visited = [c.element[0] for c in candidate]
                already_visited.extend([c.element[1] for c in candidate])
                already_visited = set(already_visited)
                feasible_components = [c for c in self.components if c.element[0] == last.element[1] and c.element[1] not in already_visited]
            if len(feasible_components) == 0:
                candidate = []
            else:
                # Choose a feasible component
                if random.random() <= self.bias:
                    next_component = max(feasible_components)
                else:
                    next_component = selectors.fitness_proportionate_selection(random, feasible_components, {'num_selected': 1})[0]
                candidate.append(next_component)
        return candidate
    
    def evaluator(self, candidates, args):
        """Return the fitness values for the given candidates."""
        fitness = []
        for candidate in candidates:
            total = 0
            for c in candidate:
                total += self.weights[c.element[0]][c.element[1]]
            last = (candidate[-1].element[1], candidate[0].element[0])
            total += self.weights[last[0]][last[1]]
            fitness.append(1 / total)



In [1]:
import inspyred
from utils.utils_07.exercise_1 import *
from utils.utils_07.plot_utils import *

import collections
collections.Iterable = collections.abc.Iterable
collections.Sequence = collections.abc.Sequence

# common parameters
pop_size = 100
max_generations = 50
seed = 10
prng = Random(seed)
display = True
# ACS specific parameters
evaporation_rate = 0.4
learning_rate = 0.1

args = {}
args["fig_title"] = "ACS"

# run ACS
problem = Assembly_problem(eval_allign(reads))
ac = inspyred.swarm.ACS(prng, problem.components)
ac.observer = [plot_observer]
ac.terminator = inspyred.ec.terminators.generation_termination


final_pop = ac.evolve(generator=problem.constructor, 
                      evaluator=problem.evaluator, 
                      bounder=problem.bounder,
                      maximize=problem.maximize, 
                      pop_size=pop_size,
                      max_generations=max_generations,
                      evaporation_rate=evaporation_rate,
                      learning_rate=learning_rate,**args)
best_ACS = max(ac.archive)



NameError: name 'Assembly_problem' is not defined