Defining Suffix Tree

In [1]:
from operator import attrgetter
leafEnd = -1

In [2]:
#The Suffix-tree's node
class Node:

    def __init__(self, leaf):
        # self.__identifier = identifier
        self.children = {}
        # for leaf nodes, it stores the index of suffix for the path  from root to leaf
        self.leaf = leaf
        self.suffixIndex = None
        self.start = None
        self.end = None
        self.suffixLink = None

    def __eq__(self, node):
        atg = attrgetter('start', 'end', 'suffixIndex')
        return atg(self) == atg(node)

    def __ne__(self, node):
        atg = attrgetter('start', 'end', 'suffixIndex')
        return atg(self) != atg(node)

    def __getattribute__(self, name):
        if name == 'end':
            if self.leaf:
                return leafEnd
        return super(Node, self).__getattribute__(name)

In [3]:
 #The Suffix-Tree
class SuffixTree:

    def __init__(self, data):
        #Initiate the tree.
        self._string = data
        self.lastNewNode = None
        self.activeNode = None
        #activeEdge is represeted as input string character index (not the character itself).
        self.activeEdge = -1
        self.activeLength = 0
        # remainingSuffixCount tells how many suffixes yet to be added in tree
        self.remainingSuffixCount = 0
        self.rootEnd = None
        self.splitEnd = None
        self.size = -1  # Length of input string
        self.root = None

    def edge_length(self, node):
        return node.end - node.start + 1

    """
        Walk down from current node.
        activePoint change for walk down (APCFWD) using
        Skip/Count Trick  (Trick 1). If activeLength is greater
        than current edge length, set next  internal node as
        activeNode and adjust activeEdge and activeLength
        accordingly to represent same activePoint.
    """
    def walk_down(self, current_node):
        length = self.edge_length(current_node)
        if (self.activeLength >= length):
            self.activeEdge += length
            self.activeLength -= length
            self.activeNode = current_node
            return True
        return False

    """
        For root node, suffixLink will be set to NULL
        For internal nodes, suffixLink will be set to root
        by default in  current extension and may change in
        next extension.
        suffixIndex will be set to -1 by default and
        actual suffix index will be set later for leaves
        at the end of all phases.
    """
    def new_node(self, start, end=None, leaf=False):
        node = Node(leaf)
        node.suffixLink = self.root
        node.start = start
        node.end = end
        node.suffixIndex = -1
        return node

    def extend_suffix_tree(self, pos):
        global leafEnd
        """
            Extension Rule 1, this takes care of extending all
            leaves created so far in the tree.
        """
        leafEnd = pos
        """
            Increment remainingSuffixCount indicating that a
            new suffix added to the list of suffixes yet to be
            added to the tree.
        """
        self.remainingSuffixCount += 1
        """
            Set lastNewNode to None while starting a new phase,
            indicating there is no internal node waiting for
            it's suffix link reset in the current phase.
        """
        self.lastNewNode = None
        
        # Add all suffixes (yet to be added) one by one in tree
        while(self.remainingSuffixCount > 0):
            if (self.activeLength == 0):
                self.activeEdge = pos
            # If there is no outgoing edge starting with activeEdge from activeNode
            if (self.activeNode.children.get(self._string[self.activeEdge]) is None):
                """
                    Extension Rule 2, A new leaf edge is created starting 
                    from an existng node (the current activeNode), and if there 
                    is any internal node waiting for it's suffix link get reset,
                    point the suffix link from that last internal node to current 
                    activeNode. Then set lastNewNode to None indicating no more 
                    node waiting for suffix link reset.
                """
                self.activeNode.children[self._string[self.activeEdge]] = self.new_node(pos, leaf=True)       
                if (self.lastNewNode is not None):
                    self.lastNewNode.suffixLink = self.activeNode
                    self.lastNewNode = None
            #  If there is an outgoing edge starting with activeEdge from activeNode
            else:
                # Get the next node at the end of edge starting with activeEdge
                _next = self.activeNode.children.get(self._string[self.activeEdge])
                if self.walk_down(_next):  # Do walkdown
                    # Start from _next node (the new activeNode)
                    continue
                """
                    Extension Rule 3 (current character being processed
                    is already on the edge)
                """
                if (self._string[_next.start + self.activeLength] == self._string[pos]):
                    """"
                        If a newly created node waiting for it's 
                        suffix link to be set, then set suffix link
                        of that waiting node to curent. active node
                    """
                    if((self.lastNewNode is not None) and (self.activeNode != self.root)):
                        self.lastNewNode.suffixLink = self.activeNode
                        self.lastNewNode = None
                    self.activeLength += 1
                    """
                        STOP all further processing in this phase
                        and move on to _next phase
                    """
                    break
                """
                    We will be here when activePoint is in middle of
                    the edge being traversed and current character
                    being processed is not  on the edge (we fall off
                    the tree). In this case, we add a new internal node
                    and a new leaf edge going out of that new node. This
                    is Extension Rule 2, where a new leaf edge and a new
                    internal node get created
                """
                self.splitEnd = _next.start + self.activeLength - 1
                # New internal node
                split = self.new_node(_next.start, self.splitEnd)
                self.activeNode.children[self._string[self.activeEdge]] = split
                # New leaf coming out of new internal node
                split.children[self._string[pos]] = self.new_node(pos, leaf=True)
                _next.start += self.activeLength
                split.children[self._string[_next.start]] = _next
                """
                    We got a new internal node here. If there is any
                    internal node created in last extensions of same
                    phase which is still waiting for it's suffix link
                    reset, do it now.
                """
                # If suffixLink of lastNewNode points to current newly created internal node
                if (self.lastNewNode is not None):
                    self.lastNewNode.suffixLink = split
                """
                    Make the current newly created internal node waiting
                    for it's suffix link reset (which is pointing to self.root
                    at present). If we come across any other internal node
                    (existing or newly created) in next extension of same
                    phase, when a new leaf edge gets added (i.e. when
                    Extension Rule 2 applies is any of the next extension
                    of same phase) at that point, suffixLink of this node
                    will point to that internal node.
                """
                self.lastNewNode = split
            # One suffix got added in tree, decrement the count of suffixes yet to be added.
            self.remainingSuffixCount -= 1
            if ((self.activeNode == self.root) and (self.activeLength > 0)):
                self.activeLength -= 1
                self.activeEdge = pos - self.remainingSuffixCount + 1
            elif (self.activeNode != self.root):  
                self.activeNode = self.activeNode.suffixLink

    def walk_dfs(self, current):
        start, end = current.start, current.end
        yield self._string[start: end + 1]
        for node in current.children.values():
            if node:
                yield from self.walk_dfs(node)

    """
        Root is a special node with start and end indices as -1,
        as it has no parent from where an edge comes to root.
    """
    
    def build_suffix_tree(self):
        self.size = len(self._string)
        self.rootEnd = -1
        self.root = self.new_node(-1, self.rootEnd)
        # First activeNode will be root
        self.activeNode = self.root  
        for i in range(self.size):
            self.extend_suffix_tree(i)

    def __str__(self):
        return "\n".join(map(str, self.edges.values()))

    #Prints the Suffix Tree
    def print_dfs(self):
        for sub in self.walk_dfs(self.root):
            print(sub)

