# Notebook for development of the script - 2

In [125]:
import random as rd
import numpy as np
import matplotlib.pyplot as plt
from Bio.Seq import *
from Bio.Alphabet.IUPAC import IUPACAmbiguousDNA, IUPACUnambiguousDNA
import reprlib
import time
from sys import getsizeof

In [68]:
class Genome(Seq):
    """Classe Genome"""
    def __init__(self, seq, circular=True):
        Seq.__init__(self, seq, alphabet=IUPACUnambiguousDNA())
        self.circular=circular
        
#     def __str__(self):
#         """To show the attributes and values of the instance"""
#         out=''
#         for key, value in self.__dict__.items():
#             out+='{:20s}  {}\n'.format(key, reprlib.repr(value))
#         return out

    def sequencing(self, read_length=100, reads_nb=5000):
        reads=[]
        for _ in range(reads_nb):
            start = rd.randint(0, len(self)-1)
            read_seq = self._data[start:min(start+100, len(self))] + self._data[0:max(start+100-len(self), 0)]
            read = Read(read_seq)
            reads.append(read)
        return reads


In [69]:
class Read(Seq):
    """Classe Read"""
    def __init__(self, seq, circular=True):
        Seq.__init__(self, seq, alphabet=IUPACUnambiguousDNA())
    
#     def __str__(self):
#         """To show the attributes and values of the instance"""
#         out=''
#         for key, value in self.__dict__.items():
#             out+='{:20s}  {}\n'.format(key, reprlib.repr(value))
#         return out
    
    def generate_kmers(self, kmers_length=30):
        """Returns a list of the k-mers included in the read"""
        kmers=[]
        for i in range(len(self) - kmers_length):
            kmer_seq = self._data[i:i+kmers_length]
            kmer = Kmer(kmer_seq)
            kmers.append(kmer)
        return kmers


In [70]:
class Kmer(Seq):
    """Classe Kmer"""
    def __init__(self, seq):
        Seq.__init__(self, seq, alphabet=IUPACUnambiguousDNA())
        self.prefix = seq[:-1]
        self.suffix = seq[1:]
    
#     def __str__(self):
#         """To show the attributes and values of the instance"""
#         out=''
#         for key, value in self.__dict__.items():
#             out+='{:20s}  {}\n'.format(key, reprlib.repr(value))
#         return out
    

In [71]:
class Graph:
    def __init__(self, km1mers):
        self.nodes = tuple(km1mers)
        self.matrix = np.zeros((len(self.nodes), len(self.nodes)))
        self.eulerian = None
    
    def fill_matrix(self, kmers):
        n = 0
        for kmer in kmers:
            print('K-mer {:5d}'.format(n), end='\r')
            i = self.nodes.index(kmer.prefix)
            j = self.nodes.index(kmer.suffix)
            self.matrix[i, j]+=1
            n+=1
    
    def test_eulerian(self):
        for i in range(len(self.nodes)):
            if self.matrix[i, :].sum() != self.matrix[:, i].sum():
                print("PROBLEM : This graph is not Eulerian.")
                self.eulerian = False
                break
        if self.eulerian != False:
            print('SUCCESS : This graph is Eulerian !!')
            self.eulerian = True
        

In [72]:
# Trouve 1 cycle
def find_eulerian_cycle():
    # attention, ce code ne gere pas les sommets connectés avec eux-memes
    cycle = []
    start, s = np.array(np.where(graph.matrix))[:,0]
    cycle.append(start)
    graph.matrix[start, s]-=1
    while s!= start:
        cycle.append(s)
        t = np.where(graph.matrix[s, :])[0][0]     # s comme sommet, t comme temporary
        graph.matrix[s, t]-=1
        s = t
    return cycle

In [73]:
# Test fin
def test_fin():
    if graph.matrix.sum()==0: return True
    else: return False

In [84]:
# Assemble les cycles
def assemble(cycle, tab):      # cycle a parcourir pour avancer 
                           # & tableau des cycles restant a parcourir
    for s in cycle:        # prenons un sommet du cycle
        seq.append(s)        # ajoutons le a la sequence
        for c in tab:          # puis testons s'il est également connecté à un des cycles restant
            if s in c:
                nextcycle = c[c.index(s)+1:] + c[:c.index(s)+1]     # Si oui, recadrons le prochain cycle pour que le parcours de la liste commence bien au début
                nexttab = tab[tab.index(c)+1:] + tab[:tab.index(c)]   # enlevons également ce cycle du tableau pour le prochain appel de la fonction
                assemble(nextcycle, nexttab)
                tab.remove(c)
                break
                

