### ModelExplainerクラス shapライブラリで分類結果を分析します。

In [14]:
import shap
import transformers

class ModelExplainer():
    def __init__(self, model, tokenizer, labels):
        self.model = model
        self.tokenizer = tokenizer
        self.labels = labels
    
    def shap_explainer(self, features, answers, predicts):
        pred = transformers.pipeline("text-classification",model=self.model,tokenizer=self.tokenizer,device=0,return_all_scores=True)
        explainer = shap.Explainer(pred,output_names=self.labels)
        print("♦SHAP可視化結果")
        for feature, answer, predict in zip(features, answers, predicts):
            print(f"予測ラベル: {predict} ,正解ラベル: {answer}")
            print(feature)
            shap_values = explainer([feature])
            shap.plots.text(shap_values)
            #shap.plots.text(shap_values[0,:,predict])

### Excelシートの生成

In [4]:
import pandas as pd
import numpy as np
import io
import matplotlib.pyplot as plt
import collections

def makeClassificationResultSheet(classification_result, report_result, index2label, scores_df):
    #予測ラベルから分布図を作成
    predicted_label = classification_result['predicted_label']
    count = collections.Counter(predicted_label)
    print(count)
    dist_label = []
    for label in index2label.values():
        if label in count:
            dist_label.append(count[label])
        else:
            dist_label.append(0)
    #グラフをメモリに一時保存
    img = io.BytesIO()
    fig = plt.figure(figsize=(4,3))
    plt.bar(index2label.values(),dist_label)
    fig.savefig(img,format='png')
    #分類結果をxlsxで出力
    writer = pd.ExcelWriter('classification_result.xlsx',engine='xlsxwriter')
    classification_result.to_excel(writer,sheet_name='result',encoding='utf_8_sig',freeze_panes=[1,0])
    #classification_reportを出力
    report_result.to_excel(writer,sheet_name='classification_report')
    #classification_scoresを出力
    scores_df.to_excel(writer,sheet_name='classification_scores',freeze_panes=[1,0])
    #エクセルシートの装飾
    for column in classification_result:
        column_length = max(classification_result[column].astype(str).map(len).max(),len(column))
        col_idx = classification_result.columns.get_loc(column)
        writer.sheets['result'].set_column(col_idx+1,col_idx+1,column_length)
    
    workbook = writer.book
    color_format = workbook.add_format({'bg_color': '#9fff9c'})
    row = len(classification_result.axes[0])+1
    writer.sheets['result'].conditional_format('C2:C'+str(row),{
        'type': 'formula',
        'criteria': '=$B2=$C2',
        'format': color_format
    })
    writer.sheets['result'].conditional_format('D2:D'+str(row),{
        'type': 'formula',
        'criteria': '=$B2=$D2',
        'format': color_format
    })
    writer.sheets['result'].conditional_format('E2:E'+str(row),{
        'type': 'formula',
        'criteria': '=$B2=$E2',
        'format': color_format
    })
    for index in range(len(scores_df.index)):
        writer.sheets['classification_scores'].conditional_format(
            'B'+str(index+2)+':'+'L'+str(index+2),
            {'type': '3_color_scale',
            'max_color': '#51f569',
            'mid_color': 'white',
            'min_color': '#f55151'}
        )

    writer.sheets['classification_report'].set_column(0,0,13)
    #グラフを出力
    writer.sheets['classification_report'].insert_image('G2','graph',{'image_data': img})
    #上位3件正解率を計算
    if len(index2label.values()) > 3:
        cnt = 0
        for index, row in classification_result.iterrows():
            if row['answer_label'] == row['predicted_label']:
                cnt += 1
            elif row['answer_label'] == row['2nd_predicted']:
                cnt += 1
            elif row['answer_label'] == row['3rd_predicted']:
                cnt += 1
        top_3_accuracy_score = cnt/(len(classification_result.axes[0]))
        writer.sheets['result'].write('A'+str(len(classification_result.axes[0])+3),'Top3:')
        writer.sheets['result'].write('B'+str(len(classification_result.axes[0])+3),top_3_accuracy_score)
    writer.save()

