# Edit Distance, Assembly, and Overlaps



In [None]:
%matplotlib inline

In [1]:
from pathlib import Path
from typing import Literal, TypeAlias
import unittest

from Bio.SeqIO import parse, SeqRecord
import numpy as np
import numpy.typing as npt

In [None]:
ArrayLikeInt: TypeAlias = npt.ArrayLike
"""NumPy array of data type int"""


def min_distance(
    dist: ArrayLikeInt
) -> int:
    """Calculate the minimum distance between two strings using a distance matrix.

    Parameters
    ----------
    dist : ArrayLikeInt
        Prepopulated distance array. The rows match to the shorter string.
    Returns
    -------
    int
        Edit distance
    """
    return np.min(dist[-1])


def build_dist_matrix(
    s1: str,
    s2: str,
    method: Literal["Edit", "Approx"],
    verbose: bool = False
) -> np.ndarray:
    """Calculate the edit distance matrix between to strings

    The edit distance is defined as the number of substitutions,
    insertions, and deletions required to align them. For equal-
    weights.

    Parameters
    ----------
    s1 : str
        First string
    s2 :
        Second string
    verbose : bool
        Enable verbosity
    method : Literal["Edit", "Approx"]
        The method to use.

    Returns
    -------
    np.ndarray
        The edit distance matrix

    """
    if method not in ("Edit", "Approx"):
        raise ValueError(f'method "{method} is not supported"')
    if len(s1) > len(s2):
        shorter_str = s2
        longer_str = s1
        shape = (len(s2) + 1, len(s1) + 1)
    else:
        shorter_str = s1
        longer_str = s2
        shape = (len(s1) + 1, len(s2) + 1)
    dist = np.zeros(shape=shape, dtype=int)
    # Initialize first column
    for index in range(1, shape[0]):
        dist[index][0] = index
    # Initialize first row
    if method == "Edit":
        for index in range(1, shape[1]):
            dist[0][index] = index
    counter = 0
    total_iter = (shape[0] - 1) * (shape[1] - 1)
    for index_i in range(1, shape[0]):
        for index_j in range(1, shape[1]):
            if verbose and (counter / total_iter * 100) % 1 == 0:
                print("%d%%" % (counter / total_iter * 100))
            counter += 1
            dist_hor = dist[index_i][index_j - 1] + 1
            dist_ver = dist[index_i - 1][index_j] + 1
            dist_diag = dist[index_i - 1][index_j - 1]
            if shorter_str[index_i - 1] != longer_str[index_j - 1]:
                dist_diag += 1
            dist[index_i][index_j] = np.min([dist_hor, dist_ver, dist_diag])
    if verbose:
        print("100%")
    return dist


def build_edit_dist_matrix(
    s1: str,
    s2: str,
) -> np.ndarray:
    return build_dist_matrix(s1, s2, "Edit")


def build_approx_match_dist_matrix(
    s1: str,
    s2: str,
) -> np.ndarray:
    return build_dist_matrix(s1, s2, "Approx")


class DistanceMatrixTestCase(unittest.TestCase):

    def test_edit_distance(self):
        p_1 = "EFG"
        t_1 = "ABCD"
        edit_dist_mat_1 = build_edit_dist_matrix(p_1, t_1)
        self.assertEqual(min_distance(edit_dist_mat_1), 3)

        p_2 = "GCGTATGC"
        t_2 = "TATTGGCTATACGGTT"
        edit_dist_mat_2 = build_edit_dist_matrix(p_2, t_2)
        self.assertEqual(min_distance(edit_dist_mat_2), 5)

    def test_approx_distance(self):
        p_1 = "EFG"
        t_1 = "ABCD"
        approx_match_dist_mat_1 = build_approx_match_dist_matrix(p_1, t_1)
        self.assertEqual(
            min_distance(approx_match_dist_mat_1),
            3
        )

        p_2 = "GCGTATGC"
        t_2 = "TATTGGCTATACGGTT"
        approx_match_dist_mat_2 = build_approx_match_dist_matrix(p_2, t_2)
        self.assertEqual(
            min_distance(approx_match_dist_mat_2),
            2
        )

res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

In [None]:
!wget https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/chr1.GRCh38.excerpt.fasta
!mkdir -p week3hw
!mv chr1.GRCh38.excerpt.fasta week3hw

