In [13]:
import numpy as np
from scipy.stats import pearsonr
import networkx as nx
import matplotlib.pyplot as plt
import random
!pip install biopython

from Bio import pairwise2
from Bio import SeqIO
from collections import deque
import cmath
from Bio import SeqIO, Align

# Function to convert a DNA sequence to a complex number vector
def sequence_to_complex(seq):
    return [nucleotide_to_complex[nuc] for nuc in seq]

# Mapping of nucleotides to complex numbers

nucleotide_to_complex = {
    'A': 1 + 1j,
    'C': -1 - 1j,
    'G': -1 + 1j,
    'T': 1 - 1j
}

def calculate_overlap(assembled_seq, reference_seq):
    # Perform local alignment (you can also try global alignment)
    alignments = pairwise2.align.localxx(assembled_seq, reference_seq)

    # Get the best alignment
    best_alignment = alignments[0]

    # The alignment length (start and end positions)
    alignment_length = best_alignment[2]  # This is the end position of the alignment

    # Calculate overlap percentage
    overlap_percentage = (alignment_length / len(reference_seq)) * 100

    return overlap_percentage


# Function to calculate the maximum Pearson correlation coefficient for all shifts
def max_shifted_correlation(seq1, seq2):
    max_corr = 0  # Initialize the maximum correlation as 0

    len1, len2 = len(seq1), len(seq2)
    # Iterate over all possible shifts where seq1 and seq2 overlap
    for shift in range(1 - len2, len1):
        # Define the overlapping region based on the shift
        if shift < 0:
            overlap_seq1 = seq1[:len2 + shift]
            overlap_seq2 = seq2[-shift:]
        else:
            overlap_seq1 = seq1[shift:]
            overlap_seq2 = seq2[:len1 - shift]

        # Calculate Pearson correlation if there's enough overlap
        if len(overlap_seq1) > 1:
            # Separate real and imaginary parts for correlation calculation
            real_part1 = [x.real for x in overlap_seq1]
            real_part2 = [x.real for x in overlap_seq2]
            imag_part1 = [x.imag for x in overlap_seq1]
            imag_part2 = [x.imag for x in overlap_seq2]

            # Compute correlations for both real and imaginary parts
            real_corr = pearsonr(real_part1, real_part2)[0]
            imag_corr = pearsonr(imag_part1, imag_part2)[0]

            # Average real and imaginary correlations and update max correlation
            avg_corr = (real_corr + imag_corr) / 2
            max_corr = max(max_corr, avg_corr)

    return max_corr


def generate_initial_population(cities, population_size):
    population = []
    for _ in range(population_size):
        route = cities.copy() #copy on route list
        random.shuffle(route) #shuffle the route list
        population.append(route) # append the list in the generated population
    return population


# Fitness function
def fitness(route, distance_matrix):
    total_distance = sum(distance_matrix[route[i]][route[i+1]] for i in range(len(route)-1))
    total_distance += distance_matrix[route[-1]][route[0]]  # Return to start
    return 1 / total_distance  # Fitness is inverse of distance


# calculation of the distance of each route
def distance_fitness(route, distance_matrix):
    total_distance = sum(distance_matrix[route[i]][route[i+1]] for i in range(len(route)-1))
    total_distance += distance_matrix[route[-1]][route[0]]  # Return to start
    return total_distance  # Fitness is inverse of distance



# function used on the cumulative normalized matrix to simulte the
# spinning roulette wheel for population selection.
def get_population_index(matrix, no):
    idx = 0
    for i in range(len(matrix)):
        if(matrix[i]>no):
            return idx
        idx+=1


# An order crossover implementation for the TSP.
def order_crossover(par1, par2):
    size = len(par1)

    # Choose two random crossover points
    start, end = sorted(random.sample(range(size), 2))

    # Initialize two children with None values
    child1, child2 = [None] * size, [None] * size

    # Copy the segment from Parent 1 to Child 1, and from Parent 2 to Child 2
    child1[start:end+1] = par1[start:end+1]
    child2[start:end+1] = par2[start:end+1]


    # Fill the remaining positions in Child 1 with cities from Parent 2
    pos1 = end + 1
    for city in par2:
        if city not in child1:
            if pos1 >= size:
                pos1 = 0
            child1[pos1] = city
            pos1 += 1

    # Fill the remaining positions in Child 2 with cities from Parent 1
    pos2 = end + 1
    for city in par1:
        if city not in child2:
            if pos2 >= size:
                pos2 = 0
            child2[pos2] = city
            pos2 += 1

    return child1, child2



