In [1]:
__version__ = '0.1.0'

__all__ = [
    'encode',
    'decode',
    'charToTwobit',
    'twobitToChar',
    'motif',
    'motif_counter',
    'motif_count_scanner',
    'motif_space',
    'pos_rank_window',
    'shift_scanner',
    '__version__',
]

import re

def charToTwobit(char: str):
    switcher = {
        'A': 0x0,
        'C': 0x1,
        'G': 0x2,
        'T': 0x3,
        'U': 0x3
    }
    return switcher.get(char, "Invalid Nucleotide")


def twobitToChar(byte: bytes):
    switcher = {
        '00': 'A',
        '01': 'C',
        '10': 'G',
        '11': 'T'
    }
    return switcher.get(byte, "Invalid Value")

# convert into bits and gives direct priority
def encode(motif):
    code = {'A': '00', 'C': '01', 'G': '10', 'T': '11'}
    byteCode = ''.join(map(lambda x: code[x], motif))
    return int(byteCode, 2)


def decode(rank):
    # print("rank = ", rank)
    s = bin(rank)[2:].zfill(20)
    # print('s=',s)
    x=re.findall('..',s)
    # print('re.findall = ', x)
    # print('decode = ', ''.join(map(twobitToChar, x)))
    return ''.join(map(twobitToChar, x))

In [2]:
__all__ = [
    'Features',
    'Motif',
    '__Empty__'
]


class Features:
    """ Features: features of the motif -
        pattern - sequence
        rank - rank/priority/id (must be unique)
        valid - valid motif or not (only for universal frequency)
    """

    __slots__ = ('pattern', 'rank', 'valid')
    pattern: str
    rank: int
    valid: bool

    def __init__(self, pat: str, rank: int, valid: bool):
        self.pattern = pat
        self.rank = rank
        self.valid = valid

    def __repr__(self):
        return 'Pattern=%s, Rank=%s, Valid=%s' % (self.pattern, self.rank, self.valid)
    

class Motif:
    """ Motif: m-length sequences in the reads.
        position - position in read
        feature - feature with (pattern, rank, valid)
    """

    __slots__ = ('position', 'feature')
    position: int
    feature: Features

    def __init__(self, pos: int, feature: Features):
        self.position = pos
        self.feature = feature

    def __repr__(self):
        return 'Position=%s, Feature=(%s)' % (self.position, self.feature)

__Empty__ = Motif(-1, Features('', -1, False))

In [3]:
class ShiftScanner:
    def __init__(self, space):
        assert(space.width <= 15)
        self.space = space

        global width
        width = space.width
        # mask = int('11'*width, 2)

        # self.featuresByPriority = (Features(p, i, True) for p, i in enumerate(space.motifsOfLength()))

    """ Find all matches in the string.
        Returns an array with the matches in order, or Motif.Empty for positions
        where no valid matches were found.
    """

    def allMatches(self, read: str, tolist: bool = False):
        def getMatches(pos, motif: str):
            priority = self.space.priorityOf(motif)
            if(priority != -1):
                return Motif(pos, Features(motif, priority, True))
            return __Empty__

        if tolist:
            return [(read[pos:pos+width], pos) for pos in range(len(read) - (width-1))]

        return (getMatches(pos, read[pos:pos+width]) for pos in range(len(read) - (width-1)))

In [4]:
class MotifSpace:
    """ MotifSpace: create the priority lookup table from which
        priority/rank of each Motif can be accessed easily
    """
    
    __slots__ = ('width', '_maxMotifs', 'scanner', 'byPriority', 'priorityLookUp')
    width: int
    byPriority: list
    scanner: ShiftScanner
    _maxMotifs: int
    priorityLookUp: list

    def __init__(self, byPriority: list):
        self.byPriority = list(byPriority)
        self.width = len(self.byPriority[0]) 
        self.scanner = ShiftScanner(self)
        self._maxMotifs = 4 << (self.width * 2 - 2)
        self.priorityLookUp = [-1]*self._maxMotifs
        
        for pri, motif in enumerate(self.byPriority):
            self.priorityLookUp[self.motifToInt(motif)] = pri

    def motifToInt(self, m: str) -> int:
        return encode(m)
   
    def priorityOf(self, mk):
        return self.priorityLookUp[self.motifToInt(mk)]

    def create(self, pattern, pos):
        return Motif(pos, Features(pattern, self.priorityOf(pattern), True))


