In [2]:
pip install sentencepiece

Collecting sentencepiece
  Downloading sentencepiece-0.2.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (10 kB)
Downloading sentencepiece-0.2.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.2.1
Note: you may need to restart the kernel to use updated packages.


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import sentencepiece as spm
import seaborn as sns

import math
import os
import re
import urllib.request
import zipfile
import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [4]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [5]:
# 데이터 불러오기
home_dir = os.getenv('HOME')
train_data_path = os.path.join(home_dir, "aiffel/dktc/data/train.csv")
train_data = pd.read_csv(train_data_path)
train_data.head()

Unnamed: 0,idx,class,conversation
0,0,협박 대화,지금 너 스스로를 죽여달라고 애원하는 것인가?\n 아닙니다. 죄송합니다.\n 죽을 ...
1,1,협박 대화,길동경찰서입니다.\n9시 40분 마트에 폭발물을 설치할거다.\n네?\n똑바로 들어 ...
2,2,기타 괴롭힘 대화,너 되게 귀여운거 알지? 나보다 작은 남자는 첨봤어.\n그만해. 니들 놀리는거 재미...
3,3,갈취 대화,어이 거기\n예??\n너 말이야 너. 이리 오라고\n무슨 일.\n너 옷 좋아보인다?...
4,4,갈취 대화,저기요 혹시 날이 너무 뜨겁잖아요? 저희 회사에서 이 선크림 파는데 한 번 손등에 ...


In [7]:
%pip install -U scikit-learn
from sklearn.preprocessing import LabelEncoder


Collecting scikit-learn
  Downloading scikit_learn-1.7.2-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (11 kB)
