# Smith-Waterman genome application

We will apply the Smith-Waterman algorithm to two specific sequences. In this example, we align the NC_000017.11 Seq 1 scaffold with the whole gorilla genome.

In [4]:
import pandas as pd
import time
import numpy as np
from enum import IntEnum
from datetime import datetime as dt
import os
import seaborn as sns
import xlsxwriter
from matplotlib import pyplot as plt
import swco

In [2]:
# -*- coding: utf-8 -*-

"""

LIFE97011 - Computing
Python Programming - Assessed Exercise No. 3
Task: Smith Waterman local alignment
@Author: Slaviana Pavlovich

"""

# Assigning the constants for the scores
class Score(IntEnum):
    # order by value it should be
    MATCH = 10      
    CONS_GAP = -2   # Want to penalize worst a first gap than a secong gap
    FIRST_GAP = -4
    MISMATCH = -3
    #INV = 0
    DUP = 1

# Match: 10 , First_gap: -4, cons_gap: -1, mismatch: -5

# Assigning the constant values for the traceback
class Trace(IntEnum):
    STOP = 0
    LEFT = 1 
    UP = 2
    DIAGONAL = 3

def excel_reader(specie):
    data = pd.read_excel('../Data/Raw/Tables_Filtered_IK.xlsx', specie)
    
    data['Gene'] = data['Locus'].str.split('(\d+)').str[0] + data['Strand']
    data['Gene_non_or'] = data['Locus'].str.split('(\d+)').str[0]
    data.reset_index(inplace= True)

    data = data[data['Gene'].str.contains('LOC') == False]  

    return data

# Implementing the Smith Waterman local alignment
def smith_waterman(seq_1, seq_2):

    # Data structure:
    # 0: index
    # 1: gen + orientation
    # 2: gen

    row = len(seq_1) + 1
    col = len(seq_2) + 1
    matrix = np.zeros(shape=(row, col), dtype= int)  
    tracing_matrix = np.zeros(shape=(row, col), dtype= int)  

    # Initialising the variables to find the highest scoring cell
    max_score = -1
    max_index = (-1, -1)

    diagonal_score = 0
    vertical_score = 0
    horizontal_score = 0
    
    # Calculating the scores for all cells in the matrix

    for i in range(1, row):
        for j in range(1, col):
            
            # Calculating the diagonal score (match score)
            # If there is a match, always win? And then, we can skip all further comparations?
            match_value = Score.MATCH if seq_1.iloc[i - 1, 2] == seq_2.iloc[j - 1, 2] else Score.MISMATCH
            #Score.INV if seq_1.iloc[i - 1, 2] == seq_2.iloc[j - 1, 2] else  --> No sense, it will make sense if the overall is inversed and then one of them is turned around. So, first, check the overall waz and then compare them.
                
            diagonal_score = matrix[i - 1, j - 1] + match_value

            # Calculating the vertical gap score, penalizing less consecutive gaps and identifying duplicates
            #if seq_2.iloc[j-1, 2] == seq_2.iloc[j - 2, 2]:
                #vertical_score = matrix[i - 1, j] + Score.DUP elif
            if matrix[i - 1, j] == vertical_score:
                vertical_score = matrix[i - 1, j] + Score.CONS_GAP
            else: vertical_score = matrix[i - 1, j] + Score.FIRST_GAP
            
            # Calculating the vertical gap score, penalizing consecutive gaps and identifying duplicates
            #if seq_1.iloc[i-1, 2] == seq_1.iloc[i-2, 2]:
                #horizontal_score = matrix[i, j - 1] + Score.DUP elif
            if matrix[i, j - 1] == horizontal_score:
                horizontal_score = matrix[i, j - 1] + Score.CONS_GAP
            else: horizontal_score = matrix[i, j - 1] + Score.FIRST_GAP
            
            # Taking the highest score 
            matrix[i, j] = max(0, diagonal_score, vertical_score, horizontal_score)
            
            # Tracking where the cell's value is coming from    
            if matrix[i, j] == 0: 
                tracing_matrix[i, j] = Trace.STOP
                
            elif matrix[i, j] == horizontal_score: 
                tracing_matrix[i, j] = Trace.LEFT
                
            elif matrix[i, j] == vertical_score: 
                tracing_matrix[i, j] = Trace.UP
                
            elif matrix[i, j] == diagonal_score: 
                tracing_matrix[i, j] = Trace.DIAGONAL 
                
            # Tracking the cell with the maximum score
            # If we want different strings, here we can define a threshold and keep track of all the higher values
            if matrix[i, j] >= max_score:
                max_index = (i,j)
                max_score = matrix[i, j]


    # Initialising the variables for tracing
    aligned_seq_1 = [ ]
    aligned_seq_2 = [ ]
    index_seq_1 = [ ]
    index_seq_2 = [ ]
    (max_i, max_j) = max_index

    # Tracing and computing the pathway with the local alignment
    #if max_score > 3 * Score.MATCH
    while tracing_matrix[max_i, max_j] != Trace.STOP:
        if tracing_matrix[max_i, max_j] == Trace.DIAGONAL:
            aligned_seq_1.insert(0, seq_1.iloc[max_i - 1, 2])
            aligned_seq_2.insert(0, seq_2.iloc[max_j - 1, 2])

            index_seq_1.insert(0, seq_1.iloc[max_i - 1, 0])
            index_seq_2.insert(0, seq_2.iloc[max_j - 1, 0])

            max_i = max_i - 1
            max_j = max_j - 1
            

        elif tracing_matrix[max_i, max_j] == Trace.UP:
            aligned_seq_1.insert(0, seq_1.iloc[max_i - 1, 2])
            aligned_seq_2.insert(0, '-')

            index_seq_1.insert(0, seq_1.iloc[max_i - 1, 0])
            index_seq_2.insert(0, '-')

            max_i = max_i - 1    
            

        elif tracing_matrix[max_i, max_j] == Trace.LEFT:
            aligned_seq_1.insert(0, '-')
            aligned_seq_2.insert(0, seq_2.iloc[max_j - 1, 2])

            index_seq_1.insert(0, '-')
            index_seq_2.insert(0, seq_2.iloc[max_j - 1, 0])
            
            max_j = max_j - 1

    return aligned_seq_1, aligned_seq_2, index_seq_1, index_seq_2, max_score, max_index, max_i, max_j, matrix

