In [1]:
# !pip install pandas
# !pip install watermark
# !pip install seaborn
# !pip install biopython
# !pip install sklearn
import os
import re 
import seaborn as sns
import pandas as pd
import numpy as np 
import watermark
import random 
import math
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
# from IPython.core.interactiveshell import InteractiveShell
# InteractiveShell.ast_node_interactivity = 'all'
%load_ext watermark
%watermark
%watermark --iversion

Last updated: 2021-06-10T20:45:27.638165+08:00

Python implementation: CPython
Python version       : 3.8.5
IPython version      : 7.22.0

Compiler    : MSC v.1916 64 bit (AMD64)
OS          : Windows
Release     : 10
Machine     : AMD64
Processor   : Intel64 Family 6 Model 142 Stepping 11, GenuineIntel
CPU cores   : 8
Architecture: 64bit

seaborn   : 0.11.1
numpy     : 1.20.1
watermark : 2.2.0
pandas    : 1.2.4
matplotlib: 3.3.4
re        : 2.2.1



In [2]:
def mkdir(dirs):
    if not os.path.exists(dirs):
        os.makedirs(dirs)
    else:
        pass
mkdir("SVMoutput")
# mkdir("SVMpredict")
left=20
right=20
signal_num = left+right
bases="ACGT"
lower_bases="acgt"

In [3]:
def loadFile(file_dir):
    '''
    Function: Read  All files in the Training Set Folder and Testing Set Folder
    Parameter：file_dir
    Output: file_path,locus_list
    Attention: do not load non-fasta files!
    '''
    file_path = []
    file_locus_list = []
    all_file = tqdm(os.listdir(file_dir), desc=f'LOADING {file_dir}')
    for file_name in all_file:
        suffix = re.findall("\.(.+$)",file_name)[-1].lower()
        # or  suffix = file_name.split(".")[1].lower()
        if suffix != "txt":
            continue
        path = f"{file_dir}/{file_name}"
        file_path.append(path)
    all_file.close()
    return file_path

train_file_path =loadFile('Training Set')
test_file_path =loadFile('Testing Set')
# print(train_file_path)

LOADING Training Set: 100%|██████████████████████████████████████████████████████| 462/462 [00:00<00:00, 231956.96it/s]
LOADING Testing Set: 100%|███████████████████████████████████████████████████████| 570/570 [00:00<00:00, 190543.82it/s]


In [4]:
def extract_donor_signal(file_path,dataset):
    '''
    Parameter：train_file_path|test_file_path [set_dataset]
    Output:file_donor_positions,file_acceptor_positions,file_donor_signals,donor_signal_all
    '''
    print(f'Extract {dataset} Set donor signals'.center(50, '*'))
    donor_positions= [] #1
    acceptor_positions= [] #1
    donor_signals=[]  #1
    acceptor_signals=[]   #1
    all_donor_signal=[]  
    all_acceptor_signal=[]
    length_list = [] #1
    seq_list = [] #1
    exons = [] #1
    locus =[]
    donor_file = []
    files = tqdm(file_path, desc=f'{dataset} Progressing：')
    for file in files:
        f = open(file)
        #  first line: extract gene locus
        first_line =f.readline() 
        if dataset == "test":
            locus.append(re.search(">(.+)$",first_line).group(1))
        elif dataset=="train":
            locus.append(first_line.split()[1]) 
        #  second line: extract  donor and acceptor site positions
        second_line=f.readline()  
        exon_positions_list = re.findall(r'(\d+)\.\.(\d+)',second_line)
        donor_positions_list = [int(pos_set[1])+1 for pos_set in exon_positions_list[:-1]]
        acceptor_positions_list= [int(pos_set[0])-1 for pos_set in exon_positions_list[1:]]
        exons.append(exon_positions_list)
        donor_positions.append(donor_positions_list)
        acceptor_positions.append(acceptor_positions_list)
        seq = ''
        # extract  seq info
        for line in f.readlines():
            seq += line.strip()
        seq_length = len(seq)
        seq_list.append(seq.lower())
        length_list.append(seq_length)
        # extract  donor site signal
        donor_signal=[]
        for pos in donor_positions_list:
            signal_range = seq[pos-1-left:pos-1+right].lower()
            donor_signal.append(signal_range)
            all_donor_signal.append(signal_range)
            donor_file.append(file)
        donor_signals.append(donor_signal)
        # extract  acceptor site signal
        acceptor_signal=[]
        for pos in acceptor_positions_list:
            signal_range = seq[pos-right:pos+left].lower()
            acceptor_signal.append(signal_range)
            all_acceptor_signal.append(signal_range)
        acceptor_signals.append(acceptor_signal)
    df_set_info = pd.DataFrame({'Path':file_path, 'Locus':locus,"Length":length_list,"Exon Num":[ len(exons) for exons  in exons],\
 "Exon Location":exons,"Donor Site":donor_positions,"Acceptor Site":acceptor_positions,"Donor signals":donor_signals,"Acceptor signals":acceptor_signals})
    df_set_info.to_csv(f'SVMoutput/{dataset.capitalize()}_set_info(non-seq).csv',index=None)
    np.savetxt(f'SVMoutput/{dataset.capitalize()}_seq_list.txt',seq_list,delimiter = ',',fmt='%s')
    np.savetxt(f'SVMoutput/{dataset.capitalize()}_donor_signal.txt',all_donor_signal,delimiter = ',',fmt='%s')
    print(f"Extract Dataset {dataset.capitalize()} info Finished!")
    if dataset=="train":
        return all_donor_signal,all_acceptor_signal,seq_list,donor_positions,acceptor_positions
    elif dataset=="test":
        return all_donor_signal,all_acceptor_signal,seq_list,donor_positions,acceptor_positions,donor_file

