## ライブラリのインポート

In [1]:
# 基本ライブラリ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import precision_score, recall_score, f1_score
plt.rcParams['font.family'] = 'Meiryo'
from typing import Dict

# 深層学習系
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import Dataset, DatasetDict
from transformers import TrainingArguments
from transformers import Trainer, EarlyStoppingCallback
from transformers import EvalPrediction
from transformers import default_data_collator
from transformers import pipeline

import shap

# GPUデバイスの仕様有無を確認
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## データ整形
- 学習データを必要な列のみ取り出し学習・検証に分割する
- transformersで読み込める形式へ変換

In [2]:
# データの読み込み
excel_data_path = "./train_data.xlsx"
origin_df = pd.read_excel(excel_data_path)

In [3]:
# データの確認（コメントを外して実行）
# origin_df.head()

In [4]:
# 欠損値の確認
print(origin_df.isnull().sum())

CVE_ID                          0
CVE登録機関                         0
CWE_ID                          4
公開日                             0
最終更新日                           0
description                     0
cvssV3_baseScore                0
cvssV3_baseSeverity             0
cvssV3_attackVector             0
cvssV3_attackComplexity         0
cvssV3_privilegesRequired       0
cvssV3_userInteraction          0
cvssV3_scope                    0
cvssV3_confidentialityImpact    0
cvssV3_integrityImpact          0
cvssV3_availabilityImpact       0
cvssV3_exploitabilityScore      0
cvssV3_impactScore              0
cvssV2_baseScore                0
cvssV2_accessVector             0
cvssV2_accessComplexity         0
cvssV2_authentication           0
cvssV2_confidentialityImpact    0
cvssV2_integrityImpact          0
cvssV2_availabilityImpact       0
A+B                             0
A                               0
B                               0
dtype: int64


In [5]:
# 欠損値を埋める
origin_df = origin_df.fillna("")

In [6]:
# 文章と予測したいカラム名を指定
text_column = "description"
label_column = "A+B"

In [7]:
# データセット用のデータフレームを作成
dataset_df = origin_df[[text_column, label_column]].rename(columns={text_column:'text', label_column:'label'})

In [8]:
# 学習、検証用のデータフレームを作成
train_df, test_df = train_test_split(dataset_df, test_size=0.2, stratify = dataset_df["label"])

# pandasからtransformersで読み込める形式に変換
ds_train = Dataset.from_pandas(train_df)
ds_test = Dataset.from_pandas(test_df) 
dataset = DatasetDict({
    "train": ds_train,
    "validation": ds_test,})

## 学習
- トークナイザー（文字データ⇒数値データへ変換するもの）の定義
- BERTモデルと学習の設定

In [9]:
# トークナイザーとモデルの定義
BERT_MODEL_NAME = "bert-base-uncased"
ROBERTA_MODEL_NAME = "roberta-base"
ALBERT_MODEL_NAME = "albert-base-v2"
NUM_LABELS = 2

# 学習済トークナイザーを読み込む
bert_tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL_NAME)
roberta_tokenizer = AutoTokenizer.from_pretrained(ROBERTA_MODEL_NAME)
albert_tokenizer = AutoTokenizer.from_pretrained(ALBERT_MODEL_NAME)

# 学習済モデルを読み込む
bert_classification_model = AutoModelForSequenceClassification.from_pretrained(BERT_MODEL_NAME, output_attentions=True, num_labels=NUM_LABELS)
roberta_classification_model = AutoModelForSequenceClassification.from_pretrained(ROBERTA_MODEL_NAME, output_attentions=True, num_labels=NUM_LABELS)
albert_classification_model = AutoModelForSequenceClassification.from_pretrained(ALBERT_MODEL_NAME, output_attentions=True, num_labels=NUM_LABELS)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.out_proj.weight', 'classifier.dense.bias', 'classifier.out_proj.bias', 'classifier.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of AlbertForSequenceClassification were not initialized from the model checkpoint at albert-base-v2 and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [10]:
# 使用するモデルを選択
tokenizer = albert_tokenizer
classification_model = albert_classification_model