class _MotifSpace:
    all1mersDNA = ("A", "C", "G", "T")
    all1mersRNA = ("A", "C", "G", "U")
    
    def motifsOfLength(self, width: int, rna: bool = False) -> iter:
        bases = self.all1mersRNA if rna else self.all1mersDNA

        def generate(prefix, length):
            if (length == 0):
                yield prefix
                return

            for base in bases:
                yield from generate(prefix + base, length-1)

        return generate("", width)

    def fromTemplateWithValidSet(self, template: MotifSpace, validMers: iter) -> MotifSpace:
        validSet = set(validMers)
        return MotifSpace(filter(lambda _: _ in validSet, template.byPriority))



In [5]:
class MotifExtractor:
    __slots__ = ('scanner', 'K', 'width')
    scanner: ShiftScanner
    K: int
    width: int

    """ Extract all the best motifs from the reads
    """
    def __init__(self, space: MotifSpace, k: int):
        self.scanner = space.scanner
        self.K = k
        self.width = space.width

    """ Find all the top (best Motifs within K-length window)
    """
    def slidingTopMotifs(self, read):
        matches = self.scanner.allMatches(read)
        windowMotifs = PosRankWindow()
        
        if (len(read) < self.K):
            return iter()
        else:
            pos = self.width - self.K
            for m in matches:
                windowMotifs.moveWindowAndInsert(pos, m)
                pos += 1
                yield windowMotifs.top()
                
    """ find the regions of the top motifs in reads
    """
    def regionsInRead(self, read):
        topMotifs = self.slidingTopMotifs(read)
        self.drop(topMotifs, self.K - self.width)
        
        lastMotif = next(topMotifs)

        consumed = 1
        startReg = 1
        for motif in topMotifs:
            if lastMotif == motif:
                consumed += 1
            else:
                yield lastMotif, startReg-consumed
                lastMotif = motif
                consumed = 1

            startReg += 1

        # for the last motif
        yield lastMotif, startReg - consumed

    """ Return all the Super-mers
    """
    def splitRead(self, read):
        readByReg = self.regionsInRead(read)

        prev = next(readByReg)
        while readByReg:
            b1 = prev
            b2 = next(readByReg, None)
            
            if b2:
                yield b1[0], read[b1[1]: b2[1] + (self.K - 1)]
                prev = b2
            else:
                yield b1[0], read[b1[1]:]
                break

    @staticmethod
    def drop(itr, n):
        j = 0
        try:
            while (j < n):
                next(itr)
                j += 1
        except StopIteration:
            return


In [6]:
class MotifCounter:
    """ Main frequency counter
    """
    def __init__(self, space: MotifSpace):
        sizeOfCounter = len(space.byPriority)
        self.countArr = [0]*sizeOfCounter

    """ Increment the count of the corresponding Motif
    """
    def increment(self, m: Motif):
        rank = m.feature.rank
        self.countArr[rank] += 1

    def motifsWithCounts(self, space: MotifSpace):
        return zip(space.byPriority, self.countArr)

    def _toSpaceByFrequency(self, counts: list((str, int))):
        # sorting first by frequency and then sorting by lexicographically
        c = map(lambda _: _[0], sorted(counts, key=lambda _: (_[1], _[0])))
        return MotifSpace(c)

    """ Make a new space to with priority based on frequency and lexicographically ordering
    """
    def toSpaceByFrequency(self, oldSpace: MotifSpace):
        pairs = self.motifsWithCounts(oldSpace)
        return self._toSpaceByFrequency(pairs)
        


