In [1]:
import gzip

def loadfasta(filename,verbose=0):
    """ Parses a classically formatted and possibly 
        compressed FASTA file into a dictionary where the key
        for a sequence is the first part of its header without 
        any white space; if verbose is nonzero then the identifiers 
        together with lengths of the read sequences are printed"""
    if (filename.endswith(".gz")):
        fp = gzip.open(filename, 'rt')
    else:
        fp = open(filename, 'r')
    # split at headers
    # data = fp.read().split('>')
    data = fp.read()
    data = data.split('>')
    fp.close()
    # ignore whatever appears before the 1st header
    data.pop(0)     
    # prepare the dictionary
    D = {}
    for sequence in data:
        lines = sequence.split('\n')
        header = lines.pop(0).split()
        key = header[0]
        D[key] = ''.join(lines)
        if verbose:
            print("Sequence %s of length %d read" % (key,len(D[key])))
    return D

seq = loadfasta('MotSam.fasta')
seq.keys()

dict_keys(['Seq0', 'Seq1', 'Seq2', 'Seq3', 'Seq4', 'Seq5', 'Seq6', 'Seq7', 'Seq8', 'Seq9'])

In [2]:
import time
import random
import numpy
from functools import reduce
import operator
import copy
from math import sqrt

random.seed(12345)

indicies = {
    'A': 0,
    'T': 1,
    'C': 2,
    'G': 3
}

def product(iterable):
    return reduce(operator.mul, iterable, 1)

def compute_X(lmers, p, beta=2):
    X = numpy.zeros((4, len(lmers[0]))) # A, T, C, G
    for lmer in lmers:
        for idx in range(len(lmer)):
            X[indicies[lmer[idx]]][idx] += 1
            
    for base in range(4):
        for idx in range(len(lmers[0])):
            X[base][idx] = (X[base][idx]+beta*p[base])/(len(lmers) + beta)
    
    return X

def compute_Q(sequences):
    Q = numpy.zeros((4)) # A, T, C, G
    for seq in sequences:
        for idx in range(len(seq)):
            Q[indicies[seq[idx]]] += 1
    
    n = sum([len(seq) for seq in sequences])
    
    for base in range(4):
            Q[base] /= n
    
    return Q

def weight(lmer, X, Q):
    pX = product([X[indicies[base]][idx] for (idx, base) in enumerate(lmer)])
    pQ = product([Q[indicies[base]] for base in lmer])
    return pX/pQ

def select_random_lmer(options, W):
    r = random.randint(0, W)
    for idx in range(len(options)):
        if options[idx][0] < r:
            return idx
    
    return len(options)-1

def gibbstep(sequences, lmers, p, verbose=False):
    chosenidx = random.randint(0, len(lmers)-1)
    
    chosenlmer = lmers[chosenidx]
    chosenseq = sequences[chosenidx]
    
    otherlmers = lmers[:chosenidx]+lmers[chosenidx+1:]
    othersequences = sequences[:chosenidx]+sequences[chosenidx+1:]
    
    X = compute_X(otherlmers, p)
    Q = compute_Q(othersequences)
    #print(Q)
    #print(X)
    
    l = len(chosenlmer)
    options = []
    W = 0
    for idx in range(len(chosenseq)-l):
        lmer = chosenseq[idx:idx+l]
        w = weight(lmer, X, Q)
        W += int(w*100)
        options.append((W, lmer))
    
    lidx = select_random_lmer(options, W)
    lmers[chosenidx] = options[lidx][1]
    
    return options[lidx][0]

def choose_random_lmer(sequence, l=15):
    idx = random.randint(0,len(sequence)-l)
    return sequence[idx:idx+l]
    
def gibbs(sequences, l=15, verbose=False):
    a = sum([len([i for i in x if i == 'A']) for x in sequences])
    t = sum([len([i for i in x if i == 'T']) for x in sequences])
    g = sum([len([i for i in x if i == 'C']) for x in sequences])
    c = sum([len([i for i in x if i == 'G']) for x in sequences])
    n = a+t+g+c
    
    p = [a/n, t/n, g/n, c/n]
    
    lmers = [choose_random_lmer(seq, l) for seq in sequences]
    if verbose:
        print(lmers)
    
    Mstop = 200 # max(200, l ** 2)
    Mlimit = 10 # max(10, 2 * l)
    M = 0
    W = 0
    bestlmers = copy.deepcopy(lmers)
    while(M < Mstop):
        w = gibbstep(sequences, lmers, p, verbose=verbose)
        if W < w:
            W = w
            M = 0
            bestlmers = copy.deepcopy(lmers)
            if verbose:
                print("  " + str(lmers))
        else:
            M+=1
            if verbose:
                print(lmers)
        
        if M % Mlimit == 0: # every 10th unsuccesful step change one lmer to random one
            lmers = copy.deepcopy(bestlmers) # take the strongest lmers
            idx = random.randint(0, len(sequences) - 1) # pick one lmer
            lmers[idx] = choose_random_lmer(sequences[idx], l) # shift it
            
            
    if verbose:
        print(bestlmers)
        
    return (W, bestlmers)

seqs = list(seq.values())#['ATACGATCGC', 'GCATTCTCGG', 'GATCATTCGA']
l = 15 #4

bestmotifs = []
bestscore = 0

start = time.time()
for i in range(1):
    score, motifs = gibbs(seqs, l=l)
    if score > bestscore:
        bestmotifs = motifs
        bestscore = score        
end = time.time()
motif_search_time = end-start

print("Done in {} seconds.".format(motif_search_time))

Done in 14.330652236938477 seconds.


In [3]:
from math import inf
import copy

def item_with_max_occurences(items):
    d = {}
    print(items)
    for i in items:
        if not i in d:
            d[i] = 0
            
        d[i] += 1
    return max(d, key=lambda x: d[x])

def find_mean_motif_2(motifs):
    lists = [list(x) for x in motifs]
    zipped = zip(*lists)
    resulting_motif = [item_with_max_occurences(x) for x in zipped]
    print(resulting_motif)
    return ''.join(resulting_motif)

def distance(A, B):
    return len(A) - sum([1 if A[i] == B[i] else 0 for (i, _) in enumerate(A)])

def all_mutations(motif):
    res = set([])
    for i in range(len(motif)):
        copy = list(motif)
        copy[i] = 'A'
        res.add(''.join(copy))
        copy[i] = 'T'
        res.add(''.join(copy))
        copy[i] = 'G'
        res.add(''.join(copy))
        copy[i] = 'C'
        res.add(''.join(copy))
        
    return res

def find_mean_motif(motifs, difference=inf, verbose=False):
    if difference == inf:
        return find_mean_motif_2(motifs)
    
    possibilities = set(motifs)
    for i in range(difference):
        new_possibilities = set()
        for motif in possibilities:
            if verbose:
                print(motif)
                print('--->')
                print(all_mutations(motif))
                print()
            new_possibilities |= all_mutations(motif)
        
        possibilities = new_possibilities
        for possibility in possibilities:
            distances = [distance(motif, possibility) for motif in motifs]
            if all([d <= difference for d in distances]):
                return possibility
    
    return None

start = time.time()
mean = find_mean_motif(bestmotifs, difference=4)
end = time.time()
mean_search_time = end - start
print("Done in {} seconds".format(mean_search_time))
print(mean)
if mean != None:
    distances = [distance(motif, mean) for motif in bestmotifs]

Done in 79.27685928344727 seconds
None


In [5]:
distances

NameError: name 'distances' is not defined