ifeature 的使用程序。

In [1]:
import os
import hashlib
import pandas as pd
from typing import List, Optional

import log
import utils

def get_config(config_name:str) -> str:
    return {
        "ifeature_path": r".\iFeature-master\iFeature.py",
        "ifeature_workdir": r".\cached\ifeature",
    }[config_name]

# ##############
#
# ifeature
#
# ###############

def make_fasta(sequences:List[str], outfilename:Optional[str]) -> str:
    fastas = [ f">{i}\n{sequence}" for i, sequence in enumerate(sequences) ]
    with open(outfilename, "w") as f:
        f.write('\n'.join(fastas))
    return outfilename

def get_ifeature_descriptors(ifeature_descriptor:str) -> List[str]:
    '''
    从 ifeature的列表里面选择一些特征构成特征组合
    '''
    return {
        "default": [
            "AAC", "GAAC", "GDPC", 
            "Moran", "Geary", "NMBroto",
            "CTDC", "CTDT", "CTDD", 
            "SOCNumber", "PAAC", "APAAC"
        ], # default 1233 dimension
        "binary": ["BINARY"], # binary, (0001)
        "group1": ["AAC", "GAAC", "GDPC"],
        "group2": ["ZSCALE", "AAINDEX"], # for equal length
        "group3": ["TA", "ASA", "DisorderB", "DisorderC"], # for equal length
        "group4": ["CTDC", "CTDT", "CTDD"],
        "group5": ["BLOSUM62"], # BLOSUM62, (abcd)
        "group6": ["EAAC", "EGAAC"], # enhanced composition
        "binary+group3+group5": [
            "BINARY",
            "TA", "ASA", "DisorderB", "DisorderC",
            "BLOSUM62"
        ],
        "full_group1": ["Moran", "Geary", "NMBroto"], # auto-correlation
        
        
        # "group7": ["SOCNumber", "PAAC", "APAAC"],
        # "group4": ["PSSM"], # PSSM # 不可行
        # "autocor": ["Moran", "Geary", "NMBroto"], # 不可行
    }[ifeature_descriptor]


def _ifeature(fasta_file:str, descriptor:str, outfilename:str, verbose:bool=False, *args, **kwargs) -> str:
    cmd = "python {0} --file {1} --type {2} --out {3}".format(
        get_config("ifeature_path"), 
        fasta_file,
        descriptor,
        outfilename,
    )
    with log.Verbose(msg = f"{cmd}", verbose = verbose):
        os.system(cmd)
    return outfilename

def _featurize_ifeature(sequences:List[str], ifeature_descriptor:str, verbose:bool=False, *args, **kwargs) -> pd.DataFrame:
    """
    sequences as the input,
    ifeature_descriptor as input,
    """
    cached_file = os.path.join(
        get_config("ifeature_workdir"),
        utils.md5(f"{''.join(sequences)}{ifeature_descriptor}") + ".csv"
    )
    print(cached_file)
    if not os.path.exists(cached_file):
        with log.Verbose(msg = f"Make the ifeature feature: {cached_file}", verbose = verbose):
            df_feature = pd.DataFrame(index=range(len(sequences)))
            fasta_file = os.path.join(
                get_config("ifeature_workdir"), 
                utils.md5(f"{''.join(sequences)}") + ".fasta"
            )
            if not os.path.exists(fasta_file):
                with log.Verbose(msg = f"Make the fasta file: {fasta_file}", verbose = verbose):
                    make_fasta(sequences, fasta_file)
            print(ifeature_descriptor)
            print(get_ifeature_descriptors(ifeature_descriptor))
            for descriptor in get_ifeature_descriptors(ifeature_descriptor):
                _descriptor_file = os.path.join(
                    get_config("ifeature_workdir"), 
                    utils.md5(f"{''.join(sequences)}{descriptor}") + ".csv"
                )
                print(descriptor)
                print(_descriptor_file)
                if not os.path.exists(_descriptor_file):
                    with log.Verbose(msg = f"Make the ifeature descriptor {_descriptor_file} from fasta file: {fasta_file}", verbose = verbose):
                        _ifeature(fasta_file, descriptor, _descriptor_file, verbose, *args, **kwargs)
                _df = pd.read_csv(_descriptor_file, sep='\t', header=0, index_col=0)
                _df.rename(columns = { column:''.join([column, '_', descriptor]) for column in _df.columns }, inplace = True)
                df_feature = df_feature.join(_df)
            df_feature.to_csv(cached_file)
    else:
        with log.Verbose(msg = f"Read the ifeature feature: {cached_file}", verbose = verbose):
            df_feature = utils.read_csv(cached_file)
    return df_feature