def encode_dna_signal(reads):
    """
    Encodes a list of DNA reads into a signal using the cumulated phase signal representation.

    Args:
    - reads (list of str): List of DNA reads, each read is a string of characters from {A, C, G, T}.

    Returns:
    - List of floats: The phase (argument) values accumulated for each read.
    """
    encoded_signals = []

    for read in reads:
        signal = []
        cumulated_phase = 0

        for nucleotide in read:
            # Convert nucleotide to complex number
            complex_value = nucleotide_to_complex.get(nucleotide)
            if complex_value:
                # Calculate the phase (argument) of the complex number
                phase = cmath.phase(complex_value)
                cumulated_phase += phase
                signal.append(cumulated_phase)
            else:
                raise ValueError(f"Invalid nucleotide {nucleotide} in read.")

        encoded_signals.append(signal)

    return encoded_signals
# Function to find the best overlap between two sequences in both directions
def find_best_bidirectional_overlap(aligner, seq1, seq2):
    # Align seq1 -> seq2 (forward)
    alignments_fwd = list(aligner.align(seq1, seq2))
    if not alignments_fwd:
        score_fwd = 0
        overlap_len_fwd = 0
        merged_fwd = None
    else:
        alignment_fwd = alignments_fwd[0]
        score_fwd = alignment_fwd.score
        overlap_len_fwd = int(len(seq2) - score_fwd)
        merged_fwd = seq1 + seq2[overlap_len_fwd:]

    # Align seq2 -> seq1 (backward)
    alignments_bwd = list(aligner.align(seq2, seq1))
    if not alignments_bwd:
        score_bwd = 0
        overlap_len_bwd = 0
        merged_bwd = None
    else:
        alignment_bwd = alignments_bwd[0]
        score_bwd = alignment_bwd.score
        overlap_len_bwd = int(len(seq1) - score_bwd)
        merged_bwd = seq2 + seq1[overlap_len_bwd:]

    # Choose the merge with the better score
    if score_fwd >= score_bwd and merged_fwd:
        return merged_fwd
    elif merged_bwd:
        return merged_bwd
    else:
        return None

# Function to assemble sequences based on the ordered reads
def assemble_ordered_reads(ordered_reads):
    aligner = Align.PairwiseAligner()
    aligner.mode = 'local'  # You can experiment with 'global' as well
    aligner.match_score = 4  # Increased reward for matches
    aligner.mismatch_score = -1  # Increased penalty for mismatches
    aligner.open_gap_score = -1  # Reduced penalty for opening a gap
    aligner.extend_gap_score = -0.25  # Reduced penalty for extending a gap
    aligner.internal_open_gap_score = -2  # Reduced penalty for internal gaps
    aligner.internal_extend_gap_score = -1  # Increased penalty for internal gaps# Start with the first read in the ordered list
    assembled_sequence = ordered_reads[0]

    # Iterate through the rest of the ordered reads
    for read in ordered_reads[1:]:
        merged = find_best_bidirectional_overlap(aligner, assembled_sequence, read)
        if merged:
            assembled_sequence = merged
        else:
            print(f"Warning: Could not merge read: {read[:20]}...")

    return assembled_sequence




In [30]:

# File path to the .fq file
fq_file = "output_prefix.fq"

# Read the .fq file and store sequences
sequences = []
for record in SeqIO.parse(fq_file, "fastq"):
    sequences.append(str(record.seq))

# Print the first few sequences
print(sequences[:5])  # Show the first 5 sequences
print(len(sequences))

reads = sequences
complex_reads = [sequence_to_complex(read) for read in reads]

import os
if os.path.exists('pearson_matrix.npy'):
  pearson_matrix = np.load('pearson_matrix.npy')
else:
       # ... (Your existing code to calculate pearson_matrix) ...
  # Create the Pearson correlation matrix
  num_reads = len(reads)
  pearson_matrix = np.zeros((num_reads, num_reads))

  for i in range(num_reads):
      for j in range(num_reads):
          if i != j:
              pearson_matrix[i][j] = max_shifted_correlation(complex_reads[i], complex_reads[j])
          else:
              pearson_matrix[i][j] = 1.0  # Self-correlation is 1
  np.save('pearson_matrix.npy', pearson_matrix)

print(pearson_matrix)


print(pearson_matrix)