train_donor_signal_all_str,train_acceptor_signal_all_str,train_seq_list,\
train_donor_positions,train_acceptor_positions=extract_donor_signal(train_file_path,dataset="train")
test_donor_signal_all_str,test_acceptor_signal_all_str,test_seq_list,\
test_donor_positions,test_acceptor_positions,test_donor_filepath=extract_donor_signal(test_file_path,dataset="test")

train Progressing：:   0%|▏                                                            | 1/462 [00:00<00:46,  9.83it/s]

*********Extract train Set donor signals**********


train Progressing：: 100%|██████████████████████████████████████████████████████████| 462/462 [00:01<00:00, 245.82it/s]
test Progressing：:   6%|███▎                                                        | 32/570 [00:00<00:03, 166.49it/s]

Extract Dataset Train info Finished!
**********Extract test Set donor signals**********


test Progressing：: 100%|███████████████████████████████████████████████████████████| 570/570 [00:01<00:00, 339.32it/s]


Extract Dataset Test info Finished!


In [5]:

import operator
from functools import reduce
test_donor_positions_1d = reduce(operator.add,test_donor_positions)

In [6]:
# save  donor str in Training Setting
# y = np.loadtxt('SVMoutput/donor_signal.txt',delimiter = ',',dtype=str)

In [7]:
# function to convert a DNA sequence string to a numpy array
# converts to lower case, changes any non 'acgt' characters to 'n'
import numpy as np
import re
sub_pattern = re.compile('[^acgt]')
def string_to_array(my_string):
    """
    function to convert a DNA sequence string to a numpy array
    converts to lower case, changes any non 'acgt' characters to 'n'
    like: ['c' 'a' 't' 'g' 'g']
    """
    #my_string = my_string.lower()
    sub_string = sub_pattern.sub('z', my_string)
    return list(sub_string)

def one_hot_encoder(s):
    """
    return onehot code，try 5d?
    """
    code_dict={"a": [1,0,0,0] ,"g":[0,1,0,0],"c":[0,0,1,0],"t":[0,0,0,1],"z":[0,0,0,0]}
    one_hot = []
    for i in s:
        one_hot += code_dict[i] #2d
    return one_hot
def code_all_seq(seqs):
    seqs = tqdm(seqs, desc='code_all_seq:')
    return np.array(list(map(one_hot_encoder, map(string_to_array,seqs))))
# create a label encoder with 'acgtn' alphabet
# from sklearn.preprocessing import LabelEncoder
# label_encoder = LabelEncoder()
# label_encoder.fit(np.array(['a','c','g','t','z']))
# function to encode a DNA sequence string as an ordinal vector
# returns a numpy vector with a=0.25, c=0.50, g=0.75, t=1.00, n=0.00
# def ordinal_encoder(my_array):
#     integer_encoded = label_encoder.transform(my_array)
#     float_encoded = integer_encoded.astype(float)
#     float_encoded[float_encoded == 0] = 0.25 # A
#     float_encoded[float_encoded == 1] = 0.50 # C
#     float_encoded[float_encoded == 2] = 0.75 # G
#     float_encoded[float_encoded == 3] = 1.00 # T
#     float_encoded[float_encoded == 4] = 0.00 # anything else, z
#     return float_encoded
# test_sequence = 'aaccggtn'
# ordinal_encoder(string_to_array(test_sequence))

