In [1]:
import numpy as np
import math

In [2]:
#fonction qui énumère les différent kmers possibles

def enumerateWords(seq,length):
    numberOfWords = len(seq)-length+1
    words = []
    for i in range(numberOfWords):
        words.append(seq[i:i+length])
    return words

In [3]:
#fonction qui trouve les matches entre un kmer et un mot

def findMatches(kmers,word,seed):
    matches = []
    for i in range(len(kmers)):
        for j in range(len(word)):
            if((seed[j] == '1') and (word[j] != kmers[i][j])):
                break
            if(j == len(word)-1):
                matches.append(i)
    return matches

In [4]:
#fonction qui étend les HSP et retourne les nouveaux indices et le score

def extendHsp(seq1,i,seq2,j,seed,e):
    
    #calculer le score initial
    nbOnes = 0
    for k in range(len(seed)):
        if(seed[k] == "1"):
            nbOnes += 1
    
    score = nbOnes*5 + (len(seed)-nbOnes)*-4
    scoreToReturn = score
    scoreMax = score
    
    #mettre les indices de début et de fin dans des variables
    start1 = i
    end1 = i+len(seed)-1
    start2 = j
    end2 = j+len(seed)-1
    
    #vérifier si on est au début où la fin du séquence
    canExtendLeft = True if (start1 > 0 and start2 > 0) else False
    canExtendRight = True if (end1 < len(seq1)-1 and end2 < len(seq2)-1) else False
    
    #boucler tant qu'on peut étendre et que le score ne baisse pas sous le seuil
    while((canExtendLeft or canExtendRight) and (score > scoreMax-e)):
        scoreLeft = -float("Inf")
        scoreRight = -float("Inf")
        scoreBoth = -float("Inf")
        
        #calculer les trois scores possibles
        if(canExtendLeft):
            scoreLeft = score + (5 if (seq1[start1-1] == seq2[start2-1]) else -4)
            
        if(canExtendRight):
            scoreRight = score + (5 if (seq1[end1+1] == seq2[end2+1]) else -4)
            
        if(canExtendRight and canExtendLeft):
            scoreBoth = scoreLeft + scoreRight -score
            
        #trouver le meilleur score
        scores = [scoreLeft,scoreRight,scoreBoth]
        indexScoreMax = np.argmax(np.array(scores))
        score = max(scoreLeft,scoreRight,scoreBoth)
        
        if(score > scoreMax):
            scoreMax = score
        
        #agrandir le HSP si possible
        if(score > scoreMax-e):
            scoreToReturn = score
            if(indexScoreMax == 0 or indexScoreMax == 2):
                start1 -= 1
                start2 -= 1
            if(indexScoreMax == 1 or indexScoreMax == 2):
                end1 += 1
                end2 += 1
            
        #recalculer la possibilité d'extension
        canExtendLeft = True if (start1 > 0 and start2 > 0) else False
        canExtendRight = True if (end1 < len(seq1)-1 and end2 < len(seq2)-1) else False
        
    return (start1,end1,start2,end2,scoreToReturn)


In [5]:
#calcul du bitscore

def computeBitscore(score):
    return round((0.192*score-math.log(0.176))/math.log(2))

In [6]:
#calcul de la e-value

def computeEvalue(m,n,b):
    return m*n*(2**-b)

In [7]:
#fonction qui retourne le score pour deux sous-séquences

def computeScore(seq1,start1,end1,seq2,start2,end2):
    length = end1 - start1 + 1
    score = 0
    for i in range(length):
        if(seq1[start1+i] == seq2[start2+i]):
            score += 5
        else:
            score -= 4
            
    return score

In [8]:
#fonction retournant une liste de tous les HSP optimalement fusionnés