In [2]:
sequence_Q51559 = "MRRESLLVSVCKGLRVHVERVGQDPGRSTVMLVNGAMATTASFARTCKCLAEHFNVVLFDLPFAGQSRQHNPQRGLITKDDEVEILLALIERFEVNHLVSASWGGISTLLALSRNPRGIRSSVVMAFAPGLNQAMLDYVGRAQALIELDDKSAIGHLLNETVGKYLPQRLKASNHQHMASLATGEYEQARFHIDQVLALNDRGYLACLERIQSHVHFINGSWDEYTTAEDARQFRDYLPHCSFSRVEGTGHFLDLESKLAAVRVHRALLEHLLKQPEPQRAERAAGFHEMAIGYA"
# -第一列称为“序列”，对应不同突变位点的组合。6个字母表示6个位点的氨基酸突变的组合，这6个位点分别是氨基酸的第74，101，143，148，173，176号氨基酸，是我们实际测验出来最有效的突变位点。
# pdb = 
def _sequence(sequence:str, full_sequence="default") -> str:
    sequence_new = sequence
    if full_sequence == "default":
        pass
    elif full_sequence == "full" or full_sequence == "activity":
        list_sequence_new = list(sequence_Q51559)
        for i, seq_i in zip((74, 101, 143, 148, 173, 176), list(sequence)):
            list_sequence_new[i-1] = seq_i
        sequence_new = "".join(list_sequence_new)
        if full_sequence == "activity":
            sequence_new = sequence_new[59:228]
    return sequence_new

def ifeature_featurize(pd:pd.DataFrame, ifeature_descriptor="cksaap", full_sequence="default", **kwargs) -> pd.DataFrame:
    sequence_list = [ _sequence(seq, full_sequence) for seq in pd['Sequence'].to_list() ]
    pd_ifeature =  _featurize_ifeature(
        sequence_list, 
        ifeature_descriptor,
    )
    return pd_ifeature