In [8]:
# 测试编码效果
test_sequence = 'aaccggtn'
np.array(one_hot_encoder(string_to_array(test_sequence)))

array([1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1,
       0, 0, 0, 0, 0, 1, 0, 0, 0, 0])

In [9]:
len(train_donor_signal_all_str) # 2381
len(set(train_donor_signal_all_str)) # 716对，不去重了，毕竟正样本都没多少，你还去重！！！

2380

In [10]:
np.ones([1,10],dtype=np.int8)

array([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]], dtype=int8)

In [11]:
donor = "agtgtaagt"
donor[left:left+2]

''

In [12]:
pattern = re.compile("[^acgt]")
def create_pseudoDonor(file_path,seqs_DNA, donor_locations,dataset,ran_num=0):
    '''
    output :pseudo donor signal containing 'gt' in the right position
    要考虑acceptor附近的序列吗
    '''
    nonDonors = []
    nonDonor_file_path = []
    nonDonor_positions = []
    file_num = tqdm(range(len(donor_locations)), desc='Creating Non Donor Signal Sequence:')
    for i in file_num:
        # 每个文件循环
        file_nonDonors= []
        file_seq_DNA = seqs_DNA[i]
        num = len(donor_locations[i])  
        length = len(file_seq_DNA)
        donor_signals_start=[pos-1-left for pos in donor_locations[i]]
#         acceptor_signals_start=[pos-right for pos in acceptor_locations[i] ]
#         signals_start=sorted(donor_signals_start+acceptor_signals_start)
        for index in range(length-signal_num+1):
            if (file_seq_DNA[index+left:index+left+2] =='gt' ) and (index not in donor_signals_start) :
                nonDonor = file_seq_DNA[index:index + signal_num]
                #                 no_known =pattern.search(nonDonor) # 暂时先把未知的位点去掉
#                 no_known =pattern.search(nonDonor)
#                 if no_known:
#                     continue # 这里之前写成break，有问题，这样遇到非正常碱基对的就直接循环中停止了
                file_nonDonors.append(nonDonor)
                nonDonor_file_path.append(file_path[i])
                nonDonor_positions.append(index+1+left)
        if ran_num:
            nonDonors += random.sample(file_nonDonors,ran_num)
        else:
            nonDonors += file_nonDonors
#         nonDonors.append(random.sample(file_nonDonors,ran_num))
    print('Created Non Donor Signal Sequence successful!')
    if dataset == "train":
         return nonDonors
    elif dataset == "test":
        return nonDonors,nonDonor_file_path,nonDonor_positions

# 训练集
train_nonDonor_list=create_pseudoDonor(train_file_path,train_seq_list, train_donor_positions,dataset = "train")
# print(test_nonDonor_list[1:10])
train_nonDonor_len= len(train_nonDonor_list) 
# print(train_nonDonor_len) # #5518899 - > 283780 -> 283607 

Creating Non Donor Signal Sequence:: 100%|██████████████████████████████████████████| 462/462 [00:02<00:00, 177.48it/s]

Created Non Donor Signal Sequence successful!





In [13]:
# train_nonDonor_uniq = set(train_nonDonor_list)
# train_nonDonor_uniq_len=len(train_nonDonor_uniq) 
# print(train_nonDonor_uniq_len)
# Training Dataset extract nonDonor signal
train_nonDonor_list=create_pseudoDonor(train_file_path,train_seq_list, train_donor_positions,dataset = "train")
train_nonDonor_len= len(train_nonDonor_list) 
train_donor_features = code_all_seq(train_donor_signal_all_str)
train_labels=[1]*len(train_donor_signal_all_str)

train_nonDonor_ran_len =int(2079)
random.seed(1)
train_nonDonor_list =random.sample(train_nonDonor_list,train_nonDonor_ran_len)

# Testing Dataset donor code
train_nonDonor_features = code_all_seq(train_nonDonor_list)
train_labels += [0]*train_nonDonor_ran_len
train_labels = np.array(train_labels) # lenth:286161
train_features =np.vstack([train_donor_features,train_nonDonor_features])

