# [Practice] Sentiment Analysis Practice 
- 한국 식당 리뷰 감성분석
- 커널 데이터 전처리 부분은 스킵하고 실행해주세요

---

### 데이터 로드

데이터 출처: https://huggingface.co/datasets/leey4n/KR3

In [2]:
from datasets import load_dataset

kr3 = load_dataset("leey4n/KR3", split='train')
kr3 = kr3.remove_columns(['__index_level_0__']) # Original file didn't include this column. Suspect it's a hugging face issue.

# 레이블이 2(모호한 답변)인 리뷰 drop
kr3_binary = kr3.filter(lambda example: example['Rating'] != 2)

In [3]:
import pandas as pd

# 'reviews' 코퍼스와 'ratings' 라벨 저장
reviews = kr3_binary["Review"]
ratings = kr3_binary['Rating']

# 샘플 확인
review_df = pd.DataFrame(kr3_binary, columns=kr3_binary.column_names)
display(review_df)

Unnamed: 0,Rating,Review
0,1,숙성 돼지고기 전문점입니다. 건물 모양 때문에 매장 모양도 좀 특이하지만 쾌적한 편...
1,1,고기가 정말 맛있었어요! 육즙이 가득 있어서 너무 좋았아요 일하시는 분들 너무 친절...
2,1,"잡내 없고 깔끔, 담백한 맛의 순댓국이 순댓국을 안 좋아하는 사람들에게도 술술 넘어..."
3,1,고기 양이 푸짐해서 특 순대국밥을 시킨 기분이 듭니다 맛도 좋습니다 다만 양념장이 ...
4,1,순댓국 자체는 제가 먹어본 순대국밥집 중에서 Top5 안에는 들어요. 그러나 밥 양...
...,...,...
459016,0,"731 배달 시켜 먹었고요, 거리상 1.8km입니다. 배민에서 시켰고 정확히 58만..."
459017,1,송탄 미군부대 근처에 위치한 곳 원래 로컬 맛 집으로 되게 유명했는데 삼대 천왕에 ...
459018,1,집에서 40킬로 정도 떨어져 있는 곳인데도 몇 달에 한 번은 이거 먹으러 일부러 갑...
459019,0,원래 글 안 쓰는데 이거는 정말 다른 분들 위해서 써야 할 것 같네요 방금 포장 주...


In [4]:
review_df['Review'][2]

'잡내 없고 깔끔, 담백한 맛의 순댓국이 순댓국을 안 좋아하는 사람들에게도 술술 넘어갈 듯합니다. 정식 메뉴를 시키면 구성비 좋게 찹쌀순대와 수육이 나오는데 아주 푸짐해요. 해장하러 갔는데 국물이 고소해서 소주를 더 시키게 되는... 여하튼 순댓국이 일품입니다. 송파에선 알아주는 터줏대감인듯해요'

In [5]:
# 긍/부정 비율 확인 (긍정이 압도적으로 많음)
positive_rate = len(review_df[review_df['Rating'] == 1]) / len(review_df)
negative_rate = 1 - positive_rate
print("긍정 리뷰 비율:", positive_rate, "|", "부정 리뷰 비율:", negative_rate)

긍정 리뷰 비율: 0.8455190503266735 | 부정 리뷰 비율: 0.15448094967332648


---

### 데이터 전처리

In [6]:
import re

# 1) 안전 불용어(기능어 중심)
safe_stopwords_ko = set("""
은 는 이 가 을 를 에 의 와 과 도 만 보다 에서 에게 께 부터 까지 마다 한테 처럼 대로 하고 으로 로 으로써 로써
그리고 그러나 하지만 또는 그래서 그런데 혹은 따라서 그러므로 고로
저 나 너 우리 저희 너희 그 이 저기 거기 여기 무엇 무슨 누구 어디 어느 어떤 이것 그것 저것 이런 그런 저런 이번 이쪽 저쪽 그쪽 이때 그때
다 요 죠 네 데 게 지 군 나 니다 합니다 했다 한다 된다 였다
때 시간 동안 시각 무렵 이상 이하 전후 이래 이후
또 또한 그냥 다시 계속 항상 여전히 아직 가끔 자주 거의
아 어 야 응 음 흠 헐 우와 와 오 이야 휴 허 하하 ㅋㅋ ㅎㅎ ㅠㅠ
""".split())

