# 心病辨证要点

### 心气虚证：
- 心悸怔忡与气虚症状共见
- 气虚证：神疲乏力、少气懒言、脉虚、动则诸症加剧为主要表现
- 心悸，胸闷，气短，精神疲倦，或有自汗，面色淡白，舌质淡，脉虚

### 心血虚证：
- 心悸、失眠、多梦与血虚症状共见
- 血虚证：面、睑、唇、舌色淡白，脉细等为主要表现
- 心悸、头晕眼花、失眠、多梦、健忘、面色淡白或萎黄、舌色淡、脉细无力

### 心阴虚证
- 心悸、心烦、失眠与虚热症状共现
- 阴虚证：口咽干燥、五心烦躁、潮热盗汗、两颧潮红、舌红少苔、脉细数
- 心烦、心悸，失眠，多梦，口燥咽干，形体消瘦，或见手足心热，潮热盗汗，两颧潮红，舌红少苔乏津，脉细数


### 心阳虚证

- 心悸怔忡，或心胸疼痛与阳虚症状共见
- 阳虚证：畏寒肢冷、小便清长、面色晄白
- 心悸怔忡，心胸憋闷或痛，气短，自汗，畏冷肢凉，神疲乏力，面色白，或面唇青紫，舌质淡胖或紫暗，苔白滑，脉弱或结或代

### 心血瘀阻证

- **心悸怔忡、心胸憋闷疼痛与瘀血症状共见为辨证的主要依据**
- 血瘀证：疼痛、肿块、出血与肤色、舌色青紫等表现共现
- 心悸怔忡，心胸憋闷疼痛，痛引肩背内臂，时作时止；或以刺痛为主，舌质晦暗或有青紫斑点，脉细、涩、结、代；或以心胸憋闷为主，体胖痰多，身重困倦，舌苔白腻，脉沉滑或沉涩；或以遇寒痛剧为主，得温痛减，畏寒肢冷，舌淡苔白，脉沉迟或沉紧；或以胀痛为主，与情志变化有关，喜太息，舌淡红，脉弦


### 心火亢盛证

- 心烦失眠、舌赤生疮、吐衄、尿赤与实热症状共见
- 发热，口渴，心烦，失眠，便秘，尿黄，面红，舌尖红绛，苔黄，脉数有力。甚或口舌生疮、溃烂疼痛；或见小便短赤、灼热涩痛；或见吐血、衄血；或见狂躁谵语、神志不清


### 痰蒙心神证

- 神志抑郁、错乱、痴呆、昏迷与痰浊症状共见
- 发热，口渴，胸闷，气粗，咯吐黄痰，喉间痰鸣，心烦，失眠，甚则神昏谵语，或狂躁妄动，打人毁物，不避亲疏，胡言乱语，哭笑无常，面赤，舌质红，苔黄腻，脉滑数


### 瘀阻脑络证

- **头痛、头晕与瘀血症状共见为辨证的主要依据**
- 头晕、头痛经久不愈，痛如锥刺，痛处固定，或健忘，失眠，心悸，或头部外伤后昏不知人，面色晦暗，舌质紫暗或有斑点，脉细涩


### 备注
- 阳虚证多于气虚证共存
### [参考来源](https://www.med66.com/zhongyineikezhuzhiyi/fudaoziliao/ha2111057194.shtml)

In [3]:
import collections
from collections import Counter
from datetime import datetime
from pprint import pprint

import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd

optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif

print(f'{datetime.today()}')

2022-10-09 13:57:06.870197


In [4]:
plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
sns.set(font='SimHei', font_scale=0.8)  # 解决Seaborn中文显示问题
print(f'{datetime.now()}')

2022-10-09 13:57:06.885820


In [5]:
# 数据平衡使用的方法
USE_randomDownSample = False
USE_Tomek_links = False

USE_ADASYN = False
USE_randomOverSample = False
USE_SMOTE = False

USE_SMOTETomek = True

# 特征选择
USE_chi2 = False
USE_f_classif = False
USE_mutual_info_classif = False

# LIME(Local Interpretable Model-Agnostic Explanations)
USE_LIME = True

In [6]:
xinzongbiao = pd.read_excel('./input/心总表.xlsx', sheet_name='总表')
xinzongbiao

Unnamed: 0,证名,病案号,性别,年龄,S1,S2,S3,S4,S5,S6,...,S116,S117,S118,S119,S120,S121,S122,S123,S124,S125
0,4.0,,1.0,31.0,0.000000,0.000000,0.000000,1.000000,0.000000,0.000000,...,1.00000,0.000000,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000
1,1.0,,1.0,93.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.00000,1.000000,1.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000
2,1.0,,1.0,10.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.00000,1.000000,1.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000
3,3.0,,1.0,10.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.00000,1.000000,1.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000
4,1.0,,1.0,37.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,1.00000,0.000000,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,1.000000,0.000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1738,4.0,152769.0,1.0,71.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.00000,0.000000,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,1.000000,0.000000
1739,3.0,130013.0,2.0,70.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.00000,0.000000,0.000000,0.000000,0.00000,0.000000,0.000000,1.000000,0.000000,0.000000
1740,4.0,176082.0,1.0,73.0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,1.00000,0.000000,0.000000,1.000000,0.00000,0.000000,0.000000,0.000000,1.000000,0.000000
1741,,,,,5.000000,9.000000,18.000000,8.000000,8.000000,4.000000,...,696.00000,147.000000,140.000000,193.000000,146.00000,15.000000,50.000000,260.000000,788.000000,100.000000


In [7]:
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler

zhenghou2id = readJSON('./input/zhenghou2id.json')
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
scaler = StandardScaler()
X = xinzongbiao.drop(columns='证名')
y = xinzongbiao['证名']
X = scaler.fit_transform(X)
tsne = TSNE(n_components=2, init='pca', random_state=64)
X_tsne = tsne.fit_transform(X)
plt.figure(figsize=(8, 8), dpi=200)
for i in range(1, 9):
    plt.scatter(X_tsne[y == i][:, 0], X_tsne[y == i][:, 1], cmap=plt.cm.Set3(i), marker=i, label=zhenghou2id[str(i)])