def duplicate(all):
    for i in range(1, len(all)):
        if all.loc[i, 'Result'] == 'Gap':
            if (all.loc[i-1, 'Result'] == 'Match') | (all.loc[i-1, 'Result'] == 'Inversion'):
                if (all.loc[i-1,'Query Sequence Gene'] == all.loc[i,'Query Sequence Gene']) | (all.loc[i-1,'Target Sequence Gene'] == all.loc[i,'Target Sequence Gene']):
                    all.loc[i, 'Result'] = 'Duplicate'

                    # Check for consecutives duplicates
                    j = i+1
                    while((j<len(all)) & (all.loc[j, 'Result'] == 'Gap') & (all.loc[i,'Query Sequence Gene'] == all.loc[j,'Query Sequence Gene']) & (all.loc[i,'Target Sequence Gene'] == all.loc[j,'Target Sequence Gene'])):
                        all.loc[j, 'Result'] = 'Duplicate'
                        j+=1
                    i=j-1
    return all

def merge(seq_1, seq_2, aligned_seq_1, index_seq_1, aligned_seq_2, index_seq_2):
    align_seq_1 = pd.DataFrame({'Original Position' : index_seq_1, 'Gene': aligned_seq_1}).merge(seq_1[['index', '#Replicon Name', 'Replicon Accession', 'Strand']], left_on = 'Original Position', right_on = 'index', how = 'left')
    align_seq_2 = pd.DataFrame({'Original Position' : index_seq_2, 'Gene': aligned_seq_2}).merge(seq_2[['index', '#Replicon Name', 'Replicon Accession', 'Strand']], left_on = 'Original Position', right_on = 'index', how = 'left')

    align_seq_1.rename(columns={'Original Position': 'Query Sequence Original Position', 'Gene' : 'Query Sequence Gene', '#Replicon Name': 'Query Sequence Replicon Name', 'Replicon Accession' : 'Query Sequence Replicon Accession', 'Strand': 'Query Sequence Orientation'}, inplace = True)
    align_seq_2.rename(columns={'Original Position': 'Target Sequence Original Position', 'Gene' : 'Target Sequence Gene', '#Replicon Name': 'Target Sequence Replicon Name', 'Replicon Accession' : 'Target Sequence Replicon Accession', 'Strand': 'Target Sequence Orientation'}, inplace = True)

    all = align_seq_1.loc[:, align_seq_1.columns != 'index'].merge(align_seq_2.loc[:, align_seq_2.columns != 'index'], right_index = True, left_index = True)

    all['Result'] = np.where(all['Query Sequence Gene'] == all['Target Sequence Gene'],  
                    np.where(all['Query Sequence Orientation'] == all['Target Sequence Orientation'], 'Match', 'Inversion'),
                    np.where(((all['Query Sequence Gene'] == '-') | (all['Target Sequence Gene'] == '-')),
                      'Gap', 'Mismatch'))
    
    all = all.reindex(
            columns = ['Query Sequence Original Position', 'Query Sequence Replicon Name',
            'Query Sequence Replicon Accession', 'Query Sequence Orientation', 'Query Sequence Gene', 'Result', 'Target Sequence Gene', 'Target Sequence Orientation',
            'Target Sequence Replicon Accession', 'Target Sequence Replicon Name', 'Target Sequence Original Position'])

    return all