# 2) 보호 단어(감성 핵심)
protect_sentiment_words = set("""
별로 좋아 좋다 최고 최악 실망 만족 불만 추천 재방문
맛있다 맛없다 싱겁다 짜다 달다 매콤 맵다 느끼하다 담백하다 신선 싱싱 비린 눅눅 바삭 촉촉 부드럽다 질기다
양 많다 적다 푸짐 부족 비싸다 싸다 가성비 가심비
빠르다 느리다 한산 복잡 붐빈다 대기 기다리다
깨끗 청결 더럽 위생 지저분 냄새 쾌적 시끄럽다 조용하다
서비스 응대 친절 불친절 상냥 무례 거칠다
정말 진짜 너무 매우 아주 꽤 완전
안 못 없다 없음 아니라 비추 다시는 다신
""".split())

# 3) 1글자 화이트리스트(도메인 핵심)
whitelist_1char = set(list("맛밥국탕면롤육쌈김술밥빵밥밥양밥밥")) | {"맛","밥","국","양","술","김"}

neg_prefix = {"안","못","안함","않","못함","무","미","불","무척"}  # 필요시 조정
emphasis_words = {"정말","진짜","너무","매우","아주","완전","꽤"}

def normalize_token(t: str) -> str:
    # 숫자 통일 (선택)
    if re.fullmatch(r"\d+", t):
        return "NUM"
    # ㅋㅋㅋ, ㅎㅎㅎ 축약 (선택)
    t = re.sub(r"(ㅋ)\1+", r"\1\1", t)
    t = re.sub(r"(ㅎ)\1+", r"\1\1", t)
    return t

def join_negation(tokens):
    """'안/못/없다' + (다음 감성 단어) -> 결합 토큰 생성"""
    joined = []
    i = 0
    while i < len(tokens):
        t = tokens[i]
        if t in {"안","못","없다","없음","아니다","별로"} and i+1 < len(tokens):
            nxt = tokens[i+1]
            # 결합 토큰 만들기
            joined.append(f"{t}_{nxt}")
            i += 2
            continue
        joined.append(t)
        i += 1
    return joined

def filter_tokens(tokens,
                  stopwords=safe_stopwords_ko,
                  protect=protect_sentiment_words,
                  min_len=1):
    out = []
    tokens = [normalize_token(t) for t in tokens]
    tokens = join_negation(tokens)

    for t in tokens:
        if t in protect or t in emphasis_words:
            out.append(t)                         # 보호
            continue
        if len(t) < min_len and t not in whitelist_1char:
            continue                              # 1글자 필터 (화이트리스트 제외)
        if t in stopwords:
            continue                              # 불용어 제거
        out.append(t)
    return out

In [6]:
from konlpy.tag import Okt
from nltk import word_tokenize, sent_tokenize
from tqdm import tqdm

okt = Okt()

preprocessed_sentences = []

for review in tqdm(reviews):
    review = re.sub(r"[^가-힣0-9\s]", "", review)   # 구두점, 특수문자, 영문 제거
    review = re.sub(r"\s+", " ", review).strip()   # 여러 공백을 하나의 공백으로
    tokens = okt.morphs(review, stem=True, norm=True) # 토큰화
    tokens = filter_tokens(tokens)
    preprocessed_sentences.append(tokens)

100%|██████████| 459021/459021 [2:22:51<00:00, 53.55it/s]   


preprocessed_sentences 저장 & 로드 
- 커널 재시작시 시간 줄이기용

In [8]:
import json, gzip

# 저장 (리뷰마다 한 줄)
with gzip.open("preprocessed_sentences.jsonl.gz", "wt", encoding="utf-8") as f:
    for toks in preprocessed_sentences:
        json.dump({"tokens": toks}, f, ensure_ascii=False)
        f.write("\n")

---

토큰화 및 전처리 완료된 corpus 로드

In [7]:
import json, gzip

# preprocessed_sentences 로드
preprocessed_sentences = []
with gzip.open("preprocessed_sentences.jsonl.gz", "rt", encoding="utf-8") as f:
    for line in f:
        preprocessed_sentences.append(json.loads(line)["tokens"])

