In [1]:
import os

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from transformers import AutoTokenizer, AutoModel

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Import dataset
all_df = pd.read_csv('emotion_dataset_raw.csv')
labels_str = all_df["Emotion"].unique().tolist()

print(labels_str)

['neutral', 'joy', 'sadness', 'fear', 'surprise', 'anger', 'shame', 'disgust']


In [3]:
# 큰 데이터셋이라 일부 데이터로 진행
all_df = all_df.loc[:1000]

In [3]:
all_df.head()

Unnamed: 0,Emotion,Text
0,neutral,Why ?
1,joy,Sage Act upgrade on my to do list for tommorow.
2,sadness,ON THE WAY TO MY HOMEGIRL BABY FUNERAL!!! MAN ...
3,joy,Such an eye ! The true hazel eye-and so brill...
4,joy,@Iluvmiasantos ugh babe.. hugggzzz for u .! b...


In [4]:
# Import tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [19]:
# Make dataset
class TextDataset(Dataset):
    def __init__(self, df, tokenizer, labels_str):
        super(TextDataset, self).__init__()
        raw_text = df["Text"].tolist()
        raw_labels = df["Emotion"].tolist()

        self.texts = []
        for idx, rt in enumerate(raw_text):
            self.texts.append(tokenizer.encode(rt))

        self.max_length = max(len(text) for text in self.texts)

        self.labels = []
        for idx, rl in enumerate(raw_labels):
            self.labels.append(labels_str.index(rl))

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts[idx] # list
        text += [0] * (self.max_length - len(text))

        label = self.labels[idx] # int
        return torch.tensor(text), torch.tensor(label)

In [20]:
from sklearn.model_selection import train_test_split

# Define dataset
train_df, test_df = train_test_split(all_df, test_size=0.2)
train_dataset = TextDataset(train_df, tokenizer, labels_str)
test_dataset = TextDataset(test_df, tokenizer, labels_str)

# Define dataloader
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [21]:
print(all_df.loc[0])
print()
print(next(iter(train_dataset)))

Emotion    neutral
Text        Why ? 
Name: 0, dtype: object

(tensor([  101,  1030, 10147,  7447, 21926,  1045,  2293,  5870,  2012,  4268,
         2025,  1996,  2812,  2030, 20716,  2785,  8840,  2140,   102,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
 

In [22]:
# Define RNN model
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(RNNModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.embedding(x)
        _, h_n = self.rnn(x)
        h_n = h_n.squeeze(0)
        out = self.fc(h_n)
        return out

In [23]:
# Make model
vocab_size = tokenizer.vocab_size
embedding_dim = 512
hidden_dim = 512
output_dim = len(labels_str)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = RNNModel(vocab_size, embedding_dim, hidden_dim, output_dim)
model.to(device)

RNNModel(
  (embedding): Embedding(30522, 512)
  (rnn): RNN(512, 512, batch_first=True)
  (fc): Linear(in_features=512, out_features=8, bias=True)
)

In [24]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.01)

In [25]:
# Train
num_epochs = 10

for epoch in range(num_epochs):
    for texts, labels in train_dataloader:
        optimizer.zero_grad()

        texts, labels = texts.to(device), labels.to(device)

        outputs = model(texts)

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

Epoch [1/10], Loss: 1.9915
Epoch [2/10], Loss: 1.7015
Epoch [3/10], Loss: 1.7276
Epoch [4/10], Loss: 1.8660
Epoch [5/10], Loss: 1.9446
Epoch [6/10], Loss: 1.9465
Epoch [7/10], Loss: 1.9043
Epoch [8/10], Loss: 2.0853
Epoch [9/10], Loss: 1.7772
Epoch [10/10], Loss: 2.0977


In [26]:
model.eval()
total_correct = 0
total_samples = 0

with torch.no_grad():
    for texts, labels in test_dataloader:
        texts, labels = texts.to(device), labels.to(device)

        outputs = model(texts)

        _, predicted = torch.max(outputs.data, 1)
        
        total_samples += labels.size(0)
        total_correct += (predicted == labels).sum().item()

accuracy = total_correct / total_samples
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 32.82%


In [33]:
# Define Transformer model (encoder only)
class Transformer(nn.Module):
    def __init__(self, vocab_size, input_dim, output_dim, hidden_dim, num_layers, num_heads, dropout=0.5):
        super(Transformer, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, input_dim)

        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=hidden_dim, dropout=dropout),
            num_layers=num_layers
        )
        
        self.output_layer = nn.Linear(input_dim, output_dim)
        
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.output_layer.weight.data.uniform_(-initrange, initrange)
        self.output_layer.bias.data.zero_()
        
    def forward(self, text):
        embedded = self.embedding(text) # (batch_size, seq_len, embedding_size)

        encoded = self.encoder(embedded.transpose(0, 1))

        output = self.output_layer(encoded[-1])  # Use only the output of the last layer
        return output

In [34]:
# Make model
vocab_size = tokenizer.vocab_size
embedding_dim = 256
hidden_dim = 1024
output_dim = len(labels_str)
num_layers = 2
num_heads = 8

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = Transformer(vocab_size, embedding_dim, output_dim, hidden_dim, num_layers, num_heads)
model.to(device)

Transformer(
  (embedding): Embedding(30522, 256)
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
        )
        (linear1): Linear(in_features=256, out_features=1024, bias=True)
        (dropout): Dropout(p=0.5, inplace=False)
        (linear2): Linear(in_features=1024, out_features=256, bias=True)
        (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.5, inplace=False)
        (dropout2): Dropout(p=0.5, inplace=False)
      )
    )
  )
  (output_layer): Linear(in_features=256, out_features=8, bias=True)
)

