##### Author: Ceyda Durmaz
##### Date: December 16, 2020

In [1]:
import math
import numpy
import random

def readFNA(filename): 
    ''' Reads in FNA file and outputs 2 strings: first line (as 'name')
        and read (as 'xStr').'''
    # Read in FNA file
    readsFile = open(filename)
    reads, xStr = readsFile.readlines(), ''
    readsFile.close()
    for r in range(0, len(reads)): 
        if r == 0: # Save first line
            name = reads[r]
            name.strip()
        else: # Read each line and append to single string
            rStr = reads[r]
            rStr = rStr.strip()
            xStr = xStr + str(rStr)
    return name, xStr

class HMM(object):
    ''' 
    2-state HMM implementation. 
    Input: Takes in three directories. A and E should have keys with 
    2-character codes. All values should be floats. 
        A: Directory of transition probabilities
        E: Directory of emission probabilities 
        I: Directory of initial probabilities 
    '''
    def __init__(self, A, E, I):
        ''' Initialize A, E, and I. '''
        # Store state labels
        self.Q, self.S = set(), set() # states and symbols
        for a, prob in A.items():
            asrc, adst = a[0], a[1]
            self.Q.add(asrc)
            self.Q.add(adst)
            
        # Store all the symbols
        for e, prob in E.items():
            eq, es = e[0], e[1]
            self.Q.add(eq)
            self.S.add(es)
        self.Q = sorted(list(self.Q))
        self.S = sorted(list(self.S))
        
        # Set maps from state labels / emission symbols to integers
        qmap, smap = {}, {}
        for i in range(len(self.Q)): qmap[self.Q[i]] = i
        for i in range(len(self.S)): smap[self.S[i]] = i
        lenq = len(self.Q)
        
        # Initialize transition probability matrix
        self.A = numpy.zeros(shape=(lenq, lenq), dtype=float)
        for a, prob in A.items():
            asrc, adst = a[0], a[1]
            self.A[qmap[asrc], qmap[adst]] = prob
        self.A /= self.A.sum(axis=1)[:, numpy.newaxis]
        
        # Initialize emission probability matrix
        self.E = numpy.zeros(shape=(lenq, len(self.S)), dtype=float)
        for e, prob in E.items():
            eq, es = e[0], e[1]
            self.E[qmap[eq], smap[es]] = prob
        self.E /= self.E.sum(axis=1)[:, numpy.newaxis]
        
        # Initialize initial probabilities
        self.I = [ 0.0 ] * len(self.Q)
        for a, prob in I.items():
            self.I[qmap[a]] = prob
        self.I = numpy.divide(self.I, sum(self.I))
        
        self.qmap, self.smap = qmap, smap
        
        # Make log-base-2 versions for log-space functions
        self.Alog = numpy.log2(self.A)
        self.Elog = numpy.log2(self.E)
        self.Ilog = numpy.log2(self.I)
    
    def viterbi(self, x):
        ''' Takes string and returns its log2 probability and most 
            likely path. '''
        # Get emission ids
        x = list(map(self.smap.get, x)) 
        
        # Get nrow, ncol dims for matrices
        nrow, ncol = len(self.Q), len(x)
        
        # Create empty arrays for prob mat and backtrack mat
        mat   = numpy.zeros(shape=(nrow, ncol), dtype=float) # prob
        matTb = numpy.zeros(shape=(nrow, ncol), dtype=int)   # backtrace
        
        # Initialize - sum init logp of state i by emiss logp of state i to O at time = 1
        for i in range(0, nrow):
            mat[i, 0] = self.Elog[i, x[0]] + self.Ilog[i]
            
        # Recursion - find max value among res sums
        for j in range(1, ncol): # for time
            for i in range(0, nrow): # for state
                ep = self.Elog[i, x[j]] # emission logp
                mx, mxi = mat[0, j-1] + self.Alog[0, i] + ep, 0
                for i2 in range(1, nrow):
                    pr = mat[i2, j-1] + self.Alog[i2, i] + ep
                    if pr > mx:
                        mx, mxi = pr, i2
                mat[i, j], matTb[i, j] = mx, mxi
                
        # Termination - get max val among all vars at time T
        omx, omxi = mat[0, ncol-1], 0
        for i in range(1, nrow):
            if mat[i, ncol-1] > omx:
                omx, omxi = mat[i, ncol-1], i
                
        # Backtrack
        i, p = omxi, [omxi]
        for j in range(ncol-1, 0, -1):
            i = matTb[i, j]
            p.append(i)
        p = ''.join(map(lambda x: self.Q[x], p[::-1]))
        
        return omx, p # Return log probability and path
    
    def getSegCount(self, path, state1, state2):
        ''' For each provided state, returns the number of contiguous 
            series of states of the same type. '''
        s1, s2, seg, llen = [], [], "", []
        if path[0] == state1: curr = state1
        else: curr = state2
        for i in path: 
            if i == curr: seg = seg + i
            else: 
                if curr == state1:
                    s1.append(seg)
                    curr, seg = state2, ""
                    seg = seg + i
                else: 
                    s2. append(seg)
                    curr, seg = state1, ""
                    seg = seg +i
        if curr == state1: s1. append(seg)
        else: s2.append(seg)
        llen.append(len(s1))
        llen.append(len(s2))
        return llen # Return counts of s1 and s2 segments
    
    def getStateCount(self, path, state):
        ''' Counts number of states in path. '''
        count = 0 
        for i in path: 
            if i == state: 
                count += 1
        return count
    
    def getSegGC(self, x, pi, s2): 
        ''' Returns a list of tuples of GC-rich segments, start index, 
            and end index. '''
        # Initialize lists and counters
        gcr, gcri, i = [], [], 0 
        # Get start and end indices of GC rich segs
        while i < len(pi): 
            if pi[i] == s2: 
                a, piSub, add = i, pi[i:], 0
                for j in piSub: 
                    if j == s2: add += 1
                gcri.append((a, a+add))
                i += add
            else: i += 1
        # Get GC segs and save to list 
        i = 0
        while i < len(gcri): 
            start, end = gcri[i][0], gcri[i][1]
            seg = x[start:end]
            gcr.append((seg, start, end))
            i += 1
        return gcr
    
    def viterbiTrain(self, x):
        ''' 
        Viterbi training estimation: finds improved parameter 
        estimates for transition probabilities, while emission and 
        initiation probabilities are fixed. Iterates 10 times.
        Outputs description of each iteration consisting of: 
            - numbers of states
            - numbers of segments
            - new probability values
        Returns a list of tuples of GC-rich segments.
        '''
        maxProb = [] # Initialize output list
        for i in range(0,10):
            print("\n ----- Iteration " + str(i+1) + " -----")
            # Get training data
            prob, path = self.viterbi(x)
            
            # Update maxProb
            if maxProb: 
                if prob > maxProb[0]: 
                    maxProb[0], maxProb[1] = prob, path
            else: 
                maxProb.append(prob)
                maxProb.append(path)

            # Get nrow, ncol dims for matrices
            nrow, ncol = len(self.Q), len(x)

            # Initialize transition matrix
            At = numpy.zeros(shape=(nrow, nrow), dtype=float)

            # Add pseudocounts to handle 0 counts cases
            for r in range(nrow): 
                for c in range(nrow): 
                    At[r,c] = random.random()

            # Add transition counts to matrix 
            prior, count = '', 0 
            while count < ncol: 
                if count == 0: 
                    init = self.qmap[path[count]]
                    At[init, init] = At[init, init] + 1
                    prior = path[count]
                else:
                    i1 = self.qmap[prior]
                    i2 = self.qmap[path[count]]
                    At[i1, i2] = At[i1,i2] + 1
                    prior = path[count]
                count += 1

            # Get sum of each row (total states)
            row0, row1 = sum(At[0]), sum(At[1])
            s1c = self.getStateCount(path, self.Q[0])
            s2c = self.getStateCount(path, self.Q[1])
            print("\tNumber of States")
            print("\t\tState " + self.Q[0] + ": " + str(s1c))
            print("\t\tState " + self.Q[1] + ": " + str(s2c))
            
            # Get segment counts 
            segs = self.getSegCount(path, self.Q[0], self.Q[1])
            print("\tNumber of Segments")
            print("\t\tState " + self.Q[0] + ": " + str(segs[0]))
            print("\t\tState " + self.Q[1] + ": " + str(segs[1]))

            # Divide each row by total of row
            for r in range(nrow):
                for c in range(nrow): 
                    if r == 0: 
                        At[r,c] = At[r,c] / row0
                    else: 
                        At[r,c] = At[r,c] / row1

            # Update self.A and self.Alog
            self.A = At
            self.Alog = numpy.log2(self.A)
            print("\tNew Probability: " + str(round(prob, 4)))
            
        print("Final Probability: " + str(round(maxProb[0], 4)))
        
        # Make list of GC rich segments (where state=2)
        pi, s2 = maxProb[1], self.Q[1] 
        gcr = self.getSegGC(x, pi, s2)
        return gcr   


