> Import modules: 

In [None]:
# Ground modules
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from Bio import SeqIO
from itertools import product
import random
from collections import Counter
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

import logging
import subprocess
from multiprocessing.pool import ThreadPool
import joblib
import json

# SCikitlearn modules :
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import classification_report , roc_auc_score

# Scipy modules : 
from scipy.stats import fisher_exact

> Set path: 

In [None]:
path_tropiseq = "/media/concha-eloko/Linux/PPT_clean/benchmark"

path_db = f"{path_tropiseq}/TropiSeq_0.85.db"
path_tmp =  f"{path_tropiseq}/tmp"


> Set up predictor: 

In [None]:
# Unmark if you want to make blastp database : 

# Run makeblast command :
# fasta_file = f"{path_tropiseq}/0.85.out"

# blast_command = f"makeblastdb -in {fasta_file} -dbtype prot -out {path_seqbased}/TropiSeq/TropiSeq_0.7.db"
# make_blast_process = subprocess.Popen(blast_command, shell =True, stdout = subprocess.PIPE, stderr=subprocess.STDOUT)
# mkblast_out, mkblast_err = make_blast_process.communicate()
# print(mkblast_out , mkblast_err)

In [None]:
# 5% probability
final_annotation = pd.read_csv(f"{path_tropiseq}/labeling_depo_clusters.pred.p_05.tsv", sep = "\t", header = 0)
final_annotation.columns = ['index', 'seq', 'domain_seq', 'depo_cluster', 'Tropiseq_KLtypes','Tropiseq_scores']

# Make the dico: 
final_annotation_tropiseq = final_annotation[final_annotation["Tropiseq_KLtypes"] != "None"]
final_annotation_tropiseq = final_annotation_tropiseq.drop_duplicates(subset = ["depo_cluster", "Tropiseq_KLtypes"])
final_annotation_tropiseq = final_annotation_tropiseq[["depo_cluster", "Tropiseq_KLtypes", "Tropiseq_scores"]]

dico_tropiseq_data = {row["depo_cluster"] : {"KL_types" : row["Tropiseq_KLtypes"], "Scores" : row["Tropiseq_scores"]} for _, row in final_annotation_tropiseq.iterrows()}

dico_pred = json.load(open(f"{path_tropiseq}/prediction_based.labeling.p_05.json"))
dico_cluster = json.load(open(f"{path_tropiseq}/dico_cluster.cdhit__0.85.json"))

dico_cluster_r = {ref_dpo : key_dpo for key_dpo,list_dpo in dico_cluster.items() for ref_dpo in list_dpo}
dico_pred_correct_name = {f"Dpo_cdhit_{cluster.split('_')[1]}":hits  for cluster, hits in dico_pred.items()}

> Relevant functions: 

In [None]:
labels_blast=["qseqid", "sseqid", "pident", "length", "mismatch", "gapopen", "qstart", "qend", "sstart", "send", "evalue", "bitscore"]

def tmp_fasta_file(record , path_tmp) :
    name_file = "_".join(record.description.split(" "))
    path_fasta = f"{path_tmp}/{name_file}.fasta"
    length_seq = len(record.seq)
    with open(path_fasta, "w") as outfile :
        outfile.write(f">{record.description}\n{str(record.seq)}")
    return path_fasta , length_seq

def blast_seq(path_fasta, path_DB, path_tmp) :
    file_name = path_fasta.split("/")[-1]
    command = f"blastp -query {path_fasta} -db {path_DB} -out {path_tmp}/{file_name}.blast_out -outfmt 6 -evalue 1e-10"
    blastp_sub = subprocess.Popen(command ,shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
    out , err = blastp_sub.communicate()
    return f"{path_tmp}/{file_name}.blast_out"

def get_best_candidate(path_blast_out, length_seq, bitscore = 75) : 
    winner = 0
    labels_blast=["qseqid", "sseqid", "pident", "length", "mismatch", "gapopen", "qstart", "qend", "sstart", "send", "evalue", "bitscore"]
    blast_df = pd.read_csv(path_blast_out, sep = "\t", names = labels_blast)
    if len(blast_df) > 0 :
        row = blast_df.iloc[0] 
        if row["bitscore"] > bitscore and length_seq/int(row["length"])> 0.8:
            winner = dico_cluster_r[row["sseqid"]]
        else :
            winner = "No hits"
    else :
        winner = "No hits"
    return winner

def get_winner(record , path_tmp) :
    path_func , len_func = tmp_fasta_file(record, path_tmp)
    path_blast_out_func = blast_seq(path_func , path_db, path_tmp)
    winner = get_best_candidate(path_blast_out_func, len_func)
    return winner

> Running test data: 

In [None]:
# Ferriol inferences : 
path_seq = "/media/concha-eloko/Linux/77_strains_phage_project/rbp_work"
dico_seq = {record.description : record.seq for record in SeqIO.parse(f"{path_seq}/77_phages_Dpo_domains.2406.multi.fasta", "fasta") if len(record.seq) >0}
set_records = [record for record in SeqIO.parse(f"{path_seq}/77_phages_Dpo_domains.2406.multi.fasta", "fasta") if len(record.seq) > 0]


ferriol_winners = []
for record in tqdm(set_records) :
    winner = get_winner(record, path_tmp)
    if winner != "No hits" :
        hit = int(winner.split("_")[-1])
        results = dico_pred_correct_name.get(winner, {})
        a = (record.description.split(",")[0] , winner, results)
    else :
        results = "Null"
    a = (record.description.split(",")[0] , winner, results)
    ferriol_winners.append(a)