In [35]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-3)

In [36]:
# Train
num_epochs = 10

for epoch in range(num_epochs):
    for texts, labels in train_dataloader:
        optimizer.zero_grad()

        texts, labels = texts.to(device), labels.to(device)

        outputs = model(texts)
        
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

Epoch [1/10], Loss: 1.2199
Epoch [2/10], Loss: 0.9899
Epoch [3/10], Loss: 0.8057
Epoch [4/10], Loss: 0.9782
Epoch [5/10], Loss: 0.6984
Epoch [6/10], Loss: 0.2349
Epoch [7/10], Loss: 0.2536
Epoch [8/10], Loss: 0.1302
Epoch [9/10], Loss: 0.1382
Epoch [10/10], Loss: 0.1109


In [37]:
model.eval()
total_correct = 0
total_samples = 0

with torch.no_grad():
    for texts, labels in test_dataloader:
        texts, labels = texts.to(device), labels.to(device)

        outputs = model(texts)

        _, predicted = torch.max(outputs.data, 1)
        
        total_samples += labels.size(0)
        total_correct += (predicted == labels).sum().item()

accuracy = total_correct / total_samples
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 60.07%


In [32]:
# Transformer model with pre=trained embedding
class Transformer(nn.Module):
    def __init__(self, output_dim, hidden_dim, num_layers, num_heads, dropout=0.5):
        super(Transformer, self).__init__()
        
        self.embedding = AutoModel.from_pretrained("huawei-noah/TinyBERT_General_4L_312D")
        input_dim = self.embedding.config.hidden_size

        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=hidden_dim, dropout=dropout),
            num_layers=num_layers
        )
        
        self.output_layer = nn.Linear(input_dim, output_dim)
        
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.output_layer.weight.data.uniform_(-initrange, initrange)
        self.output_layer.bias.data.zero_()
        
    def forward(self, input_ids):
        attention_mask = torch.where(input_ids == 0, torch.tensor(0), torch.tensor(1))

        bert_output = self.embedding(input_ids, attention_mask)
        embedded = bert_output.last_hidden_state

        encoded = self.encoder(embedded.transpose(0, 1))

        output = self.output_layer(encoded[-1])  # Use only the output of the last layer
        return output

In [15]:
# Make model
vocab_size = tokenizer.vocab_size
embedding_dim = 256
hidden_dim = 1024
output_dim = len(labels_str)
num_layers = 2
num_heads = 8

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = Transformer(output_dim, hidden_dim, num_layers, num_heads)
model.to(device)



Transformer(
  (embedding): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 312, padding_idx=0)
      (position_embeddings): Embedding(512, 312)
      (token_type_embeddings): Embedding(2, 312)
      (LayerNorm): LayerNorm((312,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-3): 4 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=312, out_features=312, bias=True)
              (key): Linear(in_features=312, out_features=312, bias=True)
              (value): Linear(in_features=312, out_features=312, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=312, out_features=312, bias=True)
              (LayerNorm): LayerNorm((312,), eps=1e-12, elementwise_af

In [16]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-5)

In [17]:
# Train
num_epochs = 10

for epoch in range(num_epochs):
    for texts, labels in train_dataloader:
        optimizer.zero_grad()

        texts, labels = texts.to(device), labels.to(device)

        outputs = model(texts)
        
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

Epoch [1/10], Loss: 1.2266
Epoch [2/10], Loss: 1.1327
Epoch [3/10], Loss: 1.1792
Epoch [4/10], Loss: 0.8962
Epoch [5/10], Loss: 0.8137
Epoch [6/10], Loss: 0.9422
Epoch [7/10], Loss: 0.6990
Epoch [8/10], Loss: 0.5932
Epoch [9/10], Loss: 0.5069
Epoch [10/10], Loss: 0.5703


In [18]:
model.eval()
total_correct = 0
total_samples = 0

with torch.no_grad():
    for texts, labels in test_dataloader:
        texts, labels = texts.to(device), labels.to(device)

        outputs = model(texts)

        _, predicted = torch.max(outputs.data, 1)
        
        total_samples += labels.size(0)
        total_correct += (predicted == labels).sum().item()

accuracy = total_correct / total_samples
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 68.11%