In [2]:
# Read in FNA file 
name, xStr = readFNA('iec224_chr2.fna')

# Output first line of FNA file
print(name)

# Setup HMM object
A = {'11':0.999, '12':0.001, '22':0.99, '21':0.01}
E = {'1A':0.291, '1C':0.209, '1G':0.209, '1T':0.291, 
     '2A':0.169, '2C':0.331, '2G':0.331, '2T':0.169}
I = {'1':0.996, '2':0.004}
cpgHmm = HMM(A, E, I)

# Get GC rich regions
print(cpgHmm.viterbiTrain(xStr))


>CP003331.1 Vibrio cholerae IEC224 chromosome II, complete sequence


 ----- Iteration 1 -----
	Number of States
		State 1: 1061417
		State 2: 10719
	Number of Segments
		State 1: 48
		State 2: 47
	New Probability: -2150980.3282

 ----- Iteration 2 -----
	Number of States
		State 1: 1067018
		State 2: 5118
	Number of Segments
		State 1: 13
		State 2: 12
	New Probability: -2149594.2876

 ----- Iteration 3 -----
	Number of States
		State 1: 1068265
		State 2: 3871
	Number of Segments
		State 1: 8
		State 2: 7
	New Probability: -2149554.8804

 ----- Iteration 4 -----
	Number of States
		State 1: 1067787
		State 2: 4349
	Number of Segments
		State 1: 8
		State 2: 7
	New Probability: -2149551.8101

 ----- Iteration 5 -----
	Number of States
		State 1: 1068256
		State 2: 3880
	Number of Segments
		State 1: 7
		State 2: 6
	New Probability: -2149551.5203

 ----- Iteration 6 -----
	Number of States
		State 1: 1068256
		State 2: 3880
	Number of Segments
		State 1: 7
		State 2: 6
	New Probability

In [3]:
A = {'11':0.80, '12':0.20, '22':0.80, '21':0.20}
E = {'1A':0.291, '1C':0.209, '1G':0.209, '1T':0.291, 
     '2A':0.169, '2C':0.331, '2G':0.331, '2T':0.169}
I = {'1':0.996, '2':0.004}
cpgHmmTest = HMM(A, E, I)
x = 'ATATATACGCGCGCGCGCGCGATATATATATATA'
print(cpgHmmTest.viterbiTrain(x))


 ----- Iteration 1 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probability: -72.5789

 ----- Iteration 2 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probability: -69.0648

 ----- Iteration 3 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probability: -68.8977

 ----- Iteration 4 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probability: -69.2522

 ----- Iteration 5 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probability: -68.9403

 ----- Iteration 6 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probability: -68.9152

 ----- Iteration 7 -----
	Number of States
		State 1: 20
		State 2: 14
	Number of Segments
		State 1: 2
		State 2: 1
	New Probabi