In [None]:
import tensorflow as tf
import numpy as np
import re

from tensorflow.keras.layers import (
    Embedding, Dense, Dropout, LayerNormalization, Input,
    GlobalAveragePooling1D, TextVectorization
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import MultiHeadAttention

print("TensorFlow version:", tf.__version__)


# 1. NSMC 데이터 다운로드 및 읽기
path_to_train_file = tf.keras.utils.get_file(
    'train.txt',
    'https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt'
)
path_to_test_file = tf.keras.utils.get_file(
    'test.txt',
    'https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt'
)

train_text = open(path_to_train_file, 'rb').read().decode(encoding='utf-8')
test_text = open(path_to_test_file, 'rb').read().decode(encoding='utf-8')

print('Length of train text:', len(train_text))
print('Length of test text:', len(test_text))


# 2. 레이블(label) 추출
train_Y = np.array([
    [int(row.split('\t')[2])]
    for row in train_text.split('\n')[1:]
    if row.count('\t') > 0
])

test_Y = np.array([
    [int(row.split('\t')[2])]
    for row in test_text.split('\n')[1:]
    if row.count('\t') > 0
])

print("train_Y shape:", train_Y.shape)
print("test_Y shape:", test_Y.shape)


# 3. 텍스트 정제 함수 정의
def clean_str(string):
    string = re.sub(r"[^가-힣A-Za-z0-9(),!?\'\`]", " ", string)
    string = re.sub(r"\'s", " \'s", string)
    string = re.sub(r"\'ve", " \'ve", string)
    string = re.sub(r"n\'t", " n\'t", string)
    string = re.sub(r"\'re", " \'re", string)
    string = re.sub(r"\'d", " \'d", string)
    string = re.sub(r"\'ll", " \'ll", string)
    string = re.sub(r",", " , ", string)
    string = re.sub(r"!", " ! ", string)
    string = re.sub(r"\(", " \( ", string)
    string = re.sub(r"\)", " \) ", string)
    string = re.sub(r"\?", " \? ", string)
    string = re.sub(r"\s{2,}", " ", string)
    string = re.sub(r"\'{2,}", "\'", string)
    string = re.sub(r"\'", "", string)
    return string.lower()


# 4. 훈련/테스트 텍스트 전처리
train_text_X = [
    row.split('\t')[1]
    for row in train_text.split('\n')[1:]
    if row.count('\t') > 0
]
train_text_X = [clean_str(sentence) for sentence in train_text_X]

test_text_X = [
    row.split('\t')[1]
    for row in test_text.split('\n')[1:]
    if row.count('\t') > 0
]
test_text_X = [clean_str(sentence) for sentence in test_text_X]


# 5. TextVectorization으로 정수 인코딩 + 패딩
VOCAB_SIZE = 2000
MAX_LEN = 25

vectorize_layer = TextVectorization(
    standardize='lower_and_strip_punctuation',
    split='whitespace',
    max_tokens=VOCAB_SIZE,
    output_mode='int',
    output_sequence_length=MAX_LEN
)

vectorize_layer.adapt(train_text_X)

train_X = vectorize_layer(train_text_X)
test_X = vectorize_layer(test_text_X)

print("train_X shape:", train_X.shape)
print("test_X shape:", test_X.shape)


# 6. Transformer용 하이퍼파라미터 설정
EMBEDDING_DIM = 64
NUM_HEADS = 2
FF_DIM = 128
DROPOUT_RATE = 0.1
NUM_BLOCKS = 2
EPOCHS = 5
BATCH_SIZE = 32

tf.random.set_seed(42)
np.random.seed(42)


# 7. Token + Positional Embedding 레이어
class TokenAndPositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = Embedding(vocab_size, embed_dim)
        self.pos_emb = Embedding(maxlen, embed_dim)

    def call(self, x):
        # 위치 인덱스: 0 ~ maxlen-1
        positions = tf.range(start=0, limit=tf.shape(x)[-1], delta=1)
        positions = self.pos_emb(positions)          # (maxlen, embed_dim)
        return self.token_emb(x) + positions         # (batch, maxlen, embed_dim)


# 8. Transformer Block 정의
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        # Multi-Head Self-Attention
        self.att = MultiHeadAttention(
            num_heads=num_heads,
            key_dim=embed_dim // num_heads
        )
        # Feed Forward Network (FFN)
        self.ffn = Sequential([
            Dense(ff_dim, activation="relu"),
            Dense(embed_dim),
        ])
        # Layer Normalization + Dropout
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training=None):
        # Self-Attention + Residual + LayerNorm
        attn_output = self.att(inputs, inputs)  # Self-Attention (Q=K=V=inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)

        # FFN + Residual + LayerNorm
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


# 9. Transformer 기반 감성 분류 모델 정의
inputs = Input(shape=(MAX_LEN,))
x = TokenAndPositionalEmbedding(MAX_LEN, VOCAB_SIZE, EMBEDDING_DIM)(inputs)

for _ in range(NUM_BLOCKS):
    x = TransformerBlock(EMBEDDING_DIM, NUM_HEADS, FF_DIM, DROPOUT_RATE)(x)

x = GlobalAveragePooling1D()(x)
x = Dropout(0.1)(x)
outputs = Dense(1, activation='sigmoid')(x)

transformer_model = Model(inputs, outputs)
transformer_model.summary()


# 10. 컴파일 및 학습
transformer_model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

history = transformer_model.fit(
    train_X, train_Y,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_split=0.2,
    verbose=1
)


# 11. 테스트 데이터 평가
test_loss, test_acc = transformer_model.evaluate(test_X, test_Y, verbose=0)
print(f"[Transformer] Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")


# 12. 예시 문장 감성 분류
example_sentences = [
    "이 영화는 정말 재미있고, 스토리도 흥미진진하며 배우들의 연기까지 완벽했다.",
    "전체적으로 너무 지루하고 스토리가 늘어져서 끝까지 보기 힘들었다."
]

example_sentences_clean = [clean_str(s) for s in example_sentences]
example_seq = vectorize_layer(example_sentences_clean)
pred = transformer_model.predict(example_seq)

for s, p in zip(example_sentences, pred):
    print(f"문장: {s}")
    print(f"긍정 확률: {p[0]:.4f}")
    print("결과:", "긍정" if p[0] > 0.5 else "부정")


TensorFlow version: 2.19.0
Downloading data from https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt


  string = re.sub(r"\(", " \( ", string)
  string = re.sub(r"\)", " \) ", string)
  string = re.sub(r"\?", " \? ", string)


[1m14628807/14628807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt
[1m4893335/4893335[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Length of train text: 6937271
Length of test text: 2318260
train_Y shape: (150000, 1)
test_Y shape: (50000, 1)
train_X shape: (150000, 25)
test_X shape: (50000, 25)


Epoch 1/5
[1m1175/3750[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m1:33[0m 36ms/step - accuracy: 0.6349 - loss: 0.6241