def mergeHsps(hsps,seq1,seq2):
    
    merged_hsps_new = hsps
    merged_hsps = []
    
    #continuer à essayer de fusionner les HSP tant que la dernière et l'avant-dernière liste
    #n'ont pas la même taille (tous fusionnés)
    while(len(merged_hsps) != len(merged_hsps_new)):
        merged_hsps = merged_hsps_new
        merged_hsps_new = []
        
        #garder une liste des HSP fusionnés pour ne pas les réinclure dans la liste
        hasMerged = np.zeros((len(merged_hsps)))
        
        #regarder chaque paire de HSP
        for i in range(len(merged_hsps)-1):
            for j in range(i+1,len(merged_hsps)):
                #indices du HSP de la 1ere séquence
                start1_1 = merged_hsps[i][0]
                end1_1 = merged_hsps[i][1]
                start2_1 = merged_hsps[i][2]
                end2_1 = merged_hsps[i][3]
                
                #2e séquence, mutatis mutandis
                start1_2 = merged_hsps[j][0]
                end1_2 = merged_hsps[j][1]
                start2_2 = merged_hsps[j][2]
                end2_2 = merged_hsps[j][3]
                                
                #cas où la 1ere commence avant la 2e
                if(start1_1 < start1_2 and end1_1 > start1_2):
                    overlap = end1_1 - start1_2
                    #vérifier s'il y a vraiment overlap
                    if(end2_1 - overlap  == start2_2):
                        hasMerged[i] = 1
                        hasMerged[j] = 1
                        score = computeScore(seq1,start1_1,end1_2,seq2,start2_1,end2_2)
                        newHsp = (start1_1, end1_2, start2_1, end2_2, score)
                        #ajouter la fusion à la nouvelle liste
                        merged_hsps_new.append(newHsp)
                #cas inverse
                elif(start1_1 > start1_2 and end1_2 > start1_1):
                    overlap = end1_2 - start1_1
                    if(end2_2 - overlap == start2_1):
                        hasMerged[i] = 1
                        hasMerged[j] = 1
                        score = computeScore(seq1,start1_2,end1_1,seq2,start2_2,end2_1)
                        newHsp = (start1_2, end1_1, start2_2, end2_1, score)
                        merged_hsps_new.append(newHsp)
        
        #ajouter les HSP non fusionnés à la nouvelle liste
        for k in range(len(merged_hsps)):
            if(hasMerged[k] == 0):
                merged_hsps_new.append(merged_hsps[k])
                
    return merged_hsps_new
                

In [9]:
#fonction qui retourne les indices (pour les deux séquences) et le score du meilleur HSP

def getBestHsp(seq1,seq2,e,ss,seed):
    #séquence à chercher 
    kmersSearched = enumerateWords(seq1,len(seed))
    #séquence dans la BD
    kmersSearching = enumerateWords(seq2,len(seed))
    
    #trouver les kmers qui match selon la graine
    hsps = []
    for i in range(len(kmersSearched)):
        matches = findMatches(kmersSearching,kmersSearched[i],seed)
        
        if(matches != []):
            for j in range(len(matches)):
                hsps.append((matches[j],i))
         
    #retourner None si on ne trouve aucun HSP
    if(len(hsps) == 0):
        return None
                
    #étendre les HSP     
    extended_hsps = []
    for k in range(len(hsps)):
        extended_hsps.append(extendHsp(seq2,hsps[k][0],seq1,hsps[k][1],seed,e))
        
    #supprimer les doublons        
    extended_hsps_cleaned = list(dict.fromkeys(extended_hsps))
    
    #merger les HSP
    merged_hsps = mergeHsps(extended_hsps_cleaned,seq2,seq1)

    #trouver le meilleur HSP
    best = 0
    scoreMax = 0
    for l in range(len(merged_hsps)):
        if(merged_hsps[l][4] > scoreMax):
            scoreMax = merged_hsps[l][4]
            best = l
    
    
    return merged_hsps[best]
    

In [10]:
# Alignement Smith-Waterman

