In [None]:
import pandas as pd
import numpy as np
import math  
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Embedding, GlobalMaxPool1D, LayerNormalization, Dropout, MultiHeadAttention
from tensorflow.keras.models import Model
from tensorflow.keras.layers import TextVectorization
import matplotlib.pyplot as plt

# 1. 데이터 준비
df = pd.read_csv('train_merged.csv')
df = df.rename(columns={'class': 'target'})

# 레이블 인코딩
le = LabelEncoder()
df['target'] = le.fit_transform(df['target'])

# 데이터 분할
X = df['conversation'].astype(str).values
y = df['target'].values
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

# 2. 텍스트 벡터화
VOCAB_SIZE = 20000
MAX_LEN = 128

vectorize_layer = TextVectorization(
    max_tokens=VOCAB_SIZE,
    output_mode='int',
    output_sequence_length=MAX_LEN
)
vectorize_layer.adapt(X_train)

# 3. 포지셔널 인코딩 클래스
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_len, embed_dim):
        super().__init__()
        self.pos_encoding = self.positional_encoding(max_len, embed_dim)

    def get_angles(self, position, i, embed_dim):
        angles = 1 / tf.pow(10000.0, (2 * (i // 2)) / tf.cast(embed_dim, tf.float32))
        return position * angles

    def positional_encoding(self, max_len, embed_dim):
        angle_rads = self.get_angles(
            position=tf.range(max_len, dtype=tf.float32)[:, tf.newaxis],
            i=tf.range(embed_dim, dtype=tf.float32)[tf.newaxis, :],
            embed_dim=embed_dim
        )
        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, tf.float32)

    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        return inputs + self.pos_encoding[:, :seq_len, :]

# 4. 트랜스포머 디코더 블록
class TransformerDecoderBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(num_heads, embed_dim)
        self.enc_attn = MultiHeadAttention(num_heads, embed_dim)
        self.ffn = tf.keras.Sequential([
            Dense(ff_dim, activation="relu"),
            Dense(embed_dim)
        ])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, inputs, enc_output, training=False):
        attn1 = self.self_attn(inputs, inputs, use_causal_mask=True)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(inputs + attn1)
        
        attn2 = self.enc_attn(out1, enc_output)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(out1 + attn2)
        
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        return self.layernorm3(out2 + ffn_output)

# 5. 모델 구성 함수
def build_transformer_model():
    inputs = Input(shape=(1,), dtype=tf.string)
    
    # 벡터화
    x = vectorize_layer(inputs)
    x = Embedding(VOCAB_SIZE, 128)(x)
    x = PositionalEncoding(MAX_LEN, 128)(x)
    
    # 인코더
    enc_output = None
    for _ in range(2):
        x = MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
        x = Dropout(0.3)(x)
        x = LayerNormalization(epsilon=1e-6)(x)
        x = Dense(256, activation='relu')(x)
        x = Dense(128)(x)
        x = LayerNormalization(epsilon=1e-6)(x)
        enc_output = x
    
    # 디코더
    dec_input = x
    for _ in range(2):
        dec_input = TransformerDecoderBlock(128, 4, 256)(dec_input, enc_output)
    
    # 출력
    x = GlobalMaxPool1D()(dec_input)
    outputs = Dense(len(le.classes_), activation='softmax')(x)
    
    model = Model(inputs, outputs)
    model.compile(optimizer='adam', 
                 loss='sparse_categorical_crossentropy',
                 metrics=['accuracy'])
    return model

# 6. 모델 생성
model = build_transformer_model()

# 7. F1 콜백
class F1Callback(tf.keras.callbacks.Callback):
    def __init__(self, X_val, y_val):
        super().__init__()
        self.X_val = X_val
        self.y_val = y_val
        
    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.X_val).argmax(axis=1)
        f1 = f1_score(self.y_val, y_pred, average='macro')
        print(f'\nVal F1: {f1:.4f}')

# 8. 학습 실행
X_train = tf.convert_to_tensor(X_train, dtype=tf.string)
X_val = tf.convert_to_tensor(X_val, dtype=tf.string)
y_train = tf.convert_to_tensor(y_train, dtype=tf.int32)
y_val = tf.convert_to_tensor(y_val, dtype=tf.int32)

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=10,
    batch_size=32,
    callbacks=[F1Callback(X_val, y_val.numpy())]  # y_val을 numpy 배열로 변환
)

# 9. 시각화 코드 (수정)
def plot_metrics(history):
    plt.figure(figsize=(15, 5))
    
    # Accuracy
    plt.subplot(1, 3, 1)
    plt.plot(history.history['accuracy'], label='Train')
    plt.plot(history.history['val_accuracy'], label='Val')
    plt.title('Accuracy')
    plt.legend()
    
    # Loss
    plt.subplot(1, 3, 2)
    plt.plot(history.history['loss'], label='Train')
    plt.plot(history.history['val_loss'], label='Val')
    plt.title('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

plot_metrics(history)