def makeSecurityDetectorResultSheet(classification_result, report_result, index2label, scores_df):
    #分類結果をxlsxで出力
    writer = pd.ExcelWriter('classification_result_ml.xlsx',engine='xlsxwriter')
    classification_result.to_excel(writer,sheet_name='result',encoding='utf_8_sig',freeze_panes=[1,0])
    #classification_reportを出力
    report_result.to_excel(writer,sheet_name='classification_report')
    #classification_scoresを出力
    scores_df.to_excel(writer,sheet_name='classification_scores',freeze_panes=[1,0])
    #エクセルシートの装飾
    for column in classification_result:
        column_length = max(classification_result[column].astype(str).map(len).max(),len(column))
        col_idx = classification_result.columns.get_loc(column)
        writer.sheets['result'].set_column(col_idx+1,col_idx+1,column_length)
    
    workbook = writer.book
    color_format = workbook.add_format({'bg_color': '#9fff9c'})
    color_format2 = workbook.add_format({'bg_color': '#5465ff'})
    row = len(classification_result.axes[0])+1
    writer.sheets['result'].conditional_format('E2:E'+str(row),{
        'type': 'formula',
        'criteria': '=$C2=$E2',
        'format': color_format2
    })
    writer.sheets['result'].conditional_format('D2:D'+str(row),{
        'type': 'formula',
        'criteria': '=$B2=$D2',
        'format': color_format
    })
    for index in range(len(scores_df.index)):
        writer.sheets['classification_scores'].conditional_format(
            'B'+str(index+2)+':'+'L'+str(index+2),
            {'type': '3_color_scale',
            'max_color': '#51f569',
            'mid_color': 'white',
            'min_color': '#f55151'}
        )

    writer.sheets['classification_report'].set_column(0,0,13)
    writer.save()

### ライブラリのインポート、ファイルパスの設定

In [5]:
from tqdm import tqdm
from IPython.display import display
import pandas as pd
import numpy as np
import shutil
import glob
import os
import argparse
import random

import torch
from torch.utils.data import DataLoader
from transformers import BertForSequenceClassification, BertJapaneseTokenizer
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report
import pytorch_lightning as pl
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import StochasticWeightAveraging

#MODEL_NAME = 'cl-tohoku/bert-base-japanese-whole-word-masking'
MODEL_NAME = 'cl-tohoku/bert-base-japanese-v2'

TRAIN_PATH = 'datasets/cocoa/trainJP_nfr.txt'
TEST_PATH = 'datasets/cocoa/testJP_nfr.txt'
FOLD = 10
PYTORCH_MODEL_DIR = './model_transformers/'

### Pytorch Ligntning のクラス

In [7]:
class BertClassifier_pl(pl.LightningModule):

    def __init__(self, model_name, num_labels, lr, train_batch_size = 32):
        #model_name: Transformersのモデル名
        #num_labels: ラベルの数
        #lr: 学習率
        #train_batch_size: 学習データのバッチサイズ
        super().__init__()

        self.save_hyperparameters()

        self.bert_sc = BertForSequenceClassification.from_pretrained(
            model_name,
            num_labels=num_labels,
            attention_probs_dropout_prob=0.2,
            hidden_dropout_prob=0.2,
        )
        
    # 学習データのミニバッチの損失を出力
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        loss = output.loss
        self.log('train_loss', loss)
        return loss

    # 検証データのミニバッチの損失,精度を出力
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        labels = batch.pop('labels')
        labels_predicted = output.logits.argmax(-1)
        num_correct = (labels_predicted == labels).sum().item()
        val_acc = num_correct/labels.size(0)
        self.log('val_loss', val_loss)
        self.log('val_acc',val_acc)
        return {"val_loss": val_loss,"val_acc": val_acc}

    # テストデータのミニバッチの精度を出力
    def test_step(self, batch, batch_idx):
        labels = batch.pop('labels')
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        num_correct = (labels_predicted == labels).sum().item()
        accuracy = num_correct/labels.size(0)
        self.log('accuracy', accuracy)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr,weight_decay=1e-2)
        #optimizer = AdamW(self.parameters(),lr=self.hparams.lr,weight_decay=1e-2)
        return [optimizer]

### データセットの読み込み

