In [2]:
# !wget https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/chr1.GRCh38.excerpt.fasta

In [9]:
# Find the closest match
def editDistanceClosest(pattern, text):
    # Create distance matrix
    D = []
    for _ in range(len(text)+1):
        D.append([0]*(len(pattern)+1))
    # Initialize first row and column of matrix
    for i in range(len(text)+1):
        D[i][0] = 0
    for i in range(len(pattern)+1):
        D[0][i] = i
    # Fill in the rest of the matrix
    for i in range(1, len(text)+1):
        for j in range(1, len(pattern)+1):
            distHor = D[i][j-1] + 1
            distVer = D[i-1][j] + 1
            if text[i-1] == pattern[j-1]:
                distDiag = D[i-1][j-1]
            else:
                distDiag = D[i-1][j-1] + 1
            D[i][j] = min(distHor, distVer, distDiag)
    # Potential offset appears at the cell where edit distance is minimum the value in the bottom row of the matrix
    bottom_row = [column[len(pattern)] for column in D]
    index_match_bottom = bottom_row.index(min(bottom_row))
    # Trace back:
    row, column = trace_back(pattern, text, D, index_match_bottom)
    return D[index_match_bottom][len(pattern)], row, column

def trace_back(pattern, text, D, index_match_bottom):
    i = index_match_bottom
    j = len(pattern)
    while (i > 0 and j > 0):
        # Establish the other 3 coordinates one of which led to the current result in the current cell (i, j)
        above_coor = (i, j-1)
        left_coor = (i-1, j)
        upper_left_coor = (i-1, j-1)
        # Calculate the delta based on the similarity of the suffixes
        delta = 0 if text[i - 1] == pattern[j - 1] else 1
        # Check to see where the current coordinate comes from
        current_edist = D[i][j]
        if current_edist == D[above_coor[0]][above_coor[1]] + 1:
            i = above_coor[0]
            j = above_coor[1]
        elif current_edist == D[left_coor[0]][left_coor[1]] + 1:
            i = left_coor[0]
            j = left_coor[1]
        elif current_edist == D[upper_left_coor[0]][upper_left_coor[1]] + delta:
            i = upper_left_coor[0]
            j = upper_left_coor[1]
    return (i, j)


In [11]:
text = "TATTGGCTATACGGTT"
pattern = "GCGTATGC"
shortest_edist, i, j = editDistanceClosest(pattern, text)
print(shortest_edist)
print(i)

2
5


In [13]:
# Read genome in the fasta file
genome = ""
genome_line_list = []
with open("chr1.GRCh38.excerpt.fasta", "r") as f:
    for line in f:
        if line[0] != ">":
            genome_line_list.append(line.rstrip())
genome = "".join(genome_line_list)

In [14]:
# Question 1
pattern = "GCTGATCGATCGTACG"
shortest_edist, i, j = editDistanceClosest(pattern, genome)
print(shortest_edist)
print(i)

3
380536


In [15]:
# Question 2
pattern = "GATTTACCAGATTGAG"
shortest_edist, i, j = editDistanceClosest(pattern, genome)
print(shortest_edist)
print(i)

2
186719


In [16]:
# Combine k-mer-index approach in approximate matching and then overlap assembly
def overlap(a, b, min_length=3):
    """ Return length of longest suffix of 'a' matching
        a prefix of 'b' that is at least 'min_length'
        characters long.  If no such overlap exists,
        return 0. """
    start = 0  # start all the way at the left
    while True:
        start = a.find(b[:min_length], start)  # look for b's prefix in a
        if start == -1:  # no more occurrences to right
            return 0
        # found occurrence; check for full suffix/prefix match
        if b.startswith(a[start:]):
            return len(a)-start
        start += 1  # move just past previous match

In [28]:
class DenovoAssembly:
    def __init__(self, reads, min_len = 3):
        self.min_len = min_len
        self.reads = reads
        self.k_mer_map = dict()
        self.overlap_map = []

    def analyze_kmer_reads(self):
        for read in reads:
            k_mer_set = set()
            for i in range(len(read) - self.min_len + 1):
                k_mer_set.add(read[i:i+self.min_len])
            self.k_mer_map[read] = k_mer_set

    def assemble(self):
        self.analyze_kmer_reads()
        k_mer_map_keys = list(self.k_mer_map.keys())
        for i in range(len(self.k_mer_map)):
            current_read = k_mer_map_keys[i]
            for j in range(len(self.k_mer_map)):
                compared_read = k_mer_map_keys[j]
                if (compared_read[:self.min_len] not in self.k_mer_map[current_read]) or i == j:
                    continue
                else:
                    overlap_len = overlap(current_read, compared_read, min_length=self.min_len)
                    if overlap_len > 0:
                        self.overlap_map.append((current_read, compared_read))
        return self.overlap_map