In [4]:
class Base:
    def __init__(self, tree):
        self.tree = tree
        self.main_string = tree._string
        self.root = tree.root


class CheckSubString(Base):
    def __init__(self, tree, sub_string, findall=False):
        super(CheckSubString, self).__init__(tree)
        self.sub_string = sub_string
        self.latest_index = 0
        self.findall = findall
        self.continue_flag = False
        self.sub_length = len(sub_string)

    def traverse(self, node, sub_string):
        """
            Since each child starts with a unique character we 
            will pursue the process for the child that sub-string
            Starts with the frist character of this edge.
        """
        if sub_string:
            item = next(((char, child) for char, child in node.children.items() if sub_string.startswith(char)), None)

            # If the edge is equal with sub-string returns the index
            if item:
                char, child = item
                start, end = child.start, child.end
                if self.main_string[start: end + 1].startswith(sub_string):
                    if self.findall:
                        return self.find_all_match(child, len(sub_string))
                    return start - (self.sub_length - len(sub_string))
                """
                    Sub-string starts with the frist character of our edge 
                    but is not equal with it. So call the travese for the rest 
                    of sub-string (from the lenght of previous edge).
                """
                return self.traverse(child, sub_string[end - start + 1:])
            # At this level there were no edge that sub-string starts with its leading character.
            else:
                return -1
        if self.findall:
            return self.find_all_match(node, len(sub_string))
        return node.start - (self.sub_length - len(sub_string))

    def check(self):
        if self.root is None:
            return -1
        if not isinstance(self.sub_string, type(str)):
            return -1
        # Every string starts with an empty string
        if not self.sub_string:
            return 0

        return self.traverse(self.root, self.sub_string)

    def find_all_match(self, node, sub_length):

        def inner(node, traversed_edges):
            for char, child in node.children.items():
                if child.leaf:
                    yield child.start - traversed_edges
                else:
                    start, end = child.start, child.end
                    sub_length = end - start + 1
                    yield from inner(child, traversed_edges + sub_length)

        if node.leaf:
            first = node.start - (self.sub_length - sub_length)
            return [first, *inner(node, self.sub_length)]
        else:
            return list(inner(node, self.sub_length))