In [3]:
def assign_score(df:pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    def _assign_score(percentile):
        if percentile <= 0.005:
            return 5
        elif 0.005 < percentile <= 0.02:
            return 1
        elif 0.02 < percentile <= 0.1:
            return 0.1
        else:
            return 0
    df['Fitness'] = df['Activity'] * df['Selectivity']
    percentiles = df['Fitness'].rank(pct=True, ascending=False)
    df['score'] = percentiles.apply(_assign_score)
    return df

训练

In [4]:
kwargs = {
    "ifeature_descriptor": "default",
    "full_sequence": "default",
}

In [5]:
# from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline

from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import VarianceThreshold
from sklearn.decomposition import PCA, TruncatedSVD

from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import make_scorer, mean_squared_error, mean_absolute_error
from scipy.stats import spearmanr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from datetime import datetime

# 记录模型
best_models_record = {}

# 记录结果
records = []

# 读取excel文件
train_data = pd.read_csv('./kaggle/input/siatprotein2023/training.csv')
train_data = assign_score(train_data)

# 提取氨基酸序列并特征化
# for i_d in ["binary"] + [f"group{i}" for i in range(1,7)]:
# for i_d in ["full_group1"]:
for i_d in ["group3"]:
# for i_d in ["binary+group3+group5"]:
    kwargs["ifeature_descriptor"] = i_d

    # sequences = df.iloc[:, 1].tolist()
    pd_ifeature =  ifeature_featurize(train_data, **kwargs)
    print(kwargs)

    # 为预测的目标指标创建标签
    activity_labels = train_data.iloc[:, 2].tolist()
    selectivity_labels = train_data.iloc[:, 3].tolist()
    fitness_labels = train_data.iloc[:, 4].tolist()
    score_labels = train_data.iloc[:, 5].tolist()


    # 定义评分函数
    scorer = make_scorer(mean_squared_error, greater_is_better=False)

    # 划分训练集和测试集
    X_train, X_test, y_train_activity, y_test_activity = train_test_split(pd_ifeature, activity_labels, test_size=0.2, random_state=42)
    _, _, y_train_selectivity, y_test_selectivity = train_test_split(pd_ifeature, selectivity_labels, test_size=0.2, random_state=42)
    _, _, y_train_fitness, y_test_fitness = train_test_split(pd_ifeature, fitness_labels, test_size=0.2, random_state=42)
    _, _, y_train_score, y_test_score = train_test_split(pd_ifeature, score_labels, test_size=0.2, random_state=42)



    # 定义不同的回归器和它们的参数网格
    regressors = [
        ('lr', LinearRegression(), {}),
        ('ridge', Ridge(), {'regressor__alpha': [0.01, 0.1, 1.0]}),
        ('lasso', Lasso(), {'regressor__alpha': [0.01, 0.1, 1.0]}),
        ('svr', SVR(), {
            'regressor__C': [0.1, 1, 10],  # 正则化参数
        }),
        ('rf', RandomForestRegressor(random_state=40), {'regressor__n_estimators': [100]}),
        ('knn', KNeighborsRegressor(), {'regressor__n_neighbors': [3, 5, 7], 'regressor__weights': ["uniform", "distance"]}),
        ('gbr', GradientBoostingRegressor(random_state=40), {'regressor__n_estimators': [10, 50, 100], 'regressor__learning_rate': [0.01, 0.1, 0.2]}),
    ]

    regressors.append(('mlp', MLPRegressor(max_iter=500), {
        'regressor__hidden_layer_sizes': [(100, 50, 25)],
        'regressor__activation': ['relu', 'tanh'],
        'regressor__solver': ['sgd'],
    }))

    # 添加 GaussianProcessRegressor 到 regressors 列表
    # regressors.append(('gpr', GaussianProcessRegressor(random_state=42), {
    #     'regressor__kernel': [1.0 * RBF()], #, 2.0 * RBF()],
    #     'regressor__alpha': [0.01],
    #     'regressor__n_restarts_optimizer': [2] #, 10],
    # }))


    best_models = {}
    # 为活性和选择性分别进行网格搜索和模型训练
    for regressor_type, regressor, params in regressors:
        trained_models = {}  # 存储训练好的模型
        # 定义Pipeline
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            # ('variance_threshold', VarianceThreshold(threshold=0.1)),
            # ('pca', PCA(n_components=20)), # fast
            ('regressor', regressor) # 模型选择
        ])

        for y_train, y_test, label in [
            (y_train_activity, y_test_activity, "Activity"), 
            (y_train_selectivity, y_test_selectivity, "Selectivity"),
            (y_train_fitness, y_test_fitness, "Fitness"),
            (y_train_score, y_test_score, "Score"),
        ]:

            start_time = time.time()
            grid_search = GridSearchCV(pipeline, params, cv=5, scoring=scorer)
            grid_search.fit(X_train, y_train)
            end_time = time.time()

            # 输出运算时间
            execution_time = end_time - start_time
            print(f"代码执行时间: {execution_time} 秒")

            # 输出回归器类型
            print(f"{regressor_type}")
            # 输出最佳参数
            print(f"Best parameters for {label}: ", grid_search.best_params_)
            # 使用最佳模型进行预测
            preds = grid_search.predict(X_test)
            # 输出预测结果的RMSE
            rmse = np.sqrt(mean_squared_error(y_test, preds))
            # plt.scatter(preds, y_test)
            print(f"{label} RMSE: {rmse}")
            # 计算平均绝对误差
            mae = mean_absolute_error(y_test, preds)
            print(f"{label} MAE: {mae}")
            # 计算Spearman秩相关系数
            spearman = spearmanr(y_test, preds)[0]
            print(f"{label} Spearman: {spearman}\n")

            # 存储训练好的模型
            trained_models[label] = grid_search.best_estimator_

            # 记录
            records.append(
                [
                    kwargs["ifeature_descriptor"],
                    execution_time, 
                    regressor_type, 
                    label,
                    rmse,
                    mae,
                    spearman
                ]
            )
        best_models[regressor_type] = trained_models
    best_models_record[i_d] = best_models

# pd_records = pd.DataFrame(records, columns = ['feature', 'time(s)', 'regressor', 'target', 'rmse', 'mae', 'spearman'])
# current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# pd_records.to_csv(f'{current_time}_records.tsv', sep='\t')


.\cached\ifeature\913cddebbb178ceb59f1acada0678041.csv
{'ifeature_descriptor': 'group3', 'full_sequence': 'default'}
代码执行时间: 2.871272087097168 秒
lr
Best parameters for Activity:  {}
Activity RMSE: 1859262030391.55
Activity MAE: 230384854970.7816
Activity Spearman: 0.690648043964441

代码执行时间: 2.24788761138916 秒
lr
Best parameters for Selectivity:  {}
Selectivity RMSE: 3431139749749.672
Selectivity MAE: 383418614230.1825
Selectivity Spearman: 0.5256092044104564