In [29]:
reads = ['ABCDEFG', 'EFGHIJ', 'HIJABC']
assembly_obj = DenovoAssembly(reads, min_len=3)
print(assembly_obj.assemble())

[('ABCDEFG', 'EFGHIJ'), ('EFGHIJ', 'HIJABC'), ('HIJABC', 'ABCDEFG')]


In [32]:
reads = ['CGTACG', 'TACGTA', 'GTACGT', 'ACGTAC', 'GTACGA', 'TACGAT']
assembly_obj = DenovoAssembly(reads, min_len=5)
[print(element) for element in assembly_obj.assemble()]

('CGTACG', 'GTACGT')
('CGTACG', 'GTACGA')
('TACGTA', 'ACGTAC')
('GTACGT', 'TACGTA')
('ACGTAC', 'CGTACG')
('GTACGA', 'TACGAT')


[None, None, None, None, None, None]

In [33]:
# !wget https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/ERR266411_1.for_asm.fastq

--2025-02-16 13:27:15--  https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/ERR266411_1.for_asm.fastq
13.227.44.144, 13.227.44.207, 13.227.44.91, ...a8wq0iu5.cloudfront.net)... 
Connecting to d28rh4a8wq0iu5.cloudfront.net (d28rh4a8wq0iu5.cloudfront.net)|13.227.44.144|:443... connected.
200 OKequest sent, awaiting response... 
Length: 2562951 (2.4M) [application/octet-stream]
Saving to: ‘ERR266411_1.for_asm.fastq’


2025-02-16 13:27:16 (19.0 MB/s) - ‘ERR266411_1.for_asm.fastq’ saved [2562951/2562951]



In [34]:
def read_fastq(filename):
    sequences = []
    qualities = []
    with open(filename, "r") as fh:
        while True:
            fh.readline()
            seq = fh.readline().rstrip()
            fh.readline()
            Q_encoded = fh.readline().rstrip()
            if len(seq) == 0:
                break
            sequences.append(seq)
            qualities.append(Q_encoded)
    return sequences, qualities

In [35]:
reads, qualities = read_fastq("ERR266411_1.for_asm.fastq")

In [38]:
# Question 3
assembly_obj = DenovoAssembly(reads, min_len=30)
overlap_map = assembly_obj.assemble()
print(len(overlap_map))

904746


In [39]:
overlap_map[:5]

[('TAAACAAGCAGTAGTAATTCCTGCTTTATCAAGATAATTTTTCGACTCATCAGAAATATCCGAAAGTGTTAACTTCTGCGTCATGGAAGCGATAAAACTC',
  'AACAAGCAGTAGTAATTCCTGCTTTATCAAGATAATTTTTCGACTCATCAGAAATATCCGAAAGTGTTAACTTCTGCGTCATGGAAGCGATAAAACTCTG'),
 ('TAAACAAGCAGTAGTAATTCCTGCTTTATCAAGATAATTTTTCGACTCATCAGAAATATCCGAAAGTGTTAACTTCTGCGTCATGGAAGCGATAAAACTC',
  'AAACAAGCAGTAGTAATTCCTGCTTTATCAAGATAATTTTTCGACTCATCAGAAATATCCGAAAGTGTTAACTTCTGCGTCATGGAAGCGATAAAACTCT'),
 ('AGCCGACGTTTTGGCGGCGCAACCTGTGACGACAAATCTGCTCAAATTTATGCGCGCTTCGATAAAAATGATTGGCGTATCCAACCTGCAGAGTTTTATC',
  'GACAAATCTGCTCAAATTTATGCGCGCTTCGATAAAAATGATTGGCGTATCCAACCTGCAGAGTTTTATCGCTTCCATGACGCAGAAGTTAACACTTTCG'),
 ('AGCCGACGTTTTGGCGGCGCAACCTGTGACGACAAATCTGCTCAAATTTATGCGCGCTTCGATAAAAATGATTGGCGTATCCAACCTGCAGAGTTTTATC',
  'CTGTGACGACAAATCTGCTCAAATTTATGCGCGCTTCGATAAAAATGATTGGCGTATCCAACCTGCAGAGTTTTATCGCTTCCATGACGCAGAAGTTAAC'),
 ('AGCCGACGTTTTGGCGGCGCAACCTGTGACGACAAATCTGCTCAAATTTATGCGCGCTTCGATAAAAATGATTGGCGTATCCAACCTGCAGAGTTTTATC',
  'CAAATCTGCTCAAATTTATGCGCGCTTCGATAAAAATGA

In [45]:
# Question 4
nodes = set()
all_nodes_testing = dict()
for edge in overlap_map:
    all_nodes_testing[edge[0]] = 0 if edge[0] not in all_nodes_testing else all_nodes_testing[edge[0]] + 1
for node in all_nodes_testing.keys():
    if all_nodes_testing[node] >= 0:
        nodes.add(node)
print(len(nodes))

7161
