In [None]:
!pip install transformers
!pip install sentencepiece

import nltk
nltk.download("punkt")

Make sure to restart runtime after installing `sentencepiece`

In [None]:
!python --version

Python 3.7.15


In [None]:
import os 
import re
from google.colab import drive

import torch
import pandas as pd
from tqdm import tqdm
from torch.utils.data import Dataset
from transformers import Trainer, TrainingArguments, T5Tokenizer, T5ForConditionalGeneration
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split 

In [None]:
drive.mount("/content/drive")
CWD = "/content/drive/MyDrive/DACON"

def join_path(*args):
    return os.path.join(CWD, *args)

DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
TRAIN_CSV = join_path("data", "train.csv")
TEST_CSV = join_path("data", "test.csv")
MODEL = "t5-base"
MODEL_DIR = "t5-base"
BATCH_SIZE = 32
EPOCHS = 30
MAX_LENGTH = 128

TRAIN_ARGS = TrainingArguments(
    output_dir=join_path(MODEL_DIR),
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,  
    per_device_eval_batch_size=BATCH_SIZE,
    learning_rate=7e-6,
    warmup_steps=500,
    weight_decay=1e-5,
    dataloader_num_workers=0,
    save_total_limit=1,
    save_strategy="no",
    evaluation_strategy="epoch",
    run_name=f"[DACON]{MODEL_DIR}"
)

Mounted at /content/drive


In [None]:
twt_tokenizer = nltk.tokenize.TweetTokenizer(
    preserve_case=False, 
    strip_handles=True, 
    reduce_len=True
)

def shorten_repeated_words(tokens):
    for i, token in enumerate(tokens):
        if "-" in token:
            token = token.split("-")
            token = "-".join(dict.fromkeys(token))
            tokens[i] = token
    return tokens

def decode_tokens(tokens):
    sentence = " ".join(tokens)
    marks = re.findall(r"\s\W\s*", sentence)
    for mark in marks:
        if mark.strip() in ["'", "’"]:
            sentence = sentence.replace(mark, mark.strip())
        else:
            sentence = sentence.replace(mark, mark.lstrip())
    return sentence

def twt_tokenize(sentence):
    twt_tokens = twt_tokenizer.tokenize(sentence)
    twt_tokens = shorten_repeated_words(twt_tokens)
    twt_sentence = decode_tokens(twt_tokens)
    return twt_sentence

In [None]:
# 데이터 확인
train_csv = pd.read_csv(TRAIN_CSV)
train_csv["Utterance"] = train_csv["Utterance"].map(twt_tokenize)
train_csv.head()

Unnamed: 0,ID,Utterance,Speaker,Dialogue_ID,Target
0,TRAIN_0000,also i was the point person on my company’s tr...,Chandler,0,neutral
1,TRAIN_0001,you must’ve had your hands full.,The Interviewer,0,neutral
2,TRAIN_0002,that i did. that i did.,Chandler,0,neutral
3,TRAIN_0003,so let’s talk a little bit about your duties.,The Interviewer,0,neutral
4,TRAIN_0004,my duties? all right.,Chandler,0,surprise


In [None]:
targets = train_csv["Target"].unique()
target_size = targets.shape[0]
target_size

7

In [None]:
train_csv = train_csv.loc[:, ["Utterance", "Target"]]

df_train, df_eval = train_test_split(train_csv, test_size=0.2)
df_train.reset_index(drop=True, inplace=True)
df_eval.reset_index(drop=True, inplace=True)
df_train.head()

Unnamed: 0,Utterance,Target
0,i know! why don’t you get drunk! that worked f...,anger
1,"yeah, that chandler cracks me up.",joy
2,"y'know, hey! you’re the one who ended it, reme...",anger
3,"oh, be-because of the leather pants.",sadness
4,how long have we been home?,neutral