In [11]:
# トークナイザを使用してデータセットを変換する関数
def preprocess_function(data):
    texts = [q.strip() for q in data["text"]]
    inputs = tokenizer(
        texts,
        max_length=512,
        truncation=True,
        padding=True,
    )
    inputs['labels'] = torch.tensor(data['label'])
    return inputs


# データセットを前処理変換
tokenized_data = dataset.map(preprocess_function, batched=True)

Map:   0%|          | 0/17752 [00:00<?, ? examples/s]

Map:   0%|          | 0/4439 [00:00<?, ? examples/s]

In [12]:
# 評価関数を独自に定義
def custom_compute_metrics(res: EvalPrediction) -> Dict:
    # res.predictions, res.label_idsはnumpyのarray
    pred = res.predictions.argmax(axis=1)
    target = res.label_ids
    precision = precision_score(target, pred, average='macro')
    recall = recall_score(target, pred, average='macro')
    f1 = f1_score(target, pred, average='macro')
    return {
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [13]:
# データの不均衡に対応するためにTrainerクラスを継承して独自に学習クラスを定義
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = nn.CrossEntropyLoss(weight=torch.tensor([1.0, 100.0]).to(device))
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [14]:
# 学習のパラメータをセット
training_args = TrainingArguments(
    output_dir="./out/",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=1,
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=1,
    weight_decay=0.01,
    load_best_model_at_end=True,
)
data_collator = default_data_collator

In [15]:
# 学習の設定を記述（独自学習クラス）
trainer = CustomTrainer(
    model=classification_model,
    args=training_args,
    compute_metrics=custom_compute_metrics,
    train_dataset=tokenized_data["train"],
    eval_dataset=tokenized_data["validation"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=40)],
)
# # 学習の設定を記述
# trainer = Trainer(
#     model=classification_model,
#     args=training_args,
#     compute_metrics=custom_compute_metrics,
#     train_dataset=tokenized_data["train"],
#     eval_dataset=tokenized_data["validation"],
#     data_collator=data_collator,
#     tokenizer=tokenizer,
#     callbacks=[EarlyStoppingCallback(early_stopping_patience=30)],
# )

In [None]:
# 学習を実行
trainer.train(ignore_keys_for_eval=['last_hidden_state', 'hidden_states', 'attentions'])



Epoch,Training Loss,Validation Loss


In [None]:
# 指定したフォルダに学習モデルの保存
trainer.save_model("./2308070015_roberta")

In [None]:
#検証を実行
pred_result = trainer.predict(tokenized_data["validation"], ignore_keys=['loss', 'last_hidden_state', 'hidden_states', 'attentions'])
test_df['predict'] = pred_result.predictions.argmax(axis=1).tolist()

print(classification_report(test_df['label'], test_df['predict'], target_names=["OK", "NG"]))

In [None]:
# 途中から学習を始める場合
## https://github.com/huggingface/transformers/issues/7198
# trainer.train("./save230806", ignore_keys_for_eval=['last_hidden_state', 'hidden_states', 'attentions'])

## SHAPの算出

In [None]:
# 推論を行うためのパイプラインを設定
sample_data = "Possible buffer overflow to improper validation of hash segment of file while allocating memory in Snapdragon Connectivity, Snapdragon MobilePossible buffer overflow to improper validation of hash segment of file while allocating memory in Snapdragon Connectivity, Snapdragon Mobile"
pipe = pipeline('text-classification',model=trainer.model.cpu(), tokenizer=tokenizer)
print(pipe(sample_data))

In [None]:
# パイプラインを使用してSHAPを算出
explainer = shap.Explainer(pipe)
shap_value = explainer([sample_data])


In [None]:
# 判断根拠の出力
shap.plots.text(shap_value)