In [8]:
# データセットを読み込んでBERTに入力可能な形式に変換する
# dataset: 分割前データセット, dataset_idx: 分割前データセットのラベル
def loadDatasets(train_path, test_path):
    train_df = pd.read_csv(f'{train_path}', encoding='cp932')
    test_df = pd.read_csv(f'{test_path}', encoding='cp932')

    labels = {i: k for i, k in enumerate(train_df['label'].unique()) }#if k != 'PO'}
    index2label = {i: k for i, k in enumerate(labels.values())}
    label2index = {k: i for i, k in enumerate(labels.values())}

    test_label_answer = test_df['label'].tolist()
    test_data_list = test_df['feature'].tolist()
    category_list = index2label.values()
    num_labels = len(category_list)

    dataset_for_loader = []
    test_dataset_for_loader = []
    category_num = []

    max_length = 128  # トークン数
    tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)

    for label, category in enumerate(tqdm(category_list)):

        tmp_train_df = train_df[train_df['label'] == category]
        tmp_test_df = test_df[test_df['label'] == category]
        category_num.append(len(tmp_train_df))
        
        for feature in tmp_train_df['feature']:
            encoding = tokenizer(
                feature,
                max_length=max_length,
                padding='max_length',
                truncation=True
            )
            encoding['labels'] = label  # ラベルを追加
            encoding = {k: torch.tensor(v) for k, v in encoding.items()}
            dataset_for_loader.append(encoding)

        for feature in tmp_test_df['feature']:
            encoding = tokenizer(
                feature,
                max_length=max_length,
                padding='max_length',
                truncation=True
            )
            encoding['labels'] = label  # ラベル(インデックス)を追加
            encoding = {k: torch.tensor(v) for k, v in encoding.items()}
            test_dataset_for_loader.append(encoding)
    
    dataset_idx = [] #ラベル
    cnt = 0
    for num in category_num:
        for i in range(num):
            dataset_idx.append(cnt)
        cnt += 1

    return {
        'num_labels': num_labels,
        'index2label': index2label,
        'label2index': label2index,
        'dataset': dataset_for_loader,
        'dataset_idx': dataset_idx,
        'test_data': test_dataset_for_loader,
        'test_data_answer': test_label_answer,
        'test_data_list': test_data_list,
    }

### ファインチューニング

In [9]:
# BERTをファインチューニングして保存する
def trainingTaskKFold(data, generalization = False):
    if os.path.exists('model/'):
        shutil.rmtree('model/')
    
    if generalization is True:
        num_of_training = FOLD
    else:
        num_of_training = 1
    
    dataloader_test = DataLoader(data['test_data'], batch_size=256)

    #層化K分割で学習データと検証データに分割してファインチューニング
    accuracy = []
    best_model_paths = []
    training_data_amounts = []
    val_data_amounts = []
    skf = StratifiedKFold(n_splits=FOLD)
    for fold, (train_index, val_index) in enumerate(skf.split(X=data['dataset'],y=data['dataset_idx'])):
        
        train_data = [data['dataset'][i] for i in train_index]
        val_data = [data['dataset'][i] for i in val_index]
        random.shuffle(train_data)
        training_data_amounts.append(len(train_data))
        val_data_amounts.append(len(val_data))
        dataloader_train = DataLoader(train_data, batch_size=32, shuffle=True)
        dataloader_val = DataLoader(val_data, batch_size=256)

        model = BertClassifier_pl(
            MODEL_NAME, num_labels=data['num_labels'], lr=1e-5,
        )
        checkpoint = pl.callbacks.ModelCheckpoint(
            filename=f'fold={fold+1}'+'-{epoch}-{step}-{val_loss:.1f}',
            monitor='val_loss',
            mode='min',
            save_top_k=1,
            #save_last=1,
            save_weights_only=True,
            dirpath='model/',
        )
        early_stop = (
            EarlyStopping(
                monitor='val_loss',
                patience=3,
                mode='min'
            )
        )
        weight_averaging = (
            StochasticWeightAveraging(swa_lrs=1e-5)
        )
        # 学習方法
        trainer = pl.Trainer(
            gpus=1,
            max_epochs=40,
            log_every_n_steps=10,
            callbacks=[checkpoint, early_stop, weight_averaging]
        )
        # ファインチューニング
        trainer.fit(model,train_dataloaders=dataloader_train,val_dataloaders=dataloader_val)

        print('best model: ', checkpoint.best_model_path)
        print('val_loss: ', checkpoint.best_model_score)
        
        best_model_path = checkpoint.best_model_path

        test = trainer.test(dataloaders=dataloader_test,ckpt_path=best_model_path)
        print(f'Accuracy: {test[0]["accuracy"]:.3f}')
        accuracy.append(test[0]["accuracy"])

        if num_of_training == 1:
            model = BertClassifier_pl.load_from_checkpoint(best_model_path)
            model.bert_sc.save_pretrained('./model_transformers/')
            tmp_df = pd.DataFrame({
                'amount': training_data_amounts,
                'val_amounts': val_data_amounts
            })
            tmp_df.to_csv('./model/training_data_amounts.csv')
            break
        else:
            best_model_paths.append(best_model_path)
        
    if num_of_training != 1:
        print(f'Average accuracy: {np.mean(accuracy):.3f}')
        print('Starting weight averaging task ...')
        tmp_df = pd.DataFrame({
            'amount': training_data_amounts,
            'val_amount': val_data_amounts
        })
        tmp_df.to_csv('./model/training_data_amounts.csv')
        #print(tmp_df)