In [None]:
class EmotionDataset(Dataset):
    def __init__(self, df, tokenizer, target_size=7, mode=None, max_len=512):
        self.data_col = "Utterance"
        self.target_col = "Target"
        self.mode = str(mode).strip().lower()
        self.tokenizer = tokenizer
        self.target_size = target_size
        self.max_len = max_len
        self.data = self._tokenize(df)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        data, target = self.data[index]
        data_ids = data["input_ids"].squeeze()
        data_mask = data["attention_mask"].squeeze()

        if self.mode != "test":
            target_mask = target["attention_mask"].squeeze()
            labels = target["input_ids"].squeeze()
            labels[labels[:] == self.tokenizer.pad_token_id] = -100
            return {
                "input_ids": data_ids, 
                "attention_mask": data_mask, 
                "decoder_attention_mask": target_mask, 
                "labels": labels
            }

        return {
            "input_ids": data_ids, 
            "attention_mask": data_mask
        }

    def _tokenize(self, df):
        res = list()
        for index in range(df.shape[0]):
            if self.mode == "test":
                tokenized_target = None
            else:
                target = df.loc[index, self.target_col]
                tokenized_target = self.tokenizer(
                    [target],
                    max_length=self.target_size, 
                    padding="max_length", 
                    truncation=True, 
                    return_tensors="pt"
                )
            
            data = df.loc[index, self.data_col]
            tokenized_data = self.tokenizer(
                [data], 
                max_length=self.max_len, 
                padding="max_length", 
                truncation=True, 
                return_tensors="pt"
            )
            res.append([tokenized_data, tokenized_target])
        return res 

In [None]:
tokenizer = T5Tokenizer.from_pretrained(
    MODEL,
    max_length=MAX_LENGTH
)

In [None]:
train_set = EmotionDataset(
    df_train, 
    tokenizer, 
    target_size=target_size, 
    mode="train", 
    max_len=MAX_LENGTH
)
eval_set = EmotionDataset(
    df_eval, 
    tokenizer, 
    target_size=target_size, 
    mode="train", 
    max_len=MAX_LENGTH
)

In [None]:
model = T5ForConditionalGeneration.from_pretrained(
    MODEL,
    num_labels=target_size,
    ignore_mismatched_sizes=True
)

In [None]:
def eval(model, tokenizer, eval_set):
    model.to(DEVICE)
    pred_values = list()
    
    for data in eval_set:
        output = model.generate(
            input_ids=data["input_ids"].unsqueeze(0).to(DEVICE), 
            attention_mask=data["attention_mask"].unsqueeze(0).to(DEVICE), 
            max_length=512
        )
        pred = [tokenizer.decode(ids) for ids in output][0]
        re_tag = re.compile("<.*?>")
        pred = re.sub(re_tag, "", pred).strip()
        pred_values.append(pred)

    real_values = df_eval["Target"].tolist()
    acc = accuracy_score(real_values, pred_values)
    f1 = f1_score(real_values, pred_values, average="macro")
    return {
        "accuracy": acc,
        "f1": f1
    }

In [None]:
trainer = Trainer(
    model=model,
    args=TRAIN_ARGS,
    train_dataset=train_set,
    eval_dataset=eval_set
)
torch.cuda.empty_cache()

In [None]:
trainer.train()

In [None]:
# 모델 저장
trainer.save_model(join_path(MODEL_DIR))
torch.cuda.empty_cache()

In [None]:
score = eval(model, tokenizer, eval_set)
print(f"Accuracy: {score['accuracy']:.5f}")
print(f"F1-macro: {score['f1']:.5f}")

In [None]:
def classify(model, test_set):
    model.to(DEVICE)
    pred_values = list()
    
    for data in test_set:
        output = model.generate(
            input_ids=data["input_ids"].unsqueeze(0).to(DEVICE), 
            attention_mask=data["attention_mask"].unsqueeze(0).to(DEVICE), 
            max_length=512
        )
        pred = [tokenizer.decode(ids) for ids in output][0]
        re_tag = re.compile("<.*?>")
        pred = re.sub(re_tag, "", pred).strip()
        pred_values.append(pred)
    
    return pred_values

In [None]:
test_csv = pd.read_csv(join_path("data", "test.csv"))
test_csv.head()

In [None]:
df_test = test_csv.loc[:, "Utterance"].to_frame()
test_set = EmotionDataset(df_test, tokenizer)

preds = classify(model, test_set)
test_csv["Target"] = preds

submit = test_csv.loc[:, ["ID", "Target"]]
submit.head()

In [None]:
submit.to_csv(join_path(MODEL_DIR, "submit-16.csv"), index=False)