In [None]:
from sklearn.model_selection import StratifiedKFold
import os
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import os
import typing
from datetime import datetime

from sklearn.base import ClassifierMixin

from sklearn.preprocessing import MinMaxScaler

from sklearn.model_selection import GridSearchCV, StratifiedKFold,StratifiedShuffleSplit
from sklearn.model_selection._search import BaseSearchCV
from skopt import BayesSearchCV

import numpy as np
import pandas as pd
import pickle
import gzip
import pymrmr
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier, ExtraTreeClassifier
from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier, GradientBoostingClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.ensemble import BaggingClassifier, AdaBoostClassifier
from sklearn.calibration import CalibratedClassifierCV
from skopt.space import Real, Categorical
from skopt import BayesSearchCV
from sklearn.model_selection import cross_val_score, GridSearchCV, RepeatedStratifiedKFold
from sklearn.metrics import roc_curve, confusion_matrix, precision_score, accuracy_score, f1_score, matthews_corrcoef, auc,precision_recall_curve
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
import warnings
warnings.filterwarnings("ignore")


def get_evaluation(label: list, pred: list, pro_cutoff: float = None):
    fpr, tpr, thresholds = roc_curve(label, pred)
    if pro_cutoff is None:
        best_one_optimal_idx = np.argmax(tpr - fpr)
        pro_cutoff = thresholds[best_one_optimal_idx]
    pred_l = [1 if i >= pro_cutoff else 0 for i in pred]
    #后面新增的计算prAUC
    confusion_matrix_1d = confusion_matrix(label, pred_l).ravel()
    confusion_dict = {N: n for N, n in zip(['tn', 'fp', 'fn', 'tp'], list(
        confusion_matrix_1d * 2 / np.sum(confusion_matrix_1d)))}
    
    precision, recall, _ = precision_recall_curve(label, pred)
    pr_auc = auc(recall, precision)
    
    evaluation = {
        "accuracy": accuracy_score(label, pred_l),
        "precision": precision_score(label, pred_l),
        "f1_score": f1_score(label, pred_l),
        "mmc": matthews_corrcoef(label, pred_l),
        "rocAUC": auc(fpr, tpr),
        "prAUC": pr_auc,
        "specificity": confusion_dict['tn'] / (confusion_dict['tn'] + confusion_dict['fp']),
        "sensitivity": confusion_dict['tp'] / (confusion_dict['tp'] + confusion_dict['fn']),
        'pro_cutoff': pro_cutoff
    }
    return evaluation