plt.legend()
# plt.savefig('1.png')

ValueError: Input X contains NaN.
TSNE does not accept missing values encoded as NaN natively. For supervised learning, you might want to consider sklearn.ensemble.HistGradientBoostingClassifier and Regressor which accept missing values encoded as NaNs natively. Alternatively, it is possible to preprocess the data, for instance by using an imputer transformer in a pipeline or drop samples with missing values. See https://scikit-learn.org/stable/modules/impute.html You can find a list of all estimators that handle NaN values at the following page: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values

In [None]:
xinzongbiao.drop(columns=['病案号', '性别', '年龄'], inplace=True)
xinzongbiao.dropna(inplace=True)
xinzongbiao.astype(int)
id2feature = readJSON('./input/id2feature.json')
xinzongbiao = xinzongbiao.rename(columns=id2feature)
xinzongbiao['证名'].value_counts()

In [None]:
# 心总表总共出现的症状频数
xinzongbiao.drop(columns='证名').reset_index(drop=True).sum().sort_values(ascending=False)

In [None]:
# 1.心气虚症状统计 共621条
c1 = xinzongbiao[xinzongbiao['证名'] == 1].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c1, np.round(c1 / xinzongbiao['证名'].value_counts()[1], 2)])

In [None]:
# 2.心血虚证 共70条
c2 = xinzongbiao[xinzongbiao['证名'] == 2].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c2, np.round(c2 / xinzongbiao['证名'].value_counts()[2], 2)])

In [None]:
# 3.心阴虚证 共184条
c3 = xinzongbiao[xinzongbiao['证名'] == 3].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c3, np.round(c3 / xinzongbiao['证名'].value_counts()[3], 2)])

In [None]:
# 4.心阳虚证 共121条
c4 = xinzongbiao[xinzongbiao['证名'] == 4].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c4, np.round(c4 / xinzongbiao['证名'].value_counts()[4], 2)])

In [None]:
# 5.心血瘀阻证 共548条
c5 = xinzongbiao[xinzongbiao['证名'] == 5].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c5, np.round(c5 / xinzongbiao['证名'].value_counts()[5], 2)])

In [None]:
# 6.心火亢盛证 共53条
c6 = xinzongbiao[xinzongbiao['证名'] == 6].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c6, np.round(c6 / xinzongbiao['证名'].value_counts()[6], 2)])

In [None]:
# 7.痰蒙心神证 共112条
c7 = xinzongbiao[xinzongbiao['证名'] == 7].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c7, np.round(c7 / xinzongbiao['证名'].value_counts()[7], 2)])

In [None]:
# 8.痰阻脑络证 32条
c8 = xinzongbiao[xinzongbiao['证名'] == 8].drop(columns='证名').reset_index(drop=True).sum().sort_values(
    ascending=False)
pd.DataFrame(data=[c8, np.round(c8 / xinzongbiao['证名'].value_counts()[8], 2)])

In [None]:
# 心气血和心血瘀阻
import collections
from collections import Counter
from datetime import datetime
from pprint import pprint
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd
from sklearn.preprocessing import StandardScaler

optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif
id2feature = readJSON('./input/id2feature.json')
plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
sns.set(font='SimHei', font_scale=0.8)  # 解决Seaborn中文显示问题
logger.add('./log/{time}.log')
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
y.reset_index(drop=True,inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LSVC = False, False, False, True
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LSVC:
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel
    import category_encoders as ce
    # encoder = ce.TargetEncoder()
    encoder = ce.CatBoostEncoder()
    X_ce = encoder.fit_transform(X,y)
    scaler = StandardScaler()
    X_ce = scaler.fit_transform(X, y)
    X_ce = pd.DataFrame(X, columns=scaler.feature_names_in_)
    lsvc = LinearSVC(C=0.009, penalty='l2', dual=False, random_state=64).fit(X_ce, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X_ce)
    col = [c for c, i in zip(X_ce.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LASSO 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 0]  # 621
xxyz = data[data['证名'] == 4]  # 547
# xqx = xqx.sample(180, random_state=64)
# xxyz = xxyz.sample(120, random_state=64)
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).sample(frac=1).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']

# TSNE
tsne = TSNE(n_components=2, init='pca', random_state=64)
X_tsne = tsne.fit_transform(X)
plt.figure(figsize=(4, 4), dpi=200)
for i in range(0, 2):
    plt.scatter(X_tsne[y == i][:, 0], X_tsne[y == i][:, 1], cmap=plt.cm.Set1(i), marker=i, label=str(i))
plt.legend()
plt.title('心气虚和心血瘀阻TSNE')
plt.show()

# 数据平衡
USE_randomDownSample, USE_Tomek_links, USE_ADASYN, USE_randomOverSample, USE_SMOTE, USE_SMOTETomek = False, False, False, False, False, True
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=64)
    param = {
        'verbosity': 0,
        'eval_metric': 'logloss',
        'objective': 'binary:logistic',
        # 'n_estimators':trial.suggest_int('n_estimators',3500,3600),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'grow_policy': trial.suggest_categorical("grow_policy", ['depthwise', 'lossguide']),
        'learning_rate': trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        'tree_method': 'exact',
        # 'booster': 'dart',
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 1.0, log=True),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        # 'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'random_state': 42
    }
    model = xgb.XGBClassifier(**param).fit(train_x, train_y)
    preds = model.predict(valid_x)
    return accuracy_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
# logger.info(study.best_value)
logger.info(study.best_params)
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=64)
modelcv = xgb.XGBClassifier(**study.best_params)
# modelcv.save_model(f'./output/model/{datetime.now()}.json')
from sklearn.model_selection import cross_validate

