# Exact matching: better naïve algorithm

P: word

T: There would have been a time for such a word
-- word.

Boyer-Moore: Bad character rule
Upon mismatch, skip alignments until (a) mismatch becomes
a match, or (b) P moves past mismatched character

Step 1:

Step 2:

P-

Step 3:

T: GCTTOTGCTACCTTTTGCGCGCGCGCGGAA
P: COTTTTGC

T: GCTTCTGCTACCTTTTGCGCGCGCGCGGAA
CCTTTTGC

T: GCTTCTGCTACCTTTTGCGCGCGCGCGGAA
CCTTTTGC

P:

Boyer-Moore: Good suffix rule

Let t =substring matched by inner loop; skip until (a) there
are no mismatches between P and t or (b) P moves past t

-

Step 1:

T: CGTGCCTACTTACTTACTTACTTACGCGAA
P: CTTACTTAC

Step 2:

P:

T: CGTGCCTACTTACTTACTTACTTACGCGAA
CTTACTTAC

Step 3:

T: CGTGCCTACTTACTTACTTACTTACGCGAA
P:

CTTACTTAC

In [19]:
#!/usr/bin/env python

"""bm_preproc.py: Boyer-Moore preprocessing."""

__author__ = "Ben Langmead"

import unittest


def z_array(s):
    """ Use Z algorithm (Gusfield theorem 1.4.1) to preprocess s """
    assert len(s) > 1
    z = [len(s)] + [0] * (len(s)-1)

    # Initial comparison of s[1:] with prefix
    for i in range(1, len(s)):
        if s[i] == s[i-1]:
            z[1] += 1
        else:
            break

    r, l = 0, 0
    if z[1] > 0:
        r, l = z[1], 1

    for k in range(2, len(s)):
        assert z[k] == 0
        if k > r:
            # Case 1
            for i in range(k, len(s)):
                if s[i] == s[i-k]:
                    z[k] += 1
                else:
                    break
            r, l = k + z[k] - 1, k
        else:
            # Case 2
            # Calculate length of beta
            nbeta = r - k + 1
            zkp = z[k - l]
            if nbeta > zkp:
                # Case 2a: zkp wins
                z[k] = zkp
            else:
                # Case 2b: Compare characters just past r
                nmatch = 0
                for i in range(r+1, len(s)):
                    if s[i] == s[i - k]:
                        nmatch += 1
                    else:
                        break
                l, r = k, r + nmatch
                z[k] = r - k + 1
    return z


def n_array(s):
    """ Compile the N array (Gusfield theorem 2.2.2) from the Z array """
    return z_array(s[::-1])[::-1]


def big_l_prime_array(p, n):
    """ Compile L' array (Gusfield theorem 2.2.2) using p and N array.
        L'[i] = largest index j less than n such that N[j] = |P[i:]| """
    lp = [0] * len(p)
    for j in range(len(p)-1):
        i = len(p) - n[j]
        if i < len(p):
            lp[i] = j + 1
    return lp


def big_l_array(p, lp):
    """ Compile L array (Gusfield theorem 2.2.2) using p and L' array.
        L[i] = largest index j less than n such that N[j] >= |P[i:]| """
    l = [0] * len(p)
    l[1] = lp[1]
    for i in range(2, len(p)):
        l[i] = max(l[i-1], lp[i])
    return l


def small_l_prime_array(n):
    """ Compile lp' array (Gusfield theorem 2.2.4) using N array. """
    small_lp = [0] * len(n)
    for i in range(len(n)):
        if n[i] == i+1:  # prefix matching a suffix
            small_lp[len(n)-i-1] = i+1
    for i in range(len(n)-2, -1, -1):  # "smear" them out to the left
        if small_lp[i] == 0:
            small_lp[i] = small_lp[i+1]
    return small_lp


def good_suffix_table(p):
    """ Return tables needed to apply good suffix rule. """
    n = n_array(p)
    lp = big_l_prime_array(p, n)
    return lp, big_l_array(p, lp), small_l_prime_array(n)


def good_suffix_mismatch(i, big_l_prime, small_l_prime):
    """ Given a mismatch at offset i, and given L/L' and l' arrays,
        return amount to shift as determined by good suffix rule. """
    length = len(big_l_prime)
    assert i < length
    if i == length - 1:
        return 0
    i += 1  # i points to leftmost matching position of P
    if big_l_prime[i] > 0:
        return length - big_l_prime[i]
    return length - small_l_prime[i]