Creating Non Donor Signal Sequence:: 100%|██████████████████████████████████████████| 462/462 [00:02<00:00, 202.55it/s]
code_all_seq:: 100%|████████████████████████████████████████████████████████████| 2381/2381 [00:00<00:00, 91826.09it/s]
code_all_seq:: 100%|████████████████████████████████████████████████████████████| 2079/2079 [00:00<00:00, 99255.11it/s]


Created Non Donor Signal Sequence successful!


In [14]:

# Testing Dataset extract nonDonor signal

test_nonDonor_list,test_nonDonor_filepath,test_nonDonor_positions=create_pseudoDonor(test_file_path,test_seq_list,\
                                                                                      test_donor_positions,dataset = "test")
test_nonDonor_len= len(test_nonDonor_list) 
print(len(test_nonDonor_list)) # 149165

# Testing Dataset donor code
test_donor_features = code_all_seq(test_donor_signal_all_str)
test_labels=[1]*len(test_donor_signal_all_str)

test_nonDonor_features = code_all_seq(test_nonDonor_list)
test_labels += [0]*test_nonDonor_len
test_labels = np.array(test_labels)
test_features =np.vstack([test_donor_features,test_nonDonor_features])
print(test_features.shape) # 151244

Creating Non Donor Signal Sequence:: 100%|██████████████████████████████████████████| 570/570 [00:01<00:00, 449.05it/s]
code_all_seq:: 100%|████████████████████████████████████████████████████████████| 2079/2079 [00:00<00:00, 41692.17it/s]
code_all_seq::   5%|███                                                       | 7913/148357 [00:00<00:01, 78556.02it/s]

Created Non Donor Signal Sequence successful!
148357


code_all_seq:: 100%|████████████████████████████████████████████████████████| 148357/148357 [00:02<00:00, 63514.27it/s]


(150436, 160)


In [15]:
test_df = pd.DataFrame({"Filename":test_donor_filepath+test_nonDonor_filepath,"Donor Site":test_donor_positions_1d +test_nonDonor_positions,\
              "Signal":test_donor_signal_all_str+test_nonDonor_list,"label":test_labels})
test_df.to_csv("SVMoutput/Test_predict.csv",index=None)

In [16]:
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
# from sklearn.metrics import classification_report
# def save_to_file(file_name, contents,mode='a+'):
#     fh = open(file_name, mode)
#     fh.write(contents)
#     fh.close()
# # param_grid = {'C':[1,2,3,4,5,6,10]}#设置的C可能的值是1，2，5，10，可以自由设置
# param_grid = [
# #   {'C': [1, 10, 100, 1000], 'kernel': ['linear'],'class_weight':[{0:1000000,1:1},{0:100000,1:1},{0:10000,1:1},{0:1000,1:1},{0:100,1:1}, {0:10,1:1},'balanced']},
# #   {'C': [0.1,1, 10, 100, 1000], 'gamma': [0.1,0.01,0.001, 0.0001,'auto','scale'], 'kernel': ['rbf'],'class_weight':[{0:1,1:1000000},{0:1,1:100000},{0:1,1:10000},{0:1,1:1000},{0:1,1:100}, {0:1,1:10},'balanced']},
#     {'C': [0.1,1], 'gamma': [1,'auto','scale'], 'kernel': ['rbf'],'class_weight':[{0:1,1:100}, {0:1,1:10},'balanced']}
# ]
# svc = SVC(probability=True)

# # define evaluation procedure
# # cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# cv = [(slice(None), slice(None))] #去掉十成交叉
# # cv = 3
# grid = GridSearchCV(svc,param_grid,cv=cv,verbose=3,n_jobs=-1,scoring='f1')
# grid_result=grid.fit(train_features, train_labels)

# # report the best configuration
# print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
# save_to_file('SVMoutput/SVM_best_predict.txt', "Best: %f using %s \n" % (grid_result.best_score_, grid_result.best_params_),mode='w+') 

# # report all configurations
# means = grid_result.cv_results_['mean_test_score']
# stds = grid_result.cv_results_['std_test_score']
# params = grid_result.cv_results_['params']
# for mean, stdev, param in zip(means, stds, params):
#     print("%f (%f) with: %r" % (mean, stdev, param))
# model=grid_result.best_estimator_

