In [1]:
# カーネルリスタートの時はこのセルを実行しなくてもOK
!wget https://bootstrap.pypa.io/get-pip.py
!python get-pip.py
%pip install tokenizers fugashi ipadic accelerate==0.20.3 seaborn
%pip install transformers datasets scikit-learn
!wget https://github.com/ids-cv/wrime/raw/master/wrime-ver1.tsv

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from torch import nn
from datasets import Dataset, load_metric
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
from transformers import get_linear_schedule_with_warmup
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, AutoConfig, AdamW, get_linear_schedule_with_warmup

In [4]:
df_wrime = pd.read_table('wrime-ver1.tsv')
emotion_names = ['Joy', 'Sadness', 'Anticipation', 'Surprise', 'Anger', 'Fear', 'Disgust', 'Trust']
emotion_names_jp = ['喜び', '悲しみ', '期待', '驚き', '怒り', '恐れ', '嫌悪', '信頼']
num_labels = len(emotion_names)

df_wrime['readers_emotion_intensities'] = df_wrime.apply(lambda x: [x['Avg. Readers_' + name] for name in emotion_names], axis=1)

# removing samples with less emotion intensities
# (max.readers_emotion_intensities must be 2 or more)
is_target = df_wrime['readers_emotion_intensities'].map(lambda x: max(x) >= 2)
df_wrime_target = df_wrime[is_target]

In [5]:
# Divide into train, validation, and test sets
train_data, test_valid_data = train_test_split(df_wrime_target, test_size=0.4, random_state=42)
valid_data, test_data = train_test_split(test_valid_data, test_size=0.5, random_state=42)

print('train:', len(train_data))
print('valid:', len(valid_data))
print('test:', len(test_data))

train: 10942
valid: 3647
test: 3648


In [6]:
# 使用するモデルを指定して、Tokenizerを読み込む
checkpoint = 'cl-tohoku/bert-base-japanese-whole-word-masking'
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

In [8]:
# # 前処理関数: tokenize_function
# # 感情強度の正規化（総和=1）も同時に実施する
def tokenize_function(batch):
    tokenized_batch = tokenizer(batch['Sentence'], truncation=True, padding='max_length', return_tensors="pt")
    tokenized_batch['labels'] = [x / np.sum(x) for x in batch['readers_emotion_intensities']]
    return tokenized_batch

# Transformers用のデータセット形式に変換
# pandas.DataFrame -> datasets.Dataset
target_columns = ['Sentence', 'readers_emotion_intensities']
train_dataset = Dataset.from_pandas(train_data[target_columns])
valid_dataset = Dataset.from_pandas(valid_data[target_columns])
test_dataset = Dataset.from_pandas(test_data[target_columns])

# 前処理（tokenize_function） を適用
train_tokenized_dataset = train_dataset.map(tokenize_function, batched=True)
valid_tokenized_dataset = valid_dataset.map(tokenize_function, batched=True)
test_tokenized_dataset = test_dataset.map(tokenize_function, batched=True)



Map:   0%|          | 0/10942 [00:00<?, ? examples/s]

Map:   0%|          | 0/3647 [00:00<?, ? examples/s]

Map:   0%|          | 0/3648 [00:00<?, ? examples/s]

In [10]:
# https://huggingface.co/docs/transformers/training
metric = load_metric("accuracy")
# categorical_accuracy
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    label_ids = np.argmax(labels, axis=-1)
    return metric.compute(predictions=predictions, references=label_ids)

  metric = load_metric("accuracy")
You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this metric from the next major release of `datasets`.


In [11]:
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=num_labels)

  return self.fget.__get__(instance, owner)()
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at cl-tohoku/bert-base-japanese-whole-word-masking and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [12]:
# %pip install accelerate transformers[torch] -U


In [13]:
# Transformers の Trainer を用いる
# https://huggingface.co/docs/transformers/v4.21.1/en/main_classes/trainer#transformers.TrainingArguments

# 訓練時にerror → 上のコードセルをコメントアウト → 実行 → (仮想環境をdeactivate + restart vscode)もしくは(カーネルリスタート) → 上のコードセルをコメントアウト → run all the cells againで解決

# 訓練時の設定を修正
training_args = TrainingArguments(
    output_dir="test_trainer",
    per_device_train_batch_size=8, # originally 8
    num_train_epochs=1.0, # originally 1
    evaluation_strategy="steps", eval_steps=200)  # 200ステップ毎に検証データで評価する

# Trainerを生成
newtrainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized_dataset,
    eval_dataset=valid_tokenized_dataset,  # 検証データを使用する
    compute_metrics=compute_metrics,
)

# 訓練を実行
newtrainer.train()

Step,Training Loss,Validation Loss,Accuracy
200,No log,0.288242,0.534138
400,No log,0.27014,0.614478
600,0.296600,0.260736,0.618042
800,0.296600,0.252921,0.646285
1000,0.258900,0.243772,0.673704
1200,0.258900,0.241411,0.66822


TrainOutput(global_step=1368, training_loss=0.26929717036018597, metrics={'train_runtime': 261.7104, 'train_samples_per_second': 41.81, 'train_steps_per_second': 5.227, 'total_flos': 2879116261933056.0, 'train_loss': 0.26929717036018597, 'epoch': 1.0})

## テストデータにモデルをアプライ、混合行列を作成・F1スコアを各感情ラベルについて計算