# Convert Pearson correlation matrix to a distance matrix suitable for TSP
distance_matrix = 1 - pearson_matrix  # Higher correlation -> lower distance
distance_matrix *= 1000  # Scale to larger values for TSP solver
distance_matrix = distance_matrix.astype(int)  # Convert to integers

# Add an artificial vertex with zero distance to all other vertices
num_reads += 1
distance_matrix = np.pad(distance_matrix, ((1, 0), (1, 0)), mode='constant', constant_values=0)


['TAGACCGGTGTAAAATCCCTTAAACATTTACTTAAAATTTAAGGAGAGGGTATCAAGCACATTAAAATAGCTTAAGACAC', 'CTTAATATTTATCACTGCTGAGTCCCGTGGGGGTGTGGCTAGGCAAGGTGTCTTAAGCTATTTTAATGTGCTTGATACCC', 'CTCCATAGACCGGGGTAAAATCCCTTAAACATTTACTTAAAATTTAAGGTGAGGGTATCAAGCACATTAAAATAGCTTAA', 'TGATTATTCCAAGCACACTTTCCAGTATGCTTACCTTTTTACGACTTATCTCCTCTCATAAACGGATGTCTAGAAATTAA', 'CTTATAATTAATTAGAGGTAAAATTACACATGCAAACCTCCATAGACCGGTGTAAAATCCCTTAAACATTTACTTAAAAT']
22
[[1.         0.53745748 0.95749028 0.38891059 1.         0.44638501
  1.         0.4979798  0.35355339 1.         0.70710678 0.4472136
  0.41702952 0.78867513 0.57716019 0.75       0.75       0.75
  0.78867513 0.38951955 0.75       0.31361622]
 [0.53745748 1.         1.         0.62112448 0.88729833 0.39630291
  0.66666667 0.47718367 0.55       1.         0.78867513 0.49952531
  0.45674334 0.4888328  0.36229178 0.53745748 0.54397502 0.83333333
  1.         0.52571722 0.32361648 0.83333333]
 [0.95749028 1.         1.         0.40608195 0.951869   0.70412415
  0.81622777 

In [29]:
import os
if os.path.exists('pearson_matrix.npy'):
  pearson_matrix = np.load('pearson_matrix.npy')
else:
       # ... (Your existing code to calculate pearson_matrix) ...
  np.save('pearson_matrix.npy', pearson_matrix)

print(pearson_matrix)

[[1.         0.53745748 0.95749028 0.38891059 1.         0.44638501
  1.         0.4979798  0.35355339 1.         0.70710678 0.4472136
  0.41702952 0.78867513 0.57716019 0.75       0.75       0.75
  0.78867513 0.38951955 0.75       0.31361622]
 [0.53745748 1.         1.         0.62112448 0.88729833 0.39630291
  0.66666667 0.47718367 0.55       1.         0.78867513 0.49952531
  0.45674334 0.4888328  0.36229178 0.53745748 0.54397502 0.83333333
  1.         0.52571722 0.32361648 0.83333333]
 [0.95749028 1.         1.         0.40608195 0.951869   0.70412415
  0.81622777 0.47855339 0.51031036 0.47434165 0.48219461 0.29587585
  0.49479919 0.57380715 0.4736068  0.31622777 0.83333333 0.68898224
  0.60355339 0.51228193 0.63900965 0.58333333]
 [0.38891059 0.62112448 0.40608195 1.         0.68044298 0.46531409
  0.57735027 0.75       0.57560683 0.59660989 0.53745748 1.
  0.39329326 1.         1.         0.37971834 0.51031036 0.39036003
  0.47337062 0.3921868  0.2664135  0.28643571]
 [1.       

In [15]:

############## TSP part #########################
epochs = 500
mutation_rate = 0.01
population_size = 32
cities = list(range(distance_matrix.shape[0]))
number_of_cities = len(cities)
print("cities: ",cities)

population = generate_initial_population(cities, population_size)
print("initial population created:")
for i in range(0,len(population)):
    print(population[i])

minimum_per_stage = []
minimum_path = []


cities:  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
initial population created:
[10, 14, 3, 15, 5, 17, 18, 16, 7, 12, 9, 4, 6, 11, 0, 2, 20, 13, 8, 21, 1, 19, 22]
[12, 2, 8, 17, 6, 13, 5, 0, 22, 15, 21, 4, 14, 11, 19, 7, 9, 16, 3, 18, 1, 10, 20]
[21, 18, 7, 16, 10, 0, 8, 2, 22, 17, 4, 13, 5, 9, 11, 15, 19, 6, 3, 14, 20, 12, 1]
[10, 18, 15, 16, 4, 2, 5, 11, 21, 20, 19, 6, 17, 12, 0, 7, 14, 8, 3, 1, 22, 9, 13]
[1, 11, 20, 6, 3, 5, 8, 0, 4, 22, 15, 14, 2, 13, 17, 21, 10, 9, 18, 7, 16, 12, 19]
[21, 16, 7, 2, 18, 12, 13, 10, 3, 0, 19, 6, 15, 5, 14, 22, 20, 8, 11, 4, 1, 17, 9]
[13, 9, 4, 12, 15, 11, 8, 6, 1, 16, 14, 20, 3, 17, 18, 10, 0, 7, 21, 22, 5, 2, 19]
[9, 1, 14, 10, 18, 4, 21, 5, 6, 20, 12, 16, 2, 22, 7, 13, 11, 19, 0, 8, 17, 15, 3]
[1, 15, 17, 22, 18, 9, 10, 7, 16, 8, 14, 20, 12, 6, 5, 19, 4, 3, 13, 0, 21, 2, 11]
[15, 18, 3, 7, 9, 6, 14, 19, 1, 4, 0, 8, 2, 22, 17, 10, 21, 5, 16, 20, 13, 12, 11]
[12, 19, 13, 7, 5, 6, 17, 15, 2, 10, 21, 1, 11, 18

In [32]:
for epoch in range(epochs):
    #evaluation of the objective function
    fitness_scores = [fitness(route, distance_matrix) for route in population]
    distance_scores = [distance_fitness(route, distance_matrix) for route in population]
    #print("starting generation population  GENERATION["+str(epoch)+"] :")
    #for i in range(0,len(population)):
        #print(population[i])
    # now normalization of the function scores happens.
    normalized_function_scores = fitness_scores/sum(fitness_scores)
    new_population = [] # empty it.
    minimum_per_stage.append(min(distance_scores))
    if(min(minimum_per_stage)>= min(distance_scores)):
        for i in range(0,len(population)):
            if(min(distance_scores) == distance_fitness(population[i],distance_matrix)):
                minimum_path = population[i]
                break

    #print("distances per route")
    #for i in range(len(population)):
        #print(distance_scores[i])

    #print("normalized_f_sc")
    #print(normalized_function_scores)

    cumulative_normalized_function_scores = normalized_function_scores.copy()
    for i in range(len(cumulative_normalized_function_scores)):
        for j in range(0,i):
            cumulative_normalized_function_scores[i]+=normalized_function_scores[j]

    #print("cumulative_n_f_s")
    #print(cumulative_normalized_function_scores)

    #selection of the new population.
    for i in range(population_size):
        random_number = random.random() #get a random number.(like spinning the roulette wheel)
        #print("random no "+str(random_number))
        ret = get_population_index(cumulative_normalized_function_scores,random_number)
        #print("assign me to "+str(ret))
        new_population.append(population[ret])

#
#    print("New population: ")
#    for i in range(0,len(new_population)):
#        print(new_population[i])

    # because of the nature of the traveling salesman problem crossover
    # in its general form can not be used, so we implement order crossover.
    # we keep the rule for 0.75 3 out of 4 crossover.
    for i in range(0,round(population_size*0.75),2):
        new_population[i], new_population[i+1] = order_crossover(new_population[i], new_population[i+1])

#    print("New population after order crossover: ")
#    for i in range(0,len(new_population)):
#        print(new_population[i])

    #Mutation step.
    # the nature of the application does not allows just a single flip.
    # so a mutation will be a 1 to 1 swap of two random cities in the mutated population.

    for i in range(population_size):
        #print("check mutation on "+str(i))
        #print(new_population[i])
        mutate = random.random() #this is a number between [0,1)
        #print("mutate value: "+str(mutate))
        #print(1-mutate)
        #print("rate of mutation "+str(mutation_rate))
#mutation_rate = 0.008
        if( (1-mutate) < mutation_rate): #occurance of mutation
            #print("MUTATION")
            sp = random.randint(1,number_of_cities-2)
            ep = random.randint(1,number_of_cities-1)
            while(sp >= ep):
                ep = random.randint(1,number_of_cities-1)
            #print(sp)
            #print(ep)
            tmp = new_population[i][sp]
            new_population[i][sp] = new_population[i][ep]
            new_population[i][ep] = tmp
            #print("mutated now: ")
            #print(new_population[i])

    # set it as the new generation surviving for the next epoch.
    population = new_population


print("Minimum path and distance")
print(minimum_path)
print(distance_fitness(minimum_path,distance_matrix))


Minimum path and distance
[4, 15, 13, 8, 20, 11, 9, 22, 6, 12, 3, 17, 7, 2, 10, 18, 21, 19, 0, 5, 1, 16, 14]
3701


In [33]:


# Fix the solution, remove the  extra artificial vertex
ordered_reads = [reads[i - 1] for i in minimum_path if i != 0]
print("Optimal order of DNA reads for assembly:", ordered_reads)

final_assembled_sequence = assemble_ordered_reads(ordered_reads)
print("Final assembled DNA sequence of lenght: "+str(len(final_assembled_sequence))+ " nucleotides")

##test how good we went !
# Path to the chrM.fa file
fasta_file = "chrM.fa"

# Read the FASTA file and store the sequence
record = SeqIO.read(fasta_file, "fasta")

# The sequence of the chromosome
chrM_sequence = str(record.seq)

# Example usage
overlap_percentage = calculate_overlap(final_assembled_sequence, chrM_sequence)

print(f"Overlap percentage: {overlap_percentage:.2f}%")


Optimal order of DNA reads for assembly: ['TGATTATTCCAAGCACACTTTCCAGTATGCTTACCTTTTTACGACTTATCTCCTCTCATAAACGGATGTCTAGAAATTAA', 'CCCTTGCGGTACTAGTTCTATAGCTCCTAGATGTACGAATTTCTTTCTCCAATACTTTTAGTAGGATAAATGTTTTGATT', 'TTAGTTCATTATGCAAAAGGTACAAGGTTTAATCTTTGCTTGTTCTTACTTTTAATTAGTCTTTCATCTTTCCCTTGCGG', 'TCTTAGGGTTGGTAAATTTCGTGCCAGCCACCGCGGTCATACGATTAACCCAAACTAATTATCTTCGGCGTAAAACGTGT', 'AGGGCTAGGGCTAGGATTAGTTCAGAGTGTTCATTGGTCATGAAATCTTCTGGGTGTAGGCCAGATGCTTTAATATTAAG', 'AGCCACACCCCCACGGGACTCAGCAGTGATAAATATTAAGCAATAAACGAAAGTTTGACTAAGTTATACCTCTTAGGGTT', 'CGAAACCAAACGAGCTACCTAAAAACAATTTTATGAATCAACTCGTCTATGTGGCAAAATAGTGAGAAGATTTTTAGGTA', 'CTATAATCGATAAACCCCGCTCTACCTCACCATCTCTTGCTAATTCAGCCTATATACCGCCATCTTCAGCAAACCCTAAA', 'ATAAATAAATAGAATTAAAATCCAACTTATATGTGAAAATTCATTGTTAGGACCTAAACTCAATAACGAAAGTAATTCTA', 'CTATTCTTAATTTACTACTAAATCCTCCTTAGTCCTTTAGTTTCATAAAGGGTATAGTAATGTTCTTTTATAAGAAAATG', 'CTCCATAGACCGGGGTAAAATCCCTTAAACATTTACTTAAAATTTAAGGTGAGGGTATCAAGCACATTAAAATAGCTTAA', 'CATAATTAATTTCTAGACATCCGTTTATGAGAG

In [23]:
# Function to assemble sequences based on the ordered reads
def assemble_ordered_reads(ordered_reads):
    aligner = Align.PairwiseAligner()
    aligner.mode = 'local'  # You can experiment with 'global' as well
    aligner.match_score = 8  # Increased reward for matches
    aligner.mismatch_score = -1  # Increased penalty for mismatches
    aligner.open_gap_score = -4  # Reduced penalty for opening a gap
    aligner.extend_gap_score = -1 # Reduced penalty for extending a gap
    aligner.internal_open_gap_score = -2  # Reduced penalty for internal gaps
    aligner.internal_extend_gap_score = -1  # Increased penalty for internal gaps# Start with the first read in the ordered list
    assembled_sequence = ordered_reads[0]

    # Iterate through the rest of the ordered reads
    for read in ordered_reads[1:]:
        merged = find_best_bidirectional_overlap(aligner, assembled_sequence, read)
        if merged:
            assembled_sequence = merged
        else:
            print(f"Warning: Could not merge read: {read[:20]}...")

    return assembled_sequence