In [17]:
# import joblib
# joblib.dump(model,'SVMoutput/model.pickle') #保存
# # model = joblib.load('SVMoutput/model.pickle') #载入

In [18]:
# train_predict = model.predict(train_features)
# report =classification_report(train_labels, train_predict)
# print(report)

In [19]:
len(train_features)

4460

In [None]:
# model = SVC(kernel='rbf',C=1,gamma=1,class_weight='balanced',probability=True)
# model.fit(train_features, train_labels)
# train_predict = model.predict(train_features)
# report =classification_report(train_labels, train_predict)

# print(report)
model = SVC(kernel='rbf',C=0.1,gamma=1,probability=True)
model.fit(test_features, test_labels)
# train_predict = model.predict(test_features)
# report =classification_report(test_labels, test_predict)
# print(report)

In [None]:
# test_predict = model.predict(test_features)
# report_dict =classification_report(test_labels, test_predict,target_names=['pseudo donor','real donor'],digits=4,output_dict=True)
# report_df = pd.DataFrame(report_dict).T
# report_df.to_csv("SVMoutput/SVM_report.csv")
pred = model.predict_proba(test_features)
# report_df
pred 

In [None]:
1

In [None]:
df = pd.read_csv("SVMoutput/SVM_predict_scores.csv")
df['RBF']=pred[:,1]
df.to_csv("SVMoutput/SVM_predict_scores.csv")

In [None]:
pred

In [None]:
# report_dict =classification_report(test_labels, test_predict,target_names=['pseudo donor','real donor'],digits=4,output_dict=True)
# report_df = pd.DataFrame(report_dict).T
# report_df.to_csv("SVMoutput/SVM_report.csv")

In [None]:
# from notify_run import Notify
# n = Notify()
# n.send("Finished")

In [None]:
# test_df

In [None]:
# save predict info
# test_df["Score"] = pred[:,1]
# test_df["predict"] = test_predict
# test_df = test_df.sort_values(by=['Filename','label'],ascending=[True, False]).reset_index(drop=True)
# test_df.to_csv("SVMoutput/Test_predict.csv",index=None)
# test_df

In [None]:
model_l = SVC(kernel='linear',C=1,class_weight='balanced',probability=True)
model_l.fit(train_features, train_labels)
# train_predict_l = model_l.predict(train_features)
# print(classification_report(train_labels, train_predict_l))

In [None]:
test_predict_l = model_l.predict(test_features)
report_dict_l =classification_report(test_labels, test_predict_l,target_names=['pseudo donor','real donor'],digits=4,output_dict=True)
report_df_l = pd.DataFrame(report_dict_l).T
report_df_l.to_csv("SVMoutput/SVM_report_l.csv")
pred_l = model_l.predict_proba(test_features)
report_df_l

In [None]:
# 0.01: real donor	0.146730	0.960558	0.254573	2079.000000
model_p = SVC(kernel='poly',C=1,degree=4,gamma='scale',class_weight='balanced',probability=True)
model_p.fit(train_features, train_labels)
# train_predict_p = model_p.predict(train_features)
# print(classification_report(train_labels, train_predict_p))

In [None]:
test_predict_p = model_p.predict(test_features)
report_dict_p =classification_report(test_labels, test_predict_p,target_names=['pseudo donor','real donor'],digits=4,output_dict=True)
report_df_p = pd.DataFrame(report_dict_p).T
report_df_p.to_csv("SVMoutput/SVM_report_p.csv")
pred_p = model_p.predict_proba(test_features)
report_df_p

In [None]:
import joblib
joblib.dump(model,'SVMoutput/model_r.pickle') #save
joblib.dump(model_l,'SVMoutput/model_l.pickle') 
joblib.dump(model_p,'SVMoutput/model_p.pickle') 
# model = joblib.load('SVMoutput/model.pickle') #load

In [None]:
pred

In [None]:
SVM_predict_scores = pd.DataFrame(data={"RBF":pred[:,1],"Linear":pred_l[:,1],"Poly":pred_p[:,1]})
SVM_predict_scores.to_csv("SVMoutput/SVM_predict_scores.csv",index=None)
SVM_predict_scores

In [None]:
predict_probs = [pred[:,1],pred_l[:,1],pred_p[:,1]]

