In [None]:
from sklearn import svm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import xgboost as xgb
from scipy import stats
import shap
import seaborn as sns

def SHAP(bst, X_train_df):
    # SHAP値を計算
    explainer = shap.TreeExplainer(bst)
    shap_values = explainer.shap_values(X_train_df)
    
    # SHAP値をプロット
    shap.summary_plot(shap_values, X_train_df, max_display=X_train_df.shape[1])
    return shap_values

def BFI(bst):
    # ビルトインのフィーチャーインポータンスを取得
    feature_importances = bst.get_score(importance_type='weight')

    # 特徴量の名前とインポータンスをプリント
    for key, value in feature_importances.items():
        print(f"Feature: {key}, Importance: {value}")

    # フィーチャーインポータンスをプロット
    xgb.plot_importance(bst)
    plt.show()
    return

def XGBoost(X, y):
    # データを訓練用とテスト用に分割
    X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

    # 訓練データをさらに訓練用と検証用に分割
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=0)

    # モデルのパラメータを設定(CPU)
    params = {
        'objective': 'binary:logistic',
        'eval_metric': 'logloss',
        'eta': 0.1,
        'max_depth': 4,
        'seed': 0
    }
    
    """   # モデルのパラメータを設定(GPU)
    params = {
        'objective': 'multi:softmax',
        'eval_metric': 'mlogloss',
        'num_class' : 4,
        'eta': 0.1,
        'max_depth': 12,
        'seed': 0,
        'tree_method': 'gpu_hist',
        'predictor': 'gpu_predictor'
    }"""

    # データをpandas.DataFrame形式で保存
    X_train_df = pd.DataFrame(X_train, columns=column_names)

    # 訓練データと検証データをXGBoostのDMatrix形式に変換
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dval = xgb.DMatrix(X_val, label=y_val)

    # 訓練データと検証データのセットをリストに格納
    evals = [(dtrain, 'train'), (dval, 'eval')]

    # モデルを訓練
    bst = xgb.train(params, dtrain, num_boost_round=10000, evals=evals, early_stopping_rounds=100)

    # テストデータをDMatrix形式に変換
    dtest = xgb.DMatrix(X_test)

    # 訓練データで予測
    y_train_pred = bst.predict(dtrain)

    # テストデータで予測
    y_pred = bst.predict(dtest)

    # 評価
    train_accuracy = accuracy_score(y_train, y_train_pred.round())
    print(f"Training Accuracy: {train_accuracy*100:.2f}%")

    # 評価
    test_accuracy = accuracy_score(y_test, y_pred.round())
    print(f"Test Accuracy: {test_accuracy*100:.2f}%")

    #SHAP(bst, X_train_df)
    # SHAP値を計算
    explainer = shap.TreeExplainer(bst)
    shap_values = explainer.shap_values(X_train_df)
    
    # SHAP値をプロット
    shap.summary_plot(shap_values, X_train_df, max_display=X_train_df.shape[1])
    
    #BFI(bst)
    return shap_values, y_train_pred, X_train_df, explainer

def normalize_per_state(df, start, end):
    scaler = StandardScaler()
    df.iloc[start:end, :] = scaler.fit_transform(df.iloc[start:end, :])
    return df


In [None]:
raw = 0 #rawデータならば１、Bandなら０,

# 1秒ごとのデータに分割
n_samples_per_second = 256  # 256Hzのサンプリングレート
total_seconds = 10  # 全体の秒数

# データの読み込み
if(raw):
    df = pd.read_csv('Raw.csv')
    #df = pd.read_csv('Raw_ICA.csv')
else:
    #df = pd.read_csv('Band.csv')
    df = pd.read_csv('emotion.csv')


# チャンネルごとに正規化
#scaler = StandardScaler()
#for column in df.columns[:-1]:  # 'State'列を除くすべての列
#    df[column] = scaler.fit_transform(df[column].values.reshape(-1, 1))


In [None]:
# Hzごと
# 特徴量と目標変数を抽出する
X = df.drop('State', axis=1)  # 'State'以外の列すべてを特徴量とします
y = df['State']  # 'State'を目標変数とします

# 列名のリストを定義
column_names = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'AF4']
#XGBoost
#XGBoost(X, y)

In [None]:
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

# 訓練データをさらに訓練用と検証用に分割
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.3, random_state=0)
# モデルのパラメータを設定(CPU)

"""
params = {
'objective': 'binary:logistic',
'eval_metric': 'logloss',
'eta': 0.1,
'max_depth': 4,
'seed': 0
}
      
"""
# モデルのパラメータを設定(GPU)
params = {
'objective': 'multi:softmax',
'eval_metric': 'mlogloss',
'num_class' : 4,
'eta': 0.1,
'max_depth': 12,
'seed': 0,
'tree_method': 'gpu_hist',
'predictor': 'gpu_predictor'
}