In [7]:
class MotifCountingScanner:
    """ Looks for the motifs and find the it's occurences
        in the whole dataset
    """
    def __init__(self, space: MotifSpace):
        self.scanner = space.scanner
        self.motifCount = MotifCounter(space)
    
    def scanRead(self, counter: MotifCounter, read):
        for m in self.scanner.allMatches(read):
            if m.feature.valid:
                counter.increment(m)

    def scanGroup(self, counter: MotifCounter, rs):
        for r in rs:
            self.scanRead(counter, r.strip('\n'))


In [8]:
from typing import Any

class PositionNode:
    """ PosiitonNode: to track and find the best Motifs
        lesser the rank higher is the priority
    """
    __slots__ = ['prevPos', 'nextPos']
    prevPos: Any
    nextPos: Any
    
    def __init__(self, p = '_', n = '_'):
        self.prevPos = p
        self.nextPos = n
        
    def removeNode(self):
        temp = self.nextPos
        self.prevPos.nextPos = self.nextPos
        self.nextPos.prevPos = self.prevPos
        
        del self
        return temp
        
    def linkPos(self, before, after):
        before.nextPos = self
        self.prevPos = before
        self.nextPos = after
        after.prevPos = self


class MotifContainer(PositionNode):
    """ MotifContainer: Store the Motif and is the Node in PosRankWindow (doubly linked-list)
        pos - position of the Motif in the read
        motif - Motif 
        rank - rank of the Motif
    """
    __slots__ = ['pos', 'motif', 'rank']
    pos: int
    motif: Motif
    rank: int
    
    def __init__(self, motif: Motif):
        self.pos = motif.position
        self.rank = motif.feature.rank
        self.motif = motif
    
    def dropUntilPosition(self, pos: int):
        if self.pos < pos:
            self = self.removeNode()
            self.dropUntilPosition(pos)
            
    def __repr__(self):
        return '[%s, %s]' % (self.pos, self.motif)

        
class PosRankWindow:
    """ PosRankWindow: A doubly linked-list that makes easier to find the best Motif within a
                         window of k-length
    """
    start: PositionNode = PositionNode()
    end: PositionNode = PositionNode()
    
    def __init__(self):
        # None <= start <=> end => None
        self.start.nextPos = self.end
        self.start.prevPos = None
        self.end.prevPos = self.start
        self.end.nextPos = None
    
    """ Moves the window and insert the new Node to the right and removes from the left 
    """
    def moveWindowAndInsert(self, pos: int, insertRight):
        new_node = MotifContainer(insertRight)
        if new_node.motif.feature.valid:
            self.appendMonotonic(new_node, self.end)
        
        if self.start.nextPos != self.end and self.end.prevPos != self.start:
            self.start.nextPos.dropUntilPosition(pos)
    
    """ Potentially insert the Motifs to the correct location
    """
    def appendMonotonic(self, insertNode, search):
        # if this(start <=> end) is not the case
        if search.prevPos != self.start:
            if insertNode.rank < search.prevPos.rank:
                self.appendMonotonic(insertNode,search.prevPos)
            else:
                insertNode.linkPos(search.prevPos, self.end)
        else:
            insertNode.linkPos(self.start, self.end)
    
    """ Return the best motif in each k-length window
    """
    def top(self):
        return __Empty__ if self.start.nextPos==self.end else self.start.nextPos.motif 
    

    """ For the testing purpose to check what is happening in the window
    """
    def showWindow(self):
        temp = self.start.nextPos
        while temp != self.end:
            print('window-pos=',temp.pos, temp.motif)
            temp = temp.nextPos

In [9]:
class Col:
    """ Y - YELLOW | B - BOLD | G - GREEN | U - UNDERLINE
        V - PURPLE | C - CYAN | W - WHITE | R - RED | GR - GREY
    """
    V = '\033[95m'
    GR = '\u001b[30;1m'
    C = '\033[38;2;0;200;255m'
    G = '\033[92m'
    Y = '\033[93m'
    R = '\033[91m'
    W = '\033[0m'
    B = '\033[1m'
    U = '\033[4m' 