cv = cross_validate(modelcv, X_resampled, y_resampled, scoring=['accuracy', 'precision', 'recall', 'f1'], cv=kf)
logger.warning(cv)
for train_index, test_index in kf.split(X_resampled, y_resampled):
    # print(Counter(y_resampled.loc[train_index]),Counter(y_resampled.loc[test_index]))
    model = xgb.XGBClassifier(**study.best_trial.params).fit(X_resampled.loc[train_index], y_resampled.loc[train_index])
    preds = model.predict(X_resampled.loc[test_index])
    accuracy = accuracy_score(y_resampled[test_index], preds)
    Accuracy.append(accuracy)
    precision = precision_score(y_resampled[test_index], preds)
    Precision.append(precision)
    recall = recall_score(y_resampled[test_index], preds)
    Recall.append(recall)
    f1 = f1_score(y_resampled[test_index], preds)
    F1.append(f1)
    logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
                   f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
    explainer = shap.TreeExplainer(model)
    shap_values_XGBoost_train = explainer.shap_values(X_resampled.loc[train_index])
    shap.summary_plot(shap_values_XGBoost_train, X_resampled.iloc[train_index])
logger.warning(f'accuracy\t\tmean:{round(np.mean(Accuracy), 3)}\tstd:{round(np.std(Accuracy), 3)}')
logger.warning(f'precision\t\tmean:{round(np.mean(Precision), 3)}\tstd:{round(np.std(Precision), 3)}')
logger.warning(f'recall\t\tmean:{round(np.mean(Recall), 3)}\tstd:{round(np.std(Recall), 3)}')
logger.warning(f'f1\t\tmean:{round(np.mean(F1), 3)}\tstd:{round(np.std(F1), 3)}')

In [None]:
# 心阴虚和心阳虚
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LASSO = False, False, False, True
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=50)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LASSO:
    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LASSO 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 2]  # 心阴虚184
xxyz = data[data['证名'] == 3]  # 心阳虚121
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']
tsne = TSNE(n_components=2, init='pca', random_state=64)
X_tsne = tsne.fit_transform(X)
plt.figure(figsize=(4, 4), dpi=200)
for i in range(0, 2):
    plt.scatter(X_tsne[y == i][:, 0], X_tsne[y == i][:, 1], cmap=plt.cm.Set1(i), marker=i, label=str(i))
plt.legend()
plt.title('心阴虚和心阳虚TSNE')
plt.show()
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')
X

In [10]:
# 心阴虚和心阳虚
import collections
from collections import Counter
import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd
from sklearn.preprocessing import StandardScaler

optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif
from sklearn.manifold import TSNE

plt.rcParams["font.sans-serif"] = ["SimHei"]  #设置字体
plt.rcParams["axes.unicode_minus"] = False
id2feature = readJSON('./input/id2feature.json')
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LASSO = False, False, False, False
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=40)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LASSO:
    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LASSO 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 2]  # 心阴虚184
xxyz = data[data['证名'] == 3]  # 心阳虚121
# xqx = xqx.sample(120, random_state=1024)
# xxyz = xxyz.sample(100, random_state=1024)
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).reset_index(drop=True)
tmp = tmp.sample(frac=1)
X = tmp.drop(columns='证名')
y = tmp['证名']
# tsne = TSNE(n_components=2, init='pca', random_state=64)
# X_tsne = tsne.fit_transform(X)
# plt.figure(figsize=(4, 4), dpi=200)
# for i in range(0, 2):
#     plt.scatter(X_tsne[y == i][:, 0], X_tsne[y == i][:, 1], cmap=plt.cm.Set1(i), marker=i, label=str(i))
# plt.legend()
# plt.title('心阴虚和心阳虚TSNE')
# plt.show()
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=64)
    dtrain = xgb.DMatrix(train_x, label=train_y)
    dvalid = xgb.DMatrix(valid_x, label=valid_y)
    param = {
        'verbosity': 0,
        'objective': 'binary:logistic',
        'tree_method': 'exact',
        'booster': 'gbtree',
        # 'enable_categorical':True,
        'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'min_child_weight': trial.suggest_int("min_child_weight", 2, 10),
        'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'grow_policy': trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
    }
    bst = xgb.train(param, dtrain)
    preds = bst.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    return precision_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
for train_index, test_index in kf.split(X_resampled, y_resampled):
    dtrain = xgb.DMatrix(X_resampled.loc[train_index], label=y_resampled.loc[train_index])
    dvalid = xgb.DMatrix(X_resampled.loc[test_index], label=y_resampled.loc[test_index])
    model = xgb.train(study.best_trial.params, dtrain)
    preds = model.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    accuracy = accuracy_score(y_resampled.loc[test_index], preds)
    Accuracy.append(accuracy)
    precision = precision_score(y_resampled.loc[test_index], preds)
    Precision.append(precision)
    recall = recall_score(y_resampled.loc[test_index], preds)
    Recall.append(recall)
    f1 = f1_score(y_resampled.loc[test_index], preds)
    F1.append(f1)
    logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
                   f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
    # explainer = shap.TreeExplainer(model)
    # shap_values_XGBoost_train = explainer.shap_values(X_resampled.loc[train_index])
    # shap.summary_plot(shap_values_XGBoost_train, X_resampled.iloc[train_index])
logger.warning(f'accuracy\t\tmean:{round(np.mean(Accuracy), 3)}\tstd:{round(np.std(Accuracy), 3)}')
logger.warning(f'precision\t\tmean:{round(np.mean(Precision), 3)}\tstd:{round(np.std(Precision), 3)}')
logger.warning(f'recall\t\tmean:{round(np.mean(Recall), 3)}\tstd:{round(np.std(Recall), 3)}')
logger.warning(f'f1\t\tmean:{round(np.mean(F1), 3)}\tstd:{round(np.std(F1), 3)}')

2022-10-09 14:40:47.352 | INFO     | __main__:<cell line: 89>:105 - 使用 SMOTETomek 混合采样
2022-10-09 14:40:47.368 | INFO     | __main__:<cell line: 110>:110 - 采样前[(0, 184), (1, 121)],采样后[(0, 177), (1, 177)]