Reading the Dataset

In [5]:
file = open (r'genomic.fna','r')
f = file.read()
file.close()
# First 1000 characters from the file
f[:1000]

'>JQ765563.1 Human coronavirus NL63 strain NL63/DEN/2009/9, complete genome\nACTTTGTGTCTACTCCTCTCAACTAAACGAAATTTTTCTAGTGCTGTCATTTGTTATGGCAGTCCTAGTG\nTAATTGAAATTTCGTCAAGTTTGTAAACTGGTTAGGCAAGTGTTGTATTTTCTGTGTTTAAGCACTGGTG\nGTTCTGTCCACTAGTGCACACATTGATACTTAAGTGGTGTTCTGTCACTGCTTATTGTGGAAGCAACGTT\nCTGTCGTTGTGGAAACCAATAACTGCTAACCATGTTTTACAATCAAGTGACACTTGCTGTTGCAAGTGAT\nTCGGAAATTTCAGGTTTTGGTTTTGCCATTCCTTCTGTAGCCGTTCGCGCTTATAGCGAAGCCGCTGCAC\nAAGGTTTTCAGGCATGCCGCTTTGTTGCTTTTGGCTTACAGGATTGTGTAACCGGTATTAATGATGACGA\nTTATGTCATTGCATTGACTGGTACTAATCAGCTTTGTGCCAAAATTTTACTTTTTTCTGATAGACCTCTT\nAATTTGCGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC\nATGGTGCAGGAAGTGTGGTTTTTGTGGATAAGTATATGTGTGGTTTTGATGGTAAACCTGTGTTACCTAA\nAAACATGTGGGAATTTAGAGATTACTTTAATGATAATACTGATAGTATTGTTATTGATGGTGTCACTTAT\nCAATTAGCATGGGATGTTATACGTAAAGACCTTTCTTATGAACAGCAAAATGTTTTAGCTATTGAGAGCA\nTTCATTATCTTGGTACTACAGGTCATACTTTGAAGTCTGGTTGCAAACTCATTAATGCCAAGCCGCCTAA\nATATTCTTCTAAGGTTGTTTTGAGTGGTGAATGGAATGCTGTGTATAAGGCGTTTGGTT

Manipulating the Dataset to obtain a list of the Genomes

In [6]:
l = f.split('\n')
# First 30 elements from the list in which Dataset is stored
l[:30]

