In [None]:
import string
from copy import deepcopy
from datetime import datetime

import blosum as bl
import numpy as np

from threading import Thread

from multiprocessing.pool import ThreadPool

In [None]:
def score_fun(a: str, b: str, bl_matr):
    return bl_matr[a + b]

def smith_waterman(seq1: str, seq2: str, gap_score: int = -10):
    m, n = len(seq1) + 1, len(seq2) + 1

    blosum_matrix = bl.BLOSUM(62)
    matrix = np.zeros((m, n))
    for i in range(1, m):
        for j in range(1, n):
            matrix[i][j] = max(matrix[i - 1][j - 1] + score_fun(seq1[i - 1], seq2[j - 1], blosum_matrix),
                               matrix[i - 1][j] + gap_score,
                               matrix[i][j - 1] + gap_score,
                               0)
    aln1 = ""
    aln2 = ""

    max_pos = (0, 0)
    for i in range(matrix.shape[0]):
        for j in range(matrix.shape[1]):
            if matrix[i][j] > matrix[max_pos[0]][max_pos[1]]:
                max_pos = (i, j)

    score = matrix[max_pos[0]][max_pos[1]]
    i = max_pos[0]
    j = max_pos[1]


    while matrix[i][j] != 0 and (i > 0 or j > 0):
        a, b = '-', '-'
        # (A, B)
        if i > 0 and j > 0 and matrix[i][j] == matrix[i - 1][j - 1] + score_fun(seq1[i - 1], seq2[j - 1], blosum_matrix):
            a = seq1[i - 1]
            b = seq2[j - 1]
            i -= 1
            j -= 1

        # (A, -)
        elif i > 0 and matrix[i][j] == matrix[i - 1][j] + gap_score:
            a = seq1[i - 1]
            i -= 1

        # (-, A)
        elif j > 0 and matrix[i][j] == matrix[i][j - 1] + gap_score:
            b = seq2[j - 1]
            j -= 1

        aln1 += a
        aln2 += b
    #
    return aln1[::-1], aln2[::-1], score


Вход: слово длины 4 по дефолту но с изменением, нуклеотидные последовательности,
ВЫход: упорядоченный список диагоналей с наибольшим числом совпадений
сравнение индексов (длина слова, последовательности)

In [None]:
def to_map(seq: string, k: int):
    dict = {}
    for i in range(len(seq) - k + 1):
        key = seq[i : i + k]
        if dict.get(key) is None:
            dict[key] = []
        dict[key].append(i)
    return dict

In [None]:
def sub_maps_diff_pos(map_1, map_2):
    dist = {}
    for k1 in map_1.keys(): #k1 - 'AA'; val1 - [1, 2, 3]
        if map_2.get(k1) is None:
            continue

        for pos1 in map_1[k1]:
            for pos2 in map_2[k1]:
                delta = pos2 - pos1
                if dist.get(delta) is None:
                    dist[delta] = []
                dist[delta].append(pos1)


    res = []
    for gap in dist:
        li = dist[gap]
        res.append(Diag(convert_diag_to_xy(gap, li)))

    return res

def cutt_off(dist, count=1):
    res = {}
    for key in dist.keys():
        if len(dist[key]) > count:
            res[key] = dist[key]
    return res


In [None]:
def sub_maps(map_1, map_2):
    # dict[subsequence] = set(distance(subsequence))
    dist = {}

    for k1 in map_1.keys():
        if map_2.get(k1) is None:
            continue
        if dist.get(k1) is None:
            dist[k1] = []
        for pos1 in map_1[k1]:
            for pos2 in map_2[k1]:
                dist[k1].append(pos2 - pos1)
    return dist

In [None]:
def convert_to_matrix(diffs, n1, n2):
    res = np.zeros((n1, n2))
    for gap in diffs:
        for i, pos1 in enumerate(diffs[gap]):
            pos2 = pos1 + gap
            res[pos1][pos2] = 1
    return res