In [None]:
# 心阴虚和心血瘀阻
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
else:
    logger.info('不进行特征选择')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 2]  # 心阴虚184
xxyz = data[data['证名'] == 4]  # 心血瘀阻547
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=64)
    dtrain = xgb.DMatrix(train_x, label=train_y)
    dvalid = xgb.DMatrix(valid_x, label=valid_y)
    param = {
        'verbosity': 0,
        'objective': 'binary:logistic',
        'tree_method': 'exact',
        'booster': 'gbtree',
        # 'enable_categorical':True,
        'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'min_child_weight': trial.suggest_int("min_child_weight", 2, 10),
        'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'grow_policy': trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
    }
    bst = xgb.train(param, dtrain)
    preds = bst.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    return precision_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
for train_index, test_index in kf.split(X_resampled, y_resampled):
    dtrain = xgb.DMatrix(X_resampled.loc[train_index], label=y_resampled.loc[train_index])
    dvalid = xgb.DMatrix(X_resampled.loc[test_index], label=y_resampled.loc[test_index])
    model = xgb.train(study.best_trial.params, dtrain)
    preds = model.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    accuracy = accuracy_score(y_resampled.loc[test_index], preds)
    Accuracy.append(accuracy)
    precision = precision_score(y_resampled.loc[test_index], preds)
    Precision.append(precision)
    recall = recall_score(y_resampled.loc[test_index], preds)
    Recall.append(recall)
    f1 = f1_score(y_resampled.loc[test_index], preds)
    F1.append(f1)
    logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
                   f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
    explainer = shap.TreeExplainer(model)
    shap_values_XGBoost_train = explainer.shap_values(X_resampled.loc[train_index])
    shap.summary_plot(shap_values_XGBoost_train, X_resampled.iloc[train_index])
logger.warning(f'均值：\t{round(np.mean(Accuracy), 3)}\t{round(np.mean(Precision), 3)}\t'
               f'{round(np.mean(Recall), 3)}\t{round(np.mean(F1), 3)}')

In [None]:
# 心气虚和心阴虚
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LASSO = False, False, False, True
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LASSO:
    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LASSO 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 0]  # 心气血621
xxyz = data[data['证名'] == 2]  # 心阴虚184
xqx['证名'] = 0
xxyz['证名'] = 1
xqx = xqx.sample(150, random_state=1024)
tmp = pd.concat([xqx, xxyz], axis=0).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']

# TSNE
# tsne = TSNE(n_components=2, init='pca', random_state=64)
# X_tsne = tsne.fit_transform(X)
# plt.figure(figsize=(4, 4), dpi=200)
# for i in range(0, 2):
#     plt.scatter(X_tsne[y == i][:, 0], X_tsne[y == i][:, 1], cmap=plt.cm.Set1(i), marker=i, label=str(i))
# plt.legend()
# plt.title('心气虚和心阴虚TSNE')
# plt.show()

# 数据平衡
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
tsne = TSNE(n_components=2, init='pca', random_state=64)

X_tsne = tsne.fit_transform(X_resampled)
plt.figure(figsize=(4, 4), dpi=200)
for i in range(0, 2):
    plt.scatter(X_tsne[y_resampled == i][:, 0], X_tsne[y_resampled == i][:, 1], cmap=plt.cm.Set1(i), marker=i,
                label=str(i))
plt.legend()
plt.title('心气虚和心阴虚TSNE')
plt.show()

logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=64)
    dtrain = xgb.DMatrix(train_x, label=train_y)
    dvalid = xgb.DMatrix(valid_x, label=valid_y)
    param = {
        'verbosity': 0,
        'objective': 'binary:logistic',
        'tree_method': 'exact',
        'booster': 'gbtree',
        # 'enable_categorical':True,
        'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'min_child_weight': trial.suggest_int("min_child_weight", 2, 10),
        'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'grow_policy': trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
    }
    bst = xgb.train(param, dtrain)
    preds = bst.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    return precision_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
for train_index, test_index in kf.split(X_resampled, y_resampled):
    dtrain = xgb.DMatrix(X_resampled.loc[train_index], label=y_resampled.loc[train_index])
    dvalid = xgb.DMatrix(X_resampled.loc[test_index], label=y_resampled.loc[test_index])
    model = xgb.train(study.best_trial.params, dtrain)
    preds = model.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    accuracy = accuracy_score(y_resampled.loc[test_index], preds)
    Accuracy.append(accuracy)
    precision = precision_score(y_resampled.loc[test_index], preds)
    Precision.append(precision)
    recall = recall_score(y_resampled.loc[test_index], preds)
    Recall.append(recall)
    f1 = f1_score(y_resampled.loc[test_index], preds)
    F1.append(f1)
    logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
                   f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
    explainer = shap.TreeExplainer(model)
    shap_values_XGBoost_train = explainer.shap_values(X_resampled.loc[train_index])
    shap.summary_plot(shap_values_XGBoost_train, X_resampled.iloc[train_index])
logger.warning(f'均值：\taccuracy:{round(np.mean(Accuracy), 3)}\tprecision:{round(np.mean(Precision), 3)}\t'
               f'recall:{round(np.mean(Recall), 3)}\tf1:{round(np.mean(F1), 3)}')

In [None]:
# 心气虚和心阳虚
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
else:
    logger.info('不进行特征选择')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 3]  # 心阳虚121
xxyz = data[data['证名'] == 0]  # 心气血621
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=64)
    dtrain = xgb.DMatrix(train_x, label=train_y)
    dvalid = xgb.DMatrix(valid_x, label=valid_y)
    param = {
        'verbosity': 0,
        'objective': 'binary:logistic',
        'tree_method': 'exact',
        'booster': 'gbtree',
        # 'enable_categorical':True,
        'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'min_child_weight': trial.suggest_int("min_child_weight", 2, 10),
        'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'grow_policy': trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
    }
    bst = xgb.train(param, dtrain)
    preds = bst.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    return precision_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
