In [25]:
##Importing Packages
import numpy as np
import random
import time
from typing import Tuple, Dict, Any

In [26]:
class ScoringSystem:
    def __init__(self, match: int = 2, mismatch: int = -1, gap: int = -2, quality_weights: Dict[str, int] = None) -> None:
        self.match = match
        self.mismatch = mismatch
        self.gap = gap
        self.quality_weights = quality_weights if quality_weights else {'-': 3, '|': 2, '7': 2}

    def _fuzzy_similarity(self, a: str, b: str, quality_a: str) -> float:
        weight_a = self.quality_weights.get(quality_a, 1)
        return weight_a

    def _default_scoring(self, a: str, b: str, quality_a: str) -> float:
        if a == '-' or b == '-':
            return self.gap
        fuzzy_score = self._fuzzy_similarity(a, b, quality_a)
        if a != b:
            score = self.mismatch * (1 - fuzzy_score)
        else:
            score = self.match * fuzzy_score
        return score

    def score(self, a: str, b: str, quality_a: str) -> float:
        return self._default_scoring(a, b, quality_a)

In [27]:
class SequencesAnalyzer:
    traceback_symbols = {0: '↖', 1: '↑', 2: '←'}

    def __init__(self, read_seq: str, read_quality: str, match: int = 2, mismatch: int = -1, gap: int = -2, quality_weights: Dict[str, int] = None) -> None:
        self.seq_a = read_seq
        self.quality_a = read_quality
        self.scoring_sys = ScoringSystem(match, mismatch, gap, quality_weights)

    def generate_random_reference(self, fixed_part: str, total_length: int) -> str:
        random_part = ''.join(random.choices('AGCT', k=total_length - len(fixed_part)))
        return fixed_part + random_part

    def smith_waterman_algorithm(self, seq_b: str) -> Dict[str, Any]:
        rows, cols = len(self.seq_a) + 1, len(seq_b) + 1
        H = np.zeros(shape=(rows, cols), dtype=int)
        traceback = np.zeros(shape=(rows, cols), dtype=np.dtype('U5'))

        max_score = 0
        max_pos = (0, 0)

        for row in range(1, rows):
            for col in range(1, cols):
                a = self.seq_a[row - 1]
                b = seq_b[col - 1]
                qa = self.quality_a[row - 1] if row - 1 < len(self.quality_a) else '-'

                score_diag = H[row - 1, col - 1] + self.scoring_sys.score(a, b, qa)
                score_up = H[row - 1, col] + self.scoring_sys.gap
                score_left = H[row, col - 1] + self.scoring_sys.gap

                H[row, col] = max(0, score_diag, score_up, score_left)

                if H[row, col] == score_diag:
                    traceback[row, col] = self.traceback_symbols[0]
                elif H[row, col] == score_up:
                    traceback[row, col] = self.traceback_symbols[1]
                elif H[row, col] == score_left:
                    traceback[row, col] = self.traceback_symbols[2]

                if H[row, col] > max_score:
                    max_score = H[row, col]
                    max_pos = (row, col)

        return {'result_matrix': H, 'traceback_matrix': traceback, 'score': max_score, 'score_pos': max_pos}

    def run_alignment(self, N: int, K: int, total_length: int, output_filename: str) -> None:
        fixed_part = self.seq_a[:N]
        with open(output_filename, "w", encoding='utf-8') as f:
            for i in range(K):
                seq_b = self.generate_random_reference(fixed_part, total_length)
                result = self.smith_waterman_algorithm(seq_b)
                alignment_a, alignment_b, matches = self._traceback(
                    result_matrix=result['result_matrix'],
                    traceback_matrix=result['traceback_matrix'],
                    start_pos=result['score_pos'],
                    seq_b=seq_b 
                )

                f.write(f"Run {i + 1}\n")
                f.write(f"Input Sequence A: {self.seq_a}\n")
                f.write(f"Generated Sequence B: {seq_b}\n")
                f.write(f"Alignment Score: {result['score']}\n\n")
                f.write("Alignment:\n")
                f.write(f"{alignment_a}\n")
                f.write(f"{matches}\n")
                f.write(f"{alignment_b}\n\n")
                f.write(f"Result Matrix:\n {result['result_matrix']}\n\n")
                f.write(f"Traceback Matrix:\n {result['traceback_matrix']}\n\n")

    def _traceback(self, result_matrix, traceback_matrix, start_pos: Tuple[int, int], seq_b: str) -> Tuple[str, str, str]:
        seq_a_aligned = ''
        seq_b_aligned = ''
        matches = ''

        row, col = start_pos

        while result_matrix[row, col] > 0:
            symbol = traceback_matrix[row, col]
            if symbol == '↖':
                seq_a_aligned += self.seq_a[row - 1]
                seq_b_aligned += seq_b[col - 1]
                matches += '|' if self.seq_a[row - 1] == seq_b[col - 1] else ' '
                row -= 1
                col -= 1
            elif symbol == '↑':
                seq_a_aligned += self.seq_a[row - 1]
                seq_b_aligned += '-'
                matches += ' '
                row -= 1
            elif symbol == '←':
                seq_a_aligned += '-'
                seq_b_aligned += seq_b[col - 1]
                matches += ' '
                col -= 1
            else:
                break

        return seq_a_aligned[::-1], seq_b_aligned[::-1], matches[::-1]

In [28]:
read_seq = "AGCTTAGCTA"
read_quality = "|-|77|7-|"

In [29]:
analyzer = SequencesAnalyzer(read_seq, read_quality, match=3, mismatch=-1, gap=-2)
analyzer.run_alignment(N=4, K=5, total_length=10, output_filename='alignmentfu2_output.txt')