In [44]:
import os

class DiscountCustomError(Exception):
    def __init__(self, msg, arg, req):
        super().__init__(msg, arg, req)
        self.msg = msg
        self.req = req
        self.has = arg
        
    def __str__(self):
        return Col.B + Col.R + self.msg + Col.G + " | " + Col.C\
                             + self.has + Col.V + u" \u2550"u"\u2550> " + Col.C\
                             + self.req + Col.W

def verify(args):
    print(Col.G, 'Verifying the input...', Col.W)
    """ Check for the fasta file format """
    
    if not args.f.endswith('.fasta'):
        raise DiscountCustomError("FileFormatError : required .fasta file", args.f, "XXX.fasta")
    
    """ Check for ordering : if universal frequency minimizer must be entered """
    if args.o == 'ufreq' and not args.minimizers:
        raise DiscountCustomError("UniversalFrequencyOrdering : required -minimizers", args.o, "-minimizers")

    """ Check if the given output dir exist or not """
    if args.output and not os.path.isdir(args.output.split('/')[0]):
        raise DiscountCustomError("No such file or directory:", '', args.output)

class CoreConf:
    __slots__ = ['K','WIDTH','MINIMIZERS','DATASET','ORDER','TEMPLATESPACE', 'OUTPUT']
    K: int
    WIDTH: int
    MINIMIZERS: str
    DATASET: str
    ORDER: str
    TEMPLATESPACE: MotifSpace
    OUTPUT: str
    
    def __init__(self, args):
        verify(args)
        print(Col.Y, 'Verification done successfully')
        print(Col.V, 'Executing...', Col.W)
        
        self.K = args.k
        self.WIDTH = args.m
        self.DATASET = args.f
        self.MINIMIZERS = args.minimizers
        self.ORDER = args.o
        self.OUTPUT = args.output
        self.TEMPLATESPACE = MotifSpace(_MotifSpace().motifsOfLength(width=self.WIDTH))
        
        print(self.K, self.WIDTH, self.DATASET, self.MINIMIZERS, self.ORDER)

In [45]:
class ReadSplitConf(CoreConf):
    def __init__(self, args):
        super().__init__(args)

    def getInputSequences(self) -> iter:
        # degenerateAndUnknown = "[^ACTGU]+"
        with open(self.DATASET) as f:
            for _ in filter(lambda _: not _.startswith('>'), f):
                yield _.strip('\n')
        
    def getFrequencySpace(self, validMotifs) -> MotifSpace:
        inputdata = self.getInputSequences()
        template = _MotifSpace().fromTemplateWithValidSet(self.TEMPLATESPACE, validMotifs)
        counter = MotifCounter(template)
        scanner = MotifCountingScanner(template)
        scanner.scanGroup(counter, inputdata)
        return counter.toSpaceByFrequency(template)
  
    def getSplitter(self) -> MotifExtractor:
        def use():
            with open(self.MINIMIZERS) as f:
                for _ in f:
                    yield _.strip('\n') 

        template = self.TEMPLATESPACE
        validMotifs = use() if self.MINIMIZERS else template.byPriority
        
        useSpace = template if self.ORDER == 'lex' else self.getFrequencySpace(validMotifs)
        
        return MotifExtractor(useSpace, self.K)

def printSup(s, k):
    print(f'{s[0].feature.pattern} (pos {s[0].position}, rank {s[0].feature.rank}, len {len(s[1])-k+1} k-mers)', end=' ')
    before, after = s[1].split(s[0].feature.pattern, 1)
    print(f'{before}{Col.V}{s[0].feature.pattern}{Col.W}{after}')