for train_index, test_index in kf.split(X_resampled, y_resampled):
    dtrain = xgb.DMatrix(X_resampled.loc[train_index], label=y_resampled.loc[train_index])
    dvalid = xgb.DMatrix(X_resampled.loc[test_index], label=y_resampled.loc[test_index])
    model = xgb.train(study.best_trial.params, dtrain)
    preds = model.predict(dvalid)
    preds[preds >= 0.5] = 1
    preds[preds < 0.5] = 0
    accuracy = accuracy_score(y_resampled.loc[test_index], preds)
    Accuracy.append(accuracy)
    precision = precision_score(y_resampled.loc[test_index], preds)
    Precision.append(precision)
    recall = recall_score(y_resampled.loc[test_index], preds)
    Recall.append(recall)
    f1 = f1_score(y_resampled.loc[test_index], preds)
    F1.append(f1)
    logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
                   f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
    explainer = shap.TreeExplainer(model)
    shap_values_XGBoost_train = explainer.shap_values(X_resampled.loc[train_index])
    shap.summary_plot(shap_values_XGBoost_train, X_resampled.iloc[train_index])
logger.warning(f'均值：\t{round(np.mean(Accuracy), 3)}\t{round(np.mean(Precision), 3)}\t'
               f'{round(np.mean(Recall), 3)}\t{round(np.mean(F1), 3)}')

In [None]:
import lightgbm as lgb
import sklearn
import collections
from collections import Counter
from datetime import datetime
from pprint import pprint

import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd
from sklearn.preprocessing import StandardScaler

optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif

plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
sns.set(font='SimHei', font_scale=0.8)  # 解决Seaborn中文显示问题
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
id2feature = readJSON('./input/id2feature.json')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LASSO = False, False, False, True
if USE_chi2:
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
    logger.info(f'使用 卡方检验 进行特诊筛选,剩余{X.shape[1]}个特征')
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
    logger.info(f'使用 卡方检验 进行特诊筛选,剩余{X.shape[1]}个特征')
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
    logger.info(f'使用 卡方检验 进行特诊筛选,剩余{X.shape[1]}个特征')
elif USE_LASSO:
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LASSO 进行特征选择,剩余{X.shape[1]}个特征')
# Accuracy = []
# Precision = []
# Recall = []
# F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 0]  # 621
xxyz = data[data['证名'] == 4]  # 547
# xqx= xqx.sample(180,random_state=64)
# xxyz = xxyz.sample(180,random_state=64)
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).sample(frac=1).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']


# tsne = TSNE(n_components=2,init='pca',random_state=64)
# X_tsne = tsne.fit_transform(X)
# plt.figure(figsize=(4,4),dpi=200)
# for i in range(0,2):
#     plt.scatter(X_tsne[y==i][:,0],X_tsne[y==i][:,1],cmap=plt.cm.Set1(i),marker=i,label=str(i))
# plt.legend()
# plt.show()
# if USE_randomDownSample:
#     logger.info('使用 randomDownSample 下采样')
#     sampler = RandomUnderSampler(random_state=64)
# elif USE_Tomek_links:
#     logger.info('使用 TomekLinks 下采样')
#     sampler = TomekLinks()
# elif USE_ADASYN:
#     logger.info('使用 ADASYN 上采样')
#     sampler = ADASYN(random_state=64)
# elif USE_randomOverSample:
#     logger.info('使用 randomOverSample 上采样')
#     sampler = RandomOverSampler(random_state=64)
# elif USE_SMOTE:
#     logger.info('使用 SMOTE 上采样')
#     sampler = SMOTE(random_state=64)
# elif USE_SMOTETomek:
#     logger.info('使用 SMOTETomek 混合采样')
#     sampler = SMOTETomek(random_state=64)
# else:
#     assert False, '没有平衡数据'
# X_resampled, y_resampled = sampler.fit_resample(X, y)
# logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')
def objective(trial):
    train_x, test_x, train_y, test_y = train_test_split(X, y, test_size=0.25, random_state=1024)
    dtrain = lgb.Dataset(train_x, label=train_y)
    param = {
        'silent': True,
        'objective': 'binary',
        'metric': 'binary_logloss',
        'lambda_l1': trial.suggest_loguniform('lambda_l1', 1e-8, 10.0),
        'lambda_l2': trial.suggest_loguniform('lambda_l2', 1e-8, 10.0),
        'num_leaves': trial.suggest_int('num_leaves', 2, 256),
        'feature_fraction': trial.suggest_uniform('feature_fraction', 0.4, 1.0),
        'bagging_fraction': trial.suggest_uniform('bagging_fraction', 0.4, 1.0),
        'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
    }
    gbm = lgb.LGBMClassifier(**param)
    gbm.fit(train_x, train_y)
    preds = gbm.predict(test_x)
    # print(Counter(preds))
    # pred_labels = np.rint(preds)
    accuracy = sklearn.metrics.accuracy_score(test_y, preds)
    return accuracy


study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=500)