def plot_roc_curve(target, pred, path_to_: str):
    fpr, tpr, thresholds = roc_curve(target, pred)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(19.2, 10.8))
    plt.plot(fpr, tpr, color='red', lw=2,
             label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='blue', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic (ROC) curve')
    plt.legend(loc="lower right")

    plt.savefig(f"{path_to_}")
    plt.clf()
class MyOptimitzer:
    def __init__(self, classifier_name: str, classifier_class: ClassifierMixin, classifier_param_dict: dict) -> None:
        self.classifier_name = classifier_name
        self.classifier_class = classifier_class
        self.classifier_param_dict = classifier_param_dict

        self.grid_search: BaseSearchCV = None
        self.train_best_predicted_pair = None
        self.train_best_5C_predicted_pair = None
        self.best_predicted_pair = None
        self.best_5C_predicted_pair = None
        self.start_to_train_time = datetime.now()
        self.end_of_train_time = None
        pass

    def find_best(
        self,
        X: np.ndarray,
        y: np.ndarray,
        validation: tuple,
        search_method: typing.Literal["GridSearchCV", "BayesSearchCV"],
        n_jobs: int = 28
    ):

        

        if search_method == "GridSearchCV":
            self.grid_search = GridSearchCV(
                self.classifier_class(),
                param_grid=self.classifier_param_dict,
                cv=StratifiedKFold(
                    n_splits=5,
                    shuffle=True,
                    random_state=42
                ),
                scoring='roc_auc',
                n_jobs=n_jobs,
                refit=True
            )
        elif search_method == "BayesSearchCV":
            self.grid_search = BayesSearchCV(
                self.classifier_class(),
                search_spaces=self.classifier_param_dict,
                cv=StratifiedKFold(
                    n_splits=5,
                    shuffle=True,
                    random_state=42
                ),
                scoring='roc_auc',
                n_jobs=n_jobs,
                n_points=n_jobs,
                n_iter=5,
                refit=True
            )
        else:
            raise ValueError(
                'search_method: typing.Literal["GridSearchCV", "BayesSearchCV"]'
            )
        y_origin = y
        if self.classifier_name == "LabelPropagation":
            y = y.copy()
            y[
                np.random.choice(
                    a=np.arange(X.shape[0]),
                    size=max(int(X.shape[0] * 0.25), 1)
                )
            ] = -1
        full_X = np.concatenate([
            X, validation[0]
        ])
        full_y = np.concatenate([
            y_origin, validation[1]
        ])

        self.grid_search.fit(full_X, full_y)
        self.best_predicted_pair = [
            np.nan_to_num(self.grid_search.predict_proba(
                X=validation[0]
            ), nan=0.0),
            validation[1]
        ]
        self.train_best_predicted_pair = [
            np.nan_to_num(self.grid_search.predict_proba(
                X=X
            ), nan=0.0),
            y
        ]

        # 5倍交叉验证
        
        # 跑模型
        self.best_5C_predicted_pair = []
        self.train_best_5C_predicted_pair = []
        for Kfold_id, (train_id, test_id) in enumerate(
            StratifiedKFold(
                n_splits=5,
                shuffle=True,
                random_state=42
            ).split(full_X, full_y)
        ):
            

            # 定义模型并加载参数
            fiveC_model = self.classifier_class(
                **self.grid_search.best_params_,
            )
            y_to_train = full_y[train_id].copy()
            if self.classifier_name == "LabelPropagation":
                y_to_train[
                    np.random.choice(
                        a=np.arange(y_to_train.shape[0]),
                        size=max(int(y_to_train.shape[0] * 0.25), 1)
                    )
                ] = -1

            
            fiveC_model.fit(
                full_X[train_id],
                y_to_train
            )

            # 预测并记录
            self.best_5C_predicted_pair.append([
                np.nan_to_num(fiveC_model.predict_proba(
                    X=full_X[test_id]
                ), nan=0.0),
                full_y[test_id]
            ])
            self.train_best_5C_predicted_pair.append([
                np.nan_to_num(fiveC_model.predict_proba(
                    X=full_X[train_id]
                ), nan=0.0),
                y_to_train
            ])

        return self

    def get_summary(self, path_to_dir: str = None):
        os.makedirs(path_to_dir, exist_ok=True)
        model_path = "-"
        

        model_path = f"{path_to_dir}/{self.classifier_name}.pkl"
        if path_to_dir is not None:
            with open(model_path, "bw+") as f:
                pickle.dump(
                    self.grid_search, f
                )
            
        training_testing_performance = get_evaluation(
            label=self.best_predicted_pair[1],
            pred=self.best_predicted_pair[0][:, 1],
        )

        # 计算5C中的平均表现
        FiveFold_result = {}
        for keys in training_testing_performance.keys():
            value_list = []
            for item in self.best_5C_predicted_pair:

                item_performance = get_evaluation(
                    label=item[1],
                    pred=item[0][:, 1],
                )
                value_list.append(item_performance[keys])

            if keys == "pro_cutoff":
                FiveFold_result[keys] = value_list
            else:
                FiveFold_result[keys] = sum(value_list) / len(value_list)

        self.end_of_train_time = datetime.now()

        return pd.Series({
                        "Classifier_Name": self.classifier_name,
                        "Optimitied_Param": dict(self.grid_search.best_params_),
                        "Model_Path": model_path
                    } | FiveFold_result
                        )

#读取数据
prot_type = ['T3','T4','T1','T2','T6'][3]
cd_hit = [30,70,50][1]
feature_list = ['18pp','AAC','BPBaac','CTDC','CTDT','CTriad','onehot',
                'PC-PseAAC','ppt25','QSO','SC-PseAAC','CTDD','DPC']
rate = ['1_1'][0]
a = 0
for feature_name in feature_list:
    if feature_name in ['SC-PseAAC', 'PC-PseAAC']:
        neg_df = pd.read_csv(f'/mnt/md0/Public/T3_T4/txseml_addon/out/libfeatureselection/{prot_type}/feature_research_neg/{cd_hit}/{feature_name}/{a}/all_n{prot_type}_{cd_hit}_{rate}.fasta.csv', header=None)
        pos_df = pd.read_csv(f'/mnt/md0/Public/T3_T4/txseml_addon/out/libfeatureselection/{prot_type}/feature_research_pos/{prot_type}_training_{cd_hit}.fasta_{feature_name}.csv', header=None)
    else:
        neg_df = pd.read_csv(f'/mnt/md0/Public/T3_T4/txseml_addon/out/libfeatureselection/{prot_type}/feature_research_neg/{cd_hit}/{feature_name}/{a}/all_n{prot_type}_{cd_hit}_{rate}.fasta.csv')
        pos_df = pd.read_csv(f'/mnt/md0/Public/T3_T4/txseml_addon/out/libfeatureselection/{prot_type}/feature_research_pos/{prot_type}_training_{cd_hit}.fasta_{feature_name}.csv')
    if feature_name == 'BPBaac':
        feature = pd.read_csv(f'/mnt/md0/Public/T3_T4/txseml_addon/out/libfeatureselection/{prot_type}/feature_research_neg/{cd_hit}/{feature_name}/{a}/all_n{prot_type}_{cd_hit}_{rate}.fasta.csv')
    elif feature_name in ['SC-PseAAC', 'PC-PseAAC']:
        pos_df1 = pos_df.iloc[0:,0:]
        neg_df1 = neg_df.iloc[0:,0:]
    else:
        pos_df1 = pos_df.iloc[0:,1:]
        neg_df1 = neg_df.iloc[0:,1:]
    if  feature_name == 'BPBaac':
        feature = feature.iloc[0:,1:]
        num = len(feature)-len(pos_df)
        neg_target = np.zeros((num))
        pos_target = np.ones((len(pos_df)))
        neg_target_series = pd.Series(neg_target)
        pos_target_series = pd.Series(pos_target)
    else:
        feature = pd.concat([pos_df1, neg_df1])
        neg_target = np.zeros((len(neg_df1)))
        pos_target = np.ones((len(pos_df1)))
        neg_target_series = pd.Series(neg_target)
        pos_target_series = pd.Series(pos_target)



    target = pd.concat([pos_target_series, neg_target_series], ignore_index=True)
    print((target == 1).sum())
    if feature_name == 'CTriad':

        feature_ = np.array([eval(row) for row in feature['CTriad']])
        target_ = target.values
    else:
        feature_ = feature.astype("float").values
        target_ = target.values

In [None]:
from Bio.Blast.Applications import NcbiblastpCommandline
from Bio import SeqIO
import os

bac_type = ['T3','T4','T1','T2','T6'][3]
folder_path = f'/mnt/md0/Public/T3_T4/data/new_{bac_type}/val_tofeature'
files = os.listdir(folder_path)
stand_ = ['strict','lossen']
for stand in stand_:
    for fasta_file in files:
        negative_sequences = []
    # 读取阴性蛋白质序列数据
        for seq_record in SeqIO.parse(f"{folder_path}/{fasta_file}", "fasta"):
            negative_sequences.append(seq_record)
            # 执行blast比对
        output_file = f"{stand}_{fasta_file}.xml"
        blastp_cline = NcbiblastpCommandline(cmd='blastp', query=f"/mnt/md0/Public/T3_T4/data/new_T2/pos/T2_training_30.fasta", subject=f"{folder_path}/{fasta_file}", outfmt=5, out=output_file)
        blastp_cline()

            # 解析blast结果
        from Bio.Blast import NCBIXML
        blast_results = NCBIXML.parse(open(output_file))
        filtered_sequences = []  # 存储经筛选后的阴性蛋白质序列

        for result in blast_results:
            for alignment in result.alignments:
                for hsp in alignment.hsps:
                    if stand == 'strict':
                        if hsp.identities / alignment.length >= 0.7 and hsp.align_length / alignment.length >= 0.9:
                            filtered_sequences.append(alignment.title.split()[1])
                    if stand == 'lossen':
                        if hsp.identities / alignment.length >= 0.3 and hsp.align_length / alignment.length >= 0.5:
                            filtered_sequences.append(alignment.title.split()[1])
        
        # 输出筛选后的阴性蛋白质序列
        # with open(f"{fasta_file}_nonT4", "w") as output_handle:
        #     for seq_record in negative_sequences:
        #         if seq_record.id not in filtered_sequences:
        #             SeqIO.write(seq_record, output_handle, "fasta")
        
        with open(f"{stand}_{fasta_file}", "w") as output_handle:
            for seq_record in negative_sequences:
                if seq_record.id in filtered_sequences:
                    SeqIO.write(seq_record, output_handle, "fasta")
        print(0)
    

In [None]:
from Bio import SeqIO
import pandas as pd
import numpy as np
import pickle
from sklearn.metrics import roc_curve, confusion_matrix, precision_score, accuracy_score, f1_score, matthews_corrcoef, recall_score,auc,precision_recall_curve
import os
feature_name_list = ['18pp','AAC','BPBaac','CTDC','CTDT','CTriad',
                'PC-PseAAC','ppt25','QSO','SC-PseAAC']
data = {'feature': '','batch':'','bac_name':'','stand':'', 'model': '', 'rate':'','rocAUC': '', 'prAUC': '', 'MCC': '', 'F1': '', 
        'Precision': '', 'Accuracy': '', 'Sensitivity': '', 'Specificity': '', 'FPR': '', 'Recall': '','pro_cutoff':''}
df = pd.DataFrame(columns=data.keys())

rate_list = ['1_1','1_10','1_30','1_50','1_80','1_100']

bac_name = ['Ralstonia_pseudosolanacearum_GMI1000','Salmonella_LT2',
            'Coxiella_burnetii_RSA_331','new_Pseudomonas_sp.MIS38',
            'new_Burkholderia_mallei_ATCC_23344','new_Pseudomonas_aeruginosa_PAO1',
            'new1_Coxiella_burnetii_RSA_331','new_Burkholderia_pseudomallei_1710b'][7]
stand_ = ['lossen','strict']

bac_type = ['T3','T4','T1','T2','T6'][3]

cd_hit_ = [30,70]
for cd_hit in cd_hit_:
    for stand in stand_:
        fasta_file =f"/mnt/md0/Public/T3_T4/data/new_{bac_type}/val_data/{stand}_{bac_name}.fasta"
        protein_ids = []
        for seq_record in SeqIO.parse(fasta_file, "fasta"):
            protein_id = seq_record.id
            protein_ids.append(protein_id)
        for feature_name in feature_name_list:
            batch = 0
            while batch <5:
                for model_name in ["XGBClassifier", "GaussianNB", "GradientBoostingClassifier",   
                                        "SVC","KNeighborsClassifier", 
                                        "RandomForestClassifier"]:
                    for rate in rate_list:
                        model_save_dir = f"/mnt/md0/Public/T3_T4/model/{bac_type}/{cd_hit}_model/{feature_name}/{rate}/{batch}"

                        val_df = pd.read_csv(f'/mnt/md0/Public/T3_T4/txseml_addon/out/libfeatureselection/{bac_type}/val_data/{bac_name}.fasta_{feature_name}.csv')
                        val_df1 = val_df.iloc[0:, 1:]
                        
                        target_list = val_df['protein_id']
                        target = []
                        for a in range(len(target_list)):
                            if target_list[a] in protein_ids:
                                target.append(1)
                            else:
                                target.append(0)
                        feature = pd.DataFrame(val_df1)
                        if feature_name == 'CTriad':
                            feature_ = np.array([eval(row) for row in feature['CTriad']])
                        else:
                            feature_ = feature.astype("float").values
                            target_ = np.reshape(target, (len(target), 1))