In [None]:
# Colab에 필요한 라이브러리를 설치합니다.
!pip install transformers datasets



In [None]:
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader

from datasets import load_dataset
from transformers import BertTokenizer

# 1. 데이터셋 로드
# train, validation, test 세트로 구성된 것을 확인할 수 있습니다.
datasets = load_dataset("dair-ai/emotion")
print(datasets)

# 2. 라벨 이름 확인 (참고)
# 0: sadness, 1: joy, 2: love, 3: anger, 4: fear, 5: surprise
label_names = datasets["train"].features["label"].names
print(f"라벨 종류: {label_names}")

# - 라벨 개수를 따로 저장해 이후 셀에서 재사용한다
num_labels = len(label_names)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

split/train-00000-of-00001.parquet:   0%|          | 0.00/1.03M [00:00<?, ?B/s]

split/validation-00000-of-00001.parquet:   0%|          | 0.00/127k [00:00<?, ?B/s]

split/test-00000-of-00001.parquet:   0%|          | 0.00/129k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/16000 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/2000 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 16000
    })
    validation: Dataset({
        features: ['text', 'label'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 2000
    })
})
라벨 종류: ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']


In [None]:
import random
import numpy as np

# - 실험 재현을 위해 모든 모듈 시드를 고정한다
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(RANDOM_SEED)

# - 학습에 사용할 디바이스 정보를 미리 출력한다
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"사용할 장치: {DEVICE}")


사용할 장치: cuda


In [None]:
# --- 1. 학습 설정 (Hyperparameters) ---
# - 학습 관련 핵심 값을 한 곳에서 정의한다
NUM_EPOCHS = 3
TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 32
LEARNING_RATE = 5e-5
# PRETRAINED_FILE_PATH = "./save_bert_pretrain.pth"- (수정 by 승연.) github로부터 load.

# - 실행 전에 설정을 한눈에 점검한다
def describe_run():
    print("=== Fine-tune 설정 ===")
    print(f"epochs: {NUM_EPOCHS}")
    print(f"train_batch_size: {TRAIN_BATCH_SIZE}")
    print(f"eval_batch_size: {EVAL_BATCH_SIZE}")
    print(f"learning_rate: {LEARNING_RATE}")
    print(f"pretrained_path: {PRETRAINED_FILE_PATH}")
    print(f"num_labels: {num_labels}")
    print(f"device: {DEVICE}")

describe_run()


In [None]:
# 팀원의 bert_implementation.ipynb에서 가져온 핵심 Config 및 모듈 정의
import json
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

class Config(dict):
    __getattr__ = dict.__getitem__
    __setattr__ = dict.__setitem__

    @classmethod
    def load(cls, file):
        with open(file, 'r') as f:
            config = json.loads(f.read())
            return Config(config)

def create_padding_mask(seq_q, seq_k, pad_idx):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    mask = seq_k.eq(pad_idx).unsqueeze(1).expand(batch_size, len_q, len_k)
    return mask

class ScaledDotProductAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.scale = 1 / (config.d_head ** 0.5)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, Q, K, V, mask):
        scores = torch.matmul(Q, K.transpose(-1, -2)) * self.scale
        scores.masked_fill_(mask, -1e9)
        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        context = torch.matmul(attn, V)
        return context, attn

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.W_Q = nn.Linear(config.d_hidn, config.n_head * config.d_head)
        self.W_K = nn.Linear(config.d_hidn, config.n_head * config.d_head)
        self.W_V = nn.Linear(config.d_hidn, config.n_head * config.d_head)
        self.scaled_attn = ScaledDotProductAttention(config)
        self.linear = nn.Linear(config.n_head * config.d_head, config.d_hidn)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, Q, K, V, mask):
        B = Q.size(0)
        q_s = self.W_Q(Q).view(B, -1, self.config.n_head, self.config.d_head).transpose(1, 2)
        k_s = self.W_K(K).view(B, -1, self.config.n_head, self.config.d_head).transpose(1, 2)
        v_s = self.W_V(V).view(B, -1, self.config.n_head, self.config.d_head).transpose(1, 2)
        mask = mask.unsqueeze(1).repeat(1, self.config.n_head, 1, 1)
        context, attn = self.scaled_attn(q_s, k_s, v_s, mask)
        context = context.transpose(1, 2).contiguous().view(B, -1, self.config.n_head * self.config.d_head)
        output = self.linear(context)
        output = self.dropout(output)
        return output, attn

class PositionwiseFeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear1 = nn.Linear(config.d_hidn, config.d_ff)
        self.linear2 = nn.Linear(config.d_ff, config.d_hidn)
        self.activation = nn.GELU()
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        return self.linear2(self.dropout(self.activation(self.linear1(x))))

class SelfAttentionEncoderBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.multi_head_attn = MultiHeadSelfAttention(config)
        self.norm_after_attn = nn.LayerNorm(config.d_hidn, eps=config.layer_norm_epsilon)
        self.feed_forward = PositionwiseFeedForward(config)
        self.norm_after_ffn = nn.LayerNorm(config.d_hidn, eps=config.layer_norm_epsilon)

    def forward(self, x, attn_mask):
        attn_output, attn_weights = self.multi_head_attn(x, x, x, attn_mask)
        x = self.norm_after_attn(x + attn_output)
        ffn_output = self.feed_forward(x)
        x = self.norm_after_ffn(x + ffn_output)
        return x, attn_weights

class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.token_embedding = nn.Embedding(config.n_enc_vocab, config.d_hidn)
        self.position_embedding = nn.Embedding(config.n_enc_seq + 1, config.d_hidn)
        self.segment_embedding = nn.Embedding(config.n_seg_type, config.d_hidn)
        self.encoder_blocks = nn.ModuleList([
            SelfAttentionEncoderBlock(config) for _ in range(config.n_layer)
        ])

    def forward(self, token_ids, segment_ids):
        seq_length = token_ids.size(1)
        device = token_ids.device
        position_ids = torch.arange(seq_length, dtype=torch.long, device=device).unsqueeze(0).expand_as(token_ids) + 1
        position_ids = position_ids.masked_fill(token_ids.eq(self.config.i_pad), 0)
        x = (
            self.token_embedding(token_ids)
            + self.position_embedding(position_ids)
            + self.segment_embedding(segment_ids)
        )
        attn_mask = create_padding_mask(token_ids, token_ids, self.config.i_pad)
        all_attention_weights = []
        for block in self.encoder_blocks:
            x, attn_weights = block(x, attn_mask)
            all_attention_weights.append(attn_weights)
        return x, all_attention_weights

class BERTModel(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.encoder = TransformerEncoder(self.config)
        self.linear_cls = nn.Linear(config.d_hidn, config.d_hidn)
        self.activation_cls = torch.tanh

    def forward(self, input_ids, segment_ids):
        encoder_output, attention_weights = self.encoder(input_ids, segment_ids)
        cls_output = encoder_output[:, 0].contiguous()
        cls_output = self.linear_cls(cls_output)
        cls_output = self.activation_cls(cls_output)
        return encoder_output, cls_output, attention_weights


In [None]:
from transformers import BertTokenizer  # AutoTokenizer 대신 명시적으로 사용

VOCAB_FILE = "./bert-implementation/data-preprocessing/mini_emotion_tokenizer_7k.txt"
tokenizer = BertTokenizer.from_pretrained(VOCAB_FILE, do_lower_case=True)

def preprocess_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)

print("커스텀 7k 토크나이저 로드 완료")
tokenized_datasets = datasets.map(preprocess_function, batched=True)
tokenized_datasets.set_format("torch", columns=["input_ids", "attention_mask", "label"])
print("전처리 후 데이터 샘플:")
print(tokenized_datasets["train"][0])


In [None]:
class BERTForSequenceClassification(nn.Module):
    def __init__(self, config, num_labels):
        super().__init__()
        self.bert = BERTModel(config)
        self.classifier = nn.Linear(config.d_hidn, num_labels)

    def forward(self, input_ids, segment_ids):
        _, cls_output, _ = self.bert(input_ids, segment_ids)
        logits = self.classifier(cls_output)
        return logits

print("BERTForSequenceClassification 정의 완료.")