print('Number of finished trials:', len(study.trials))
print('Best trial:', study.best_trial.params)
print(study.best_value)
# kf = StratifiedKFold(n_splits=10,shuffle=True,random_state=64)
# modelcv = lgb.LGBMClassifier(**study.best_params)
# # modelcv.save_model(f'./output/model/{datetime.now()}.json')
# from sklearn.model_selection import cross_validate
# cv = cross_validate(modelcv,X,y,scoring=['accuracy','precision','recall','f1'],cv=kf)
# pprint(cv)
# modelkf = lgb.LGBMClassifier(**study.best_trial.params)
# for train_index, test_index in kf.split(X, y):
#     # print(Counter(y_resampled.loc[train_index]),Counter(y_resampled.loc[test_index]))
#     model =modelkf.fit(X.loc[train_index],y.loc[train_index])
#     preds = model.predict(X.loc[test_index])
#     accuracy = accuracy_score(y[test_index], preds)
#     Accuracy.append(accuracy)
#     precision = precision_score(y[test_index], preds)
#     Precision.append(precision)
#     recall = recall_score(y[test_index], preds)
#     Recall.append(recall)
#     f1 = f1_score(y[test_index], preds)
#     F1.append(f1)
#     logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
#                    f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
#     explainer = shap.TreeExplainer(model)
#     shap_values_XGBoost_train = explainer.shap_values(X.loc[train_index])
#     shap.summary_plot(shap_values_XGBoost_train, X.iloc[train_index])
# logger.warning(f'accuracy\t\tmean:{round(np.mean(Accuracy), 3)}\tstd:{round(np.std(Accuracy), 3)}')
# logger.warning(f'precision\t\tmean:{round(np.mean(Precision), 3)}\tstd:{round(np.std(Precision), 3)}')
# logger.warning(f'recall\t\tmean:{round(np.mean(Recall), 3)}\tstd:{round(np.std(Recall), 3)}')
# logger.warning(f'f1\t\tmean:{round(np.mean(F1), 3)}\tstd:{round(np.std(F1), 3)}')

In [None]:
# MLPClassifier
from sklearn.svm import SVC
import collections
from collections import Counter
from datetime import datetime
from pprint import pprint

import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier

optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif

plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
sns.set(font='SimHei', font_scale=0.8)  # 解决Seaborn中文显示问题
logger.add('./log/{time}.log')
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LSVC = False, False, False, True
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LSVC:
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LSVC 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 0]  # 621
xxyz = data[data['证名'] == 4]  # 547
xqx = xqx.sample(300, random_state=64)
xxyz = xxyz.sample(300, random_state=64)
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).sample(frac=1).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']

# TSNE
# tsne = TSNE(n_components=2,init='pca',random_state=64)
# X_tsne = tsne.fit_transform(X)
# plt.figure(figsize=(4,4),dpi=200)
# for i in range(0,2):
#     plt.scatter(X_tsne[y==i][:,0],X_tsne[y==i][:,1],cmap=plt.cm.Set1(i),marker=i,label=str(i))
# plt.legend()
# plt.title('心气虚和心血瘀阻TSNE')
# plt.show()

# 数据平衡
USE_randomDownSample, USE_Tomek_links, USE_ADASYN, USE_randomOverSample, USE_SMOTE, USE_SMOTETomek = False, False, False, False, False, True
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, test_x, train_y, test_y = train_test_split(X, y, test_size=0.25)
    param = {
        'hidden_layer_sizes': (100, 100, 100),
        'activation': trial.suggest_categorical('activation', ['identity', 'logistic', 'tanh', 'relu']),
        'solver': trial.suggest_categorical('solver', ['lbfgs', 'sgd', 'adam']),
        'alpha': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'random_state': 1024
    }
    clf = MLPClassifier(**param)
    clf.fit(train_x, train_y)
    preds = clf.predict(test_x)
    return accuracy_score(test_y, preds)


study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=400)
print(study.best_params)
print(study.best_value)

In [None]:
from sklearn.svm import SVC
import collections
from collections import Counter
from datetime import datetime
from pprint import pprint

import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd
from sklearn.preprocessing import StandardScaler

optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif

plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
sns.set(font='SimHei', font_scale=0.8)  # 解决Seaborn中文显示问题
logger.add('./log/{time}.log')
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LSVC = False, False, False, True
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LSVC:
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LSVC 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 0]  # 621
xxyz = data[data['证名'] == 4]  # 547
# xqx= xqx.sample(180,random_state=64)
# xxyz = xxyz.sample(120,random_state=64)
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).sample(frac=1).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']

# TSNE
# tsne = TSNE(n_components=2,init='pca',random_state=64)
# X_tsne = tsne.fit_transform(X)
# plt.figure(figsize=(4,4),dpi=200)
# for i in range(0,2):
#     plt.scatter(X_tsne[y==i][:,0],X_tsne[y==i][:,1],cmap=plt.cm.Set1(i),marker=i,label=str(i))
# plt.legend()
# plt.title('心气虚和心血瘀阻TSNE')
# plt.show()

# 数据平衡
USE_randomDownSample, USE_Tomek_links, USE_ADASYN, USE_randomOverSample, USE_SMOTE, USE_SMOTETomek = False, False, False, False, False, True
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.1, random_state=1024)
    param = {
        'C': trial.suggest_float('C', 0, 1),
        'kernel': trial.suggest_categorical('kernel', ['linear', 'poly', 'rbf', 'sigmoid']),
        'gamma': trial.suggest_categorical('gamma', ['scale', 'auto'])
    }
    model = SVC(**param)
    model.fit(train_X, train_y)
    preds = model.predict(test_X)
    return accuracy_score(test_y, preds)


study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=200)
print(study.best_params)
print(study.best_value)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder, StandardScaler
import pandas as pd
import warnings
from sklearn.model_selection import train_test_split

warnings.filterwarnings('ignore')
n_epoch = 50
batch_size = 20


def preprocess(path, sheet_name):
    """
    对心总病数据进行预处理
    :param path: 心总病文件路径
    :param sheet_name: sheet的名称
    :return: X,y
    """

    xzb = pd.read_excel(path, sheet_name=sheet_name)
    xzb.drop(columns=['病案号'], inplace=True)
    xzb.drop(labels=[1742, 1741], axis=0, inplace=True)
    xzb.drop(labels=xzb[xzb['性别'].isna()].index, inplace=True)
    xzb = xzb.sample(frac=1).astype(int)
    y = xzb['证名']
    X = xzb.drop(labels=['证名', '性别', '年龄'], axis=1)

    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)

    le = LabelEncoder()
    y = le.fit_transform(y)
    data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
    xqx = data[data['证名'] == 0]  # 621
    xxyz = data[data['证名'] == 4]  # 547
    xqx['证名'] = 0
    xxyz['证名'] = 1
    tmp = pd.concat([xqx, xxyz], axis=0).sample(frac=1).reset_index(drop=True)
    X = tmp.drop(columns='证名')
    y = tmp['证名']
    return X, y