def good_suffix_match(small_l_prime):
    """ Given a full match of P to T, return amount to shift as
        determined by good suffix rule. """
    return len(small_l_prime) - small_l_prime[1]


def dense_bad_char_tab(p, amap):
    """ Given pattern string and list with ordered alphabet characters, create
        and return a dense bad character table.  Table is indexed by offset
        then by character. """
    tab = []
    nxt = [0] * len(amap)
    for i in range(0, len(p)):
        c = p[i]
        assert c in amap
        tab.append(nxt[:])
        nxt[amap[c]] = i+1
    return tab


class BoyerMoore(object):
    """ Encapsulates pattern and associated Boyer-Moore preprocessing. """

    def __init__(self, p, alphabet='ACGT'):
        # Create map from alphabet characters to integers
        self.amap = {alphabet[i]: i for i in range(len(alphabet))}
        # Make bad character rule table
        self.bad_char = dense_bad_char_tab(p, self.amap)
        # Create good suffix rule table
        _, self.big_l, self.small_l_prime = good_suffix_table(p)

    def bad_character_rule(self, i, c):
        """ Return # skips given by bad character rule at offset i """
        assert c in self.amap
        assert i < len(self.bad_char)
        ci = self.amap[c]
        return i - (self.bad_char[i][ci]-1)

    def good_suffix_rule(self, i):
        """ Given a mismatch at offset i, return amount to shift
            as determined by (weak) good suffix rule. """
        length = len(self.big_l)
        assert i < length
        if i == length - 1:
            return 0
        i += 1  # i points to leftmost matching position of P
        if self.big_l[i] > 0:
            return length - self.big_l[i]
        return length - self.small_l_prime[i]

    def match_skip(self):
        """ Return amount to shift in case where P matches T """
        return len(self.small_l_prime) - self.small_l_prime[1]