In [None]:
# ROC plot
# problem ： 如何画不同核的图在一个图里
from sklearn.metrics import roc_curve, auc  ###计算roc和auc
from mpl_toolkits.axes_grid1.inset_locator import mark_inset
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import matplotlib as mpl
# def Find_Optimal_Cutoff(TPR, FPR, threshold):
#     y = TPR - FPR
#     Youden_index = np.argmax(y)  # Only the first occurrence is returned.
#     optimal_threshold = threshold[Youden_index]
#     point = [FPR[Youden_index], TPR[Youden_index]]
#     return optimal_threshold, point
# def ROC(label, y_prob):
#     """
#     Receiver_Operating_Characteristic, ROC
#     :param label: (n, )
#     :param y_prob: (n, )
#     :return: fpr, tpr, roc_auc, optimal_th, optimal_point
#     """
#     fpr, tpr, thresholds = roc_curve(label, y_prob)
#     roc_auc = auc(fpr, tpr)
#     optimal_th, optimal_point = Find_Optimal_Cutoff(TPR=tpr, FPR=fpr, threshold=thresholds)
#     return fpr, tpr, roc_auc, optimal_th, optimal_point
# def ROC_plot(label, y_prob):
#     fpr, tpr, roc_auc, optimal_th, optimal_point = ROC(label, y_prob)
#     plt.figure()
#     plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
#     plt.plot([0, 1], [0, 1], linestyle="--")
#     plt.plot(optimal_point[0], optimal_point[1], marker='o', color='r')
#     plt.text(optimal_point[0], optimal_point[1], f'Threshold:{optimal_th:.2f}')
#     plt.title("Receiver Operating Characteristic Curve")
#     plt.xlabel("False Positive Rate")
#     plt.ylabel("True Positive Rate")
#     plt.legend(loc="lower right")
#     f = plt.gcf()  #获取当前图像
#     f.savefig('SVMoutput/ROC_plot.png',dpi=120)
#     plt.show()
#     return optimal_th
# pred  = model.decision_function(test_features)


def plot_roc(labels, predict_probs, titles):
    color = ['r', 'g', 'b', 'y']                                                                 
    linestyles = ['-','--',':','-.']

    fig, ax = plt.subplots(1, 1, figsize=(10, 8))
    
    for idx, predict_prob in enumerate(predict_probs):
        false_positive_rate,true_positive_rate,thresholds=roc_curve(labels, predict_prob)
        roc_auc=auc(false_positive_rate, true_positive_rate)
        c = color[idx%len(color)]                                                                     
        l =linestyles[idx%len(linestyles)]
        ax.plot(false_positive_rate, true_positive_rate,'b',label='AUC: {} = {:.4}'.format(titles[idx], roc_auc), color=c, linestyle=l, alpha=0.9,markevery=20)  
        ax.legend(loc='lower right')
    # 
    axins = inset_axes(ax, width="40%", height="30%",loc='upper center',
                   bbox_to_anchor=(0.05, -0.2, 1, 1),
                   bbox_transform=ax.transAxes)
    for idx, predict_prob in enumerate(predict_probs):
        false_positive_rate,true_positive_rate,thresholds=roc_curve(labels, predict_prob)
        roc_auc=auc(false_positive_rate, true_positive_rate)
        c = color[idx%len(color)]                                                                     
        l =linestyles[idx%len(linestyles)]
        axins.plot(false_positive_rate, true_positive_rate,'b',label='AUC: {} = {:.4}'.format(titles[idx], roc_auc), color=c, linestyle=l, alpha=0.9,markevery=20)  

    axins.set_xlim(0.03, 0.13)
    axins.set_ylim(0.89, 0.99)
    #
    mark_inset(ax, axins, loc1=3,loc2=1, fc="none", ec='k', lw=1)
    ax.plot([0,1],[0,1],'k--',alpha=0.5)
    ax.set_title("Receiver Operating Characteristic Curve",fontsize=16)
    ax.set_xlabel("False Positive Rate")
    ax.set_ylabel("True Positive Rate")
    ax.legend(loc="lower right")

    ax.spines['left'].set_color('k')
    [axins.spines[loc_axis].set_color('k') for loc_axis in ['top','right','bottom','left']]
    plt.savefig('SVMoutput/ROC_plot.png',dpi=120)
    plt.show()

# plt.style.use('classic')

# mpl.style.use('fivethirtyeight')
# mpl.style.use('classic')
mpl.style.use('default')