Downloading scikit_learn-1.7.2-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (9.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.5/9.5 MB[0m [31m55.7 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hInstalling collected packages: scikit-learn
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 1.7.0
    Uninstalling scikit-learn-1.7.0:
      Successfully uninstalled scikit-learn-1.7.0
Successfully installed scikit-learn-1.7.2
Note: you may need to restart the kernel to use updated packages.


In [8]:
CLASS_NAMES = ['협박 대화', '갈취 대화', '직장 내 괴롭힘 대화', '기타 괴롭힘 대화']

encoder = LabelEncoder()
encoder.fit(CLASS_NAMES)

train_data['class'] = encoder.transform(train_data['class'])

corpus = train_data["conversation"]

In [9]:
train_data.head()

Unnamed: 0,idx,class,conversation
0,0,3,지금 너 스스로를 죽여달라고 애원하는 것인가?\n 아닙니다. 죄송합니다.\n 죽을 ...
1,1,3,길동경찰서입니다.\n9시 40분 마트에 폭발물을 설치할거다.\n네?\n똑바로 들어 ...
2,2,1,너 되게 귀여운거 알지? 나보다 작은 남자는 첨봤어.\n그만해. 니들 놀리는거 재미...
3,3,0,어이 거기\n예??\n너 말이야 너. 이리 오라고\n무슨 일.\n너 옷 좋아보인다?...
4,4,0,저기요 혹시 날이 너무 뜨겁잖아요? 저희 회사에서 이 선크림 파는데 한 번 손등에 ...


In [None]:
#전처리 함수
#===============================================================================================================================#
# 전처리
import re, unicodedata

STOPWORDS = ['의','가','이','은','들','는','좀','잘','걍','과','도','를','으로','자','에','와','한','하다']
_re_keep_ko_digit = re.compile(r'[^가-힣0-9\s]+')
_re_multi_space   = re.compile(r'\s+')

def preprocess_sentence(s: str) -> str:
    s = unicodedata.normalize('NFKC', '' if s is None else str(s))
    s = _re_keep_ko_digit.sub(' ', s)
    s = _re_multi_space.sub(' ', s).strip()
    return s



In [10]:
#데이터 분석 함수
#===============================================================================================================================#
def below_threshold_len(max_len, data, features):
    max_len_list = []
    # 두 입력의 길이가 맞지 않다면 맞춰주기
    if len(max_len) < len(features):
        max_len_list = [max_len for _ in range(len(features) - len(max_len))]
    elif len(max_len) > len(features):
        max_len_list = max_len[:len(features)]
    else:
        max_len_list = max_len

    for idx, feature in enumerate(features):
        cnt=0
        for s in data[feature]:
            if(len(s.split())<=max_len_list[idx]):
                cnt = cnt+1
        print('전체 %s 샘플 중 길이가 %s 이하인 샘플의 비율: %s'%(feature ,max_len_list[idx], (cnt / len(data[feature]))))

#데이터 길이 시각화 함수
def DataLengthVisualization(data, features, bins=40):

    #가변 길이 처리하기위한 dictionary
    container = dict()

    for idx, feature in enumerate(features):
        container[feature] = [len(s.split()) for s in data[feature]]

        print('{}의 최소 길이 : {}'.format(feature, np.min(container[feature])))
        print('{}의 최대 길이 : {}'.format(feature, np.max(container[feature])))
        print('{}의 평균 길이 : {}'.format(feature, np.mean(container[feature])))

        plt.subplot(1,len(features),idx+1)
        plt.boxplot(container[feature])
        plt.title(feature)

    plt.tight_layout()
    plt.show()

    for idx, feature in enumerate(features):

        plt.title(feature)
        plt.hist(container[feature],bins=bins)
        plt.xlabel('length of samples')
        plt.ylabel('number of samples')
        plt.show()

#희귀 단어 파악
def spase_word(data, features, threshold=7):

    #가변 길이 처리하기위한 dictionary
    container = dict()

    for feature in features:
        container[feature] = data[feature].tolist()

        # 단어 빈도수 계산
        word_counter = Counter()
        for text in container[feature]:
            word_counter.update(text.split())

        total_cnt = len(word_counter)  # 전체 단어 개수
        total_freq = sum(word_counter.values())  # 전체 단어 등장 횟수
        rare_cnt = sum(1 for count in word_counter.values() if count < threshold)  # 희귀 단어 개수
        rare_freq = sum(count for count in word_counter.values() if count < threshold)  # 희귀 단어 등장 횟수

        # 희귀 단어를 제외한 단어 사전 구축
        vocab = {"<PAD>": 0, "<UNK>": 1}  # 패딩 및 미등록 단어 추가
        word_index = {word: idx + 2 for idx, (word, count) in enumerate(word_counter.items()) if count >= threshold}

        print('대상 feature : ', feature)
        print('단어 집합(vocabulary)의 크기 :', total_cnt)
        print('등장 빈도가 %s번 이하인 희귀 단어의 수: %s'%(threshold - 1, rare_cnt))
        print('단어 집합에서 희귀 단어를 제외시킬 경우의 단어 집합의 크기 %s'%(total_cnt - rare_cnt))
        print("단어 집합에서 희귀 단어의 비율:", (rare_cnt / total_cnt)*100)
        print("전체 등장 빈도에서 희귀 단어 등장 빈도 비율:", (rare_freq / total_freq)*100)
        print("=======================================================================")

In [11]:
#Transformer model 설계
#============================================================================================================================================================================#

def create_padding_mask(x):
    # x == 0 위치를 찾아 float형 1로 변환
    mask = (x==0).float()
    # (batch_size, seq_len) -> (batch_size, 1, 1, seq_len)
    mask = mask.unsqueeze(1).unsqueeze(2)
    return mask

def create_look_ahead_mask(x):
    seq_len = x.size(1)

    # (seq_len, seq_len) 크기의 하삼각 행렬(tril) 생성 후 1에서 빼서
    # 상삼각이 1, 하삼각(자기 자신 포함)이 0이 되도록 설정
    # => 미래 토큰(자신 인덱스보다 큰 위치) 마스킹
    look_ahead_mask = 1 - torch.tril(torch.ones((seq_len, seq_len)))

    # 패딩 마스크 생성 (shape: (batch_size, 1, 1, seq_len))
    padding_mask = create_padding_mask(x)

    # look_ahead_mask: (seq_len, seq_len) -> (1, seq_len, seq_len)  -> (1, 1, seq_len, seq_len)
    look_ahead_mask = look_ahead_mask.unsqueeze(0).unsqueeze(1)
    look_ahead_mask = look_ahead_mask.to(x.device)

    # look-ahead 마스크와 패딩 마스크를 합성 (둘 중 하나라도 1이면 마스킹)
    # 최종 shape은 브로드캐스팅으로 (batch_size, 1, seq_len, seq_len)
    combined_mask = torch.max(look_ahead_mask, padding_mask)
    return combined_mask

class PositionalEncoding(nn.Module):
    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        self.position = position

        self.pos_encoding = self._build_pos_encoding(position, d_model)

    def _get_angles(self, position, i, d_model):
        return 1.0 / (10000.0 **((2.0 * (i // 2))/d_model)) * position

    def _build_pos_encoding(self, position, d_model):
        pos = torch.arange(position, dtype=torch.float32).unsqueeze(1)
        i = torch.arange(d_model, dtype=torch.float32).unsqueeze(0)

        angle_rads = self._get_angles(pos, i, d_model)
        sines = torch.sin(angle_rads[:, 0::2])
        cosines = torch.cos(angle_rads[:, 1::2])

        pos_encoding = torch.zeros(position, d_model)
        pos_encoding[:, 0::2] = sines
        pos_encoding[:, 1::2] = cosines

        pos_encoding = pos_encoding.unsqueeze(0)
        return pos_encoding

    def forward(self, x):
        return x + self.pos_encoding[:, :x.size(1), :].to(device)

def scaled_dot_product_attention(query, key, value, mask=None):

    # 1) Q와 K의 내적을 통해 score(유사도) 계산
    # key.transpose(-1, -2): (batch_size, heads, depth, seq_len)
    # matmul 결과 shape: (batch_size, heads, seq_len, seq_len)
    matmul_qk=torch.matmul(query, key.transpose(-1, -2))

    # 2) depth에 따라 정규화
    depth = key.size(-1) # depth = d_model / heads
    logits = matmul_qk / math.sqrt(depth)

    # 3) 마스크가 주어졌다면 -1e9(아주 작은 값)를 더해 소프트맥스에서 제외시키도록 함
    # 아주 작은 값이 더해지면 e^(logit+epsilon) 가 되기 때문에 e^logit * e^epsilon 이 되어 무시할 수 있는 값이 됨
    if mask is not None:
        logits = logits + (mask * -1e9)

    # 4) 소프트맥스 계산해 attention weights 생성
    attention_weights = F.softmax(logits, dim=-1)

    # 5) attention weights와 value의 내적
    output = torch.matmul(attention_weights, value)

    return output, attention_weights

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, name="multi_head_attention"):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        # d_model은 num_heads로 나누어떨어져야 함
        assert d_model % num_heads == 0

        self.depth = d_model // num_heads

        # 파이토치에서 Dense는 nn.Linear로 대응
        self.query_dense = nn.Linear(d_model, d_model)
        self.key_dense = nn.Linear(d_model, d_model)
        self.value_dense = nn.Linear(d_model, d_model)

        self.out_dense = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        """
        x: (batch_size, seq_len, d_model)
        => (batch_size, num_heads, seq_len, depth) 형태로 변환
        """
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        x = x.permute(0, 2, 1, 3)  # (batch_size, num_heads, seq_len, depth)
        return x

    def forward(self, query, key, value, mask=None):
        """
        query, key, value: (batch_size, seq_len, d_model)
        mask: (batch_size, 1, seq_len, seq_len) 등으로 broadcast 가능하도록 구성
        """
        batch_size = query.size(0)

        # Q, K, V에 각각 Linear 적용
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)

        # Head 분할
        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        # 스케일드 닷 프로덕트 어텐션
        scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)

        # (batch_size, num_heads, seq_len, depth) -> (batch_size, seq_len, num_heads, depth)
        scaled_attention = scaled_attention.permute(0, 2, 1, 3).contiguous()

        # 다시 (batch_size, seq_len, d_model)로 합치기
        concat_attention = scaled_attention.view(batch_size, -1, self.d_model)

        output = self.out_dense(concat_attention)
        return output

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)  # 이전에 구현한 MHA
        self.dropout1 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)

        # 피드포워드 부분 (Dense -> ReLU -> Dense)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )
        self.dropout2 = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)

    def forward(self, x, mask=None):
        # (1) 멀티 헤드 어텐션 (셀프 어텐션)
        attn_output = self.mha(x, x, x, mask)  # (batch_size, seq_len, d_model)
        attn_output = self.dropout1(attn_output)
        out1 = self.norm1(x + attn_output)     # 잔차 연결 + LayerNorm

        # (2) 피드포워드 신경망
        ffn_output = self.ffn(out1)            # (batch_size, seq_len, d_model)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.norm2(out1 + ffn_output)   # 잔차 연결 + LayerNorm

        return out2

class Encoder(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 ff_dim,
                 d_model,
                 num_heads,
                 dropout=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model

        # (1) 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)

        # (2) 포지셔널 인코딩
        self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)

        self.dropout = nn.Dropout(dropout)

        # (3) EncoderLayer 쌓기
        self.enc_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])

    def forward(self, x, mask=None):
        # (1) 임베딩 & sqrt(d_model)로 스케일링
        x = self.embedding(x) * math.sqrt(self.d_model)

        # (2) 포지셔널 인코딩 적용 + 드롭아웃
        x = self.pos_encoding(x)  # shape: (batch_size, seq_len, d_model)
        x = self.dropout(x)

        # (3) num_layers만큼 쌓아올린 EncoderLayer 통과
        for layer in self.enc_layers:
            x = layer(x, mask)

        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(DecoderLayer, self).__init__()

        # 첫 번째 서브 레이어 (디코더 내부 셀프 어텐션)
        self.self_mha = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)

        # 두 번째 서브 레이어 (인코더-디코더 어텐션)
        self.encdec_mha = MultiHeadAttention(d_model, num_heads)
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)

        # 세 번째 서브 레이어 (피드포워드 네트워크)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),  # Dense(units=ff_dim)
            nn.ReLU(),                   # activation='relu'
            nn.Linear(ff_dim, d_model)   # Dense(units=d_model)
        )
        self.norm3 = nn.LayerNorm(d_model, eps=1e-6)

        # 드롭아웃
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_outputs, look_ahead_mask=None, padding_mask=None):
        # 1) 셀프 어텐션 (디코더 내부)
        self_attn_out = self.self_mha(x, x, x, mask=look_ahead_mask)
        self_attn_out = self.dropout1(self_attn_out)
        out1 = self.norm1(x + self_attn_out)  # 잔차 연결 + LayerNorm

        # 2) 인코더-디코더 어텐션
        encdec_attn_out = self.encdec_mha(out1, enc_outputs, enc_outputs, mask=padding_mask)
        encdec_attn_out = self.dropout2(encdec_attn_out)
        out2 = self.norm2(out1 + encdec_attn_out)  # 잔차 연결 + LayerNorm

        # 3) 피드포워드 (Dense -> ReLU -> Dense)
        ffn_out = self.ffn(out2)
        ffn_out = self.dropout3(ffn_out)
        out3 = self.norm3(out2 + ffn_out)  # 잔차 연결 + LayerNorm

        return out3