class TestBoyerMoorePreproc(unittest.TestCase):

    def test_z_1(self):
        s = 'abb'
        #    -00
        z = z_array(s)
        self.assertEqual([3, 0, 0], z)

    def test_z_2(self):
        s = 'abababab'
        #    00604020
        z = z_array(s)
        self.assertEqual([8, 0, 6, 0, 4, 0, 2, 0], z)

    def test_z_3(self):
        s = 'abababab'
        #    00604020
        z = z_array(s)
        self.assertEqual([8, 0, 6, 0, 4, 0, 2, 0], z)

    def test_n_1(self):
        s = 'abb'
        #    01-
        n = n_array(s)
        self.assertEqual([0, 1, 3], n)

    def test_n_2(self):
        s = 'abracadabra'
        #    1004010100-
        n = n_array(s)
        self.assertEqual([1, 0, 0, 4, 0, 1, 0, 1, 0, 0, 11], n)

    def test_n_3(self):
        s = 'abababab'
        #    0204060-
        n = n_array(s)
        self.assertEqual([0, 2, 0, 4, 0, 6, 0, 8], n)

    def test_big_l_prime_1(self):
        s = 'abb'
        #    001
        big_l_prime = big_l_prime_array(s, n_array(s))
        self.assertEqual([0, 0, 2], big_l_prime)

    def test_big_l_prime_2(self):
        s = 'abracadabra'
        #    01234567890
        # L' 00000003007
        # L  00000003337
        big_l_prime = big_l_prime_array(s, n_array(s))
        self.assertEqual([0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 8], big_l_prime)

    def test_small_l_prime_1(self):
        s = 'abracadabra'
        # N  1004010100-
        # l'           1
        # l'        4
        # l' 44444444111
        small_l_prime = small_l_prime_array(n_array(s))
        self.assertEqual([11, 4, 4, 4, 4, 4, 4, 4, 1, 1, 1], small_l_prime)

    def test_good_suffix_match_mismatch_1(self):
        p = 'GGTAGGT'
        big_l_prime, big_l, small_l_prime = good_suffix_table(p)
        self.assertEqual([0, 0, 0, 0, 3, 0, 0], big_l_prime)
        self.assertEqual([0, 0, 0, 0, 3, 3, 3], big_l)
        self.assertEqual([7, 3, 3, 3, 3, 0, 0], small_l_prime)
        self.assertEqual(0, good_suffix_mismatch(6, big_l_prime, small_l_prime))
        self.assertEqual(0, good_suffix_mismatch(6, big_l, small_l_prime))
        #  t:      xT
        #  p: GGTAGGT
        # L': -000300
        #  L: -000333
        self.assertEqual(7, good_suffix_mismatch(5, big_l_prime, small_l_prime))
        self.assertEqual(4, good_suffix_mismatch(5, big_l, small_l_prime))
        #  t:     xGT
        #  p: GGTAGGT
        # L': -000300
        #  L: -000333
        self.assertEqual(7, good_suffix_mismatch(4, big_l_prime, small_l_prime))
        self.assertEqual(4, good_suffix_mismatch(4, big_l, small_l_prime))
        #  t:    xGGT
        #  p: GGTAGGT
        # L': -000300
        #  L: -000333
        self.assertEqual(4, good_suffix_mismatch(3, big_l_prime, small_l_prime))
        self.assertEqual(4, good_suffix_mismatch(3, big_l, small_l_prime))
        #  t:   xAGGT
        #  p: GGTAGGT
        # L': -000300
        #  L: -000333
        self.assertEqual(4, good_suffix_mismatch(2, big_l_prime, small_l_prime))
        self.assertEqual(4, good_suffix_mismatch(2, big_l, small_l_prime))
        #  t:  xTAGGT
        #  p: GGTAGGT
        # L': -000300
        #  L: -000333
        self.assertEqual(4, good_suffix_mismatch(1, big_l_prime, small_l_prime))
        self.assertEqual(4, good_suffix_mismatch(1, big_l, small_l_prime))
        #  t: xGTAGGT
        #  p: GGTAGGT
        # L': -000300
        #  L: -000333
        self.assertEqual(4, good_suffix_mismatch(0, big_l_prime, small_l_prime))
        self.assertEqual(4, good_suffix_mismatch(0, big_l, small_l_prime))

    def test_good_suffix_table_1(self):
        s = 'abb'
        #    001
        big_l_prime, big_l, small_l_prime = good_suffix_table(s)
        self.assertEqual([0, 0, 2], big_l_prime)
        self.assertEqual([0, 0, 2], big_l)
        self.assertEqual([3, 0, 0], small_l_prime)

    def test_good_suffix_table_2(self):
        s = 'abracadabra'
        #    01234567890
        # L' 00000003007
        # L  00000003337
        # l' -4444444111
        big_l_prime, big_l, small_l_prime = good_suffix_table(s)
        self.assertEqual([0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 8], big_l_prime)
        self.assertEqual([0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 8], big_l)
        self.assertEqual([11, 4, 4, 4, 4, 4, 4, 4, 1, 1, 1], small_l_prime)

if __name__ == '__main__':
    unittest.main()


E
ERROR: C:\Users\barba\AppData\Roaming\jupyter\runtime\kernel-6abfeabd-387c-4df8-8759-d216d3c88057 (unittest.loader._FailedTest.C:\Users\barba\AppData\Roaming\jupyter\runtime\kernel-6abfeabd-387c-4df8-8759-d216d3c88057)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute 'C:\Users\barba\AppData\Roaming\jupyter\runtime\kernel-6abfeabd-387c-4df8-8759-d216d3c88057'

----------------------------------------------------------------------
Ran 1 test in 0.003s

FAILED (errors=1)


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [20]:
def boyer_moore(p, p_bm, t):
    """ Do Boyer-Moore matching. p=pattern, t=text,
        p_bm=BoyerMoore object for p """
    i = 0
    occurrences = []
    while i < len(t) - len(p) + 1:
        shift = 1
        mismatched = False
        for j in range(len(p)-1, -1, -1):
            if p[j] != t[i+j]:
                skip_bc = p_bm.bad_character_rule(j, t[i+j])
                skip_gs = p_bm.good_suffix_rule(j)
                shift = max(shift, skip_bc, skip_gs)
                mismatched = True
                break
        if not mismatched:
            occurrences.append(i)
            skip_gs = p_bm.match_skip()
            shift = max(shift, skip_gs)
        i += shift
    return occurrences


In [21]:
p = 'TCAA'
p_bm = BoyerMoore(p)
p_bm.bad_character_rule(2, 'T')