# sns.set_style('darkgrid', {'axes.linewidth': 2, 'axes.edgecolor':'black'})
# rc = {"axes.spines.bottom" : False,
#       "axes.spines.top"    : False,
#       "axes.spines.right"  : False,
#       "axes.edgecolor"     : "black"}
# plt.style.use(("ggplot", rc))
# plt.style.use(("ggplot"))
# mpl.style.use(("fast"))
predict_probs = [pred[:,1],pred_l[:,1],pred_p[:,1]]
plot_roc(test_labels, predict_probs,titles=["RBF","Linear","Poly"])



In [None]:
from sklearn.metrics import precision_recall_curve


def plot_pr(labels, predict_probs, titles):
    color = ['r', 'g', 'b', 'y']                                                                 
    linestyles = ['-','--','-.']
    plt.figure(figsize=(10, 8))
    f_scores = np.linspace(0.2, 0.8, num=4)
    for f_score in f_scores:
        x = np.linspace(0.01, 1)
        y = f_score * x / (2 * x - f_score)
        l, = plt.plot(x[y >= 0], y[y >= 0], color='gray',linestyle='--' ,alpha=0.2)
        plt.annotate('f1={0:0.1f}'.format(f_score), xy=(0.9, y[45] + 0.02))
    for idx, predict_prob in enumerate(predict_probs):
        precision, recall, thresholds = precision_recall_curve(labels, predict_prob)
        pr_auc=auc(recall, precision)
        c = color[idx%len(color)]                                                                     

        l =linestyles[idx%len(linestyles)]
        plt.plot(recall,precision,'b', label='AUC: {} = {:.4}'.format(titles[idx], pr_auc),color=c, linestyle=l, alpha=0.9,markevery=20)  
        plt.legend(loc='lower right')
    plt.ylim(0,1)
    # plt.plot([0,1],[1,0],color='gray',linestyle='--',alpha=0.5)
    plt.title('Precision/Recall Curve',fontsize=16)# give plot a title
    plt.xlabel('Recall')# make axis labels
    plt.ylabel('Precision')
    plt.legend(loc="upper right")
    f = plt.gcf()  
    f.savefig('SVMoutput/PR_plot.png',dpi=120)
    plt.show()

plot_pr(test_labels, predict_probs, titles=["RBF","Linear","Poly"])

In [None]:
# 混淆矩阵,，
from sklearn.metrics import confusion_matrix
def save_to_file(file_name, contents,mode='a+'):
    fh = open(file_name, mode)
    fh.write(contents)
    fh.close()
def draw_confusion_matrix(labels,predicts,name,size=(6, 6),mode='a+',threshold="Default"):
    mat = confusion_matrix(labels,predicts)
    TP = mat[1][1]
    FP = mat[0][1]
    TN = mat[0][0]
    FN = mat[1][0]
    recall  = TP/(FN+TP)
    precision = TP/(FP+TP)
    acc = (TP+TN)/np.sum(mat)
    fpr =  FP/(TN+ FP)
    f1_score = 2*precision*recall/(precision+recall)
    print(f"Precision :{precision}, Recall:{recall},f1_score:{f1_score},Acc:{acc},FPR:{fpr},Sp:{1-fpr}")
    save_to_file('SVMoutput/SVM_predict.txt', f"\nPrecision :{precision}, Recall:{recall},f1_score:{f1_score},Acc:{acc} \n",mode)

    f, ax = plt.subplots(figsize=size)
    sns.set()
    sns.heatmap(mat, square=True, annot=True, fmt='d',cmap='Blues' ,cbar=False,
                xticklabels=['pseudo donor','real donor'],
                yticklabels=['pseudo donor','real donor'],)
    ax.set_xlabel(f'Predicted label\nPrecision :{precision:.2f}, Recall:{recall:.2f},F1_score:{f1_score:.2f}')
    ax.set_ylabel(f'True label');
    plt.title(f"Confusion Matrix(Threshold:{threshold})\n",fontsize=15)
    f.savefig(f"SVMoutput/{name}.png",dpi=120)

draw_confusion_matrix(test_labels, test_predict,name="confusion_matrix_1",mode='w+')