代码执行时间: 2.161593198776245 秒
lr
Best parameters for Fitness:  {}
Fitness RMSE: 4505280763263.6455
Fitness MAE: 499011915494.7757
Fitness Spearman: 0.7122896176000428

代码执行时间: 2.1820638179779053 秒
lr
Best parameters for Score:  {}
Score RMSE: 560887590742.843
Score MAE: 66310810175.77509
Score Spearman: 0.15595106219038024

代码执行时间: 3.041175127029419 秒
ridge
Best parameters for Activity:  {'regressor__alpha': 1.0}
Activity RMSE: 0.31950455267374067
Activity MAE: 0.23097179887967453
Activity Spearman: 0.7673384242861789

代码执行时间: 3.037

In [6]:
# 用于中断之后保存, try-error
pd_records = pd.DataFrame(records, columns = ['feature', 'time(s)', 'regressor', 'target', 'rmse', 'mae', 'spearman'])
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
pd_records.to_csv(f'{current_time}_records.tsv', sep='\t')


分析

In [7]:
best_models_record['group3']['rf']

{'Activity': Pipeline(steps=[('scaler', StandardScaler()),
                 ('regressor', RandomForestRegressor(random_state=40))]),
 'Selectivity': Pipeline(steps=[('scaler', StandardScaler()),
                 ('regressor', RandomForestRegressor(random_state=40))]),
 'Fitness': Pipeline(steps=[('scaler', StandardScaler()),
                 ('regressor', RandomForestRegressor(random_state=40))]),
 'Score': Pipeline(steps=[('scaler', StandardScaler()),
                 ('regressor', RandomForestRegressor(random_state=40))])}

In [None]:
import matplotlib.pyplot as plt
plt.scatter(train_data['score'], train_data['Fitness'])

In [None]:
train_data[train_data['score'] == 5.0]

In [None]:
train_data[train_data['score'] == 1.0]

全集数据的再训练与预测

In [14]:
best_model_selected = {
    "Activity": best_models_record['group3']['rf']['Activity'],
    "Selectivity": best_models_record['group3']['rf']['Selectivity'],
}

In [10]:
X = {
    "Activity": pd_ifeature,
    "Selectivity": pd_ifeature,
}
y = {
    "Activity": activity_labels,
    "Selectivity": selectivity_labels,
}

In [15]:
# 读取测试数据
test_data = pd.read_csv("./kaggle/input/siatprotein2023/test.csv")
# 提取测试数据的氨基酸序列
# test_sequences = test_data['Sequence'].tolist()
pd_ifeature_test = ifeature_featurize(test_data, **kwargs)
# 使用训练好的模型进行预测
result_df = pd.DataFrame()
result_df['SequenceID'] = test_data['SequenceID']

for label in ["Activity", "Selectivity"]:
    best_model_selected[label].fit(X[label], y[label])
    preds = best_model_selected[label].predict(pd_ifeature_test)
    result_df[label] = preds

# 将结果保存为csv文件
result_df.to_csv('predictions.csv', index=False)
print("Your submission was successfully saved!")
#查看结果，确保格式正确后提交
print(result_df)

.\cached\ifeature\dc8bf6b46d453b2a91a36e3023715e2f.csv
Your submission was successfully saved!
     SequenceID  Activity  Selectivity
0          1594   1.58674      3.53621
1          1595   1.18953      2.80674
2          1596   0.63047      2.35620
3          1597   0.87162      3.51218
4          1598   0.61104      3.71211
..          ...       ...          ...
920        2514   0.38774      4.16898
921        2515   0.27967      3.44643
922        2516   0.07755      2.62161
923        2517   0.32952      3.70893
924        2518   0.12366      3.03918

[925 rows x 3 columns]


In [28]:
test_data

Unnamed: 0,SequenceID,Sequence
0,1594,AACCAD
1,1595,AACCTQ
2,1596,AADCSM
3,1597,AADQTQ
4,1598,AADTLQ
...,...,...
920,2514,YMQLSQ
921,2515,YQQLSQ
922,2516,YRQLSQ
923,2517,YWQLSQ


In [17]:
result_df["Fitness"] = result_df["Activity"] * result_df["Selectivity"]

In [31]:
result_df.sort_values(by='Fitness', ascending=False)[:96].merge(test_data, on='SequenceID')[["Sequence", "Activity", "Selectivity"]].to_csv("eval.csv", index=False, sep=',')