['>JQ765563.1 Human coronavirus NL63 strain NL63/DEN/2009/9, complete genome',
 'ACTTTGTGTCTACTCCTCTCAACTAAACGAAATTTTTCTAGTGCTGTCATTTGTTATGGCAGTCCTAGTG',
 'TAATTGAAATTTCGTCAAGTTTGTAAACTGGTTAGGCAAGTGTTGTATTTTCTGTGTTTAAGCACTGGTG',
 'GTTCTGTCCACTAGTGCACACATTGATACTTAAGTGGTGTTCTGTCACTGCTTATTGTGGAAGCAACGTT',
 'CTGTCGTTGTGGAAACCAATAACTGCTAACCATGTTTTACAATCAAGTGACACTTGCTGTTGCAAGTGAT',
 'TCGGAAATTTCAGGTTTTGGTTTTGCCATTCCTTCTGTAGCCGTTCGCGCTTATAGCGAAGCCGCTGCAC',
 'AAGGTTTTCAGGCATGCCGCTTTGTTGCTTTTGGCTTACAGGATTGTGTAACCGGTATTAATGATGACGA',
 'TTATGTCATTGCATTGACTGGTACTAATCAGCTTTGTGCCAAAATTTTACTTTTTTCTGATAGACCTCTT',
 'AATTTGCGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC',
 'ATGGTGCAGGAAGTGTGGTTTTTGTGGATAAGTATATGTGTGGTTTTGATGGTAAACCTGTGTTACCTAA',
 'AAACATGTGGGAATTTAGAGATTACTTTAATGATAATACTGATAGTATTGTTATTGATGGTGTCACTTAT',
 'CAATTAGCATGGGATGTTATACGTAAAGACCTTTCTTATGAACAGCAAAATGTTTTAGCTATTGAGAGCA',
 'TTCATTATCTTGGTACTACAGGTCATACTTTGAAGTCTGGTTGCAAACTCATTAATGCCAAGCCGCCTAA',
 'ATATTCTTCTAAGGTTGTT

In [7]:
#Remove all strings that are not Genomes
genomes = []
d = '$'
for i in l:
    if i != "":
        if i[0] in ['A','T','G','C']:
            i = i + d
            genomes.append(i)
"""
First 30 elements from the list which contains 
the final Dataset used for the Project
"""
genomes[:30]

['ACTTTGTGTCTACTCCTCTCAACTAAACGAAATTTTTCTAGTGCTGTCATTTGTTATGGCAGTCCTAGTG$',
 'TAATTGAAATTTCGTCAAGTTTGTAAACTGGTTAGGCAAGTGTTGTATTTTCTGTGTTTAAGCACTGGTG$',
 'GTTCTGTCCACTAGTGCACACATTGATACTTAAGTGGTGTTCTGTCACTGCTTATTGTGGAAGCAACGTT$',
 'CTGTCGTTGTGGAAACCAATAACTGCTAACCATGTTTTACAATCAAGTGACACTTGCTGTTGCAAGTGAT$',
 'TCGGAAATTTCAGGTTTTGGTTTTGCCATTCCTTCTGTAGCCGTTCGCGCTTATAGCGAAGCCGCTGCAC$',
 'AAGGTTTTCAGGCATGCCGCTTTGTTGCTTTTGGCTTACAGGATTGTGTAACCGGTATTAATGATGACGA$',
 'TTATGTCATTGCATTGACTGGTACTAATCAGCTTTGTGCCAAAATTTTACTTTTTTCTGATAGACCTCTT$',
 'AATTTGCGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$',
 'ATGGTGCAGGAAGTGTGGTTTTTGTGGATAAGTATATGTGTGGTTTTGATGGTAAACCTGTGTTACCTAA$',
 'AAACATGTGGGAATTTAGAGATTACTTTAATGATAATACTGATAGTATTGTTATTGATGGTGTCACTTAT$',
 'CAATTAGCATGGGATGTTATACGTAAAGACCTTTCTTATGAACAGCAAAATGTTTTAGCTATTGAGAGCA$',
 'TTCATTATCTTGGTACTACAGGTCATACTTTGAAGTCTGGTTGCAAACTCATTAATGCCAAGCCGCCTAA$',
 'ATATTCTTCTAAGGTTGTTTTGAGTGGTGAATGGAATGCTGTGTATAAGGCGTTTGGTTCACCATTTATT$',
 'ACAAATGGTA

Implementing Suffix Tree using Dataset

In [10]:
while True:
    pos = int(input("Enter position at which genome is present : "))
    searchString = input("Enter Substring to be searched for in the genome : ")
    str = genomes[pos]
    print("\nSuffix tree for Genome",str,"at postion",pos)
    t = SuffixTree(str)
    t.build_suffix_tree()
    a = CheckSubString(t, searchString, findall=True)
    t.print_dfs()    
    print('Substring',searchString,'can be found at positions :')
    print(a.check())
    ch = int(input("Do you want to continue?\nYes -> 0\nNo -> 1\n"))
    if ch == 1:
        print('The Program has ended.')
        break

Enter position at which genome is present : 7
Enter Substring to be searched for in the genome : GAGG

Suffix tree for Genome AATTTGCGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$ at postion 7

A
T
GTT
GTTTTTGGCC$
CTTCAGGACTTTGATGTTGTTTTTGGCC$
T
ATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
T
TTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
GCGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
A
CAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
TT
ATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
TGCGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
G
CAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
G
ACTTTGATGTTGTTTTTGGCC$
TTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
C
TTTGATGTTGTTTTTGGCC$
AGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
T
G
GC
C$
TCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
CGAGGTTGGCTCATTTTTTCTAACAGCAATTATGTTCTTCAGGACTTTGATGTTGTTTTTGGCC$
TT
GTTTTTGGCC$
CTTCAGGACTTTGATGTTGTTTTTGGCC$
TTTGGCC$
ATGTTGTTTTTGGCC$
T
G
GC
C$
TCATTTTTTCTAACAGCAATTATGT