[4] 학습 준비
- 데이터 로더 준비
- 학습용 / 테스트용 함수
- 모델 클래스
- 학습 관련 변수 => DEVICE, OPTIMIZER, MODEL 인스턴스, EPOCHS, BATCH_SIZE, LOSS_FN

In [45]:
import pandas as pd

csv로 만든 파일 불러오기 (인코딩한 가사, 어휘사전)

In [46]:
# csv로 만든 파일 불러오기.
df = pd.read_csv('encoded.csv', index_col=0) # 0번째 열을 인덱스로 지정.
vocaDF = pd.read_csv('vocab.csv', index_col=0)
df.shape, vocaDF.shape

((760, 1696), (7954, 1))

피처 라벨 분리.

In [47]:
labelSR = df.genre

featureDF = df.iloc[:,:-1]

featureDF.shape, labelSR.shape

((760, 1695), (760,))

In [48]:
import numpy as np

train, test 데이터 분리

In [49]:
from sklearn.model_selection import train_test_split
Xtrn, Xtst, ytrn, ytst = train_test_split(featureDF, labelSR, test_size=0.2, random_state = 10, shuffle = True, stratify = labelSR)

In [50]:
import torch

In [51]:
# 텐서로 변환
# 바로 변환이 안돼서 넘파이로 변환 후 다시 텐서로 변환.
XtrnNP = np.array(Xtrn)
XtstNP = np.array(Xtst)
ytrnNP = np.array(ytrn)
ytstNP = np.array(ytst)

Xtrain = torch.LongTensor(XtrnNP)
Xtest = torch.LongTensor(XtstNP)
ytrain = torch.LongTensor(ytrnNP)
ytest = torch.LongTensor(ytstNP)

In [52]:
print(f'Xtrain = {Xtrain.shape}, {Xtrain.ndim}')
print(f'Xtest = {Xtest.shape}, {Xtest.ndim}')
print(f'ytrain = {ytrain.shape}, {ytrain.ndim}')
print(f'ytest = {ytest.shape}, {ytest.ndim}')

Xtrain = torch.Size([608, 1695]), 2
Xtest = torch.Size([152, 1695]), 2
ytrain = torch.Size([608]), 1
ytest = torch.Size([152]), 1


데이터셋, 로더 만들기

In [53]:
from torch.utils.data import TensorDataset, DataLoader
trainDS = TensorDataset(Xtrain, ytrain)
testDS = TensorDataset(Xtest, ytest)

In [54]:
BATCH_SIZE = 1
trainDL = DataLoader(trainDS, BATCH_SIZE, shuffle = True)
testDL = DataLoader(testDS, BATCH_SIZE, shuffle = True)

모델 만들기

In [55]:
import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self, vocab_size, input_size, hidden_size, num_layers, dropout, output_size):
        super(LSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, input_size)
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_size, output_size)
        self.init_weights()
    
    # fc 레이어의 가중치(weight)와 편향(bias)을 초기화하는 메서드
    def init_weights(self):
        
        initrange = 0.5
        # uniform_으로 fc 레이어의 가중치를 -initrange에서 initrange 사이의 균등 분포(uniform distribution)를 가지도록 초기화
        self.fc.weight.data.uniform_(-initrange, initrange) 
        # zero_ 메서드를 사용하여 편향을 모두 0으로 초기화
        self.fc.bias.data.zero_() 

    # forward 메서드 : 모델의 순전파(forward pass)
    def forward(self, text): # text : 입력 받은 텍스트 데이터
        embedded = self.embedding(text) # 입력 데이터를 임베딩
        out, _ = self.lstm(embedded) # 임베딩된 데이터를 lstm 레이어에 입력
        out = self.fc(out[:, -1, :]) # out[:, -1, :]를 사용하여 시퀀스의 마지막 시간 단계의 출력을 선택
        return out

In [56]:
# class SentenceClassifier(nn.Module) :
#     def __init__(self, n_vocab, hidden_dim, embedding_dim, n_layers, dropout=0.5, bidirectional = True) :
#         super().__init__()

#         self.embedding=nn.Embedding(num_embeddings =n_vocab,
#                                     embedding_dim = embedding_dim,
#                                     padding_idx =0)
#         self.model = nn.LSTM(input_size = embedding_dim,
#                              hidden_size=hidden_dim,
#                              num_layers= n_layers,
#                              bidirectional = bidirectional,
#                              dropout = dropout,
#                              batch_first = True)
#         if bidirectional :
#             self.classifier = nn.Linear(hidden_dim*2, 1)
#         else :
#             self.classifier = nn.Linear(hidden_dim, 1)
#         self.dropout = nn.Dropout(dropout)
#     def forward(self, inputs) :
#         embeddings = self.embedding(inputs)
#         output, _ = self.model(embeddings)
#         last_out = output[:, -1, :]
#         last_out = self.dropout(last_out)
#         logits = self.classifier(last_out)
#         return logits

In [57]:
HIDDEN_SIZE = 64
EMBEDD_DIM  = 128
VOCAB_SIZE  = vocaDF.shape[0]
NUM_LAYERS = 1
EPOCHS      = 100
LR          = 0.1
BATCH_SIZE  = 1
DROPOUT     = 0.5
OUTPUT      = labelSR.nunique() # 8

In [58]:
OUTPUT

8

In [59]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [60]:
import torch.optim as optim

mdl = LSTM(VOCAB_SIZE, EMBEDD_DIM, HIDDEN_SIZE, NUM_LAYERS, DROPOUT, OUTPUT)
#mdl = SentenceClassifier(VOCAB_SIZE, HIDDEN_SIZE, EMBEDD_DIM,NUM_LAYERS).to(device)
# CRITERION = nn.BCEWithLogitsLoss().to(device)
# OPTIMIZER = optim.RMSprop(mdl.parameters(), lr=0.001)