print(preprocessed_sentences[:2])

[['숙성', '돼지고기', '전문점', '이다', '건물', '모양', '때문', '매장', '모양', '좀', '특이하다', '쾌적하다', '편이', '고', '살짝', '레트로', '감성', '분위기', '잡다', '모든', '직원', '분들', '께서', '전부', '가능하다', '멘트', '치다', '고기', '초반', '커팅', '까지는', '굽다', '가격', '저렴하다', '편', '아니다_맛', '준수', '하다', '등심', '덧', '살이', '인상', '깊다', '구이', '별로_일', '줄', '알다', '육', '향', '짙다', '얇다', '며', '뻑뻑', '하다', '않다', '하이라이트', '된장찌개', '진짜', '굿', '이다', '버터', '간장', '밥', '골뱅이', '국수', '등', '나중', '더', '맛보다', '하다', '것', '들', '남기다', '두다'], ['고기', '정말', '맛있다', '육즙', '가득', '있다', '너무', '좋다', '일', '하다', '분들', '너무', '친절하다', '좋다', '가격', '조금', '있다', '그만하다', '맛', '이라고', '생각']]


단어사전 만들기

In [8]:
from collections import Counter
import json

SPECIALS = ["<pad>", "<unk>"]  # 0: pad, 1: unk

class SimpleVocab:
    def __init__(self, itos, unk_token="<unk>"):
        self._itos = list(itos)                       # index -> token
        self._stoi = {t:i for i,t in enumerate(itos)} # token -> index
        self.unk = unk_token
        self.unk_id = self._stoi.get(unk_token, 1)

    def __len__(self): return len(self._itos)
    def lookup_indices(self, tokens): return [self._stoi.get(t, self.unk_id) for t in tokens]
    def lookup_token(self, idx): return self._itos[idx]
    def get_stoi(self): return self._stoi
    def get_itos(self): return self._itos

    def save(self, path):
        with open(path, "w", encoding="utf-8") as f:
            json.dump({"itos": self._itos, "unk": self.unk}, f, ensure_ascii=False)

    @staticmethod
    def load(path):
        with open(path, "r", encoding="utf-8") as f:
            obj = json.load(f)
        return SimpleVocab(obj["itos"], unk_token=obj.get("unk","<unk>"))

def build_vocab_simple(tokenized_sentences, min_freq=2, max_size=None, specials=SPECIALS):
    """
    min_freq: 최소 등장 빈도 (희귀어를 <unk>로 처리)
    max_size: 특수토큰 포함한 전체 vocab 최대 크기 (None이면 제한 없음)
    정렬: 빈도 descending, 동률이면 사전순
    """
    counter = Counter()
    for sent in tokenized_sentences:
        counter.update(sent)

    # 빈도 필터
    items = [(w, c) for w, c in counter.items() if c >= min_freq]
    # 빈도 내림차순, 동률 사전순
    items.sort(key=lambda x: (-x[1], x[0]))

    # 특수토큰 먼저, 그 뒤 단어들
    words = [w for w, _ in items]
    itos = list(specials) + words

    # max_size 제한 (특수토큰 포함하여 자르기)
    if max_size is not None and len(itos) > max_size:
        itos = itos[:max_size]

    return SimpleVocab(itos, unk_token="<unk>")

In [9]:
vocab = build_vocab_simple(preprocessed_sentences, min_freq=2, max_size=30000)
ids = vocab.lookup_indices(preprocessed_sentences[0])
print(len(vocab), ids[:20], [vocab.lookup_token(i) for i in ids[:10]])

30000 [929, 604, 701, 9, 592, 1177, 160, 244, 1177, 38, 492, 1306, 217, 41, 190, 3565, 1172, 43, 388, 364] ['숙성', '돼지고기', '전문점', '이다', '건물', '모양', '때문', '매장', '모양', '좀']


vocab 만든거 저장

In [10]:
vocab.save("vocab.json")

vocab 다시 로드해서 사용

In [11]:
vocab = SimpleVocab.load("vocab.json")

In [12]:
# 시퀀싱
sequenced_tokens = [vocab.lookup_indices(tokens) for tokens in preprocessed_sentences]