In [None]:
human_genome_file = Path("week3hw/chr1.GRCh38.excerpt.fasta")
with human_genome_file.open("r") as fh:
    human_genome_seg: SeqRecord = list(parse(fh, human_genome_file.suffix.strip(".")))[0]

In [None]:
human_genome_seg, len(human_genome_seg.seq)

# Quiz

## Q1 and Q2 Preamble

We saw how to adapt dynamic programming to find approximate occurrences of a pattern in a text. Recall that:

 1. Rows of the dynamic programming matrix are labeled with bases from P and columns with bases from T
 2. Elements in the first row are set to 0
 3. Elements in the first column are set to 0, 1, 2, ..., as for edit distance
 4. Other elements are set in the same way as elements of a standard edit distance matrix
 5. The minimal value in the bottom row is the edit distance of the closest match between P and T


First, download the provided excerpt of human chromosome 1

https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/chr1.GRCh38.excerpt.fasta

Second, parse it using the readGenome function we wrote before.

Third, adapt the editDistance function we saw in practical (copied below) to answer questions 1 and 2 below. Your function should take arguments p (pattern), t (text) and should return the edit distance of the match between P and T with the fewest edits.

## Q1

What is the edit distance of the best match between pattern GCTGATCGATCGTACG and the excerpt of human chromosome 1?  (Don't consider reverse complements.)

In [None]:
p_1 = "GCTGATCGATCGTACG"
t_1 = str(human_genome_seg.seq)

try:
    assert edit_mat_1 is not None
except NameError:
    edit_mat_1 = build_dist_matrix(p_1, t_1, "Approx", verbose=True)

In [None]:
edit_dist_1 = min_distance(edit_mat_1)

In [None]:
edit_dist_1

The correct answer is __3__!

## Q2

What is the edit distance of the best match between pattern GATTTACCAGATTGAG and the excerpt of human chromosome 1?  (Don't consider reverse complements.)

In [None]:
p_2 = "GATTTACCAGATTGAG"
t_2 = t_1

try:
    assert edit_mat_2 is not None
except NameError:
    edit_mat_2 = build_dist_matrix(p_2, t_2, "Approx", verbose=True)


In [None]:
edit_dist_2 = min_distance(edit_mat_2)

In [None]:
edit_dist_2

The correct answer is __2__!

## Q3 and Q4 Preamble

In a practical, we saw a function for finding the longest exact overlap (suffix/prefix match) between two strings. The function is copied below.

```python
def overlap(a, b, min_length=3):
    """ Return length of longest suffix of 'a' matching
        a prefix of 'b' that is at least 'min_length'
        characters long.  If no such overlap exists,
        return 0. """
    start = 0  # start all the way at the left
    while True:
        start = a.find(b[:min_length], start)  # look for b's prefix in a
        if start == -1:  # no more occurrences to right
            return 0
        # found occurrence; check for full suffix/prefix match
        if b.startswith(a[start:]):
            return len(a)-start
        start += 1  # move just past previous match
```

Say we are concerned only with overlaps that (a) are exact matches (no differences allowed), and (b) are at least k bases long. To make an overlap graph, we could call `overlap(a,b,min_length=k)` on every possible pair of reads from the dataset. Unfortunately, that will be very slow!

Consider this: Say we are using k=6, and we have a read `a` whose length-6 suffix is GTCCTA. Say GTCCTA does not occur in any other read in the dataset. In other words, the 6-mer GTCCTA occurs at the end of read `a` and nowhere else. It follows that `a`'s suffix cannot possibly overlap the prefix of any other read by 6 or more characters.

Put another way, if we want to find the overlaps involving a suffix of read
`a` and a prefix of some other read, we can ignore any reads that don't contain the length-k suffix of `a`. This is good news because it can save us a lot of work!

Here is a suggestion for how to implement this idea. You don't have to do it this way, but this might help you. Let every k-mer in the dataset have an associated Python set object, which starts out empty. We use a Python dictionary to associate each k-mer with its corresponding set. (1) For every k-mer in a read, we add the read to the set object corresponding to that k-mer. If our read is GATTA and k=3, we would add GATTA to the set objects for GAT, ATT and TTA. We do this for every read so that, at the end, each set contains all reads containing the corresponding k-mer. (2) Now, for each read `a`, we find all overlaps involving a suffix of `a`. To do this, we take `a`'s length-k suffix, find all reads containing that k-mer (obtained from the corresponding set) and call `overlap(a,b min_length=k)` for each.

The most important point is that we do not call `overlap(a,b,min_length=k)` if `b` does not contain the length-k suffix of `a`.

Download and parse the read sequences from the provided Phi-X FASTQ file. We'll just use their base sequences, so you can ignore read names and base qualities. Also, no two reads in the FASTQ have the same sequence of bases. This makes things simpler.

https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/ERR266411_1.for_asm.fastq

Next, find all pairs of reads with an exact suffix/prefix match of length at least 30. Don't overlap a read with itself; if a read has a suffix/prefix match to itself, ignore that match. Ignore reverse complements.
 * Hint 1: Your function should not take much more than 15 seconds to run on this 10,000-read dataset, and maybe much less than that. (Our solution takes about 3 seconds.) If your function is much slower, there is a problem somewhere.
 * Hint 2: Remember not to overlap a read with itself. If you do, your answers will be too high.
 * Hint 3: You can test your implementation by making up small examples, then checking that (a) your implementation runs quickly, and (b) you get the same answer as if you had simply called `overlap(a,b,min_length=k)` on every pair of reads. We also have provided a couple examples you can check against.

Picture the overlap graph corresponding to the overlaps just calculated.

In [2]:
def overlap(a: str, b: str, min_length=3) -> int:
    """Return length of the longest suffix of 'a' matching
    a prefix of 'b' that is at least 'min_length' characters
    long. If no such overlap exists, return 0.

    Parameters
    ----------
    a : str
        String to test its suffix
    b : str
        String to test its prefix
    min_length : int
        Minimum length of match

    Returns
    -------
    int
        Longest overlap between suffix of 'a' with prefix of 'b'. Zero (0) otherwise
    """
    start = 0  # start all the way at the left
    # MGH addition, min length must be positive
    if min_length < 1:
        raise ValueError("min_length must be positive definite")
    # MGH addition, edge case if len(b) < min_length, then should return 0
    if len(b) < min_length:
        return 0
    while True:
        start = a.find(b[:min_length], start)  # look for b's prefix in a
        if start == -1:  # no more occurrences to right
            return 0
        # found occurrence; check for full suffix/prefix match
        if b.startswith(a[start:]):
            return len(a)-start
        start += 1  # move just past previous match


class OverlapTestCase(unittest.TestCase):

    def test_overlap(self):
        r1 = "ABCDEF"
        #       ||||  <-- four overlaps
        r2 =   "CDEFGH"
        n_overlaps = 4
        for i in range(1, n_overlaps + 1):
            self.assertEqual(
                overlap(r1, r2 ,i),
                n_overlaps
            )

    def test_no_overlap(self):

        r1 = "ABCDEF"
        #       ||||  <-- four overlaps
        r2 =   "CDEFGH"
        n_overlaps = 4
        for i in range(n_overlaps + 1, n_overlaps + 3):
            self.assertEqual(
                overlap(r1, r2 ,i),
                0
            )

        # Edge case when pattern is shorter than min length
        self.assertEqual(
            overlap(
                "ABCDEF",
                #    ||  <-- two overlaps
                    "EF",
                3),   #  <-- min 3-length
            0
        )


res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

test_no_overlap (__main__.OverlapTestCase) ... ok
test_overlap (__main__.OverlapTestCase) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.006s

OK


In [None]:
!wget https://d28rh4a8wq0iu5.cloudfront.net/ads1/data/ERR266411_1.for_asm.fastq
!mkdir -p week3hw
!mv ERR266411_1.for_asm.fastq week3hw

In [3]:
fq_file = Path("week3hw/ERR266411_1.for_asm.fastq")

with fq_file.open("r") as fq_handle:
    fastq_reads: list[SeqRecord] = list(parse(fq_handle, fq_file.suffix.strip(".")))
    fastq_reads_str: list[str] = [str(read.seq) for read in fastq_reads]

In [4]:
len(fastq_reads_str)

10000

In [47]:
from collections import defaultdict

Kmer: TypeAlias = str
ReadIndex: TypeAlias = int


def overlap_all_pairs(
    reads: list[str],
    kmer_len: int
) -> list[tuple[ReadIndex, ReadIndex]]:
    """Find all overlapping exact pairs of reads between (suffix, prefix)

    Parameters
    ----------
    reads : list[str]
        All the reads
    kmer_len : int
        Length of kmers

    Returns
    -------
    list[tuple[ReadIndex, ReadIndex]]
        Overlap pairs
    """
    kmers_to_reads: dict[Kmer, set[ReadIndex]] = defaultdict(set)
    overlap_pairs: list[tuple[ReadIndex, ReadIndex]] = []

    # Final all kmers in the reads
    for read_index in range(len(reads)):
        read = reads[read_index]
        for index in range(len(read) - kmer_len + 1):
            kmer = read[index: index + kmer_len]
            kmers_to_reads[kmer].add(read_index)

    # Check each read for overlap
    for read_index_a in range(len(reads)):
        read_a = reads[read_index_a]
        kmer_suffix_a = read_a[-kmer_len:]
        possible_reads = kmers_to_reads[kmer_suffix_a]
        for read_index_b in possible_reads:
            if read_index_a == read_index_b:
                continue
            read_b = reads[read_index_b]
            overlap_ab = overlap(read_a, read_b, kmer_len)
            if overlap_ab >= kmer_len:
                overlap_pairs.append((read_a, read_b))
    return overlap_pairs


class OverlapPairsTestCase(unittest.TestCase):

    def test_pairs(self):
        test_reads_1 = ['ABCDEFG', 'EFGHIJ', 'HIJABC']
        pairs_1 = sorted(overlap_all_pairs(test_reads_1, 3))
        expected_pairs_1 = sorted(
            [('ABCDEFG', 'EFGHIJ'),
             ('EFGHIJ', 'HIJABC'),
             ('HIJABC', 'ABCDEFG')]
        )
        for p1, ex_p1 in zip(pairs_1, expected_pairs_1):
            self.assertListEqual(
                list(p1),
                list(ex_p1)
            )

        test_reads_2 = ['CGTACG', 'TACGTA', 'GTACGT', 'ACGTAC', 'GTACGA', 'TACGAT']
        pairs_2 = sorted(overlap_all_pairs(test_reads_2, 4))
        expected_pairs_2 = sorted([
            ('CGTACG', 'TACGTA'),
            ('CGTACG', 'GTACGT'),
            ('CGTACG', 'GTACGA'),
            ('CGTACG', 'TACGAT'),
            ('TACGTA', 'ACGTAC'),
            ('TACGTA', 'CGTACG'),
            ('GTACGT', 'TACGTA'),
            ('GTACGT', 'ACGTAC'),
            ('ACGTAC', 'GTACGA'),
            ('ACGTAC', 'GTACGT'),
            ('ACGTAC', 'CGTACG'),
            ('GTACGA', 'TACGAT')
        ])
        for p2, ex_p2 in zip(pairs_2, expected_pairs_2):
            self.assertListEqual(
                list(p2),
                list(ex_p2)
            )

        pairs_3 = sorted(overlap_all_pairs(test_reads_2, 5))
        expected_pairs_3 = sorted(
            [('CGTACG', 'GTACGT'),
             ('CGTACG', 'GTACGA'),
             ('TACGTA', 'ACGTAC'),
             ('GTACGT', 'TACGTA'),
             ('ACGTAC', 'CGTACG'),
             ('GTACGA', 'TACGAT')]
        )
        for p3, ex_p3 in zip(pairs_3, expected_pairs_3):
            self.assertListEqual(
                list(p3),
                list(ex_p3)
            )



res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

test_pairs (__main__.OverlapPairsTestCase) ... ok
test_no_overlap (__main__.OverlapTestCase) ... ok
test_overlap (__main__.OverlapTestCase) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.020s

OK


In [29]:
overlap_all_pairs(test_reads_2, 5)

[('CGTACG', 'GTACGT'),
 ('CGTACG', 'GTACGA'),
 ('TACGTA', 'ACGTAC'),
 ('GTACGT', 'TACGTA'),
 ('ACGTAC', 'CGTACG'),
 ('GTACGA', 'TACGAT')]

How many edges are in the graph? In other words, how many distinct pairs of reads overlap?

In [40]:
fastq_pairs = overlap_all_pairs(fastq_reads_str, 30)

In [41]:
print(len(fastq_pairs))

904746


The answer is __904746__. Answer verified!

## Q4

Picture/ the overlap graph corresponding to the overlaps computed for the previous question. How many nodes in this graph have at least one outgoing edge?  (In other words, how many reads have a suffix involved in an overlap?)

In [42]:
nodes = set()
for pair in fastq_pairs:
    nodes.add(pair[0])
    # nodes.add(pair[1])
print(len(nodes))

7161


The answer is __7161__.