In [1]:
import os
import pandas as pd
from Bio import motifs
from Bio import SeqIO
import re

In [2]:
import numpy as np

# Define the PPM function
def ppm(C):
    return C

# Define the S function, B is a background
def s(C, B):
    return np.log2(ppm(C) / B)

# Define the ICtotal function
def ic_total(C):
    return np.log2(len(C))

# Define the U function
def u(C):
    return -np.sum(ppm(C) * np.log2(ppm(C)), axis=0)

# Define the ICfinal function
def ic_final(C):
    return ic_total(C) - u(C)

# Define the IC function
def ic(C):
    return ppm(C) * ic_final(C)

# Calculate IC values
def calc_ic_df(pfm_df):
    B = 0.25
    ppm_df = pfm_df.apply(ppm, axis=1)
    s_df = ppm_df.apply(lambda x: s(x, B))
    pfm_df
    ic_df = pfm_df.apply(ic, axis=1)
    ic_df = ic_df.transpose()
    ic_t = ic_df.sum(axis=0)
    ic_t = ic_t.round(2)
    return ic_t.to_list()

def trim_motif (motif, ic_t, reverse, threshold = 0.5):

    began = False
    new_motif = ""
    ran = range(0)
    if reverse:
        ran = reversed(range(len(motif)))
    else:
        ran = range(0, len(motif))
    new_ic = []
    for i in ran:
        if ic_t[i] > threshold:
            began = True
            new_motif += motif[i]
            new_ic.append(ic_t[i])
            began = True
        elif ic_t[i] <= threshold and began:
            new_motif += motif[i]
            new_ic.append(ic_t[i])
    
    # trim from back as well
    if reverse:
        return new_motif[::-1], list(reversed(new_ic))

    return trim_motif(new_motif, new_ic, True)

In [3]:
# dict storing rules for combining two IUPAC letters
IUPAC_TO_IUPAC = {
        "A" : {"A":"A", "C":"M", "G":"R", "T":"W", "M":"M", "R":"R", "W":"W", "S":"V", "Y":"H", "K":"D", "V":"V", "H":"H", "D":"D", "B":"N", "N":"N",},
        "C" : {"A":"M", "C":"C", "G":"S", "T":"Y", "M":"M", "R":"V", "W":"H", "S":"S", "Y":"Y", "K":"B", "V":"V", "H":"H", "D":"N", "B":"B", "N":"N",},
        "G" : {"A":"R", "C":"S", "G":"G", "T":"K", "M":"V", "R":"R", "W":"D", "S":"S", "Y":"B", "K":"K", "V":"V", "H":"N", "D":"D", "B":"B", "N":"N",},
        "T" : {"A":"W", "C":"Y", "G":"K", "T":"T", "M":"H", "R":"D", "W":"W", "S":"B", "Y":"Y", "K":"K", "V":"N", "H":"H", "D":"D", "B":"B", "N":"N",},
        "M" : {"A":"M", "C":"M", "G":"V", "T":"H", "M":"M", "R":"V", "W":"H", "S":"V", "Y":"H", "K":"N", "V":"V", "H":"H", "D":"N", "B":"N", "N":"N",},
        "R" : {"A":"R", "C":"V", "G":"R", "T":"D", "M":"V", "R":"R", "W":"D", "S":"V", "Y":"N", "K":"D", "V":"V", "H":"N", "D":"D", "B":"N", "N":"N",},
        "W" : {"A":"W", "C":"H", "G":"D", "T":"W", "M":"H", "R":"D", "W":"W", "S":"N", "Y":"H", "K":"D", "V":"N", "H":"H", "D":"D", "B":"N", "N":"N",},
        "S" : {"A":"V", "C":"S", "G":"S", "T":"B", "M":"V", "R":"V", "W":"N", "S":"S", "Y":"B", "K":"B", "V":"V", "H":"N", "D":"N", "B":"B", "N":"N",},
        "Y" : {"A":"H", "C":"Y", "G":"B", "T":"Y", "M":"H", "R":"N", "W":"H", "S":"B", "Y":"Y", "K":"B", "V":"N", "H":"H", "D":"N", "B":"B", "N":"N",},
        "K" : {"A":"D", "C":"B", "G":"K", "T":"K", "M":"N", "R":"D", "W":"D", "S":"B", "Y":"B", "K":"K", "V":"N", "H":"N", "D":"D", "B":"B", "N":"N",},
        "V" : {"A":"V", "C":"V", "G":"V", "T":"N", "M":"V", "R":"V", "W":"N", "S":"V", "Y":"N", "K":"N", "V":"V", "H":"N", "D":"N", "B":"N", "N":"N",},
        "H" : {"A":"H", "C":"H", "G":"N", "T":"H", "M":"H", "R":"N", "W":"H", "S":"N", "Y":"H", "K":"N", "V":"N", "H":"H", "D":"N", "B":"N", "N":"N",},
        "D" : {"A":"D", "C":"N", "G":"D", "T":"D", "M":"N", "R":"D", "W":"D", "S":"N", "Y":"N", "K":"D", "V":"N", "H":"N", "D":"D", "B":"N", "N":"N",},
        "B" : {"A":"N", "C":"B", "G":"B", "T":"B", "M":"N", "R":"N", "W":"N", "S":"B", "Y":"B", "K":"B", "V":"N", "H":"N", "D":"N", "B":"B", "N":"N",},
        "N" : {"A":"N", "C":"N", "G":"N", "T":"N", "M":"N", "R":"N", "W":"N", "S":"N", "Y":"N", "K":"N", "V":"N", "H":"N", "D":"N", "B":"N", "N":"N",},
    }