X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
print(X.shape)


class XZB(Dataset):
    def __init__(self, X, y, train=True):
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=1024)
        if train:
            X_train.reset_index(drop=True, inplace=True)
            y_train.reset_index(drop=True, inplace=True)
            self.data, self.labels = X_train, y_train
        else:
            X_test.reset_index(drop=True, inplace=True)
            y_test.reset_index(drop=True, inplace=True)
            self.data, self.labels = X_test, y_test

    def __len__(self):
        # print(len(self.labels),self.labels)
        return len(self.labels) - 1

    def __getitem__(self, index):
        return self.data.loc[index].values, self.labels.loc[index]


class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(73, 40)
        self.fc2 = nn.Linear(40, 20)
        self.fc3 = nn.Linear(20, 2)

    def forward(self, input):
        out = F.relu(self.fc1(input.float()))
        out = F.relu(self.fc2(out))
        out = F.relu(self.fc3(out))
        return F.softmax(self.fc3(out), dim=1)


model = MLP()

train_dataset = XZB(X, y, train=True)
test_dataset = XZB(X, y, train=False)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)
lossfunc = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(), lr=0.01)
for epoch in range(n_epoch):
    train_loss = 0.0
    for data, target in train_dataloader:
        optimizer.zero_grad()
        output = model(data)
        loss = lossfunc(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.size(0)
    train_loss = train_loss / len(train_dataloader.dataset)
    print('Epoch:  {}  \tTraining Loss: {:.6f}'.format(epoch + 1, train_loss))
    correct, total = 0, 0
    with torch.no_grad():
        for data, labels in test_dataloader:
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print('Accuracy of the network on the test images: %d %%' % (
            100 * correct / total))

In [None]:
# 心阳虚和痰蒙心神证
import collections
from collections import Counter
from datetime import datetime
from pprint import pprint

import matplotlib.pyplot as plt
from imblearn.combine import SMOTETomek
import warnings

warnings.filterwarnings('ignore')
import shap
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import xgboost as xgb
import optuna
from utils import readJSON, preprocess
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
from loguru import logger
from imblearn.over_sampling import SMOTE, RandomOverSampler, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.manifold import TSNE
optuna.logging.set_verbosity(optuna.logging.CRITICAL)
from sklearn.feature_selection import chi2, SelectKBest, f_classif, mutual_info_classif
id2feature = readJSON('./input/id2feature.json')
plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
sns.set(font='SimHei', font_scale=0.8)  # 解决Seaborn中文显示问题
logger.add('./log/{time}.log')
X, y = preprocess(path='./input/心总表.xlsx', sheet_name='总表')
X.columns = id2feature.values()
X.reset_index(drop=True, inplace=True)
USE_chi2, USE_f_classif, USE_mutual_info_classif, USE_LSVC = False, False, False, True
if USE_chi2:
    logger.info('使用 卡方检验 进行特诊筛选')
    chi2_model = SelectKBest(chi2, k=80)
    X = pd.DataFrame(chi2_model.fit_transform(X, y), columns=chi2_model.get_feature_names_out())
elif USE_f_classif:
    logger.info('使用 F检验 进行特征筛选')
    f_classif_model = SelectKBest(f_classif, k=80)
    X = pd.DataFrame(f_classif_model.fit_transform(X, y), columns=f_classif_model.get_feature_names_out())
elif USE_mutual_info_classif:
    logger.info('使用 互信息法 进行特征筛选')
    mutual_info_classif_model = SelectKBest(mutual_info_classif, k=80)
    X = pd.DataFrame(mutual_info_classif_model.fit_transform(X, y),
                     columns=mutual_info_classif_model.get_feature_names_out())
elif USE_LSVC:
    from sklearn.svm import LinearSVC
    from sklearn.feature_selection import SelectFromModel

    scaler = StandardScaler()
    X = scaler.fit_transform(X, y)
    X = pd.DataFrame(X, columns=scaler.feature_names_in_)
    lsvc = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=64).fit(X, y)
    model = SelectFromModel(lsvc, prefit=True)
    X_new = model.transform(X)
    col = [c for c, i in zip(X.columns, model.get_support()) if not i]
    X.drop(columns=col, inplace=True)
    logger.info(f'使用 LASSO 进行特征选择,剩余{X.shape[1]}个特征')
Accuracy = []
Precision = []
Recall = []
F1 = []
le = LabelEncoder()
y = le.fit_transform(y)
data = pd.concat([pd.DataFrame(X), pd.DataFrame(y, columns=['证名'])], axis=1)
xqx = data[data['证名'] == 3]  # 心阳虚
xxyz = data[data['证名'] == 6]  # 心血瘀阻
# xqx = xqx.sample(180, random_state=64)
# xxyz = xxyz.sample(120, random_state=64)
xqx['证名'] = 0
xxyz['证名'] = 1
tmp = pd.concat([xqx, xxyz], axis=0).sample(frac=1).reset_index(drop=True)
X = tmp.drop(columns='证名')
y = tmp['证名']

# TSNE
tsne = TSNE(n_components=2, init='pca', random_state=64)
X_tsne = tsne.fit_transform(X)
plt.figure(figsize=(4, 4), dpi=200)
for i in range(0, 2):
    plt.scatter(X_tsne[y == i][:, 0], X_tsne[y == i][:, 1], cmap=plt.cm.Set1(i), marker=i, label=str(i))
plt.legend()
plt.title('心阳虚和痰蒙心神TSNE')
plt.show()

# 数据平衡
USE_randomDownSample, USE_Tomek_links, USE_ADASYN, USE_randomOverSample, USE_SMOTE, USE_SMOTETomek = False, False, False, False, False, True
if USE_randomDownSample:
    logger.info('使用 randomDownSample 下采样')
    sampler = RandomUnderSampler(random_state=64)