패딩

In [14]:
ratings = list(kr3_binary["Rating"])   # 정답 라벨 (0/1)

# 입력-라벨 동기화 (빈 시퀀스 제거)
filtered_pairs = [(seq, y) for seq, y in zip(sequenced_tokens, ratings) if len(seq) > 0]

sequenced_tokens, labels = map(list, zip(*filtered_pairs))

In [15]:
print(len(sequenced_tokens), len(labels))
print(sequenced_tokens[0])  
print(labels[0])         

455822 455822
[929, 604, 701, 9, 592, 1177, 160, 244, 1177, 38, 492, 1306, 217, 41, 190, 3565, 1172, 43, 388, 364, 82, 161, 688, 828, 288, 4788, 697, 39, 2623, 4838, 1580, 183, 26, 231, 211, 2413, 3284, 2, 641, 8094, 711, 422, 476, 298, 4714, 116, 150, 707, 93, 4068, 495, 1055, 2750, 2, 13, 4736, 801, 37, 484, 9, 340, 466, 54, 3508, 367, 173, 783, 29, 269, 2, 10, 14, 637, 306]
1


In [16]:
def pad(ids, max_len, pad_id=0):
    return ids[:max_len] + [pad_id] * max(0, max_len - len(ids))

max_len = 20  # 메모리를 위한 양 
pad_id = vocab.get_stoi()["<pad>"]

In [17]:
from tqdm import tqdm
padded_sequences = [pad(seq, max_len, pad_id) for seq in tqdm(sequenced_tokens)]

100%|██████████| 455822/455822 [00:01<00:00, 382018.45it/s]


In [18]:
print(padded_sequences[0])
print(len(padded_sequences[0]))     # max_len = 130 확인
print("pad id:", pad_id)        

[929, 604, 701, 9, 592, 1177, 160, 244, 1177, 38, 492, 1306, 217, 41, 190, 3565, 1172, 43, 388, 364]
20
pad id: 0


Torch Tensor 변환

In [19]:
import torch

inputs = torch.tensor(padded_sequences, dtype=torch.long)           # shape: (N, T=130)
labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)     # shape: [N, 1]

---

### 모델 생성 및 학습

모델 정의

In [20]:
inputs

tensor([[  929,   604,   701,  ...,    43,   388,   364],
        [   39,    22,     4,  ...,  4080,     5,   243],
        [  388, 17317,   458,  ...,    53,     2,   507],
        ...,
        [   18,     8,  5092,  ...,   598,     2,    18],
        [  341,   745,  2118,  ...,    30,   143,   136],
        [  915,   768,    18,  ...,   404,  8189,  1480]])

In [21]:
labels

tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [0.],
        [1.]])

모델 인스턴스 생성

In [22]:
import torch
import torch.nn as nn

PAD_ID = 0  # your PAD token id