In [None]:
best_predict_label =[1 if i >0.65 else 0 for i in pred_p[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_p_0.65",threshold=0.65)
best_predict_label =[1 if i >0.07 else 0 for i in pred_p[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_p_0.07",threshold=0.07)
best_predict_label =[1 if i >0.2 else 0 for i in pred_p[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_p_0.2",threshold=0.2)
best_predict_label =[1 if i >0.9 else 0 for i in pred_p[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_p_0.9",threshold=0.9)

In [None]:
# 如果根据阈值来重新生成混淆矩阵？
best_predict_label =[1 if i >0.07 else 0 for i in pred[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_0.07",threshold=0.07)
best_predict_label =[1 if i >0.2 else 0 for i in pred[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_0.2",threshold=0.2)
best_predict_label =[1 if i >0.9 else 0 for i in pred[:,1] ]
draw_confusion_matrix(test_labels, best_predict_label,name="confusion_matrix_0.9",threshold=0.9)

In [None]:
def sum_threshold_info(test_labels,pred):
    TPs  = []
    FPs = []
    TNs = []
    FNs = []
    for threshold in thresholds:
        predict_label =[1 if i >= threshold else 0 for i in pred ]
        mat = confusion_matrix(test_labels,predict_label)
        TP = mat[1][1]
        FP = mat[0][1]
        TN = mat[0][0]
        FN = mat[1][0]

        TPs.append(TP)
        FPs.append(FP)
        TNs.append(TN)
        FNs.append(FN)
    SVMpredict_df = pd.DataFrame({"Threshold":thresholds,"TP":TPs,"FP":FPs,"TN":TNs,"FN":FNs})
    SVMpredict_df["Recall"] = SVMpredict_df["TP"] /(SVMpredict_df["TP"]+SVMpredict_df["FN"])
    SVMpredict_df["Precision"] = SVMpredict_df["TP"] /(SVMpredict_df["TP"]+SVMpredict_df["FP"])
    SVMpredict_df["F1-Score"] = 2*SVMpredict_df["Recall"]*SVMpredict_df["Precision"] /(SVMpredict_df["Recall"]+SVMpredict_df["Precision"])
    SVMpredict_df["Sn"] = SVMpredict_df["TN"] /(SVMpredict_df["TN"]+SVMpredict_df["FP"])
    SVMpredict_df["FPR"] =  1- SVMpredict_df["Sn"]
    SVMpredict_df["Acc"] =  (SVMpredict_df["TP"] + SVMpredict_df["TN"]) / (SVMpredict_df["TP"] + SVMpredict_df["TN"]+SVMpredict_df["FP"] + SVMpredict_df["FN"])
    return SVMpredict_df
def plot_PRF(SVMpredict_df):
    f, ax = plt.subplots(figsize=(8,8))
    alpha=0.5
    plt.plot(thresholds, SVMpredict_df["Recall"], color='green', label='Recall',alpha=alpha)
    plt.plot(thresholds, SVMpredict_df["Precision"], color='red', label='Precison',alpha=alpha)
    plt.plot(thresholds, SVMpredict_df["F1-Score"], color='#2196f3', label='F1-score',alpha=alpha)
    plt.xlabel("Thresholds")
    plt.ylabel("Score")
    plt.xticks(np.arange(0,1.1,0.1))
    plt.yticks(np.arange(0,1.1,0.1))
    plt.xlim([-0.05,0.9])
    plt.legend(loc="lower right") # 图标在外侧
    plt.grid(linestyle='-.',alpha=0.7)
    plt.show()
    return f
thresholds = list(np.arange(0,1.05,0.05)) 
SVMpredict_df_r=sum_threshold_info(test_labels,pred[:,1])
SVMpredict_df_r.to_csv("SVMoutput/SVM_predict_info_r.csv",index=None)
f = plot_PRF(SVMpredict_df_r)
f.savefig("SVMoutput/Different Threshold Predictions_r.png",dpi=400,bbox_inches = 'tight')

SVMpredict_df_r

In [None]:
SVMpredict_df_p=sum_threshold_info(test_labels,pred_p[:,1])
SVMpredict_df_p.to_csv("SVMoutput/SVM_predict_info_p.csv",index=None)
f = plot_PRF(SVMpredict_df_p)
f.savefig("SVMoutput/Different Threshold Predictions_p.png",dpi=400,bbox_inches = 'tight')
# sns.lineplot(data=[SVMpredict_df["Recall"],SVMpredict_df["Precision"],SVMpredict_df["F1-Score"]],palette="tab10",ax=ax)

SVMpredict_df_p

In [None]:
1

In [None]:
pred[:,1].shape

In [None]:
from notify_run import Notify
n = Notify()
n.send("Finished")

In [None]:
1