class Decoder(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 ff_dim,
                 d_model,
                 num_heads,
                 dropout=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model

        # (1) 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)

        # (2) 포지셔널 인코딩
        # 실제 학습 시에는 최대 시퀀스 길이에 맞추어 쓰기도 함
        self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)

        self.dropout = nn.Dropout(dropout)

        # (3) DecoderLayer 쌓기
        self.dec_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])

    def forward(self, x, enc_outputs, look_ahead_mask=None, padding_mask=None):
        # (1) 임베딩 + sqrt(d_model)로 스케일링
        x = self.embedding(x) * math.sqrt(self.d_model)

        # (2) 포지셔널 인코딩 + 드롭아웃
        x = self.pos_encoding(x)    # (batch_size, tgt_seq_len, d_model)
        x = self.dropout(x)

        # (3) num_layers만큼 쌓인 DecoderLayer 통과
        for layer in self.dec_layers:
            x = layer(x, enc_outputs, look_ahead_mask, padding_mask)

        return x

class Transformer(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 units,
                 d_model,
                 num_heads,
                 dropout=0.1):

        super(Transformer, self).__init__()

        self.encoder = Encoder(
            vocab_size = vocab_size,
            num_layers = num_layers,
            ff_dim = units,
            d_model = d_model,
            num_heads = num_heads,
            dropout = dropout
        )

        self.decoder = Decoder(
            vocab_size = vocab_size,
            num_layers = num_layers,
            ff_dim = units,
            d_model = d_model,
            num_heads = num_heads,
            dropout = dropout
        )

        # 최종 출력층: (d_model) -> (vocab_size)
        self.final_linear = nn.Linear(d_model, vocab_size)

    def forward(self, inputs, dec_inputs):
        # 1) 인코더 패딩 마스크 생성
        enc_padding_mask = create_padding_mask(inputs) # shape (batch_size, 1, 1, src_seq_len)

        # 2) 디코더 look-ahead + 패딩 마스크
        look_ahead_mask = create_look_ahead_mask(dec_inputs) # shape (batch_size, 1, tgt_seq_len, tgt_seq_len)

        # 3) 디코더에서 인코더 출력 쪽을 마스킹할 때 쓸 패딩 마스크
        dec_padding_mask = create_padding_mask(inputs) # shape (batch_size, 1, 1, src_seq_len)

        # 4) 인코더 수행
        enc_outputs = self.encoder(
            x = inputs,
            mask = enc_padding_mask
        ) # shape: (batch_size, src_seq_len, d_model)

        # 5) 디코더 수행
        dec_outputs = self.decoder(
            x = dec_inputs,                     # (batch_size, tgt_seq_len)
            enc_outputs = enc_outputs,          # (batch_size, src_seq_len, d_model)
            look_ahead_mask = look_ahead_mask,
            padding_mask = dec_padding_mask,
        ) # shape: (batch_size, tgt_seq_len, d_model)

        logits = self.final_linear(dec_outputs) # (batch_size, tgt_seq_len, vocab_size)
        return logits

