**TO DO**

-> Implement function to get consensus sequence

-> Implement viterbi algorithm to score sequences

-> Add scoring function

-> Compare performance to pyHMMER

In [7]:
import pandas as pd
import numpy as np

In [8]:
# Important Global Values
proteinCodes = ["A", "R", "N", "D", "C", "E", "Q", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"]
nucleotides = ["A", "C", "T", "G"]

In [9]:
# This class lets us store an HMM
# It contains HMMState objects that make up the  HMM
# We can generate samples from it and find the most likely sequence
class profileHMM:
  # Initializes the HMM
  def __init__(self, seqType):
    self.seqType = seqType
    if seqType == "AA": self.codes = proteinCodes
    elif seqType == "NA": self.codes = nucleotides

    self.initial_tMat = None

    self.mStates = [] # List of match states

    self.iState = None # The global insertion state
    self.iSizes = None # Distribution of insertion sizes

    self.dState = None
    self.dSizes = None # Distribution of deletion sizes

  def setInitial_tMat(self, tProbs):
    self.initial_tMat = pd.DataFrame({"State": ["Match", "Insert", "Delete"], "Prob": tProbs})

  # Adds match states to the existing HMM
  def add_mState(self, sProbs, tProbs):
    mHMMState = HMMState(sProbs, tProbs, self.codes)
    self.mStates.append(mHMMState)

  # Sets the global insertion state to the HMM
  # We use a global insertion state because it
  # simplifies insertions
  def set_insertionInfo(self, sProbs, tProbs, iSizes):
    iHMMState = HMMState(sProbs, tProbs, self.codes)
    self.iState = iHMMState
    self.iSizes = iSizes

  # Sets the global deletion state info.
  # We use arbitrary state probabilities, because a deletion always
  # removes codes and so we don't need to generate random codes.
  def set_deletionInfo(self, tProbs, dSizes):
    dHMMState = HMMState(np.repeat(0.05, len(self.codes)).tolist(), tProbs, self.codes)
    self.dState = dHMMState
    self.dSizes = dSizes

  # Randomly generates a sequence from the existing HMM
  def sample(self, showDeletes):
    # Sets the deletion character
    deleteChar = ""
    if showDeletes: deleteChar = "-"

    # Tracks the sequence as it iterates
    out_seq = ""

    mStateIDX = 0

    firstState = np.random.choice(a = self.initial_tMat["State"], size = 1, p = self.initial_tMat["Prob"])

    # Checking for an initial Deletion State
    while firstState == "Delete":
      # If we are, determine the size of the deletion
      deletionAmount = np.random.choice(self.dSizes)
      outseq = out_seq + deleteChar
      deletionAmount = deletionAmount - 1
      # If this deletion is longer than 1
      # we need to skip the next mStates too
      while deletionAmount > 0:
        # Adds the deletion char to the sequence
        out_seq = out_seq + deleteChar
        deletionAmount = deletionAmount - 1
        # Skips the next mState
        mStateIDX = mStateIDX + 1
      firstState = self.dState.forward()[1]

    # Checking for an initial Insertion State
    while firstState == "Insert":
      insertionAmount = np.random.choice(self.iSizes)
      while insertionAmount > 0:
        step = self.iState.forward()
        out_seq = out_seq + step[0]
        insertionAmount = insertionAmount - 1
        firstState = step[1]

    # Iterates through the mStates
    # We use a while loop instead of a for loop
    # because we want to be able to control
    # exactly where we are for deletions
    while mStateIDX < len(self.mStates):
      mState = self.mStates[mStateIDX]
      step = mState.forward()

      # Adds the match state value
      out_seq = out_seq + step[0]

      # First checks if the next state is a deletion state
      while step[1] == "Delete":
        # If we are, determine the size of the deletion
        deletionAmount = np.random.choice(self.dSizes)
        out_seq = out_seq + deleteChar
        deletionAmount = deletionAmount - 1
        # If this deletion is longer than 1
        # we need to skip the next mStates too
        while deletionAmount > 0:
          # Adds the deletion char to the sequence
          out_seq = out_seq + deleteChar
          deletionAmount = deletionAmount - 1
          # Skips the next mState
          mStateIDX = mStateIDX + 1
        step = self.dState.forward()

      # Checks if there is an insertion
      while step[1] == "Insert":
        insertionAmount = np.random.choice(self.iSizes)
        while insertionAmount > 0:
          step = self.iState.forward()
          out_seq = out_seq + step[0]
          insertionAmount = insertionAmount - 1

      mStateIDX = mStateIDX + 1
    return out_seq[0]

  # Finds the consensus sequence
  def mostLikely(self):
    ...

  # Implements the viterbi algorithm to
  # calculate a p-score for an observed
  def scoreObs(self, obs):
    ...


In [10]:
# This class represents our HMM States that make up our HMM
class HMMState:
  def __init__(self, sProbs, tProbs, codes):
    # The state matrix, contains the code probabilities
    self.sMat = pd.DataFrame({"Code": codes,
                              "Prob": sProbs})

    # The transition matrix, contains transition probabilites
    self.tMat = pd.DataFrame({"State": ["Match", "Insert", "Delete"], "Prob": tProbs})

  # Does a forward step through this state
  # Randomly selects a code based on eProbs
  # Randomly selects a next state type based on tProbs
  def forward(self):
    code = np.random.choice(a = self.sMat["Code"], size = 1, p = self.sMat["Prob"])
    transition = np.random.choice(a = self.tMat["State"], size = 1, p = self.tMat["Prob"])
    return [code, transition]

In [11]:

# Function that takes a list of probabilities
# and makes slightly adjusts the probabilities
# so that there are no zero values. This is
# to combat overfitting on trained data, and allows
# for more unique codes and considers substition mututations

# We use an epsilon value that gets split
# across absent codes - more likely for NAs
# 0.005 (0.5%) for AAs
# 0.01 (1%) for NAs
# The probability vector is then normalized
def makeNonZero(probs, seqType):
  if seqType == "AA": epsilon = 0.005
  elif seqType == "NA": epsilon = 0.01
  countZero = sum(1 for prob in probs if prob == 0)
  nonZeroProb = epsilon / countZero
  # Update Probs
  newProbs = []
  for prob in probs:
    if prob == 0:
      newProbs.append(nonZeroProb)
    else:
      newProbs.append(prob)

  # Normalizes our values
  return newProbs / np.array(newProbs).sum()


# This function builds our HMM
# SeqDF is the given MSA where each row is a
# sequence, and each column is either an AA or NA

# seqType specifies if the sequence is AAs or NAs
# Takes either "AA" or "NA"

# noZeroes specifies if we assume non-zero probs
# for absent codes - combats overfitting.
def buildHMM(seqDF, seqType, noZeroes = True):

  # The HMM we output
  HMM = profileHMM(seqType)

  # ----- Column Labelling -----

  # First we need to go through each column and determine
  # whether or not it was the result of an insertion.
  # In order to do this we look at the number of gaps "-"
  # in the column.

  # If more than half of the sequences contain a gap
  # we then assume those without had an insertion.

  # Conversely if less than half of the sequences have a gap, we asssume
  # a regular state, and those that have the gap had a deletion event.
  ncol = seqDF.shape[1]
  nseq = seqDF.shape[0]

  colStates = [] # Labels each column as either match or insertion states

  delCount = 0 # Counts the number of deletion events

  for col in range(ncol):
    colDat = seqDF.iloc[:,col]
    ngaps = np.array(colDat == "-").sum()
    if ngaps > (nseq / 2): # More than half gaps.
      colStates.append("I")
    else:
      colStates.append("M")


  # ----- Calculating Global Deletion & Insertion States -----

  # We need to first look at the sizes of each insertion and deletion


  # ----- Deletion Counting -----
  # Since it is more likely that an insertion was of various lengths
  # than to have consecutive insertions we look at insertion length

  deletionSizes = [] # Stores the sizes of each deletion

  # We need to look at each sequence individually
  for seq in range(nseq):
    curSeq = seqDF.iloc[seq,:] # Gets current sequence
    curDelSize = 0 # Tracks the size

    for col in range(ncol):
      if colStates[col] == "M" and curSeq[col] == "-":
        curDelSize = curDelSize + 1

      else:
        if curDelSize != 0:
          deletionSizes.append(curDelSize)
          curDelSize = 0


  numDeletions = len(deletionSizes)
  deletionSites = sum(1 for colState in colStates if colState == "M")
  baseDeletionProb = numDeletions / (deletionSites * nseq)

  # ----- Insertion Counting -----
  # Since it is more likely that an insertion was of various lengths
  # than to have consecutive insertions we look at insertion length

  insertionSizes = [] # Stores the sizes of each insertion

  # We need to look at each sequence individually
  for seq in range(nseq):
    curSeq = seqDF.iloc[seq,:] # Gets the current sequence
    curInsSize = 0 # This tracks the size of the current insertion

    for col in range(ncol):
      # This increments the insertion size if it is following another insertion
      # again this is because it is more likely for a larger insertion than
      # back to back insertions.
      if colStates[col] == 'I' and curSeq[col] != "-":
        curInsSize = curInsSize + 1

      else:
        # If the counter is non-zero and the insertion ends we track
        # the size of the insertion and reset the tracker to 0
        if curInsSize != 0:
          insertionSizes.append(curInsSize)
          curInsSize = 0


  # We calculate the default insertion probability as the
  # number of insertions divided by the number of places
  # an insertion could occur (match states across all seq)
  numInsertions = len(insertionSizes)
  insertionSites = deletionSites # Can re-use since it is the same
  baseInsertionProb = numInsertions / (insertionSites * nseq)

  # Now we need to calculate insertion probabilities
  # for each protein within the insertions
  insertionDF = seqDF.iloc[:, [col for col in range(ncol) if colStates[col] == "I"]]
  insertionValues = insertionDF.values.flatten().tolist()
  insertionValues = [value for value in insertionValues if value != "-"]
  insertion_sProbs = []
  for code in HMM.codes:
    insertion_sProbs.append(sum(1 for value in insertionValues if value == code) / len(insertionValues))

  # Adjusts probability to combat overfitting
  if noZeroes: insertion_sProbs = makeNonZero(insertion_sProbs, seqType)

  # Insertion State Transition Probabilities, use the default
  # Insertion probability because there is no location specific data
  insertion_tProbs = [1-baseInsertionProb-baseDeletionProb, baseInsertionProb, baseDeletionProb]

  # Creating the HMM's Insertion Info
  HMM.set_insertionInfo(insertion_sProbs, insertion_tProbs, insertionSizes)

  deletion_tProbs = insertion_tProbs
  # Adding the deletion info to the HMM
  HMM.set_deletionInfo(deletion_tProbs, deletionSizes)


  # ----- Calculating the initial transition probs -----
  firstColState = colStates[0]
  iProb = baseInsertionProb
  dProb = baseDeletionProb
  if firstColState == "I":
    iProb = (len([value for value in seqDF.iloc[:,0] if value != "-"]) / nseq)
  if firstColState == "M":
    dProb = (len([value for value in seqDF.iloc[:,0] if value == "-"]) / nseq)
  HMM.setInitial_tMat([1-iProb-dProb, iProb, dProb])


  # ----- Calculating the Match States -----
  # Look at each column
  for col in range(ncol):
    colState = colStates[col]
    # Check if the column is a match or insertion column
    # skip the column if its an insertion column. If it is
    # a match column we instead look at the probabilities for each protein
    if colState == "M":
      # Gets the protein codes from the current column
      curCol = seqDF.iloc[:,col]
      sProbs = []
      # Gets the probabilities for each protein (including deletion)
      for code in HMM.codes:
        sProbs.append(sum(1 for value in curCol if value == code) / nseq)

      # Adjusts probability to combat overfitting
      if noZeroes: sProbs = makeNonZero(sProbs, seqType)

      # Checks for non-base insertion/deletion probs.
      # Checks to make sure it isnt the last column before
      # trying to check the next. We want to check the next
      # column to see if it is an insertion state.
      if col < (ncol - 1):
        iProb = baseInsertionProb
        dProb = baseDeletionProb
        colStateNext = colStates[col + 1]
        nextCol = seqDF.iloc[:, col + 1]
        # Finds the proportion of next states that are insertions
        if colStateNext == "I":
          iProb = (len([value for value in nextCol if value != "-"]) / nseq)
        # Finds the proportion of next states that are deletions
        if colStateNext == "M":
          dProb = (len([value for value in nextCol if value == "-"]) / nseq)

        tProbs = [1-iProb-dProb, iProb, dProb]

      # Creates an mState corresponding to the current match column
      # Adds it to the model
      HMM.add_mState(sProbs, tProbs)

  return HMM

In [12]:
# Sample Sequences
kinAseq = "VAVKTLKDELG--LFRIVN--FPNVGDLKPQKLSDFGLAIEAAK"
kinBseq = "VVIRTLKDELG--VFKLVH--YPNVDDLKPQKLSDFGLSVESVK"
kinCseq = "IAVKALKGDLG--LFRVTDG-LHPGVDLKPQKLSDFGFAIEAVR"
kinDseq = "VVIKALKDELG--LYKVVN--HPNVDVLKPQKLSDFGLAIEAIR"
kinEseq = "LALKVLKDELGNVLFRVVNEEYPNVNDLKPQKLSDFGLAIEAAR"
kinFseq = "-VLRNLRDDLG--LYKIVN--HPNVEELRPQKLSDFGIAIDAVK"

# Combining them into an iterable dataframe
seqs = [kinAseq, kinBseq, kinCseq, kinDseq, kinEseq, kinFseq]
seqDF = pd.DataFrame(list(seq) for seq in seqs)

# Creating the HMM
myHMM = buildHMM(seqDF, seqType = "AA", noZeroes=True)

# Sampling it
myHMM.sample(showDeletes = True)

'VVIRNLKDELGLFKLVNYPNVNVLKPQKLSDFGLAIESVK'