In [None]:
import torch
import pandas as pd
import numpy as np

from transformers import pipeline, TrainingArguments, AutoModel, AutoModelForSequenceClassification, Trainer, ElectraTokenizer
from datasets import load_dataset, ClassLabel

from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import MinMaxScaler

import matplotlib.pyplot as plt
import matplotlib
from umap import UMAP

In [None]:
device = torch.device('cpu')
if torch.mps.is_available():
    device = torch.device('mps:0')
if torch.cuda.is_available():
    device = torch.device('cuda')
device

In [None]:
# preprocess csv files

df1 = pd.read_csv('data/한국어_단발성_대화_데이터셋.csv')
df2 = pd.read_csv('data/감정 분류를 위한 대화 음성 데이터셋.csv')
df = pd.concat([df1, df2], ignore_index=True)

df = df.dropna(subset=['emotion'])
df.loc[df['emotion'].isin(['happiness']), 'emotion'] = '행복'
df.loc[df['emotion'].isin(['neutral']), 'emotion'] = '중립'
df.loc[df['emotion'].isin(['sadness']), 'emotion'] = '슬픔'
df.loc[df['emotion'].isin(['angry']), 'emotion'] = '분노'
df.loc[df['emotion'].isin(['surprise']), 'emotion'] = '놀람'
df.loc[df['emotion'].isin(['disgust']), 'emotion'] = '혐오'
df.loc[df['emotion'].isin(['fear']), 'emotion'] = '공포'

emotion_names = df['emotion'].unique()
df['label'] = df['emotion'].apply(lambda x: emotion_names.tolist().index(x))
df = df.drop('emotion', axis=1)
df.to_csv('data/processed_emotion_data.csv', index=False)
emotion_names

In [None]:
# Load data
emotions_local = load_dataset('csv', data_files='data/processed_emotion_data.csv')
class_label = ClassLabel(num_classes=len(emotion_names), names=emotion_names.tolist())
emotions = emotions_local.cast_column('label', class_label)

In [None]:
import re
import emoji
from soynlp.normalizer import repeat_normalize

emojis = ''.join(emoji.EMOJI_DATA.keys())
pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-ㅣ가-힣{emojis}]+')
url_pattern = re.compile(
    r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)')

import re
import emoji
from soynlp.normalizer import repeat_normalize

pattern = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fㄱ-ㅣ가-힣]+')
url_pattern = re.compile(
    r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)')

def clean(x): 
    x = pattern.sub(' ', x)
    x = emoji.replace_emoji(x, replace='') #emoji 삭제
    x = url_pattern.sub('', x)
    x = x.strip()
    x = repeat_normalize(x, num_repeats=2)
    return x

emotions = emotions.map(lambda x: {'text': [clean(t) for t in x['text']]}, batched=True, batch_size=None)
emotions = emotions.filter(lambda x: len(x['text'].split()) > 1)


In [None]:
emotions = emotions["train"].train_test_split(test_size=0.2, seed=42)
labels = emotions['train'].features['label'].names

train_ds = emotions['train']
emotions

In [None]:
# check frequency of classes
emotions.set_format(type="pandas")
df = emotions['train'][:]

def label_int2str(row):
    return emotions['train'].features['label'].int2str(row)

df['label_name'] = df['label'].apply(label_int2str)

matplotlib.rc('font', family='Malgun Gothic')
matplotlib.rcParams['axes.unicode_minus'] = False

df['label_name'].value_counts(ascending=True).plot.barh()
plt.title('Frequency of Classes')
plt.show()

In [None]:
# context size
df['Words Per Sentence'] = df['text'].str.split().apply(len)
df.boxplot('Words Per Sentence', by='label_name', grid = False, showfliers = False, color = 'black')
plt.suptitle('')
plt.xlabel('')
plt.show()

emotions.reset_format()

In [None]:
model_ckpt = "monologg/koelectra-base-v3-discriminator"
tokenizer = ElectraTokenizer.from_pretrained(model_ckpt)

def tokenize(batch):
    return tokenizer(batch["text"], padding=True, truncation=True)

emotions_encoded = emotions.map(tokenize, batched = True, batch_size=None)

In [None]:
model = AutoModel.from_pretrained(model_ckpt).to(device)

def extract_hidden_states(batch):
    inputs = {k:v.to(device) for k, v in batch.items() if k in tokenizer.model_input_names}
    with torch.no_grad():
        last_hidden_state = model(**inputs).last_hidden_state
    return {"hidden_state": last_hidden_state[:, 0].cpu().numpy()}

emotions_encoded.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
emotions_hidden=emotions_encoded.map(extract_hidden_states, batched = True)

In [None]:
x_train = np.array(emotions_hidden['train']['hidden_state'])
x_test = np.array(emotions_hidden['test']['hidden_state'])
y_train = np.array(emotions_hidden['train']['label'])
y_test = np.array(emotions_hidden['test']['label'])
x_train.shape, x_test.shape