### 予測、結果の出力

In [10]:
#保存済みモデルで予測して評価する
def predictAndEvaluate(data,mode='gpu'):
    #保存済みモデルをロード
    bert_sc = BertForSequenceClassification.from_pretrained(
        './model_transformers/'
    )

    tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)
    #符号化
    encoding = tokenizer(
        data['test_data_list'],
        padding= 'longest',
        return_tensors='pt'
    )
    if mode == 'gpu':
        #GPUにのせる
        bert_sc = bert_sc.cuda()
        encoding = { k: v.cuda() for k, v in encoding.items() }
    else:
        encoding = { k: v for k, v in encoding.items() }

    #予測する
    with torch.no_grad():
        output = bert_sc.forward(**encoding)
    scores = output.logits #分類スコア
    print(scores)
    labels_predicted = scores.argmax(-1) #スコアが最も高いラベルインデックス
    labels_predicted_2nd = scores.argsort()[:,-2] #スコアが2番目に高いラベルインデックス
    labels_predicted_3rd = scores.argsort()[:,-3] #スコアが3番目に高いラベルインデックス
    #CPUに戻す
    if mode == 'gpu':
        labels_predicted = labels_predicted.cpu()
        labels_predicted_2nd = labels_predicted_2nd.cpu()
        labels_predicted_3rd = labels_predicted_3rd.cpu()
        scores = scores.cpu()

    #予測インデックスをラベルに変換する
    predicted = []
    predicted_2nd = []
    predicted_3rd = []
    for index in labels_predicted.tolist():
        predicted.append(data['index2label'][index])
    for index in labels_predicted_2nd.tolist():
        predicted_2nd.append(data['index2label'][index])
    for index in labels_predicted_3rd.tolist():
        predicted_3rd.append(data['index2label'][index])

    print('予測ラベル: ',predicted)
    print('正解ラベル: ',data['test_data_answer'])

    target_names = list(data['index2label'].values())
    label_ids = list(data['index2label'].keys())

    #正解ラベルをインデックスに変換する
    ans_labels = []
    for label in data['test_data_answer']:
        ans_labels.append(data['label2index'][label])

    report = classification_report(
        ans_labels, labels_predicted, labels=label_ids, target_names=target_names, output_dict=True, zero_division=0
    )

    report_result = pd.DataFrame(report).T
    display(report_result)

    classification_result = pd.DataFrame({
        'answer_label': data['test_data_answer'], 
        'predicted_label': predicted,
        '2nd_predicted': predicted_2nd,
        '3rd_predicted': predicted_3rd,
        'text': data['test_data_list'],
    },index=np.arange(1,len(predicted)+1))

    scores_df = pd.DataFrame(scores,columns=data['index2label'].values(),index=np.arange(1,len(predicted)+1))

    index2label = data['index2label']
    makeClassificationResultSheet(classification_result,report_result,index2label,scores_df)

    modelExplainer = ModelExplainer(model=bert_sc,tokenizer=tokenizer,labels=target_names)
    modelExplainer.shap_explainer(data['test_data_list'],data['test_data_answer'],predicted)

def predict(features):
    #保存済みモデルをロード
    bert_sc = BertForSequenceClassification.from_pretrained(
        './model_transformers/'
    )
    tokenizer = BertJapaneseTokenizer.from_pretrained(MODEL_NAME)
    #符号化
    encoding = tokenizer(
        list(features),
        padding= 'longest',
        return_tensors='pt'
    )
    bert_sc.cuda()
    encoding = { k: v.cuda() for k, v in encoding.items() }
    with torch.no_grad():
        output = bert_sc.forward(**encoding)
    return output.logits.cpu()

## メインタスク

In [15]:
print('Is CUDA available?: ',torch.cuda.is_available())

data = loadDatasets(TRAIN_PATH, TEST_PATH)

predictAndEvaluate(data)