<a href="https://colab.research.google.com/github/hzzz15/Soulsync/blob/main/models/soulsync_modeling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. 코랩 환경 설정 및 라이브러리 설치

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
!pip install transformers datasets sentencepiece --upgrade

In [None]:
!git clone https://github.com/SKTBrain/KoBERT.git
%cd KoBERT/kobert_hf
!pip install . --no-deps

In [None]:
# 기본 import
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import get_cosine_schedule_with_warmup
from torch.optim import AdamW
from kobert_tokenizer import KoBERTTokenizer

In [None]:
# 모델 및 토크나이저 불러오기
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
bert_model = BertModel.from_pretrained('skt/kobert-base-v1')

2. 데이터 전처리 및 라벨 인코딩

In [None]:
# 구글 드라이브 연동
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# csv 파일
import pandas as pd

# CSV 파일 경로 (로컬 업로드한 경우)
file_path = '/content/merged_full_dataset.csv'

# 파일 불러오기
df = pd.read_csv(file_path)
df.head()

In [None]:
# 감정 문자열을 숫자로 매핑
emotion_label_map = {
    "행복": 0,
    "놀람": 1,
    "분노": 2,
    "공포": 3,
    "혐오": 4,
    "슬픔": 5,
    "중립": 6
}

df["label"] = df["Emotion"].map(emotion_label_map)
df = df[["Sentence", "label"]].dropna()
df.head()

In [None]:
# 문장, 라벨을 리스트로 구성
data_list = []
for sentence, label in zip(df["Sentence"], df["label"]):
    data_list.append([sentence, int(label)])

print(f"전체 샘플 수: {len(data_list)}")
print(data_list[:3])

In [None]:
# train/test 분리
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(data_list, test_size=0.2, random_state=42)
print(f"Train: {len(train_data)}, Test: {len(test_data)}")

3. KoBERT용 Dataset 및 DataLoader 정의

In [None]:
# BERTSentenceTransform 정의
import numpy as np
from torch.utils.data import Dataset

class BERTSentenceTransform:
    def __init__(self, tokenizer, max_seq_length, pad=True, pair=False):
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length
        self.pad = pad
        self.pair = pair

    def __call__(self, line):
        text = line[0]
        tokens = self.tokenizer.tokenize(text)

        # Truncate
        if len(tokens) > self.max_seq_length - 2:
            tokens = tokens[:self.max_seq_length - 2]

        tokens = ['[CLS]'] + tokens + ['[SEP]']
        segment_ids = [0] * len(tokens)
        input_ids = self.tokenizer.convert_tokens_to_ids(tokens)
        valid_length = len(input_ids)

        if self.pad:
            pad_len = self.max_seq_length - valid_length
            input_ids += [self.tokenizer.convert_tokens_to_ids(['[PAD]'])[0]] * pad_len
            segment_ids += [0] * pad_len

        return np.array(input_ids, dtype=np.int32), \
               np.array(valid_length, dtype=np.int32), \
               np.array(segment_ids, dtype=np.int32)

In [None]:
# BERTDataset 정의
class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, tokenizer, max_len, pad, pair):
        self.transform = BERTSentenceTransform(tokenizer, max_seq_length=max_len, pad=pad, pair=pair)
        self.sentences = [self.transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (*self.sentences[i], self.labels[i])

    def __len__(self):
        return len(self.labels)


In [None]:
# 데이터로더 구성
from torch.utils.data import DataLoader

# 하이퍼파라미터 설정
max_len = 64
batch_size = 32

# Dataset 생성
train_dataset = BERTDataset(train_data, 0, 1, tokenizer, max_len, True, False)
test_dataset = BERTDataset(test_data, 0, 1, tokenizer, max_len, True, False)

# DataLoader
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
# 확인
sample = next(iter(train_dataloader))
print("input_ids:", sample[0].shape)
print("valid_length:", sample[1].shape)
print("segment_ids:", sample[2].shape)
print("labels:", sample[3].shape)

4. KoBERT 모델 정의 + Optimizer & Scheduler 설정

In [None]:
# KoBERT 모델 정의
import torch.nn as nn
import torch

class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size=768,
                 num_classes=7,
                 dr_rate=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(hidden_size, num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)

    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        _, pooled_output = self.bert(
            input_ids=token_ids,
            token_type_ids=segment_ids.long(),
            attention_mask=attention_mask.to(token_ids.device),
            return_dict=False
        )
        if self.dr_rate:
            pooled_output = self.dropout(pooled_output)
        return self.classifier(pooled_output)

In [None]:
# 모델 객체 생성
model = BERTClassifier(bert_model, dr_rate=0.5).to(device)
loss_fn = nn.CrossEntropyLoss()

In [None]:
# Optimizer 수정
from torch.optim import AdamW

learning_rate = 5e-5
epochs = 5
max_grad_norm = 1.0

optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)

In [None]:
# Scheduler 정의
from transformers import get_cosine_schedule_with_warmup

total_steps = len(train_dataloader) * epochs
warmup_steps = int(total_steps * 0.1)

scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps
)

5. KoBERT 학습 루프 + 중간 저장 기능(checkpoint)

In [None]:
# Accuracy 계산
def calc_accuracy(preds, labels):
    _, pred_max = torch.max(preds, 1)
    correct = (pred_max == labels).sum().item()
    return correct / len(labels)

In [None]:
# 모델 저장
def save_checkpoint(model, optimizer, epoch, path):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }
    torch.save(checkpoint, path)

In [None]:
from tqdm import tqdm
import torch.nn.functional as F
import torch

def calc_accuracy(preds, labels):
    _, pred_max = torch.max(preds, 1)
    correct = (pred_max == labels).sum().item()
    return correct / len(labels)

def save_checkpoint(model, optimizer, epoch, path="./kobert_best.pt"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }
    torch.save(checkpoint, path)

best_val_acc = 0.0

for epoch in range(epochs):
    model.train()
    total_loss = 0
    train_acc = 0

    for batch in tqdm(train_dataloader):
        token_ids, valid_length, segment_ids, label = [x.to(device) for x in batch]
        label = label.long()  # CrossEntropyLoss는 long 타입 라벨 필요

        optimizer.zero_grad()
        out = model(token_ids, valid_length, segment_ids)

        loss = loss_fn(out, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        train_acc += calc_accuracy(out, label)

    avg_train_loss = total_loss / len(train_dataloader)
    avg_train_acc = train_acc / len(train_dataloader)
    print(f"\n[Epoch {epoch+1}] Train loss: {avg_train_loss:.4f}, Train acc: {avg_train_acc:.4f}")

    # Validation
    model.eval()
    val_loss = 0
    val_acc = 0

    with torch.no_grad():
        for batch in test_dataloader:
            token_ids, valid_length, segment_ids, label = [x.to(device) for x in batch]
            label = label.long()

            out = model(token_ids, valid_length, segment_ids)
            loss = loss_fn(out, label)

            val_loss += loss.item()
            val_acc += calc_accuracy(out, label)

    avg_val_loss = val_loss / len(test_dataloader)
    avg_val_acc = val_acc / len(test_dataloader)
    print(f"[Epoch {epoch+1}] Val loss: {avg_val_loss:.4f}, Val acc: {avg_val_acc:.4f}")

    # Checkpoint
    if avg_val_acc > best_val_acc:
        best_val_acc = avg_val_acc
        save_checkpoint(model, optimizer, epoch+1)
        print(f"Best model saved at epoch {epoch+1}")


In [None]:
# 모델 로드 시
checkpoint = torch.load("./kobert_best.pt", map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
start_epoch = checkpoint["epoch"]

6. 문장 -> 감정 예측 함수

In [None]:
# 숫자 → 감정 문자열
id2label = {
    0: "행복",
    1: "놀람",
    2: "분노",
    3: "공포",
    4: "혐오",
    5: "슬픔",
    6: "중립"
}

In [None]:
def predict_emotion(sentence, model, tokenizer, max_len=64):
    model.eval()

    # 입력 문장을 KoBERT용 포맷으로 변환
    transform = BERTSentenceTransform(tokenizer, max_seq_length=max_len, pad=True, pair=False)
    input_ids, valid_length, segment_ids = transform([sentence])

    # valid_length -> int로 변환
    input_ids = torch.tensor([input_ids]).to(device)
    valid_length = torch.tensor([int(valid_length)]).to(device)
    segment_ids = torch.tensor([segment_ids]).to(device)

    # 예측 수행
    with torch.no_grad():
        output = model(input_ids, valid_length, segment_ids)
        predicted = torch.argmax(output, dim=1).item()
        emotion = id2label[predicted]

    print(f"예측된 감정: {emotion}")
    return emotion

In [None]:
# 모델 객체 생성 후 로드
model = BERTClassifier(bert_model, dr_rate=0.5).to(device)
checkpoint = torch.load("./kobert_best.pt", map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])

In [None]:
# 예시
predict_emotion("이 노래 너무 좋아서 눈물이 나요", model, tokenizer)
predict_emotion("뭐야 이게 왜 이래?", model, tokenizer)

7. 저장

In [None]:
# Colab에서 모델 저장
torch.save({
    'model_state_dict': model.state_dict(),
}, 'soulsync_model.pt')

In [None]:
# 모델 구조 정의도 저장
import torch.nn as nn
import torch

class BERTClassifier(nn.Module):
    def __init__(self, bert, hidden_size=768, num_classes=7, dr_rate=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(hidden_size, num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)

    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        _, pooled_output = self.bert(input_ids=token_ids,
                                     token_type_ids=segment_ids.long(),
                                     attention_mask=attention_mask.to(token_ids.device),
                                     return_dict=False)
        if self.dr_rate:
            pooled_output = self.dropout(pooled_output)
        return self.classifier(pooled_output)

In [None]:
# 다운로드
from google.colab import files
files.download('soulsync_model.pt')
files.download('soulsync_model.py')