In [1]:
import gzip
import time
import networkx as nx
import numpy as np


def load_fasta(filename, verbose=0):
    """
    Parses a classically formatted and possibly
    compressed FASTA file into a dictionary where the key
    for a sequence is the first part of its header without
    any white space; if verbose is nonzero then the identifiers
    together with lengths of the read sequences are printed
    """
    if filename.endswith(".gz"):
        fp = gzip.open(filename, 'rt')
    else:
        fp = open(filename, 'r')
    # split at headers
    # data = fp.read().split('>')
    data = fp.read()
    data = data.split('>')
    fp.close()
    # ignore whatever appears before the 1st header
    data.pop(0)
    # prepare the dictionary
    D = {}
    for sequence in data:
        lines = sequence.split('\n')
        header = lines.pop(0).split()
        key = header[0]
        D[key] = ''.join(lines)
        if verbose:
            print("Sequence %s of length %d read" % (key, len(D[key])))
    return D


nucleotides = {'A': 0, 'C': 1, 'G': 2, 'T': 3}

In [2]:
def char_diff(s1, s2):
    """

    :param s1: first string sequence
    :param s2: second string sequence
    :return: number of pairwise different characters between string s1 and string s2
    """
    err = 0
    for c1, c2 in zip(s1, s2):
        if c1 != c2:
            err += 1
    return err


def score(motifs):
    """
    :param motifs: a list of motifs, where each motif is represented as a string
    :return: sum of number of different characters between all possible pairs of motifs
    """
    err = 0
    d = len(motifs)

    for i in range(d):
        for j in range(i + 1, d):
            err += char_diff(motifs[i], motifs[j])

    return err


def connected_components(motifs, mutations):
    """

    :param motifs: a list of motifs, where each motif is represented as a string
    :param mutations: integer of allowed mutations in each motif
    :return: boolean if motifs can form a connected graph
    """

    d = len(motifs)
    adjacency_matrix = np.zeros((d, d))
    for i in range(d):
        for j in range(d):
            adjacency_matrix[i, j] = char_diff(motifs[i], motifs[j]) <= mutations

    G = nx.from_numpy_matrix(adjacency_matrix)
    return nx.number_connected_components(G)

In [3]:
def get_profile(motifs, beta=1):
    """

    :param motifs: a list of motifs, where each motif is represented as a string
    :param beta: laplace correction value
    :return: a profile of size lx4 where l is the length of a motif and 4 represents the number of amino acids
    """
    l = len(motifs[0])
    profile = np.zeros((l, 4))
    profile += beta
    for count in range(l):
        for motif in motifs:
            x = nucleotides[motif[count]]
            profile[count][x] += 1

    return profile / np.linalg.norm(profile, axis=1, ord=1).reshape((l, 1))


def get_background(DNAs, l):
    """
    
    :param DNAs:  list of strings where each string represents a DNA
    :param l: length of motif
    :return: background probabilities profile Q 
    """
    t = len(DNAs)
    Q = [np.zeros((l, 4)) for _ in range(t)]

    for i in range(t):
        for j in range(t):
            if i != j:
                for n in range(len(DNAs[j]) - l + 1):
                    for m in range(l):
                        ch = DNAs[j][n + m]
                        Q[i][m][nucleotides[ch]] += 1
        Q[i] /= np.linalg.norm(Q[i], axis=1, ord=1).reshape((l, 1))
    return Q


def new_motif(motifs, DNA, Q):
    """

    :param motifs: a list of motifs, where each motif is represented as a string
    :param DNA:  string
    :return: newly randomly chosen motif, probability of motif being chose depends on  calculated weights
    """

    l = len(motifs[0])
    profile = get_profile(motifs)

    weights = np.zeros(len(DNA) - l + 1)
    for i in range(len(DNA) - l + 1):
        pr = 1
        pr_q = 1
        for j in range(l):
            x = nucleotides[DNA[i + j]]
            pr *= profile[j][x]
            pr_q *= Q[j][x]
        weights[i] = pr/pr_q

    weights = weights / np.sum(weights)
    i = np.random.choice(list(range(len(weights))), p=weights)

    return DNA[i:i + l]

def gibbs_sampling(DNAs, l, n, mutations):
    """

    :param DNAs: a list of strings where each string represents a DNA
    :param l: length of motif
    :param n: number of iterations of gibbs sampling
    :param mutations:
    :return: best found motifs
    """
    t = len(DNAs)
    optimal = False
    Q = get_background(DNAs, l)

    motifs = []
    for DNA in DNAs:
        i = np.random.randint(len(DNA) - l + 1)
        motifs.append(DNA[i:i + l])

    motifs = list(motifs)
    best_score = score(motifs)
    for _ in range(n):
        h = np.random.randint(t)
        other_motifs = motifs[0:h] + motifs[h + 1:t]
        motifs[h] = new_motif(other_motifs, DNAs[h], Q[h])

        if score(motifs) < best_score:
            motifs = list(motifs)
            best_score = score(motifs)
            if connected_components(motifs, mutations) == 1:
                optimal = True
                break
    return motifs, optimal