# データをpandas.DataFrame形式で保存
X_train_df = pd.DataFrame(X_train, columns=column_names)

# 訓練データと検証データをXGBoostのDMatrix形式に変換
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)

# 訓練データと検証データのセットをリストに格納
evals = [(dtrain, 'train'), (dval, 'eval')]

# モデルを訓練
bst = xgb.train(params, dtrain, num_boost_round=10000, evals=evals, early_stopping_rounds=100)

# テストデータをDMatrix形式に変換
dtest = xgb.DMatrix(X_test)

# 訓練データで予測
y_train_pred = bst.predict(dtrain)

# テストデータで予測
y_pred = bst.predict(dtest)

# 評価
train_accuracy = accuracy_score(y_train, y_train_pred.round())
print(f"Training Accuracy: {train_accuracy*100:.2f}%")

# 評価
test_accuracy = accuracy_score(y_test, y_pred.round())
print(f"Test Accuracy: {test_accuracy*100:.2f}%")



In [None]:
#SHAP(bst, X_train_df)
# SHAP値を計算 (バッチ処理)
explainer = shap.TreeExplainer(bst)
batch_size = 64  # バッチサイズの設定
shap_values = []

y_train_df = pd.DataFrame(y_train, columns=column_names)
# バッチごとにSHAP値を計算
"""
for i in range(0, X_train_df.shape[0], batch_size): #X_train_df.shape[0]
    batch_start = i
    batch_end = min(i + batch_size, X_train_df.shape[0])
    batch = X_train_df.iloc[batch_start:batch_end]
    batch_y = y_train_df.iloc[batch_start:batch_end]
    #print(batch.shape)
    shap_values.append(explainer.shap_values(batch,batch_y,tree_limit=2,check_additivity=False))
"""
shap_values = explainer.shap_values(X,y,tree_limit=5,check_additivity=False)
#一つのリストに結合
#shap_values = np.concatenate(shap_values, axis=0)
    
# SHAP値をプロット
#shap.summary_plot(shap_values, X_train_df, max_display=X_train_df.shape[1])

In [None]:
shap.summary_plot(shap_values[2], X)

In [None]:
X100 = shap.utils.sample(X, 100)
explainer = shap.TreeExplainer(bst,X100)
shap = explainer(X)
shap.plots.waterfall(shap[0])

In [None]:
count_label = 4

#データフレームに変換
y_train_df = pd.DataFrame(y_train)
y_test_df = pd.DataFrame(y_test, columns = ["State"])
y_train_df = pd.DataFrame(y_train, columns = ["State"])

#該当する目的変数を持ったインデックスを格納するリスト
DataIndex = []
# ↑に対応するtrainデータのラベルインデックス
PredictedLabel_index = []
# ラベルごとのSHAP値
SHAP_EachLabel = []

for i in range(count_label):
    DataIndex.append(list(y_train_df[y_train_df["State"]==i].index))
    PredictedLabel_index.append(X_train.drop(DataIndex[i]))
    SHAP_EachLabel.append(explainer.shap_values(PredictedLabel_index[i]))

#各ｃｈの電圧値とSHAP値の線形グラフ
#shap.dependence_plot(ind = "O2", shap_values=shap_values_0_train, features=X_train_0 )

In [None]:
#各ｃｈの電圧値とSHAP値の相関係数
# 空のデータフレームを作成
correlation_df = pd.DataFrame(columns=['feature', 'correlation_0', 'correlation_1', 'correlation_2', 'correlation_3'])
data = []

# すべての特徴についてループ
for feature in X_train.columns:
    # 分類結果が0のときのSHAP値と特徴量の値を取得
    shap_values_0 = shap_values_0_train[:, X_train_0.columns.get_loc(feature)]
    feature_values_0 = X_train_0[feature]
    # 分類結果が1のときのSHAP値と特徴量の値を取得
    shap_values_1 = shap_values_1_train[:, X_train_1.columns.get_loc(feature)]
    feature_values_1 = X_train_1[feature]
    
    # NumPyのcorrcoef関数を用いて相関係数を計算
    correlation_coefficient_0 = np.corrcoef(shap_values_0, feature_values_0)[0, 1]
    correlation_coefficient_1 = np.corrcoef(shap_values_1, feature_values_1)[0, 1]
    
    # リストに相関係数を追加
    data.append({'feature': feature, 
                 'correlation_0': correlation_coefficient_0,
                 'correlation_1': correlation_coefficient_1})

# pandas.concatを用いてデータフレームを作成
correlation_df = pd.concat([correlation_df, pd.DataFrame(data)], ignore_index=True).T

# データフレームを表示
correlation_df