In [None]:
x_scaled = MinMaxScaler().fit_transform(x_train)
mapper = UMAP(n_components=2, metric='cosine').fit(x_scaled)
df_emb = pd.DataFrame(mapper.embedding_, columns = ["X", "Y"])
df_emb['label'] = y_train
df_emb.head()

fig, axes = plt.subplots(3, 3, figsize=(7, 5))
axes = axes.flatten()
cmaps = ['Greys', 'Blues', 'Oranges', 'Reds', "Purples", 'Greens', 'Grays']
labels = emotions['train'].features['label'].names

for i, (label, cmap) in enumerate(zip(labels, cmaps)):
    df_emb_sub = df_emb.query(f'label == {i}')
    axes[i].hexbin(df_emb_sub['X'], df_emb_sub['Y'], cmap=cmap, gridsize=20, linewidths=(0,))
    axes[i].set_title(label)
    axes[i].set_xticks([]), axes[i].set_yticks([])

plt.tight_layout()
plt.show()

In [None]:
num_labels=len(emotion_names)
model = AutoModelForSequenceClassification.from_pretrained(model_ckpt, num_labels=num_labels).to(device)

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    f1 = f1_score(labels, preds, average = 'weighted')
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "f1": f1}

In [None]:
batch_size = 16
logging_steps = (len(emotions_encoded['train']) // batch_size)
model_name = 'koelectra-base-v3-finetuned-emotion'
training_args = TrainingArguments(
    output_dir=model_name,
    num_train_epochs=4,
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    weight_decay=0.01,
    eval_strategy='epoch',
    disable_tqdm=False,
    logging_steps=logging_steps,
    push_to_hub=True,
    save_strategy='epoch',
    metric_for_best_model="f1",
    greater_is_better=True,
    load_best_model_at_end=True,
    log_level='error'
)

In [None]:
trainer = Trainer(model = model, args = training_args,
                  compute_metrics = compute_metrics,
                  train_dataset=emotions_encoded['train'],
                  eval_dataset=emotions_encoded['test'],
                  tokenizer=tokenizer)

trainer.train()

trainer.push_to_hub(commit_message = "combined two dataset test(conversation, discrete)")

In [None]:
preds_output = trainer.predict(emotions_encoded['test'])
preds_output.metrics

In [None]:
def plot_confusion_matrix(y_preds, y_true, labels):
    cm = confusion_matrix(y_true, y_preds, normalize='true')
    fig, ax = plt.subplots(figsize=(6, 6))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(cmap='Blues', values_format='.2f', ax=ax, colorbar=False)
    plt.title('Normalized confusion matrix')
    plt.show()

y_preds = np.argmax(preds_output.predictions, axis=1)
plot_confusion_matrix(y_preds, y_test, labels)

In [None]:
from torch.nn.functional import cross_entropy

def forward_pass_with_label(batch):
    inputs = {k:v.to(device) for k, v in batch.items() if k in tokenizer.model_input_names}
    with torch.no_grad():
        output = model(**inputs)
        pred_label = torch.argmax(output.logits, axis=-1)
        loss = cross_entropy(output.logits, batch['label'].to(device), reduction='none')
    return {'loss': loss.cpu().numpy(), 'predicted_label': pred_label.cpu().numpy()}

emotions_encoded.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
emotions_encoded['test'] = emotions_encoded['test'].map(forward_pass_with_label, batched=True, batch_size=16)

emotions_encoded.set_format('pandas')
cols = ['text', 'label', 'predicted_label', 'loss']
df_test = emotions_encoded['test'][:][cols]
df_test['label'] = df_test['label'].apply(label_int2str)
df_test['predicted_label'] = df_test['predicted_label'].apply(label_int2str)

In [None]:
df_test.sort_values('loss', ascending=False).head(10)

In [None]:
df_test.sort_values('loss', ascending=True).head(10)

In [None]:
from transformers import pipeline, ElectraTokenizer
import pandas as pd
import matplotlib.pyplot as plt

matplotlib.rc('font', family='Malgun Gothic')
matplotlib.rcParams['axes.unicode_minus'] = False

model_id = "daniel604/koelectra-base-v3-finetuned-emotion"
model_ckpt = "monologg/koelectra-base-v3-discriminator"
tokenizer = ElectraTokenizer.from_pretrained(model_ckpt)
classifier = pipeline("text-classification", model=model_id, tokenizer=tokenizer)

def predict(text):
    preds = classifier(text, return_all_scores=True)
    preds_df = pd.DataFrame(preds[0])
    plt.bar(labels, 100 * preds_df['score'], color = 'C0')
    plt.title(f'"{text}"')
    plt.ylabel("Class probability (%)")
    plt.show()

In [None]:
custom_text = '미치도록 사랑했던 지겹도록 다투었던'
predict(custom_text)