def motif_contains(m1, m2):
    """
    Checks if IUPAC motif m2 is identically contained in IUPAC motif m1.
    Returns a tuple (idx, ident), where
    idx is the first index of m2 in m1 if m1 is contained in m2 and None if it is not contained
    ident is True if the substring of m1 at idx of length len(m2) is identical to m2
    
    Examples:
    ATC is identically contained in CATC at index 1
    ATC is non-identically contained in CATS at index 1
    ATN is not contained in CATS
    """
    lm1, lm2 = len(m1), len(m2)
    if lm2 > len(m1):
        return False
    for i in range(lm1 - lm2 + 1):
        identical = True
        for j in range(lm2):
            if m1[i+j] != m2[j]:
                identical = False
            if m2[j] != 'N':
                if m1[i+j] == 'N':
                    # do not match agaist TypeI motif gaps
                    break
                if m1[i+j] != IUPAC_TO_IUPAC[m1[i+j]][m2[j]]:
                    break
        else: # if not break
            return i, identical
    return False

In [None]:
iupac = {'N': 'ATCG', 
         'V' : 'ACG', 'H' : 'ACT', 'D' : 'AGT', 'B' : 'CGT', 
         'M' : 'AC', 'K' : 'GT', 'W' : 'AT', 'S' : 'GC', 'Y' : 'CT', 'R' : 'AG',
         'A' : 'A', 'T' : 'T', 'G' : 'G', 'C' : 'C'}

## postprocessing of PWMs of Xylella Nanopore sequencing data

In [None]:
fasta_files_dict = dict()
for dataname in os.listdir('../../samples'):
    for suffix in ['_peaks_uBH_0.001_peak_dist_2_min_cov_20_min_dist_20_k_3_kmer_quantile_0.25',
                '_peaks_uBH_0.001_peak_dist_2_min_cov_20_min_dist_20',
                '_peaks_uBH_0.001']:
        fasta_files_dict[dataname + suffix] = '../../samples/' + dataname + '/peaks/' + dataname + suffix + '.fasta'