In [None]:
# --- 2. ⚠️ 팀의 BERT 모델 로드 ⚠️ ---
# - 사전학습 구성으로 BERT 분류 모델을 초기화한다
config = Config({
    "n_enc_vocab": 7000,
    "n_enc_seq": 256,
    "n_seg_type": 2,
    "n_layer": 6,
    "d_hidn": 256,
    "i_pad": 0,
    "d_ff": 1024,
    "n_head": 4,
    "d_head": 64,
    "dropout": 0.1,
    "layer_norm_epsilon": 1e-12
})
config.device = DEVICE

model = BERTForSequenceClassification(config, num_labels=num_labels).to(DEVICE)

try:
    checkpoint = torch.load(PRETRAINED_FILE_PATH, map_location=DEVICE)
    state_dict = checkpoint.get("state_dict", checkpoint)
    model.bert.load_state_dict(state_dict)
    print(f"[불러오기 완료] '{PRETRAINED_FILE_PATH}' 로드 성공")
except FileNotFoundError:
    print(f"[경고] '{PRETRAINED_FILE_PATH}' 파일을 찾지 못했습니다. 경로를 확인하세요.")
except Exception as e:
    print(f"[오류] 모델 로드 실패: {e}")

optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()
# - 데이터 분할별 배치 크기를 적용해 DataLoader를 만든다
train_dataloader = DataLoader(tokenized_datasets["train"], shuffle=True, batch_size=TRAIN_BATCH_SIZE)
eval_dataloader = DataLoader(tokenized_datasets["validation"], batch_size=EVAL_BATCH_SIZE)
print("모델, 옵티마이저, 데이터로더 준비 완료.")


In [None]:
from tqdm.auto import tqdm

# - 지정한 에포크 동안 학습과 검증을 반복한다
for epoch in range(NUM_EPOCHS):
    print(f"
--- Epoch {epoch + 1}/{NUM_EPOCHS} ---")
    # --- 1. 학습 (Training) ---
    model.train()
    train_loss = 0
    for batch in tqdm(train_dataloader, desc="Training"):
        input_ids = batch["input_ids"].to(DEVICE)
        labels = batch["label"].to(DEVICE)
        segment_ids = torch.zeros_like(input_ids).to(DEVICE)

        logits = model(input_ids=input_ids, segment_ids=segment_ids)
        loss = criterion(logits, labels)
        train_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    avg_train_loss = train_loss / len(train_dataloader)
    print(f"평균 학습 손실(Loss): {avg_train_loss:.4f}")

    # --- 2. 평가 (Validation) ---
    model.eval()
    val_loss = 0
    correct_predictions = 0
    with torch.no_grad():
        for batch in tqdm(eval_dataloader, desc="Validation"):
            input_ids = batch["input_ids"].to(DEVICE)
            labels = batch["label"].to(DEVICE)
            segment_ids = torch.zeros_like(input_ids).to(DEVICE)

            logits = model(input_ids=input_ids, segment_ids=segment_ids)
            loss = criterion(logits, labels)
            val_loss += loss.item()

            preds = torch.argmax(logits, dim=1)
            correct_predictions += (preds == labels).sum().item()

    avg_val_loss = val_loss / len(eval_dataloader)
    accuracy = correct_predictions / len(tokenized_datasets["validation"])
    print(f"평균 검증 손실(Loss): {avg_val_loss:.4f} | 정확도: {accuracy:.4f}")

print("--- 학습 완료! ---")


In [None]:
import torch.nn.functional as F

def predict_emotion(text):
    print(f'입력 문장: "{text}"')
    model.eval()
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    input_ids = inputs["input_ids"].to(DEVICE)
    segment_ids = torch.zeros_like(input_ids).to(DEVICE)

    with torch.no_grad():
        logits = model(input_ids=input_ids, segment_ids=segment_ids)

    probabilities = F.softmax(logits, dim=1)[0]
    results = {label: probabilities[i].item() for i, label in enumerate(label_names)}

    print("--- 6개 라벨 Softmax 확률 값 ---")
    for label, prob in results.items():
        print(f"{label:10}: {prob:.4f} ( {prob*100:6.2f} % )")

    predicted_label_index = torch.argmax(probabilities).item()
    predicted_label = label_names[predicted_label_index]
    print(f"\\n=> 예측된 감정: {predicted_label}")

predict_emotion("I feel so happy and excited today!")
print("-" * 30)
predict_emotion("This is so frustrating and makes me angry.")