elif USE_Tomek_links:
    logger.info('使用 TomekLinks 下采样')
    sampler = TomekLinks()
elif USE_ADASYN:
    logger.info('使用 ADASYN 上采样')
    sampler = ADASYN(random_state=64)
elif USE_randomOverSample:
    logger.info('使用 randomOverSample 上采样')
    sampler = RandomOverSampler(random_state=64)
elif USE_SMOTE:
    logger.info('使用 SMOTE 上采样')
    sampler = SMOTE(random_state=64)
elif USE_SMOTETomek:
    logger.info('使用 SMOTETomek 混合采样')
    sampler = SMOTETomek(random_state=64)
else:
    assert False, '没有平衡数据'
X_resampled, y_resampled = sampler.fit_resample(X, y)
logger.info(f'采样前{list(Counter(y).items())},采样后{list(Counter(y_resampled).items())}')


def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=64)
    param = {
        'verbosity': 0,
        'eval_metric': 'logloss',
        'objective': 'binary:logistic',
        # 'n_estimators':trial.suggest_int('n_estimators',3500,3600),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'grow_policy': trial.suggest_categorical("grow_policy", ['depthwise', 'lossguide']),
        'learning_rate': trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        'tree_method': 'exact',
        'booster': 'dart',
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 1.0, log=True),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        # 'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'random_state': 42
    }
    model = xgb.XGBClassifier(**param).fit(train_x, train_y)
    preds = model.predict(valid_x)
    return accuracy_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
# logger.info(study.best_value)
logger.info(study.best_params)
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=64)
modelcv = xgb.XGBClassifier(**study.best_params)
# modelcv.save_model(f'./output/model/{datetime.now()}.json')
from sklearn.model_selection import cross_validate

cv = cross_validate(modelcv, X_resampled, y_resampled, scoring=['accuracy', 'precision', 'recall', 'f1'], cv=kf)
logger.warning(cv)
for train_index, test_index in kf.split(X_resampled, y_resampled):
    # print(Counter(y_resampled.loc[train_index]),Counter(y_resampled.loc[test_index]))
    model = xgb.XGBClassifier(**study.best_trial.params).fit(X_resampled.loc[train_index], y_resampled.loc[train_index])
    preds = model.predict(X_resampled.loc[test_index])
    accuracy = accuracy_score(y_resampled[test_index], preds)
    Accuracy.append(accuracy)
    precision = precision_score(y_resampled[test_index], preds)
    Precision.append(precision)
    recall = recall_score(y_resampled[test_index], preds)
    Recall.append(recall)
    f1 = f1_score(y_resampled[test_index], preds)
    F1.append(f1)
    logger.warning(f'{round(np.mean(accuracy), 3)}\t{round(np.mean(precision), 3)}\t'
                   f'{round(np.mean(recall), 3)}\t{round(np.mean(f1), 3)}')
    explainer = shap.TreeExplainer(model)
    shap_values_XGBoost_train = explainer.shap_values(X_resampled.loc[train_index])
    shap.summary_plot(shap_values_XGBoost_train, X_resampled.iloc[train_index])
logger.warning(f'accuracy\t\tmean:{round(np.mean(Accuracy), 3)}\tstd:{round(np.std(Accuracy), 3)}')
logger.warning(f'precision\t\tmean:{round(np.mean(Precision), 3)}\tstd:{round(np.std(Precision), 3)}')
logger.warning(f'recall\t\tmean:{round(np.mean(Recall), 3)}\tstd:{round(np.std(Recall), 3)}')
logger.warning(f'f1\t\tmean:{round(np.mean(F1), 3)}\tstd:{round(np.std(F1), 3)}')

In [None]:
import pandas as pd
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import warnings
import optuna
import xgboost as xgb
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split,StratifiedKFold
from collections import Counter
warnings.filterwarnings('ignore')
heart = pd.read_csv('./input/heart/heart.csv')
X = heart.drop(columns='target')
y = heart['target']
print(Counter(y))
# TSNE
plt.rcParams['font.sans-serif'] = ['SimHei']  # 黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决无法显示符号的问题
tsne = TSNE(n_components=2,init='pca',random_state=64)
X_tsne = tsne.fit_transform(X)
plt.figure(figsize=(4,4),dpi=200)
for i in range(0,2):
    plt.scatter(X_tsne[y==i][:,0],X_tsne[y==i][:,1],cmap=plt.cm.Set1(i),marker=i,label=str(i))
plt.legend()
plt.title('心脏病')
plt.show()
def objective(trial):
    train_x, valid_x, train_y, valid_y = train_test_split(X, y, test_size=0.1, random_state=64)
    param = {
        'verbosity': 0,
        'eval_metric': 'logloss',
        'objective': 'binary:logistic',
        # 'n_estimators':trial.suggest_int('n_estimators',3500,3600),
        'max_depth': trial.suggest_int("max_depth", 3, 12, step=1),
        'grow_policy': trial.suggest_categorical("grow_policy", ['depthwise', 'lossguide']),
        'learning_rate': trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        'tree_method': 'exact',
        'booster': 'dart',
        'gamma': trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 1.0, log=True),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 1.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.2, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0),
        # 'eta': trial.suggest_float("eta", 1e-8, 1.0, log=True),
        'random_state': 42
    }
    model = xgb.XGBClassifier(**param).fit(train_x, train_y)
    preds = model.predict(valid_x)
    return accuracy_score(valid_y, preds)


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)
print(study.best_value)
kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=64)
modelcv = xgb.XGBClassifier(**study.best_params)
# modelcv.save_model(f'./output/model/{datetime.now()}.json')
from sklearn.model_selection import cross_validate

cv = cross_validate(modelcv, X, y, scoring=['accuracy', 'precision', 'recall', 'f1'], cv=kf)
print(cv)