In [252]:
# https://www.delftstack.com/ja/howto/numpy/numpy-softmax/
def np_softmax(x):
    f_x = np.exp(x) / np.sum(np.exp(x))
    return f_x

In [253]:
# テキストを感情解析する関数
def analyze_emotion(text):
    # 推論モード
    model.eval()

    # 入力データ変換 + 推論
    tokens = tokenizer(text, truncation=True, return_tensors="pt")
    tokens.to(model.device)
    preds = model(**tokens)
    prob = np_softmax(preds.logits.cpu().detach().numpy()[0])
    out_dict = {n: p for n, p in zip(emotion_names_jp, prob)}
    out_list = list(out_dict.values())
    return out_list

In [254]:
# 結果を保存する空のリストを作成
predicted_labels = []

# test_tokenized_datasetからSentenceカラムのデータを取得
sentences = test_tokenized_dataset['Sentence']

# 各テキストにanalyze_emotion関数を適用し、結果をリストに保存
for text in sentences:
    result = analyze_emotion(text)
    predicted_labels.append(result)

true_labels = test_tokenized_dataset['labels']

In [255]:
# 予測結果と真のラベルをDataFrameに変換
predicted_df = pd.DataFrame(predicted_labels, columns=emotion_names_jp)
true_df = pd.DataFrame(true_labels, columns=emotion_names_jp)

In [256]:
# DataFrameの各行を更新して、最大値に1、それ以外に0を持つようにする
def update_dataframe(df):
    for index, row in df.iterrows():
        max_value = row.max()
        df.loc[index] = (row == max_value).astype(int)
    return df

In [257]:
predicted_process_values = update_dataframe(predicted_df)
true_process_values = update_dataframe(true_df)

In [258]:
# 各DataFrameから最大の感情を抽出
def get_max_emotions(df):
    max_emotions = []
    for index, row in df.iterrows():
        max_emotions.append(row.index[row == 1].tolist())
    return pd.DataFrame({'Emotions': max_emotions})

predicted_emotions = get_max_emotions(predicted_df)
true_emotions = get_max_emotions(true_df)

In [259]:
# true_emotionsのリストサイズが2以上の場合、以下の2つを実行
# 1:predリストの感情がtrueリストにある場合は、predリストと一致する感情を除いてtrueリスト内の感情を削除 
# 2:predリストの感情がtrueリストにない場合は、両リストの感情をすべて削除して空リストにする→混同行列・F1スコア計算には含まないエントリとして扱う
def remove_extra_emotions(predicted_emotions, true_emotions):
    for idx, (pred, true) in zip(predicted_emotions.index, zip(predicted_emotions['Emotions'], true_emotions['Emotions'])):
        if len(true) >= 2:
            true_emotions.at[idx, 'Emotions'] = [emotion for emotion in true if emotion in pred] if any(emotion in pred for emotion in true) else []

remove_extra_emotions(predicted_emotions, true_emotions)

In [262]:
# 混同行列の作成
confusion_matrix_data = pd.DataFrame(0, index=emotion_labels, columns=emotion_labels)
for pred, true in zip(predicted_emotions['Emotions'], true_emotions['Emotions']):
    for pred_label in pred:
        if pred_label in emotion_labels:
            for true_label in true:
                if true_label in emotion_labels:
                    confusion_matrix_data.at[true_label, pred_label] += 1

# 各列と各行に合計値を追加
confusion_matrix_data['合計'] = confusion_matrix_data.sum(axis=1)
confusion_matrix_data.loc['合計'] = confusion_matrix_data.sum()

# 混同行列に明記
confusion_matrix_data.index.name = '予測値'
confusion_matrix_data.columns.name = '正解値'

In [263]:
confusion_matrix_data

正解値,喜び,悲しみ,期待,驚き,怒り,恐れ,嫌悪,信頼,合計
予測値,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
喜び,782,43,65,56,0,8,5,0,959
悲しみ,22,496,32,27,0,24,32,0,633
期待,73,40,724,36,0,10,20,0,903
驚き,42,42,31,298,0,11,17,0,441
怒り,1,9,2,4,0,2,26,0,44
恐れ,10,84,29,31,0,166,25,0,345
嫌悪,9,45,13,15,0,10,152,0,244
信頼,5,0,2,3,0,0,0,0,10
合計,944,759,898,470,0,231,277,0,3579


In [261]:
# 各感情ラベルのPrecision、Recall、F1スコアを計算
f1_scores = {}
for emotion_label in emotion_names_jp:
    tp = confusion_matrix_data.at[emotion_label, emotion_label]
    fp = confusion_matrix_data.loc[emotion_label, '合計'] - tp
    fn = confusion_matrix_data.loc['合計', emotion_label] - tp
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    f1_scores[emotion_label] = {'Precision': precision, 'Recall': recall, 'F1 Score': f1_score}

# 結果を表示
f1_scores_df = pd.DataFrame.from_dict(f1_scores, orient='index')
print("各感情ラベルのPrecision、Recall、F1スコア:")
print(f1_scores_df)

各感情ラベルのPrecision、Recall、F1スコア:
     Precision    Recall  F1 Score
喜び    0.815433  0.828390  0.821860
悲しみ   0.783570  0.653491  0.712644
期待    0.801772  0.806236  0.803998
驚き    0.675737  0.634043  0.654226
怒り    0.000000  0.000000  0.000000
恐れ    0.481159  0.718615  0.576389
嫌悪    0.622951  0.548736  0.583493
信頼    0.000000  0.000000  0.000000