In [61]:
CRITERION = nn.CrossEntropyLoss()
OPTIMIZER = optim.AdamW(mdl.parameters(), lr=LR)
SCHEDULER = optim.lr_scheduler.StepLR(OPTIMIZER, 1.0, gamma=0.1)

In [68]:
def train(model, dataloader, optimizer, criterion, epoch):
    model.train()
    
    # 학습 평가 관련 변수들
    total_acc, total_count = 0,0 # 배치 학습 과정에서 정확도(accuracy)를 계산하기 위한 변수
    log_interval=10 # 로그 출력 간격
    
    for idx, (text, label) in enumerate(dataloader):
        #text, label = text.to(device), label.to(device) => GPU로 이동
        predicted_label = model(text) # 모델에 입력 데이터를 전달하여 예측값을 계산
        optimizer.zero_grad() # 기울기(gradient) 초기화

        
        loss = criterion(predicted_label, label) # 손실(loss) 계산
        loss.backward() # 역전파 수행
        
        #함수는 그래디언트(기울기)의 노름(norm)을 클리핑(clipping)하는 유틸리티 함수
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1) 
        
        # 모델 파라미터 업데이트
        optimizer.step() 
        
        # 배치 학습 평가
        # predicted_label.argmax(1) : 모델이 예측한 각 샘플의 클래스를 나타내는 텐서. argmax(1)은 각 행(row)에서 최대 값의 인덱스(클래스)를 가져옴.
        # (predicted_label.argmax(1) == label) : 예측값과 실제 레이블(label)을 비교하여 일치하는지 여부를 판단하는 불리언(Boolean) 텐서를 생성
        # .sum().item() : 불리언 텐서의 원소들을 합하여 정확하게 예측된 샘플 수를 계산
        # 샘플 수를 total_acc에 누적
        total_acc += (predicted_label.argmax(1) == label).sum().item() 
        
        # 전체 샘플 수 계산 - 레이블(label) 텐서의 첫 번째 차원의 크기를 반환. 배치의 샘플 수.
        # => 현재 배치의 샘플 수를 누적
        total_count += label.size(0)
        
        # log_interval 배치마다 로그를 출력하기 위한 조건
        if idx % log_interval == 0 and idx > 0:
            # 현재 학습 에포크(epoch)와 배치 인덱스(idx), 손실(loss) 값을 출력
            print(f"epoch : {epoch} batch : {idx} loss : {loss.item()}")
            
            # 현재 배치의 정확도(accuracy)를 계산하여 출력
            print(f"Accuracy : {total_acc/total_count}")
            
            # 정확하게 예측된 샘플 수(total_acc)와 전체 샘플 수를 다음 로그 출력을 위해 초기화
            total_acc, total_count = 0,0
            
            break

In [69]:
def evaluate(model, dataloader, criterion):
    # 평가 모드
    model.eval()
    
    total_acc, total_count = 0,0
    
    with torch.no_grad():
        for _, (text, label) in enumerate(dataloader):
            predicted_label = model(text)
            loss = criterion(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
            
    return total_acc/total_count, loss.item()


In [73]:
# def predict(model, text, text_pipeline):
#     with torch.no_grad():
#         text = torch.tensor(text_pipeline(text), dtype=torch.int64).to(device)
#         text = text.unsqueeze(0)
#         offsets = torch.tensor([0]).to(device)
#         predicted_label = model(text, offsets)
#         return predicted_label.argmax(1).item() + 1

In [71]:
for epoch in range(1, EPOCHS+1):
    train(mdl, trainDL, OPTIMIZER, CRITERION, epoch)
    accu_val, loss = evaluate(mdl, testDL, CRITERION)
    print(f"test - [epoch] : {epoch} [Accuracy] : {accu_val}  [loss] : {loss}\n")
    SCHEDULER.step()

epoch : 1 batch : 10 loss : 2.179597854614258
Accuracy : 0.18181818181818182
test - [epoch] : 1 [Accuracy] : 0.13157894736842105  [loss] : 2.29099178314209

epoch : 2 batch : 10 loss : 1.8449842929840088
Accuracy : 0.0
test - [epoch] : 2 [Accuracy] : 0.13157894736842105  [loss] : 1.8449842929840088

epoch : 3 batch : 10 loss : 1.8449842929840088
Accuracy : 0.18181818181818182
test - [epoch] : 3 [Accuracy] : 0.13157894736842105  [loss] : 2.205381393432617

epoch : 4 batch : 10 loss : 2.1557817459106445
Accuracy : 0.2727272727272727
test - [epoch] : 4 [Accuracy] : 0.13157894736842105  [loss] : 1.793044090270996

epoch : 5 batch : 10 loss : 2.179597854614258
Accuracy : 0.18181818181818182
test - [epoch] : 5 [Accuracy] : 0.13157894736842105  [loss] : 2.205381393432617

epoch : 6 batch : 10 loss : 1.8449842929840088
Accuracy : 0.09090909090909091
test - [epoch] : 6 [Accuracy] : 0.13157894736842105  [loss] : 1.9659347534179688

epoch : 7 batch : 10 loss : 1.9659347534179688
Accuracy : 0.0909

In [72]:
# 학습 진행
# for epoch in range(1, EPOCHS+1):
#     train(mdl, trainDL, OPTIMIZER, CRITERION, epoch)
#     accu_val = evaluate(mdl, testDL, CRITERION)
#     print(f"epoch : {epoch} Accuracy : {accu_val}")
#     SCHEDULER.step()

In [67]:
#torch.save(mdl, 'model.pth')