In [None]:
class Diag:
    def is_straight(self):
        return (self.y_max - self.y_min) == (self.x_max - self.x_min)

    def __init__(self, li):
        self.x_min = li[0]
        self.y_min = li[1]
        self.x_max = li[2]
        self.y_max = li[3]

    def __add__(self, diag):
        return Diag([min(self.x_min, diag.x_min), min(self.y_min, diag.y_min), max(diag.x_max, self.x_max), max(diag.y_max, self.y_max)])

    def __len__(self):
        return self.x_max - self.x_min + self.y_max - self.y_min

    def to_positions(self):
        return self.x_min, self.y_min, self.x_max, self.y_max

    def __getitem__(self, item):
        x = self.x_min + item
        y = self.y_min + item
        if x > self.x_max or y > self.y_max:
            raise KeyError
        return Diag([x, y, x, y])

    def __eq__(self, other):
        return self.x_min == other.x_min and self.y_min == other.y_min and self.x_max == other.x_max and self.y_max == other.y_max

    def __le__(self, other):
        return len(self) <= len(other)

    def __lt__(self, other):
        return len(self) < len(other)

    def __hash__(self):
        return hash(repr(self))

    def __str__(self):
        return f"({self.x_min}, {self.y_min}) -> ({self.x_max}, {self.y_max}): " + ("straight" if self.is_straight() else "not straight")

In [None]:
def convert_diag_to_xy(gap, poss1):
    MAX = float('inf')
    MIN = float('-inf')
    x_min = MAX
    x_max = MIN
    y_min = MAX
    y_max = MIN
    for i, pos1 in enumerate(poss1):
        pos2 = pos1 + gap
        x_min = min(x_min, pos1)
        x_max = max(x_max, pos1)
        y_min = min(y_min, pos2)
        y_max = max(y_max, pos2)
    return x_min, y_min, x_max, y_max

def get_top_10(dist, diagonal_min_length=5):
    res = sorted(dist)
    res_1 = []
    for i in range(len(res)):
        if len(res[i]) >= 2 * diagonal_min_length:
            res_1.append(res[i])
    return res_1[:10:-1]

In [None]:
def find_best_for_diag(diag_scores_old, gap):
    concatted = False
    diag_scores_new = {}
    diag_scores = deepcopy(diag_scores_old)
    for d1 in diag_scores:
        MAX = float('-inf')
        best_d2 = None
        for d2 in diag_scores:
            if not d1.__eq__(d2) and d2.x_min >= d1.x_min and d2.y_min >= d1.y_min:
                # чтобы вторая диагоналей была не выше и не левее первой
                if d2.x_min  <= d1.x_max:
                    dist = d2.y_min - d1.y_min
                elif d2.y_min <= d1.y_max:
                    dist = d2.x_min - d1.x_min
                else:
                    dist = d2.y_min - d1.y_max + d2.x_min - d1.x_max
                res = diag_scores[d1] + diag_scores[d2] + dist * gap
                if res > diag_scores[d1] and res > diag_scores[d2] and res > MAX:
                    MAX = res
                    best_d2 = d2

        if best_d2 is not None:
            diag_scores_new[best_d2] = float('-inf')
            diag_scores_new[d1] = float('-inf')
            concatted = True
            diag_scores_new[(d1 + best_d2)] = MAX
        else:
            diag_scores_new[d1] = diag_scores[d1]
    return concatted, diag_scores_new

In [None]:
def read_fasta(path):
    res = {}
    s1_name, s1 = '', ''
    f = open(path, 'r')
    arr = f.read().split('>')
    for s in arr:
        ss = s.split('\n')
        sss = ''
        for i in range(1, len(ss)):
            sss += ss[i]
        if ss[0] != '':
            if len(s1_name) == 0:
                s1_name = ss[0]
                s1 = sss
            else:
                res[ss[0]] = sss
    return s1_name, s1, res

