In [None]:
# Installs
%pip install biopython
%pip install pandas
%pip install numpy
%pip install scikit-learn
%pip install matplotlib
%pip install seaborn

In [None]:
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view
import numpy as np
from Bio import SeqIO, motifs, AlignIO
from Bio.Align import MultipleSeqAlignment
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
import os
import random
import time
from IPython.display import Image
from sklearn.preprocessing import label_binarize
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.neural_network import MLPClassifier


In [None]:
# => Utility functions
# Function to read text files of sequences
def read_seq_file(in_path):
    with open(os.path.abspath(os.path.expanduser(in_path)), "r") as f:
        sequences = f.read().splitlines()
    return sequences

# Function to generate list of false sequences
def generate_false_sequences(true_seq):
    false_seq = []
    for seq in true_seq:
        x = list(seq)
        random.shuffle(x)
        false_seq.append(''.join(x))
    return false_seq

# Function to convert classification prediction into genomic intervals
def generate_genomic_locations(classes, proba, window_size):
    loc_pairs = []
    classes_len = classes.shape[0]
    counter = 0
    while counter < classes_len: 
        if classes[counter] == 1:
            loc_pairs.append([counter + 1, counter + window_size])
            loc_pairs[-1].extend(proba[counter].tolist())
        counter += 1
    return loc_pairs


# Function to convert locations into GFF format
def generate_gff_df(locations, seqid, strand, score_filter=0.001):
    column_names = ["seqid", "source", "type", "start", "end", "score", "strand", "phase", "attribute"]
    strand_letter = "F" if strand == "+" else "R"
    # small function to generate attributes
    attr_func = lambda row: \
        f"id={row['seqid']}_{strand_letter}_prom_{row.name}" \
        f";name={row['seqid']}_{strand_letter}_prom_{row.name}" \
        f";true_proba={row['true_proba']};false_proba={row['false_proba']}"

    gff_df = pd.DataFrame(locations, columns=["start", "end", "true_proba", "false_proba"])
    gff_df = gff_df[gff_df["true_proba"] <= score_filter].copy()
    gff_df["seqid"] = seqid
    gff_df["source"] = "motif_predictor"
    gff_df["type"] = "motif"
    gff_df["score"] = "."
    gff_df["strand"] = strand
    gff_df["phase"] = "."

    for i in gff_df.index:
        gff_df.at[i, "attribute"] = attr_func(gff_df.loc[i])
    gff_df.drop(["true_proba", "false_proba"], inplace=True, axis=1)
    gff_df = gff_df.reindex(columns=column_names)
    return gff_df

In [None]:
# Inputs

promoters_seq_url = "http://regulondb.ccg.unam.mx/menu/download/datasets/files/PromoterSigma24Set.txt"
genome_file = "./GCF_000005845.2_ASM584v2_genomic.fa"
#true_bs_file = "./sigE_binding_sites.txt"
save_dir = "./"
iterations = 500
threads = 40
score_filter = 0.001

In [None]:
# Load promoter sequences and analyze them
prom_col = ["id", "name", "strand", "tss", "sigma_name", "sequence", "evidence", "confidence"]
promotors_df = pd.read_csv(promoters_seq_url, comment="#", sep="\t", names=prom_col)
promotors_df.dropna(subset = ["sequence"], inplace=True)
promotors_df = promotors_df[promotors_df["confidence"].isin(["Strong", "Confirmed"])]
promoters = [prom[: 60].upper() for prom in promotors_df["sequence"]]
window_size = len(promoters[0])


In [None]:
# Load input files
save_dir = os.path.abspath(save_dir)
genome_file_parsed = SeqIO.parse(os.path.abspath(genome_file), "fasta")
#true_sequences = read_seq_file(true_bs_file)
true_sequences = promoters
false_sequences = generate_false_sequences(true_sequences)

In [None]:
# Function to vectorize each sequence, produces 1d flattened vectors
vector_func = lambda seq: label_binarize(list(seq), classes=list("ATCG")).flatten()

In [None]:
# Vectorize training data sets in a parallel mode and make dataframes of it

true_dataset_arr = np.array(list(map(vector_func, true_sequences)), dtype=np.int32)
false_dataset_arr = np.array(list(map(vector_func, false_sequences)), dtype=np.int32)
# Concatenate training datasets
full_arr = np.concatenate([true_dataset_arr, false_dataset_arr], axis=0)
labels = ([1] * true_dataset_arr.shape[0]) + ([0] * false_dataset_arr.shape[0])

In [None]:
# Prepare the data for prediction
f_data_to_predict = {}
r_data_to_predict = {}
for seq_rec in genome_file_parsed:
    print(f"==> Preparing: {seq_rec.id}")
    # Vectorize the genomic sequence forward and reverse
    f_chrom_vector = vector_func(str(seq_rec.seq))
    r_chrom_vector = vector_func(str(seq_rec.reverse_complement().seq))
    # Convert vectorized genome to array of sliding windows
    f_data_to_predict[seq_rec.id] = sliding_window_view(f_chrom_vector, window_size * 4)[::4]
    r_data_to_predict[seq_rec.id] = sliding_window_view(r_chrom_vector, window_size * 4)[::4]
    del f_chrom_vector, r_chrom_vector # free some memory space

In [None]:
models = {"RandomForest": RandomForestClassifier(n_jobs=threads),
          "GradientBoosting": GradientBoostingClassifier(),
          "AdaBoost": AdaBoostClassifier(),
          "MultiLayerPerceptron": MLPClassifier(max_iter=iterations)}
thresholds = {"RandomForest": 0.1,
              "GradientBoosting": 0.05,
              "AdaBoost": 0.43,
              "MultiLayerPerceptron": 0.005}

for model_name, model in models.items():
    t = time.time()
    # Train
    print(f"==> Training {model_name} model")
    model.fit(full_arr, labels)
    save_df = pd.DataFrame()
    for seqid in f_data_to_predict.keys():
        print(f"===> Predicting for: {seqid} using {model_name} model")        
        # Get classification probabilities
        f_proba = model.predict_proba(f_data_to_predict[seqid])
        r_proba = model.predict_proba(r_data_to_predict[seqid])
        # Get classes from propabilities
        f_predict_classes = np.array([model.classes_[i] for i in np.argmax(f_proba, axis=1)])
        r_predict_classes = np.array([model.classes_[i] for i in np.argmax(r_proba, axis=1)])
        # Make genomic coordenates from the classification result
        f_locations = generate_genomic_locations(f_predict_classes, f_proba, window_size)
         # Filp the array up down for reverse strand
        r_locations = generate_genomic_locations(np.flipud(r_predict_classes), np.flipud(r_proba), window_size)
        # Convert to GFF format
        f_gff = generate_gff_df(f_locations, seq_rec.id, "+", thresholds[model_name])
        r_gff = generate_gff_df(r_locations, seq_rec.id, "-", thresholds[model_name])
        save_df = save_df.append(f_gff, ignore_index=True)
        save_df = save_df.append(r_gff, ignore_index=True)
    
    print(f"Time elapsed for {model_name} model: {round((time.time() - t) / 60, 2)} minutes")
    
    print(f"Predicted motifs count: {save_df.shape[0]} at threshold {thresholds[model_name]}")
    save_df.sort_values(by=["seqid", "start", "end"], inplace=True)
    print("Saving GFF")
    save_df.to_csv(os.path.abspath(f"{save_dir}/predicted_promoters_sk_{model_name}.gff"),
                   sep="\t", header=False, index=False)