In [None]:
# PROGRAMME PRINCIPAL
# Parameters
genome_length= 10000
read_length=100
reads_nb=5000
kmers_length=30

# Genome generation
genome= Genome(''.join(rd.choices(["A", "T", "G", "C"], k=genome_length)), circular=True)
# genome = Genome('ATGGCGTGCA')

# Reads generation
reads = genome.sequencing(read_length=read_length, reads_nb=reads_nb)

# K-mers generation
kmers=set()
for read in reads:
    kmers.update(read.generate_kmers(kmers_length=kmers_length))

# (K-1)-mers generation
km1mers=set()
for kmer in kmers:
    km1mers.update([kmer.prefix, kmer.suffix])
    
# Graph generation
graph = Graph(km1mers)
graph.fill_matrix(kmers)
graph.test_eulerian()
print(reprlib.repr(graph.nodes))

# Trouve tous les cycles
cycles = []
while test_fin() == False:
    cycles.append(find_eulerian_cycle())
print(str(len(cycles))+' cycles trouvés')

# Assemble les cycles
if len(cycles) == 1:
    seq = cycles[0]
else:
    seq=[]
    assemble(cycles[0], cycles[1:])
print(reprlib.repr(seq))





K-mer     0K-mer     1K-mer     2K-mer     3K-mer     4K-mer     5K-mer     6K-mer     7K-mer     8K-mer     9K-mer    10K-mer    11K-mer    12K-mer    13K-mer    14K-mer    15K-mer    16K-mer    17K-mer    18K-mer    19K-mer    20K-mer    21K-mer    22K-mer    23K-mer    24K-mer    25K-mer    26K-mer    27K-mer    28K-mer    29K-mer    30K-mer    31K-mer    32K-mer    33K-mer    34K-mer    35K-mer    36K-mer    37K-mer    38K-mer    39K-mer    40K-mer    41K-mer    42K-mer    43K-mer    44K-mer    45K-mer    46K-mer    47K-mer    48K-mer    49K-mer    50K-mer    51K-mer    52K-mer    53K-mer    54K-mer    55K-mer    56K-mer    57K-mer    58K-mer    59K-mer    60K-mer    61K-mer    62K-mer    63K-mer    64K-mer    65K-mer    66K-mer    67K-mer    68K-mer    69K-mer    70K-mer    71K-mer    72K-mer    73K-mer    74K-mer    75K-mer    76K-mer    77K-mer    78K-mer    79K-mer    80K-mer    81K-mer    82K-me

In [122]:
# Recréé la sequence
genome_assembly = ''
for s in seq:
    genome_assembly+=graph.nodes[s][0]
genome_assembly

# test genome de départ == genome assemblé
offset = (genome_assembly*2).index(genome._data)
genome_assembly[offset:]+genome_assembly[:offset] == genome._data

True

---
## Tests

In [None]:
def assemble(cycle, tab):      # cycle a parcourir pour avancer 
                           # & tableau des cycles restant a parcourir
    for s in cycle:        # prenons un sommet du cycle
        seq.append(s)        # ajoutons le a la sequence
        for c in tab:          # puis testons s'il est également connecté à un des cycles restant
            if s in c:
                nextcycle = c[c.index(s)+1:] + c[:c.index(s)]     # Si oui, recadrons le prochain cycle pour que le parcours de la liste commence bien au début
                nexttab = tab[tab.index(c)+1:] + tab[:tab.index(c)]   # enlevons également ce cycle du tableau pour le prochain appel de la fonction
                
                

In [None]:
def assemble(cycle, tab):      # cycle a parcourir pour avancer 
                           # & tableau des cycles restant a parcourir
    for s in cycle:        # prenons un sommet du cycle
        seq.append(s)        # ajoutons le a la sequence
        for j, c in enumerate(tab):          # puis testons s'il est également connecté à un des cycles restant
            for i, s1 in enumerate(c):
                if s == s1:
                    nextcycle = c[i+1:] + c[:i]     # Si oui, recadrons le prochain cycle pour que le parcours de la liste commence bien au début
                    nexttab = tab[j+1:] + tab[:j]   # enlevons également ce cycle du tableau pour le prochain appel de la fonction
            
            
            
            
            if s in c:
                nextcycle = c[c.index(s)+1:] + c[:c.index(s)]     # Si oui, recadrons le prochain cycle pour que le parcours de la liste commence bien au début
                nexttab = tab[j+1:] + tab[:j]   # enlevons également ce cycle du tableau pour le prochain appel de la fonction
                
                

In [158]:
getsizeof(np.zeros(12, dtype=bool))

108