class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_classes):
        super().__init__()
        # keep PAD embedding fixed at zeros
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=PAD_ID)
        self.rnn_cell = nn.RNNCell(embed_size, hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x: (B, L) LongTensor
        B, L = x.size()
        device = x.device

        # lengths of each sequence (exclude PAD)
        lengths = x.ne(PAD_ID).sum(dim=1)            # (B,)

        # init hidden
        h = torch.zeros(B, self.rnn_cell.hidden_size, device=device)

        # only update hidden for timesteps < length for each sample
        max_T = int(lengths.max().item()) if L > 0 else 0
        for t in range(max_T):
            xt = self.embedding(x[:, t])            # (B, E)
            h_new = self.rnn_cell(xt, h)            # (B, H)
            mask = (t < lengths).unsqueeze(1)       # (B, 1) bool
            h = torch.where(mask, h_new, h)         # keep h for padded samples

        logits = self.fc(h)                          # (B, 1)
        return self.sigmoid(logits)                  # BCELoss-compatible


모델 학습

In [23]:
# 모델 생성
VOCAB_SIZE = len(vocab) + 1
EMBED_SIZE = 64
HIDDEN_SIZE = 64
NUM_CLASSES = 1

model = RNNClassifier(
    vocab_size=VOCAB_SIZE, 
    embed_size=EMBED_SIZE, 
    hidden_size=HIDDEN_SIZE, 
    num_classes=NUM_CLASSES
)

print(model)

RNNClassifier(
  (embedding): Embedding(30001, 64, padding_idx=0)
  (rnn_cell): RNNCell(64, 64)
  (fc): Linear(in_features=64, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [24]:
# 모델 학습
import torch.optim as optim
from tqdm import tqdm

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
epochs = 50

for epoch in range(epochs):
    model.train() 
    optimizer.zero_grad()
    outputs = model(inputs)

    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

    print(f'Epoch {epoch + 1}/{epochs} | Loss: {loss.item():.4f}')

Epoch 1/50 | Loss: 0.7130
Epoch 2/50 | Loss: 0.5841
Epoch 3/50 | Loss: 0.4802
Epoch 4/50 | Loss: 0.4220
Epoch 5/50 | Loss: 0.4530
Epoch 6/50 | Loss: 0.4375
Epoch 7/50 | Loss: 0.4122
Epoch 8/50 | Loss: 0.4152
Epoch 9/50 | Loss: 0.4196
Epoch 10/50 | Loss: 0.4077
Epoch 11/50 | Loss: 0.3959
Epoch 12/50 | Loss: 0.3919
Epoch 13/50 | Loss: 0.3903
Epoch 14/50 | Loss: 0.3844
Epoch 15/50 | Loss: 0.3730
Epoch 16/50 | Loss: 0.3619
Epoch 17/50 | Loss: 0.3572
Epoch 18/50 | Loss: 0.3534
Epoch 19/50 | Loss: 0.3429
Epoch 20/50 | Loss: 0.3280
Epoch 21/50 | Loss: 0.3156
Epoch 22/50 | Loss: 0.3068
Epoch 23/50 | Loss: 0.2982
Epoch 24/50 | Loss: 0.2885
Epoch 25/50 | Loss: 0.2820
Epoch 26/50 | Loss: 0.2776
Epoch 27/50 | Loss: 0.2690
Epoch 28/50 | Loss: 0.2598
Epoch 29/50 | Loss: 0.2546
Epoch 30/50 | Loss: 0.2515
Epoch 31/50 | Loss: 0.2458
Epoch 32/50 | Loss: 0.2380
Epoch 33/50 | Loss: 0.2329
Epoch 34/50 | Loss: 0.2302
Epoch 35/50 | Loss: 0.2250
Epoch 36/50 | Loss: 0.2192
Epoch 37/50 | Loss: 0.2162
Epoch 38/5

# 테스트

In [27]:
import re
from konlpy.tag import Okt


def sentence_preprocessing(test_input_sentence):
    okt = Okt()

    # 인풋 처리
    test_input = re.sub(r"[^가-힣0-9\s]", "", test_input_sentence)
    test_inut = re.sub(r"\s+", " ", test_input_sentence).strip()
    test_tokens = tokens = okt.morphs(test_input_sentence, stem=True, norm=True)
    test_tokens = filter_tokens(tokens)
    #print(test_tokens)
  
    sequenced_tokens = vocab.lookup_indices(test_tokens) # 시퀀싱
    #print(sequenced_tokens)

    # 패딩
    padded_tokens = [pad(sequenced_tokens, 20, 0)] # 패딩
    #print(padded_tokens)

    return torch.tensor(padded_tokens, dtype=torch.long)

In [31]:
while True:
    test_input_sentence = input("감정 분석을 할 문장을 입력해주세요: ")
    
    # 종료
    if test_input_sentence == "X":
        break
    
    print("리뷰:", test_input_sentence)
    test_input = sentence_preprocessing(test_input_sentence)
    
    model.eval()
    with torch.no_grad():
        output = model(test_input)
        print("확률:", float(output[0][0]))
        if float(output[0][0]) > 0.5:
            print("긍정")
        else:
            print("부정")
    
    print()

리뷰: 사장님이 미쳤나 싶을 정도입니다
확률: 0.9522188305854797
긍정

리뷰: 고기가 너무 질겨요...
확률: 0.17332422733306885
부정

리뷰: 이 집은 진짜 꼭 가야합니다
확률: 0.9739217162132263
긍정

리뷰: 다시는 이 집에 가지 마세요
확률: 0.05102381855249405
부정