In [None]:
for filename in os.listdir('training_xylella'):
    filepath = 'training_xylella' + filename
    fasta_file = fasta_files_dict[filename]
    csv_file = 'motifs_xylella/' + filename + '.csv'
    pwm_directory = filepath + '/recover_PWM_500'

    # trim pwm based on IC and filter out motifs shorter than 3 nt
    motif_list_trim = list()
    for filename in os.listdir(pwm_directory):
        f = os.path.join(pwm_directory, filename)
        pwm = pd.read_csv(f, delimiter=" ", names=["A","C","G","T"])
        motif = motifs.Motif(alphabet="ACGT", counts=pwm)
        motif_trim = trim_motif(str(motif.degenerate_consensus), calc_ic_df(pwm), False, 1.5)[0]
        if motif_trim and len(motif_trim) > 3:
            motif_list_trim.append(motif_trim)
    motif_list_trim = list(set(motif_list_trim))

    # filter out motifs that are represented by other more unspecific motifs
    motif_list_combined = list()
    for motif1 in motif_list_trim:
        flag = True
        for motif2 in motif_list_trim:
            if motif1 != motif2:
                cont = motif_contains(motif1,motif2)
                if cont and cont[1]:
                    print(motif1,motif2)
                    flag = False
                    break
        if flag:
            motif_list_combined.append(motif1)
    motif_list_combined = list(set(motif_list_combined))

    # count occurrences of motifs
    motif_counts = []
    for motif in motif_list_combined:
        motif_count = 0
        for rec in SeqIO.parse(fasta_file, "fasta"):
            regex = r""
            for cc in motif:
                regex += f"[{iupac[cc]}]"
            motif_count += len(re.findall(re.compile(regex, re.IGNORECASE), str(rec.seq)))
        motif_counts.append(motif_count)

    df = pd.DataFrame({'motif' : motif_list_combined, 'frequency' : motif_counts})
    df.to_csv(f"{csv_file}.csv")


## postprocessing of PWMs of artificial datasets with controlled TPR

In [None]:
fasta_files_dict = dict()
for dataname in os.listdir('../../../artificial_Riv19/'):
    fasta_files_dict[dataname] = '../../../artificial_Riv19/' + dataname + '.fasta'

In [None]:
for filename in os.listdir('training_artificial'):
    filepath = 'training_artificial' + filename
    fasta_file = fasta_files_dict[filename]
    csv_file = 'motifs_artificial/' + filename + '.csv'
    pwm_directory = filepath + '/recover_PWM_500'

    # trim pwm based on IC and filter out motifs shorter than 3 nt
    motif_list_trim = list()
    for filename in os.listdir(pwm_directory):
        f = os.path.join(pwm_directory, filename)
        pwm = pd.read_csv(f, delimiter=" ", names=["A","C","G","T"])
        motif = motifs.Motif(alphabet="ACGT", counts=pwm)
        motif_trim = trim_motif(str(motif.degenerate_consensus), calc_ic_df(pwm), False, 1.5)[0]
        if motif_trim and len(motif_trim) > 3:
            motif_list_trim.append(motif_trim)
    motif_list_trim = list(set(motif_list_trim))

    # filter out motifs that are represented by other more unspecific motifs
    motif_list_combined = list()
    for motif1 in motif_list_trim:
        flag = True
        for motif2 in motif_list_trim:
            if motif1 != motif2:
                cont = motif_contains(motif1,motif2)
                if cont and cont[1]:
                    print(motif1,motif2)
                    flag = False
                    break
        if flag:
            motif_list_combined.append(motif1)
    motif_list_combined = list(set(motif_list_combined))

    # count occurrences of motifs
    motif_counts = []
    for motif in motif_list_combined:
        motif_count = 0
        for rec in SeqIO.parse(fasta_file, "fasta"):
            regex = r""
            for cc in motif:
                regex += f"[{iupac[cc]}]"
            motif_count += len(re.findall(re.compile(regex, re.IGNORECASE), str(rec.seq)))
        motif_counts.append(motif_count)

    df = pd.DataFrame({'motif' : motif_list_combined, 'frequency' : motif_counts})
    df.to_csv(f"{csv_file}.csv")
