In [None]:
import json
import numpy as np
import torch
import pandas as pd
import evaluate
from datasets import Dataset
from tqdm.auto import tqdm
from torch.optim import AdamW
from transformers import AutoTokenizer, AutoModelForSequenceClassification, get_scheduler
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, ConfusionMatrixDisplay


############################################################################################################
# model name
# choose from huggingface
model_name = "cl-tohoku/bert-base-japanese-v3"
# マージ元
from_emoji = ["😂", "💕", "☺️", "😃", "😆", "😁", "🥲", "👍", "✨", "☀️", "😅", "🥺"]
# マージ先
to_emoji = ["🤣", "🥰", "😊", "😊", "😊", "😊", "😭", "😊", "😊", "😊", "🤣", "😭"]
# ラベル
label2id_temp = {"😊": 0, "🤣": 1, "😭": 2, "🎉": 3, "🥰": 4, "😇": 5, "💦": 6, "🤔": 7}
############################################################################################################

# label2id
label2id = {}
cnt = 0
for e, i in label2id_temp.items():
    label2id[e] = cnt
    cnt = cnt + 1
# ラベル
label = [e for e in label2id]
# ラベル数
num_labels = len(label)
# id2label
id2label = {}
for e, i in label2id.items():
    id2label[i] = e


# json ファイルから pandas に読み込む
def load_json_to_pandas(json_path):

    data = pd.read_json(json_path)

    # 絵文字マージ
    if from_emoji:
        for index, row in data.iterrows():
            if row["label"] in from_emoji:
                data.at[index, "label"] = to_emoji[from_emoji.index(row["label"])]

    return data


# pandas から Dataset オブジェクト化する
def pandas_to_Dataset(dataset):

    dataset = Dataset.from_pandas(dataset)

    return dataset


# DataFrame 型のデータセットのラベルを整数に変える
def label_to_int(dataset):

    for index, row in dataset.iterrows():
        dataset.at[index, "label"] = label2id[row["label"]]

    return dataset


# トークン化
def tokenize(data):

    tokenizer = AutoTokenizer.from_pretrained(model_name)

    return tokenizer(data["text"], padding="max_length", truncation=True, max_length=128)


def main():

    ############################################################################################################
    # パラメータ設定
    # 訓練データパス
    train_data_path = "./dataset/train_cls/R8.json"
    # テストデータのパス
    test_data_path = "./dataset/test/T8h.json"
    # 訓練データ数 
    num_of_train_data = 313770
    # テストデータ数
    num_of_test_data = 360
    # バッチサイズ
    batch_size = 16
    # エポック数
    num_epoch = 2
    # 学習率
    learning_rate = 1e-5
    ############################################################################################################
    
    # 生データの読み込み（絵文字マージ含む）
    train_data = load_json_to_pandas(train_data_path)
    test_data = load_json_to_pandas(test_data_path)

    # ラベルを整数に変える
    train_data = label_to_int(train_data)
    test_data = label_to_int(test_data)

    # Dataset オブジェクトに変える
    train_data = pandas_to_Dataset(train_data)
    test_data = pandas_to_Dataset(test_data)
    #print(train_data[1])

    # トークン化
    train_data = train_data.map(tokenize, batched=True)
    test_data = test_data.map(tokenize, batched=True)
    #print(train_data[0])

    # 入力に必要のない欄を削除
    train_data = train_data.remove_columns(["text"])
    test_data = test_data.remove_columns(["text"])
    #print(train_data[0])

    # label を labels に再命名
    train_data = train_data.rename_column("label", "labels")
    test_data = test_data.rename_column("label", "labels")

    # pytorch tensors を返す
    train_data.set_format("torch")
    test_data.set_format("torch")
    
    # より小さいデータセットを作成する
    train_data = train_data.shuffle(seed=42).select(range(num_of_train_data))
    test_data = test_data.shuffle(seed=42).select(range(num_of_test_data))
    #print(len(train_data), len(test_data))
    #print(train_data[0])
    #print(test_data[0])

    # dataloader を作る
    train_dataloader = DataLoader(train_data, shuffle=False, batch_size=batch_size)
    eval_dataloader = DataLoader(test_data, batch_size=batch_size)

    # モデルをロード
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)

    # 最適化関数、学習率の設定
    optimizer = AdamW(model.parameters(), lr=learning_rate)

    # learning rate scheduler の作成
    num_training_steps = num_epoch * len(train_dataloader)
    lr_scheduler = get_scheduler(
        name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps
        )
    
    # GPU に送る
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    model.to(device)

    # プログレスバーの作成
    progress_bar = tqdm(range(num_training_steps))

    # 訓練
    #model.train()
    for epoch in range(num_epoch):
        for batch in train_dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            loss = outputs.loss
            loss.backward()

            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()
            progress_bar.update(1)

    # 検証
    all_predictions = []
    all_refs = []

    metric_acc = evaluate.load("accuracy")
    metric_f1 = evaluate.load("f1")
    metric_f1_each = evaluate.load("f1")

    #model.eval()
    for batch in eval_dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)

        logits = outputs.logits

        predictions = torch.argmax(logits, dim=-1)
        predictions_topk = torch.topk(
            input=logits,
            k=5,
            dim=-1,
            sorted=True
        )

        predictions_topk = predictions_topk[1].tolist()
        for pred in predictions_topk:
            all_predictions.append(pred)
        labels_per_batch = batch["labels"].tolist()
        for l in labels_per_batch:
            all_refs.append(l)

        metric_acc.add_batch(predictions=predictions, references=batch["labels"])
        metric_f1.add_batch(predictions=predictions, references=batch["labels"])
        metric_f1_each.add_batch(predictions=predictions, references=batch["labels"])

    result_acc = metric_acc.compute()
    result_f1 = metric_f1.compute(average="macro")
    result_f1_each = metric_f1_each.compute(average=None)
    print(result_acc)
    print(result_f1)
    print(result_f1_each)

    num_true = []
    for i in range(0, len(all_predictions), 1):
        if all_refs[i] in all_predictions[i]:
            num_true.append(1)
    print(f"A@5: {len(num_true)} / {len(all_predictions)} = {len(num_true) / len(all_predictions)}")

    y_true = np.array([id2label[l] for l in all_refs])
    y_pred = np.array([id2label[l[0]] for l in all_predictions])

    print(classification_report(y_true=y_true, y_pred=y_pred, labels=label, zero_division=0.0))

    ConfusionMatrixDisplay.from_predictions(y_true=y_true, y_pred=y_pred, labels=label)


if __name__ == "__main__":
    main()