In [None]:
#RoPE 설계
#============================================================================================================================================================================#




In [None]:
#Transformer decoder 기반 model 설계
#============================================================================================================================================================================#


In [None]:
# 학습 함수
#=======================================================================================================================================================================
def train_model(model, train_dataset,vocab_size=8000,
                num_layers=2, units=512, d_model=256,
                num_heads=8, dropout=0.1, train_ratio=0.8,
                warmup_steps=4000, criterion = 'CE', optimize='Adam',
                batch_size=256, epochs=50,lr = 0.001,
                verbose = 1, patience=4, max_len = 40):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    model = model(
        vocab_size=vocab_size,
        num_layers=NUM_LAYERS,
        units=UNITS,
        d_model=D_MODEL,
        num_heads=NUM_HEADS,
        dropout=DROPOUT
    )

    model.to(device)

    # 손실 함수 & 옵티마이저
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # 패딩 토큰 무시

    if optimize=='AdamW':
        optimizer=optim.AdamW(model.parameters(), lr=lr)
    elif optimize=='Adam':
        optimizer=optim.Adam(model.parameters(), betas = (0.9, 0.98), eps=1e-9, lr=lr)
    elif optimize=="SGD":
        optimizer=optim.SGD(model.parameters(), lr=lr)
    else:
        optimizer=optim.Adam(model.parameters(), betas = (0.9, 0.98), eps=1e-9, lr=lr)

    scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda = get_lr_lambda(d_model, warmup_steps = 4000))

    train_losses = []
    val_losses = []
    train_acc_list = []
    val_acc_list = []

    train_size = int(len(train_dataset) * train_ratio)
    val_size = len(train_dataset) - train_size

    # 랜덤하게 dataset나누기
    tr_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

    model.train()
    best_param = dict()
    best_val_loss = float('inf')
    early_stop_counter = 0

    for epoch in range(epochs):
        total_loss, total_acc = 0, 0

        # PyTorch DataLoader 설정. epoch 마다 train과 test 에서
        train_loader = DataLoader(tr_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        for step, batch in enumerate(train_loader):
            optimizer.zero_grad()
            loss, acc = train_step(model, batch, optimizer, criterion, device)
            total_loss += loss
            total_acc += acc

            if (step+1) % (verbose*100) == 0:
                print(f"[Epoch {epoch+1}, Step {step}] Loss: {loss:.4f}, Acc: {acc:.4f}")

            scheduler.step()

        avg_loss = total_loss / len(train_loader)
        avg_acc = total_acc / len(train_loader)
        train_losses.append(avg_loss)
        train_acc_list.append(avg_acc.to('cpu'))

        # Validation loss 계산
        model.eval()
        val_loss, val_acc = 0, 0
        with torch.no_grad():
            for batch in val_loader:
                loss, acc = eval_step(model, batch, criterion, device)

                val_loss += loss
                val_acc += acc

        val_loss /= len(val_loader)
        val_losses.append(val_loss)
        val_acc /= len(val_loader)
        val_acc_list.append(val_acc.to('cpu'))

        if (epoch+1)%verbose==0:
            print(f"Epoch {epoch+1}/{epochs} | Train Loss: {avg_loss:.4f}, Train Acc: {avg_acc:.4f} | Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        # Early Stopping 조건
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_param = copy.deepcopy(model.state_dict())
            early_stop_counter = 0
        else:
            early_stop_counter += 1

        if early_stop_counter >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break

        model.train()

    model.load_state_dict(best_param)

    Loss_Visualization(train_losses, val_losses)
    Acc_Visualization(train_acc_list, val_acc_list)
    return model

def train_step(model, batch, optimizer, loss_function, device):
    model.train()
    enc_input, dec_input, target = [x.to(device) for x in batch]

    optimizer.zero_grad()

    # 모델 포워드 패스
    logits = model(enc_input, dec_input) # (batch_size, seq_len, vocab_size)

    # Loss 계산 (패딩 토큰 무시)
    loss = loss_function(logits.permute(0, 2, 1), target) # (batch_size, vocab_size, seq_len) 필요

    # Backpropagation
    loss.backward()
    optimizer.step()

    return loss.item(), accuracy_function(logits, target, pad_id=sp.pad_id())

def eval_step(model, batch, loss_function, device):
    model.eval()
    enc_input, dec_input, target = [x.to(device) for x in batch]

    # 모델 포워드 패스
    logits = model(enc_input, dec_input) # (batch_size, seq_len, vocab_size)

    # Loss 계산 (패딩 토큰 무시)
    loss = loss_function(logits.permute(0, 2, 1), target) # (batch_size, vocab_size, seq_len) 필요

    return loss.item(), accuracy_function(logits, target, pad_id=sp.pad_id())

In [None]:
# 시각화 및 보조 함수/ 기타
#=================================================================================================================================================================================

def Loss_Visualization(train_losses, val_losses):
    plt.plot(range(len(train_losses)), train_losses, 'b-',label='Train Loss')
    plt.plot(range(len(val_losses)), val_losses,'r--', label='Validation Loss')
    plt.legend()
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Training and Validation Loss")
    plt.show()

def Acc_Visualization(train_acc, val_acc):
    plt.plot(range(len(train_acc)), train_acc, 'b-',label='Train Acc')
    plt.plot(range(len(val_acc)), val_acc,'r--', label='Validation Acc')
    plt.legend()
    plt.xlabel("Epochs")
    plt.ylabel("Acc")
    plt.title("Training and Validation Acc")
    plt.show()

def accuracy_function(y_pred, y_true, pad_id=0):
    """
    y_pred: (batch_size, seq_len, vocab_size)
    y_true: (batch_size, seq_len)
    """
    preds = y_pred.argmax(dim=-1)
    mask = (y_true != pad_id)
    correct = (preds == y_true) & mask
    acc = correct.float().sum() / mask.float().sum()
    return acc

def get_lr_lambda(d_model, warmup_steps = 4000):
    d_model = float(d_model)
    def lr_lambda(step):
        step += 1
        return (d_model ** -0.5) * min(step ** -0.5, step * (warmup_steps ** -1.5))

    return lr_lambda