2

In [22]:
p = 'ACTA'
p_bm = BoyerMoore(p)
p_bm.good_suffix_rule(0)

3

In [23]:
p = 'ACAC'
p_bm = BoyerMoore(p)
p_bm.match_skip()

2

In [28]:
t = 'GCTACGATCTAGAATCTA'
P = 'TCTA'
p_bm = BoyerMoore(P)

In [32]:
boyer_moore(P, p_bm, t)

[7, 14]

In [33]:
t[14:18]

'TCTA'

In [None]:
T = 'CGTGCGTGCTT'

Index of T
CGTGC: 0,4
GCGTG: 3
GTGCC: 1
GTGCT: 5
TGCCT: 2
TGCTT: 6

5-mer index

In [None]:
def find_bad_cycle(qualities):
    n_cycles = len(qualities[0])
    total_scores = [0] * n_cycles
    
    for qual in qualities:
        for i in range(n_cycles):
            total_scores[i] += phred33ToQ(qual[i])
    
    avg_scores = [total / len(qualities) for total in total_scores]

    # Print average scores to see what's going on
    for i, score in enumerate(avg_scores):
        print(f"Cycle {i}: Average quality = {score:.2f}")
    
    worst_cycle = avg_scores.index(min(avg_scores))
    print(f"\nWorst cycle is {worst_cycle} (lowest average quality).")
    return worst_cycle
    
bad_cycle = find_bad_cycle(quals)
print(f"Bad sequencing cycle: {bad_cycle}")

#### Hash table

In [None]:
t ='GTGCGTGTGGGGG'
table = {'GTG':[0, 4, 6], 'TGC':[1],
'GCG': [2], 'CGT': [3], 'TGT':[5],
'TGG':[7], 'GGG':[8, 9, 10]}

In [None]:
table['GGG']
table['CGT']

In [46]:
import bisect
class Index(object):
    def __init__(self, t, k):
        self.k = k
        self.index = []
        for i in range(len(t) - k + 1):
            self.index.append((t[i:i+k], i))
        self.index.sort()

    def query(self, p):
        kmer = p[:self.k]
        i = bisect.bisect_left(self.index, (kmer, -1))
        hits = []
        while i < len(self.index):
            if self.index[i][0] != kmer:
                break
            hits.append(self.index[i][1])
            i += 1
        return hits

In [47]:
def queryIndex(p, t, index):
    k = index.k
    offsets = []
    for i in index.query(p):
        if p[k:] == t[i+k:i+len(p)]:  # corrected comparison
            offsets.append(i)
    return offsets

In [49]:
t = 'GCTACGATCTAGAATCTA'
p = 'TCTA'

index = Index(t, 2)
print(queryIndex(p, t, index))

[7, 14]


In [50]:
t[14:18]

'TCTA'

In [None]:
def reverseComplement(s):
    complement = {'A':'T', 'T':'A', 'G':'C','C':'G', 'N':'N'}
    t=''
    for base in s:
        t = complement[base] + t
    return t
reverseComplement('GTGTATAGGATT')

In [None]:
def readGenome(filename):
    genome=''
    with open(filename, 'r') as f:
        for line in f:
            if not line[0] == '>':
                genome += line.rstrip()
    return genome

In [None]:
def readFastq(filename):
    sequences =[]
    qualities=[]
    with open(filename) as fh:
        while True:
            fh.readline()                 # Line 1: header → not stored, # Skip the header (@...)
            seq = fh.readline().rstrip()  # Line 2: sequence → store in `sequences`
            fh.readline()                 # Line 3: plus sign → not stored
            qual = fh.readline().rstrip() # Line 4: quality → store in `qualities`, # Skip the plus line (+)
            if len(seq) == 0:
                break
            sequences.append(seq)
            qualities.append(qual)
    return sequences, qualities

In [None]:
def naive(p, t):                         # pattern and text; p = x and t = y
    occurrences = []                     # track all indexes p matches agains t
    for i in range(len(t) - len(p) + 1): # Loop over alignments
        match = True
        for j in range(len(p)):          # Loop over characters of the pattern (1 can match or more)
            if t[i+j] != p[j]:           # compare characters
                match = False            # mismatch; reject alignment
                break
        if match:                        # What would go wrong if you moved if match: inside the loop? 
                                         # This would cause occurrences.append(i) to run after the first character matches, 
                                         # not after all characters match
            occurrences.append(i)        # alL chars matched; record
    return occurrences