In [None]:
def get_best_diagonals_concatenation_with_scores(str1, str2, substr_len, gap, blosum_cf, diag_min_len):
    map_1 = to_map(str1, substr_len)
    map_2 = to_map(str2, substr_len)
    diags = get_top_10(sub_maps_diff_pos(map_1, map_2), diagonal_min_length=diag_min_len)
    diffs2 = []
    blosum_matrix = bl.BLOSUM(62)
    diag_scores = {}

    for d1 in diags:
        diag_scores[d1] = 0.0

        for i in range(d1.x_max - d1.x_min):
            len_1_diag = d1[i]
            diag_scores[d1] += blosum_matrix[str1[len_1_diag.x_min] + str2[len_1_diag.y_min]]
        if diag_scores[d1] < blosum_cf:
            del diag_scores[d1]
        else:
            diffs2.append(d1)


    concatted = True
    while concatted:
        concatted, diag_scores = find_best_for_diag(diag_scores, gap)

    return diag_scores


def find_best_score(diagonals_scores):
    MAX_SCORE = float('-inf')
    MAX_KEY = None
    for keys in diagonals_scores:
        if diagonals_scores[keys] > MAX_SCORE:
            MAX_SCORE = diagonals_scores[keys]
            MAX_KEY = keys
    return MAX_KEY, MAX_SCORE

def operate(seq1, seq2, substr_len, gap, blosum_cf, diag_min_len):
    res = get_best_diagonals_concatenation_with_scores(seq1, seq2, substr_len, gap, blosum_cf, diag_min_len)
    best_diag, best_score = find_best_score(res)
    if best_diag is None:
        return '', '', -100
    s1_1, s2_1, s1_2, s2_2 = best_diag.to_positions()
    return smith_waterman(seq1[s1_1 : s1_2 + 1], seq2[s2_1 : s2_2 + 1], gap_score=gap)

def operate_for_multi(seq1, seq2, substr_len, gap, blosum_cf, diag_min_len, threshold_score, s2_name):
    res = get_best_diagonals_concatenation_with_scores(seq1, seq2, substr_len, gap, blosum_cf, diag_min_len)
    best_diag, best_score = find_best_score(res)
    if best_diag is None:
        return
    s1_1, s2_1, s1_2, s2_2 = best_diag.to_positions()
    al_1, al_2, score = smith_waterman(seq1[s1_1 : s1_2 + 1], seq2[s2_1 : s2_2 + 1], gap_score=gap)
    if score >= threshold_score:
                print(f"\tto { s2_name}\n\n\t{al_1}\n\t{al_2}\n\t\tScore:{score}\n{'-' * 15}")


def operate_fasta(path, substr_len=2, gap=-10, blosum_cf=5, threshold_score=100, diag_min_len=5):
    s1_name, s1, m = read_fasta(path)
    print(s1_name)
    for key in m:
        if key != s1_name:
            s2_name, s2 = key, m[key]
            al_1, al_2, score = operate(s1, s2, substr_len, gap, blosum_cf, diag_min_len)
            if score >= threshold_score:
                print(f"\tto { s2_name}\n\n\t{al_1}\n\t{al_2}\n\t\tScore:{score}\n{'-' * 15}")


def operate_fasta_multi(path, substr_len=2, gap=-10, blosum_cf=5, threshold_score=100, diag_min_len=5):
    s1_name, s1, m = read_fasta(path)
    print(s1_name)
    pool = ThreadPool(processes=7)
    for key in m:
        if key != s1_name:
            s2_name, s2 = key, m[key]
            pool.apply_async(operate_for_multi, args=(s1, s2, substr_len, gap, blosum_cf, diag_min_len, threshold_score, s2_name)).get()

In [None]:
start = datetime.now()
operate_fasta('uniprot_sprot.fasta', substr_len=2, gap=-10, blosum_cf=5, threshold_score=300, diag_min_len=10)
print((datetime.now() - start).seconds)

In [None]:
start = datetime.now()
operate_fasta_multi('uniprot_sprot.fasta', substr_len=2, gap=-10, blosum_cf=10, threshold_score=300, diag_min_len=10)
print((datetime.now() - start).seconds)