In [5]:
human = swco.excel_reader('Human')
#gorilla = excel_reader('Gorilla')
xen = pd.read_csv('../Data/Intermediate/xenopus_genome_proc.csv')

query_specie = 'human'
target_specie = 'xenopus'

In [6]:
# Order the accessions per number of locus

human_acc = human.groupby('Replicon Accession', as_index = False)['Locus'].count()
human_acc.rename(columns={'Locus':'# Locus'}, inplace=True)
query_acc = human_acc.loc[human_acc['# Locus'] > 9].sort_values('# Locus', ascending = False)
query_acc = query_acc['Replicon Accession']
query_acc = query_acc.reset_index()

# Just consider two accessions to make a test
#query_acc = human_acc.loc[(human_acc['Locus'] == 853) | (human_acc['Locus'] == 958), 'Replicon Accession']

#query_acc = human_acc.loc[(human_acc['Replicon Accession'].str.contains('NC_000021.9')), 'Replicon Accession'] # |(human_acc['Replicon Accession'].str.contains('NT_167244.2'))
#query_acc = query_acc.reset_index()



In [7]:
xen.rename(columns={'Change':'index'}, inplace=True)

In [8]:
query_acc.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 73 entries, 0 to 72
Data columns (total 2 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   index               73 non-null     int64 
 1   Replicon Accession  73 non-null     object
dtypes: int64(1), object(1)
memory usage: 1.3+ KB


In [6]:
# query_acc.loc[query_acc['Replicon Accession'].str.contains('NC_000024.10'), 'index']

In [9]:
target_seq = xen.loc[:,['index', 'Gene', 'Gene_non_or']]
target_seq_name = 'xen' 

In [10]:
query_acc.iloc[:,1]

0     NC_000001.11
1     NC_000019.10
2     NC_000002.12
3     NC_000011.10
4     NC_000017.11
          ...     
68     NT_187517.1
69     NT_187640.1
70     NT_187661.1
71     NT_187668.1
72     NT_187686.1
Name: Replicon Accession, Length: 73, dtype: object

In [13]:
# Save our results afterwards
path = '../Data/S_W_Intermediate/' # + query_specie + '_' + target_specie + '_' + dt.now().strftime('%Y%m%d_%H%M%S') + '.xlsx'
timestamp = dt.now().strftime('%Y%m%d_%H%M%S')
writer = pd.ExcelWriter(path  + query_specie + '_' + target_specie + '_' + timestamp + '.xlsx', engine = 'xlsxwriter')

#os.makedirs(path)

# Parameters
params = []
params = pd.DataFrame({
    'Description': ['', 'Parameters', 'Match', 'Mismatch', 'First gap', 'Second gap'], 
    'Value': ['', '', swco.Score.MATCH, swco.Score.MISMATCH, swco.Score.FIRST_GAP, swco.Score.CONS_GAP]})

results = []
results = pd.DataFrame(data = {'Accession':'', '% Covered Target sequence':'', '% Covered Scaffold': '', 'Number of consecutive genes': ''})

for acc in query_acc.iloc[:,1]:

    print(acc)
    
    query_seq = []
    query_seq = human.loc[human['Replicon Accession'].str.contains(acc), ['index', 'Gene', 'Gene_non_or']]
    query_seq_name = 'human_' + acc
    
    # Executing the Smith Waterman local alignment algorithm
    aligned_query_seq = []
    aligned_target_seq = []
    index_query_seq = []
    index_target_seq = []
    matrix = [[]]

    tic = time.perf_counter()
    aligned_query_seq, aligned_target_seq, index_query_seq, index_target_seq, max_score, max_index, max_i, max_j, matrix = swco.smith_waterman(query_seq, target_seq)
    toc = time.perf_counter()

    # Time
    d = toc - tic
    print('Computed in %s'%time.strftime('%H:%M:%S', time.gmtime(d)))

    all = []

    all = swco.merge(human, xen, aligned_query_seq, index_query_seq, aligned_target_seq, index_target_seq)

    all = swco.duplicate(all)

    index_query_seq_no_gaps = []
    index_target_seq_no_gaps = []
    index_query_seq_no_gaps = list(filter(lambda c: c!= '-', index_query_seq))
    index_target_seq_no_gaps = list(filter(lambda c: c!= '-', index_target_seq))

    summary = []
    summary = pd.DataFrame({
        'Description' : ['Query Sequence', 'Length Query Sequence', 'Target Sequence', 'Length Target Sequence', 'Score', '', 'Aligned Query Sequence initial position', 'Aligned Query Sequence final position', 'Aligned Query Sequence length', '', 'Aligned Target Sequence initial position', 'Aligned Target Sequence final position', 'Aligned Target Sequence length'],
        'Value': [query_seq_name, len(query_seq), target_seq_name, len(target_seq), max_score, '', index_query_seq_no_gaps[0], 
                    index_query_seq_no_gaps[-1], index_query_seq_no_gaps[-1] - index_query_seq_no_gaps[0], '', index_target_seq_no_gaps[0], 
                    index_target_seq_no_gaps[-1], index_target_seq_no_gaps[-1] - index_target_seq_no_gaps[0]]
    })

    summary = pd.concat([summary, params])

    aligned_query_seq_no_gaps = []
    aligned_target_seq_no_gaps = []
    aligned_query_seq_no_gaps = list(filter(lambda c: c!= '-', aligned_query_seq))
    aligned_target_seq_no_gaps = list(filter(lambda c: c!= '-', aligned_target_seq))

    #KPIs
    perc_align_query_seq = len(aligned_query_seq_no_gaps) / len(query_seq)
    perc_align_target_seq = len(aligned_target_seq_no_gaps) / len(target_seq)

    kpi = []
    kpi = pd.DataFrame({
        'Description': ['','KPIs','% Covered Target sequence', '% Covered Scaffold', 'Number of consecutive genes', ''], #include where duplicates occur
        'Value': ['','', perc_align_target_seq, perc_align_query_seq, len(all), '']
    })

    result_perc = all.groupby('Result').count() / all.shape[0]
    result_perc = result_perc.reset_index().iloc[:,[0,1]]
    result_perc.rename(columns = {'Result':'Description', 'Query Sequence Original Position': 'Value'}, inplace=True)

    summary = pd.concat([summary, kpi, result_perc]) # summary.append(kpi)

    summary.to_excel(writer, sheet_name = 'Summary_' + acc, index = False, header=False)
    all.to_excel(writer, sheet_name = 'Comparison_' + acc, index = False)

    actual_results = pd.DataFrame(data = {'Accession': acc, '% Covered Target sequence': perc_align_target_seq, '% Covered Scaffold': perc_align_query_seq, 'Number of consecutive genes': len(all)}, index=query_acc.loc[query_acc['Replicon Accession'].str.contains(acc), 'index'])
    results = pd.concat([results, actual_results])
    
    heatmap = sns.heatmap(matrix)
    plt.title('Heatmap of ' + target_seq_name + ' vs. ' + query_seq_name, fontsize = 12)
    plt.savefig(path + 'Images/' + timestamp + '_' + acc + '.png')
    plt.clf()

    workbook  = writer.book
    worksheet = writer.sheets['Summary_' + acc]
    worksheet.insert_image('H2', path + 'Images/' + timestamp + '_' + acc + '.png')

    # Remove match parts and clean variables
    prev = []
    after = []
    prev = target_seq.loc[target_seq['index'] < index_target_seq_no_gaps[0] + 1]
    after = target_seq.loc[index_target_seq_no_gaps[-1] - 1 < target_seq['index']]

    target_seq = pd.concat([prev, after])
    del heatmap, perc_align_target_seq, perc_align_query_seq, result_perc

results.to_excel(writer, sheet_name = 'Results', index = False)
writer.save()    

ValueError: If using all scalar values, you must pass an index

In [10]:
aligned_query_seq

[]

In [15]:
writer

<pandas.io.excel._xlsxwriter.XlsxWriter at 0x22153a675b0>

In [16]:
#results.to_excel(path + 'human_gorilla_20221204_145134.xlsx', sheet_name = 'Results', index = False)


In [13]:
writer.save()

  warn("Calling close() on already closed file.")