def readSplitDemo(args):
    print(args)
    print(Col.C, 'Running Discount...')
    
    conf = ReadSplitConf(args)
    
    spl = conf.getSplitter()
    if conf.OUTPUT:
        with open(conf.OUTPUT, 'w+') as f:
            for r in conf.getInputSequences():
                print('Read:', r)
                for s in spl.splitRead(r):
                    f.write('{0}\t{1}\n'.format(s[0].feature.pattern, s[1]))
                    printSup(s, spl.K)
    else:
        for r in conf.getInputSequences():
            print('Read :', r)
            for s in spl.splitRead(r):
                printSup(s, spl.K)

    

In [46]:
import argparse

class CustomArgparser(argparse.ArgumentParser):
    def error(self, message):
        print(Col.R + Col.B + 'ERROR : %s\n' % message)
        print(Col.W + 'For more details run :' + Col.Y + ' discount ' + Col.GR + '-h' + Col.W)
        exit()


def main(args):
    # print(sys.argv)    
    parser = CustomArgparser(prog="discount", description="\n\tDiscountPy : A k-mer counting tool")
    parser.version = Col.G + 'DiscountPy version ' + Col.C + __version__ + Col.W
    
    parser.add_argument("-k", metavar='', type=int, help="Length of the k-mers", required=True)
    parser.add_argument("-m", metavar='', type=int, help="Width of the minimizers (default 10)", default=10)
    parser.add_argument("-f", metavar='', type=str, help="Dataset (.fasta)", required=True)
    parser.add_argument("-o", type=str, choices=["lex", "freq"], default="freq",
                              help="Ordering {lex | lexicographic, freq | frequency} (default freq)")
    parser.add_argument("--minimizers", metavar="", type=str, help="Valid minimizers sets")
    parser.add_argument('--output', metavar='', type=str, help='Generates output of Super-mers with minimizers')
    parser.add_argument("-v", '--version', action='version', help="Version of the tool")

    readSplitDemo(parser.parse_args(args))


In [47]:
file = 'C:/Users/umesh/Desktop/Discount-In-Python/Discount/data/testData.fasta' 
pasha = 'C:/Users/umesh/Desktop/Discount-In-Python/Discount/PASHA/pasha_all_28_10.txt'
out = 'C:/Users/umesh/Desktop/Discount-In-Python/Discount/output' + '/out.txt'
args = ['-k','28', '--minimizers', pasha,'-f', file, '--output', out]

main(args)

Namespace(k=28, m=10, f='C:/Users/umesh/Desktop/Discount-In-Python/Discount/data/testData.fasta', o='freq', minimizers='C:/Users/umesh/Desktop/Discount-In-Python/Discount/PASHA/pasha_all_28_10.txt', output='C:/Users/umesh/Desktop/Discount-In-Python/Discount/output/out.txt')
[38;2;0;200;255m Running Discount...
[92m Verifying the input... [0m
[93m Verification done successfully
[95m Executing... [0m
28 10 C:/Users/umesh/Desktop/Discount-In-Python/Discount/data/testData.fasta C:/Users/umesh/Desktop/Discount-In-Python/Discount/PASHA/pasha_all_28_10.txt freq
Read: TGGCGTCATTTTCCCCAATCATAGACTGTTGAAAGTGAACAAAACTGATGACTATCGTGCCACATTGATTGTTGTCGGTGCCTCGCTTCTGAACGACCCCT
AATCATAGAC (pos 16, rank 167060, len 13 k-mers) TGGCGTCATTTTCCCC[95mAATCATAGAC[0mTGTTGAAAGTGAAC
AAAGTGAACA (pos 31, rank 167040, len 19 k-mers) CCCAATCATAGACTGTTG[95mAAAGTGAACA[0mAAACTGATGACTATCGTG
AACTGATGAC (pos 42, rank 167048, len 11 k-mers) AAGTGAACAA[95mAACTGATGAC[0mTATCGTGCCACATTGATT
TATCGTGCCA (pos 52, rank 16

'output'