def traceback(startIndex, matrix, chemins, read1, read2):
    
    tracebackList1 = []
    tracebackList2 = []
    
    i = startIndex[0]
    j = startIndex[1]
    
    while(chemins[i,j] != -1):
        if(chemins[i,j] == 0):
            tracebackList1.append("-")
            tracebackList2.append(read2[i-1])
            i = i-1
        elif(chemins[i,j] == 1):
            tracebackList1.append(read1[j-1])
            tracebackList2.append("-")
            j = j-1
        else:
            tracebackList1.append(read1[j-1])
            tracebackList2.append(read2[i-1])
            i = i-1
            j = j-1
            
    tracebackList1.reverse()
    tracebackList2.reverse()
    
    return(tracebackList1, tracebackList2)

def smithWaterman(seq1, seq2):
    matrix = np.ones((len(seq2)+1, len(seq1)+1))
    matrix[:,0] = 0
    matrix[0] = 0

    chemins = np.zeros((len(seq2)+1, len(seq1)+1))
    chemins = chemins - 1

    for i in range(1, len(seq2)+1):
        for j in range(1, len(seq1)+1):
            diagoValue = -1
            if(seq1[j-1] == seq2[i-1]):
                diagoValue = 1

            matrix[i,j] = max(0, matrix[i-1,j] - 1, matrix[i,j-1] - 1, matrix[i-1,j-1] + diagoValue)
            if(matrix[i,j] == 0):
                chemins[i,j] = -1
            elif(matrix[i,j] == matrix[i-1,j] - 1):
                chemins[i,j] = 0
            elif(matrix[i,j] == matrix[i,j-1] - 1):
                chemins[i,j] = 1
            else:
                chemins[i,j] = 2
    
    scoreMax = matrix.max()
    indice = np.unravel_index(np.argmax(matrix, axis=None), matrix.shape)
                        
    response = traceback(indice, matrix, chemins, seq1, seq2)
    
    # Pourcentage d'identité
    match = 0
    for i in range (len(response[0])):
        if (response[0][i] == response[1][i]):
            match += 1;
    identity = match / len(response[0]) * 100
    
    return(scoreMax, response, len(response[0]), identity)

In [11]:
#fonction pour transformer une liste en string

def listToString(l):  
    
    string = ""  
    for ele in l:  
        string += ele  
    
    return string  

In [12]:
#fonction de quicksort pour trier les résultats

def sort(array):
    less = []
    equal = []
    greater = []

    if len(array) > 1:
        pivot = array[0][0]
        for x in array:
            if x[0] < pivot:
                less.append(x)
            elif x[0] == pivot:
                equal.append(x)
            elif x[0] > pivot:
                greater.append(x)
        return sort(less)+equal+sort(greater) 
    else:  
        return array

In [13]:
#fonction pour exécuter pblast sur avec la seqNum-ième séquence du fichier sequences
#la fonction utilise les valeurs par défaut