def repeated_gibbs_sampling(DNAs, l, n=1000, repeats=20, mutations=4):
    """

    :param DNAs: a list of strings where each string represents a DNA
    :param l: length of motif
    :param n: number of iterations of gibbs sampling
    :param repeats: number of repeated gibbs samplings
    :param mutations:
    :return: best found motifs
    """

    results, optimal = gibbs_sampling(DNAs, l, n, mutations)
    best_score = score(results)
    motifs = results[:]
    for i in range(repeats - 1):
        print(best_score)
        if optimal:
            break
        results, optimal = gibbs_sampling(DNAs, l, n, mutations)

        if score(results) < best_score:
            best_score = score(results)
            motifs = results[:]

        
    return motifs


In [4]:

l = 15
D = load_fasta('MotSam.fasta')
DNAs = []
for d, DNA in D.items():
    DNAs.append(DNA)
start_time=time.time()
results = repeated_gibbs_sampling(DNAs, l, mutations=4)
end_time=time.time()-start_time

print()
print("Obtained motifs:")
for result in results:
    print(result)
print()
print("Time:"+str(end_time))


391
200

Obtained motifs:
GTGACACATTATGTC
GTTAGACAATATGTT
GTGATATGTTATGAC
CTGAAACACTATGTC
CTGACACGTTATGTC
GTGACACATTATGTC
GAGACTCATAATGTC
GGGACACATAATGTC
GTGAGAGACGATGTC
GTGACACATGGGGTC

Time:13.050220489501953


#### 1. What motif of length 15 have you found and using which parameters of your program in the sequences from the file MotSam.fasta?

I obtained the following motifs:  
GTGATATGTTATGAC  
GTGACACATTATGTC  
GTTAGACAATATGTT  
CTGAAACACTATGTC  
CTGACACGTTATGTC  
GTGACACATTATGTC  
GTGAGAGACGATGTC  
GAGACTCATAATGTC  
GTGACACATGGGGTC  
GGGACACATAATGTC  
  
Parameters:  
- repeats of Gibbs sampling : 20  
- number of iterations in Gibbs sampling: 1000  
- Laplace correction beta: 1

Laplace formula:  

\begin{equation}
\theta = \cfrac{f_i+\beta}{l+4\beta},~~ i=\{A,C,G,T\}
\end{equation}  

#### 2. What was the running time of your program and what can be seen during the execution of your program?

Running time is usually under 2 minutes. We can see the number of diffrent characters between "best" motif of each sequence.  


In [5]:
def motif_score(motifs, mutation):
    scores = [0]*len(motifs)
    for i, motif1 in enumerate(motifs):
        for j, motif2 in enumerate(motifs):
            err = char_diff(motif1, motif2)
            if err>mutation:
                scores[i] = float('Inf')
                break
            scores[i] += err
    best_score_motif = min(scores)
    return motifs[scores.index(best_score_motif)],best_score_motif

motif, m_score = motif_score(results, 4)
print(motif)
print(m_score)

GTGACACATTATGTC
25


####  3. What is the score of the found motif, i.e. its distance to all sequences?

Found motif is GTGACACATTATGTC with score 25.

#### 4. How can we improve the performance of Gibbs sampling in order to be more reliable, faster, etc?

We can impove reliability by repeating Gibbs sampling as we did in our implementation. The other option ("phase shifts") would be to shift candidate motifs when we get stuck in local optima with a hope of getting better results. We could also use information about amino acids.

In [6]:
l = 9
D = load_fasta('MotSamShort.fasta')
DNAs = []
for d, DNA in D.items():
    DNAs.append(DNA)
results = repeated_gibbs_sampling(DNAs, l, mutations=4)
print(results)

225
['GTTTTCCCA', 'GTCTTCGCG', 'TGTCTAGCT', 'AGTTACGCT', 'TGTTTCGGT', 'TTATTCACT', 'CTACGCGGA', 'AGTTTAACA', 'CTTCTCCCA', 'GTATTTCCT']


#### 5. What results are produced by your program when searching for motif of length 9 with at most 4 mutations within sequences in the file  MotSamShort.fasta? How does the computation differ from the computation with the first set of sequences?  
We have more motifs that fit the criteria of having at most 4 mutations and because length is only 9 we have more candidates. After repeating Gibbs sampling we don't obtain the same best motifs.