# How many possible alignments? y-x+1
# How many number of characters possible comparison? x(y-x+1)
# Least number of characters possible? y-x+1 (first character can be a mismatch)

In [None]:
def naive_2mm(p, t):                         # pattern and text; p = x and t = y
    occurrences = []                     # track all indexes p matches agains t
    for i in range(len(t) - len(p) + 1): # Loop over alignments
        match = True
        mismatches = 0
        for j in range(len(p)):          # Loop over characters of the pattern (1 can match or more)
            if t[i+j] != p[j]: 
                mismatches += 1 
                if mismatches > 2:
                    match = False
                    break
        if match:                  
            occurrences.append(i)        # alL chars matched; record
    return occurrences

In [None]:
def naiveHamming(p, t, maxDistance):                         # pattern and text; p = x and t = y
    occurrences = []                     # track all indexes p matches agains t
    for i in xrange(len(t) - len(p) + 1): # Loop over alignments
        nnm = 0
        match = True
        for j in xrange(len(p)):          # Loop over characters of the pattern (1 can match or more)
            if t[i+j] != p[j]:
                nmm += 1
                if nnm > maxDistance:
                    break
        if nnm <= maxDistance
            occurrences.append(i)        # alL chars matched; record
    return occurrences

# How many possible alignments? y-x+1
# How many number of characters possible comparison? x(y-x+1)
# Least number of characters possible? y-x+1 (first character can be a mismatch)

In [54]:
def approximate_match(p, t, n):
    segment_length = int(round(len(p) / (n + 1)))
    all_matches = set()
    for i in range(n + 1):
        start = i * segment_length
        end = min((i + 1) * segment_length, len(p))
        p_bm = BoyerMoore(p[start:end], alphabet="ACGT")
        matches = boyer_moore(p[start:end], p_bm, t)

        for m in matches:
            if m < start or m - start + len(p) > len(t):
                continue

            mismatches = 0
            for j in range(0, start):
                if not p[j] == t[m - start + j]:
                    mismatches += 1
                    if mismatches > n:
                        break

            for j in range(end, len(p)):
                if not p[j] == t[m - start + j]:
                    mismatches += 1
                    if mismatches > n:
                        break

            if mismatches <= n:
                all_matches.add(m - start)
    return list(all_matches)

In [55]:
p = 'AACTTG'
t = 'CACTTAATTTG'
print(approximate_match(p, t, 2)) # up to 2 mismatches

[0, 5]


In [56]:
print(t[5:])

AATTTG


In [None]:
class Index(object):
    def __init__(self, t, k):
        ''' Create index from all substrings of size 'length' '''
        self.k = k  # k-mer length (k)
        self.index = []
        for i in range(len(t) - k + 1):  # for each k-mer
            self.index.append((t[i:i+k], i))  # add (k-mer, offset) pair
        self.index.sort()  # alphabetize by k-mer
    
    def query(self, p):
        ''' Return index hits for first k-mer of P '''
        kmer = p[:self.k]  # query with first k-mer
        i = bisect.bisect_left(self.index, (kmer, -1))  # binary search
        hits = []
        while i < len(self.index):  # collect matching index entries
            if self.index[i][0] != kmer:
                break
            hits.append(self.index[i][1])
            i += 1
        return hits

In [None]:
def approximate_match_subseq(p, genome, index, n):
    segment_length = int(round(len(p) / (n + 1)))
    total_index_hits = 0
    matches = set()

    for i in range(n + 1):
        start = i * segment_length
        segment = p[start:start + index.span:index.ival]
        hits = index.query(p[start:])
        total_index_hits += len(hits)

        for m in hits:
            if m < start or m - start + len(p) > len(genome):
                continue

            mismatches = 0
            for j in range(0, start):
                if p[j] != genome[m - start + j]:
                    mismatches += 1
                    if mismatches > n:
                        break
            for j in range(start + index.span, len(p)):
                if p[j] != genome[m - start + j]:
                    mismatches += 1
                    if mismatches > n:
                        break

            if mismatches <= n:
                matches.add(m - start)

    return total_index_hits, list(matches)