def pblast(db="tRNAs.fasta",sequences="unknown.fasta",e=4,ss=0.001,seed="11111111111",seqNum=1):
    
    #lignes de tRNAs.fasta
    readsFile = open(db, "r")
    readsLines = readsFile.readlines()
    readsFile.close()

    reads = []
    for i in range(0, len(readsLines)):
        reads.append(readsLines[i][:-1])

    #lignes de unknown.fasta
    readsFile2 = open(sequences, "r")
    readsLines2 = readsFile2.readlines()
    readsFile2.close()

    reads2 = []
    for i in range(0, len(readsLines2)):
        reads2.append(readsLines2[i][:-1])

    #assigner la séquence à blaster
    sequence = reads2[seqNum*2-1]

    #trouver le meilleur HSP pour chaque séquence de la BD
    results = []
    for i in range(0,len(reads),2):

        #trouver le meilleur HSP
        bestHsp = getBestHsp(sequence,reads[i+1],e,ss,seed)
        if(bestHsp == None):
            continue

        #calculer les scores
        bitscore = computeBitscore(bestHsp[4])
        evalue = computeEvalue(len(reads[i+1]),len(sequence),bitscore)

        #ne pas tenir compte du HSP s'il est au-dessus du 2e seuil
        if(evalue > ss):
            continue

        #trouver l'alignement local
        alignment = smithWaterman(sequence,reads[i+1])

        resultString = ""
        resultString += ("%s  score: %.2f  ident: %.2f\n" % (reads[i],alignment[0],alignment[3]))
        resultString += (listToString(alignment[1][1])+"\n")
        resultString += (listToString(alignment[1][0])+"\n")
        resultString += ("\n# Best HSP score: %.2f, bitscore: %.2f, evalue: %.2e\n" % (bestHsp[4],bitscore,evalue))
        resultString += ("%d  %s  %d\n" % (bestHsp[2], sequence[int(bestHsp[2]):int(bestHsp[3])+1], bestHsp[3]))
        resultString += ("%d  %s  %d\n\n\n" % (bestHsp[0], reads[i+1][int(bestHsp[0]):int(bestHsp[1])+1], bestHsp[1]))
        resultString += ("----------------------------------------\n")

        results.append((alignment[3],resultString))

    #trier les résultats et les afficher en ordre décroissant de % d'identité de l'alignement local
    results = (sort(results))

    out = ""
    for j in range(len(results)-1,-1,-1):
        out += results[j][1]
    out += "Total: "+ str(len(results))
    
    return out

In [14]:
#Permet d'appeler pBLAST avec différents paramètre
#1.On n'affiche les résultat pour une seule séquence du fichier "unknown.fasta" à la fois. Celui-ci
#doit être décidé avec l'argument "seqNum" (1 à 4)
#2.Les paramètre pouvant être changés sont:
# - db: le fichier de base de données (doit être dans le même dossier que le code)
# - sequences: le fichier de séquences à rechercher (doit être dans le même dossier que le code)
# - e: le 1er seuil
# - ss: le 2e seuil
# - seed: la graine (composée de 1 et de 0)
# - seqNum: le choix de la séquence à blaster

print(pblast(seqNum=4))

>R|tct|Marchantia_polymorpha  score: 69.00  ident: 97.26
GCATTCTTAGCTCAGTTGGATAGAGCAACAACCTTCTAAGTTGAAGGTCACAGGTTCAAATCCTGTAGAATGC
GCATTCTTAGCTCAGCTGGATAGAGCAACAACCTTCTAAGTTGAAGGTCACAGGTTCAAATCCTGTAGGATGC

# Best HSP score: 260.00, bitscore: 75.00, evalue: 1.43e-19
16  TGGATAGAGCAACAACCTTCTAAGTTGAAGGTCACAGGTTCAAATCCTGTAG  67
16  TGGATAGAGCAACAACCTTCTAAGTTGAAGGTCACAGGTTCAAATCCTGTAG  67


----------------------------------------
>R|tcg|Marchantia_polymorpha  score: 67.00  ident: 95.89
GCATTCTTAGCTCAGTTGGATAGAGCAACAACCTTCGAAGTTGATGGTCACAGGTTCAAATCCTGTAGGATGC
GCATTCTTAGCTCAGCTGGATAGAGCAACAACCTTCTAAGTTGAAGGTCACAGGTTCAAATCCTGTAGGATGC

# Best HSP score: 140.00, bitscore: 41.00, evalue: 2.46e-09
45  GGTCACAGGTTCAAATCCTGTAGGATGC  72
45  GGTCACAGGTTCAAATCCTGTAGGATGC  72


----------------------------------------
>R|tct|Mesostigma_viride  score: 59.00  ident: 91.55
CATTCTTAGCTCAGTTGGATAGAGCAACGGCCTTCTAAGCTGTAGGTCACAGGTTCAAATCCTGTAGAATG
CATTCTTAGCTCAGCTGGATAGAGCAACAACCTTCTAAGTTGAAGGTCACAGGTTCAAATC