### Setting

In [None]:
import pandas as pd
import numpy as np
import torch
import os
import random
from sklearn.model_selection import StratifiedKFold, train_test_split
from transformers import AutoModel, AutoTokenizer, AdamW, get_cosine_schedule_with_warmup
from torch.utils.data import Dataset, SubsetRandomSampler
from torch.utils.data import DataLoader
from torch import nn
from tqdm import tqdm

# for graphing
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### def seed_everything(seed)

In [None]:
#시드 설정
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    print("1. Seed 설정이 완료되었습니다.")

### CFG   

모델 학습 및 설정에 사용되는 여러 하이퍼파라미터 및 구성 옵션들을 저장한 딕셔너리

In [None]:
#모델의 여러 초매개변수 및 구성 설정

CFG = {
    'EPOCHS': 20,
    'LEARNING_RATE':5e-5,
    'BATCH_SIZE':64,
    'SEED':40,
    'MAX_LENGTH': 65,
    'PATIENCE': 5,
    'K': 3,
    'WARMUP_RATIO': 0.1
}

In [None]:
#os.environ["TOKENIZERS_PARALLELISM"] = "false"

### device

GPU 설정

In [None]:
# GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

### sent_type

감정 클래스와 해당 숫자 인덱스를 매핑한 딕셔너리

In [None]:
sent_type = {0:'angry', 1:'fear', 2:'happy', 3:'sad', 4:'neutral'}  # 라벨링

### class sentenceDataset(Dataset)  

텍스트 데이터셋을 PyTorch의 Dataset으로 구현한 클래스.   
데이터를 토큰화하고 레이블을 포함하여 모델 학습에 사용될 수 있는 형식으로 변환.

In [None]:
class sentenceDataset(Dataset):
    def __init__(self, dataframe, sent_col, tokenizer, labels=None):
        texts = dataframe[sent_col].values.tolist()

        # Tokenize each text using the provided tokenizer
        self.texts = [tokenizer(text, padding='max_length', max_length = CFG['MAX_LENGTH'], truncation=True, return_tensors='pt') for text in texts]
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]

        if self.labels is not None:
            # If labels are provided, get the corresponding label
            type_tmp = self.labels['type'][idx]
            return text, torch.Tensor(type_tmp).to(device)
        else:
            # If no labels, return a placeholder tensor
            return text, torch.Tensor([-1,-1,-1,-1]).to(device)

### class sentenceClassifier(nn.Module)

텍스트 분류를 위한 PyTorch 모델 클래스.    
 기본 모델을 기반으로 하여 선형 레이어와 소프트맥스 레이어를 사용하여 텍스트를 클래스로 분류.

In [None]:
class sentenceClassifier(nn.Module):               # 텍스트 분류 작업을 수행
    def __init__(self, base_model):
        super().__init__()
        self.klue = base_model # from transformers package

        self.fc1 = nn.Linear(768, 5).to(device) #input : 768차원 output : 5차원 선형 레이어 # 0:'angry', 1:'fear', 2:'happy', 3:'sad', 4:'neutral'
        self.softmax = nn.Softmax(dim = 1).to(device)

    #forward 연산
    def forward(self, input_ids, attention_mask):
        klue_out = self.klue(input_ids= input_ids, attention_mask = attention_mask)[0][:,0]  #주어진 입력 (input_ids 및 attention_mask)을 기반 모델에 전달하고, 그 결과에서 첫 번째 토큰의 출력만 선택

        x = self.fc1(klue_out).to(device)     #선택된 출력을 선형 레이어에 전달하여 클래스별 로직 생성
        x = self.softmax(x).to(device)        # softmax에 통과시켜 확률 분포로 변환

        return x                              # 확률분포로 반환

### def calc_label_accuracy(X, Y)  

모델의 예측과 실제 레이블을 비교하여 각 클래스별 정확도를 계산하는 함수.

In [None]:
def calc_label_accuracy(X,Y):                           #레이블 정확도를 계산하는 함수
    _, max_indices = torch.max(X, 1)                   #모델의 출력 X에서 가장 큰 값을 갖는 인덱스를 찾아 max_indices에 저장. 이것은 모델이 각 샘플에 대해 예측한 클래스
    label = [0 for _ in range(5)]                      #실제 dataset의 각 감정 레이블당 갯수를 저장하기 위한 리스트. 5개의 클래스
    acc = [0 for _ in range(5)]                        #inference 후, 각 감정 레이블당 갯수를 저장하기 위한 리스트. 5개의 클래스

    for i, j in zip(max_indices, Y):                  #모델의 예측값과 실제 레이블을 반복하여 비교
        l_val = ((j == 1).nonzero().flatten().tolist())[0]     #실제 레이블 j에서 값이 1인 인덱스를 찾아서 l_val에 저장, 원-핫 인코딩된 레이블에서 실제 클래스를 찾는 거승로 보임
        label[l_val] += 1                                #실제 데이터셋에서 해당 클래스의 수를 증가
        if i == l_val:                                     #모델이 예측한 클래스가 실제 클래스와 일치하면 해당 클래스에 대한 정확도를 증가
            acc[i] += 1

    ans = [] #각 레이블당 accuracy                         #각 클래스에 대한 정확도를 저장할 빈 리스트를 초기화
    for i in range(5):                                     #각 클래스에 대해 정확도를 계산
      if label[i] != 0:                                    #해당 클래스에 대한 데이터가 존재하는 경우, 정확도를 계산하여 리스트에 추가
        ans.append(np.round(acc[i]/label[i], 2))
      else:                                                 #해당 클래스에 대한 데이터가 없는 경우, 정확도를 0으로 설정
        ans.append(0)

    return ans                                             #계산된 정확도를 반환

### def loss_graph(train_loss, val_loss, file_path)

훈련 손실(train loss)과 검증 손실(validation loss)의 추이를 시각화하여 저장하는 함수

In [None]:
#plot for train loss and validation loss
def loss_graph(train_loss, val_loss, file_path):
    plt.figure(figsize=(13, 6))

    # 전체 Train Loss 그래프
    plt.subplot(1, 2, 1)
    plt.plot(train_loss, label='Train Loss')
    plt.title('Overall Train Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # 전체 Validation Loss 그래프
    plt.subplot(1, 2, 2)
    plt.plot(val_loss, label='Validation Loss')
    plt.title('Overall Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.savefig(file_path)
    plt.show()

### def accuracy_graph(train_accuracies, val_accuracies, sent_types, file_path)

In [None]:
def accuracy_graph(train_accuracies, val_accuracies, sent_types, file_path):
    plt.figure(figsize=(13, 6))

    # 전체 Train Accuracy 그래프
    plt.subplot(1, 2, 1)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.title('Overall Train Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    # 전체 Validation Accuracy 그래프
    plt.subplot(1, 2, 2)
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Overall Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.savefig(file_path)
    plt.show()

###def train(model, train_dataloader, val_dataloader, learning_rate, epochs)

In [None]:
def train(model, train_dataloader, val_dataloader, learning_rate, epochs):
    avg_val_losses, avg_train_losses = [], []                                    #epoch 당 평균 loss를 저장할 리스트
    avg_train_accuracies, avg_val_accuracies = [], []

    criterion = {                                                                #분류 작업에서 사용할 손실 함수를 정의. CrossEntropyLoss를 사용하고, GPU로 이동
        'type' : nn.CrossEntropyLoss().to(device)
        }

    no_decay = ['bias', 'LayerNorm.weight']                                      # 가중치 감쇠를 적용하지 않을 레이어 설정 #옵티마이저와 관련된 설정
    optimizer_grouped_parameters = [                                             # 옵티마이저의 그룹 파라미터 설정
      {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
      {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
      ]

    optimizer = AdamW(optimizer_grouped_parameters, lr = learning_rate)          #옵티마이저는 AdamW를 사용, 학습률은 주어진 learning_rate로 설정
    model = model.to(device)                                                     #모델을 GPU로 이동

    t_total = len(train_dataloader) * CFG['EPOCHS']                              # 총 훈련 스텝 계산
    warmup_step = int(t_total * CFG['WARMUP_RATIO'])                             # 워밍업 스텝 계산

    scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total)   # 코사인 스케줄러 설정

    # 에폭 루프
    for epoch in range(epochs):
        val_losses, train_losses = [], []                                        # 에폭마다 손실과 정확도를 저장할 리스트 초기화

        train_accuracy = [0 for _ in range(5)]
        val_accuracy = [0 for _ in range(5)]

        model.train()                                                            # 훈련 모드로 모델 전환

        # 훈련 데이터셋 루프
        print("훈련 루프가 시작되었습니다.")
        for batch_id, (train_input, type_label) in tqdm(enumerate(train_dataloader)):
            attention_mask = train_input['attention_mask'].to(device)            # 입력 데이터를 GPU로 이동
            input_ids = train_input['input_ids'].squeeze(1).to(device)
            type_label = type_label.to(device)
            optimizer.zero_grad()                                                # 옵티마이저 초기화

            type_output = model(input_ids, attention_mask)                       # 모델에 입력 전달하여 예측 얻기

            loss = criterion['type'](type_output, type_label)                    # 손실 계산
            loss.backward()

            optimizer.step()                                                     # 옵티마이저 업데이트
            scheduler.step()                                                     # 스케줄러 업데이트
            train_losses.append(loss.item())                                     # 훈련 손실 기록


            for i in range(5):                                                   # 정확도 계산 및 누적
              acc = calc_label_accuracy(type_output, type_label)
              train_accuracy[i] += acc[i]
            # print(f"현재 배치: {batch_id + 1}, 현재 손실: {loss.item()}")


        for i in range(5):                                                     # 각 클래스별 훈련 정확도 출력
          print(f"⦁ Train acc for label {sent_type[i]}: {train_accuracy[i] / (batch_id+1)}")

        avg_train_accuracies.append(sum(train_accuracy) / len(train_accuracy))



        # 검증 데이터셋 루프
        print("검증 루프가 시작되었습니다.")
        with torch.no_grad():
            model.eval()

            # same process as the above
            for  batch_id, (val_input, vtype_label) in tqdm(enumerate(val_dataloader)):
                attention_mask = val_input['attention_mask'].to(device)          # 입력 데이터를 GPU로 이동
                input_ids = val_input['input_ids'].squeeze(1).to(device)
                vtype_label = vtype_label.to(device)

                vtype_output = model(input_ids, attention_mask)                  # 모델에 입력 전달하여 예측 얻기

                loss = criterion['type'](vtype_output, vtype_label)      # 검증 손실 기록
                val_losses.append(loss.item())

                for i in range(5):
                  acc = calc_label_accuracy(vtype_output, vtype_label)           # 정확도 계산 및 누적
                  val_accuracy[i] += acc[i]

            for i in range(5):                                                   # 각 클래스별 검증 정확도 출력
              print(f"⦁ Val acc for label {sent_type[i]}: {val_accuracy[i] / (batch_id+1)}")

            avg_val_accuracies.append(sum(val_accuracy) / len(val_accuracy))

            train_loss = np.average(train_losses)                                 # 에폭마다 훈련 및 검증 손실 출력
            val_loss = np.average(val_losses)


            avg_train_losses.append(train_loss)
            avg_val_losses.append(val_loss)

            print(f" ▶ epoch {epoch+1} ◀ train_loss: {train_loss:.4f} val_loss: {val_loss:.4f}")
            print()

    loss_graph(avg_train_losses, avg_val_losses,'/content/drive/MyDrive/Emotion/model/loss_plot.png')
    accuracy_graph(avg_train_accuracies, avg_val_accuracies, sent_type,  '/content/drive/MyDrive/Emotion/model/accuracy_plot.png')
    #torch.save(model.state_dict(), '/content/drive/MyDrive/Emotion/model/klueroberta_trained_model.pth')

    return model       # 최종 훈련된 모델 반환

### def encode_label(data, label_col)

감정 레이블 원-핫 인코딩

In [None]:
def encode_label(data, label_col):
    data_tmp =pd.get_dummies(data, columns=[label_col])      #입력 데이터프레임(data)의 label_col 열에 대해 원-핫 인코딩을 수행. 결과로 나온 더미 변수들을 새로운 데이터프레임(data_tmp)으로 저장
    data_labels = {                                          #데이터프레임에서 1부터 4까지의 열을 슬라이싱하여 해당하는 레이블에 대한 원-핫 인코딩된 값들을 가져와 리스트로 변환.
        'type': data_tmp.iloc[:, 1:].values.tolist()        #we have 5 labels  #이 리스트를 딕셔너리에 저장하고, 'type'이라는 키로 반환합니다.
        }
    return data_labels                                       #원-핫 인코딩된 레이블을 담고 있는 딕셔너리를 반환. 이 딕셔너리는 모델의 훈련 시에 사용됨.

### def run_classifier():

 감정 분류 모델을 실행하는 함수

In [None]:
def run_classifier():
    # 데이터 및 모델 설정
    data_path = "/content/drive/MyDrive/Emotion/data/data.csv"
    sent_col = 'Sentence'
    label_col = 'Emotion'
    model_name = 'klue/roberta-small'
    seed_everything(CFG['SEED']) # Seed 고정


     # 데이터 로드 및 훈련/테스트 분리
    data = pd.read_csv(data_path, encoding='CP949')
    train1, test = train_test_split(data, test_size=0.2, random_state=CFG['SEED'])
    train1 = train1.reset_index(drop=True)
    print("2. 데이터가 성공적으로 로드되었습니다.")


    # 모델 및 토크나이저 설정
    base_model = AutoModel.from_pretrained(model_name)
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = sentenceClassifier(base_model)
    #model.load_state_dict(torch.load('/home/ubuntu/model/pre-trained/klueroberta_crossVal_epoch40.pt', map_location = device))


     # K-fold 교차 검증을 위한 설정
    k = CFG['K']
    skf = StratifiedKFold(n_splits = k, shuffle = True, random_state=CFG['SEED'])


    # 각 폴드에 대해 훈련 및 검증 수행
    print("3. 각 폴드에 대해 훈련 및 검증 수행")
    for fold, (train_idx, val_idx) in enumerate(skf.split(train1[sent_col], train1[label_col])):
        print('Fold {}'.format(fold + 1))
        if fold == 0:

            # 훈련 및 검증 레이블 인코딩 및 데이터 샘플 구성
            train_labels = encode_label(train1.loc[train_idx], label_col)
            val_labels = encode_label(train1.loc[val_idx], label_col)
            print(f"train & val encode_label success")

            train_sample = sentenceDataset(train1.loc[train_idx], sent_col, tokenizer, train_labels)
            val_sample = sentenceDataset(train1.loc[val_idx], sent_col, tokenizer, val_labels)


            # 데이터로더 설정
            train_dataloader = DataLoader(train_sample, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0)
            val_dataloader = DataLoader(val_sample, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

            print("4.  훈련 데이터셋 크기:", len(train_sample), "검증 데이터셋 크기:", len(val_sample) )
            print()

            # 모델 훈련
            model = train(model, train_dataloader, val_dataloader, CFG['LEARNING_RATE'], CFG['EPOCHS'])

    # 테스트 및 평가
    test_labels = encode_label(test, label_col)
    test_accuracy = [0 for _ in range(5)]

    test_set = sentenceDataset(test, sent_col, tokenizer, test_labels)
    test_dataloader = DataLoader(test_set, batch_size=CFG['BATCH_SIZE'], shuffle = True, num_workers = 0)

    with torch.no_grad():
        model.eval()
        for batch_id, (test_input, t_label) in tqdm(enumerate(test_dataloader)):
            attention_mask = test_input['attention_mask'].to(device)
            input_ids = test_input['input_ids'].squeeze(1).to(device)
            t_label = t_label.to(device)

            # 테스트 데이터에 대한 예측 수행
            test_output = model(input_ids, attention_mask) # from the forward function

            # 클래스별 정확도 계산
            for i in range(5):
                acc = calc_label_accuracy(test_output, t_label)
                test_accuracy[i] += acc[i]

        print()
        print(f"￭ 테스트 정확도: {test_accuracy[i] / (batch_id + 1)}")

        # 클래스별 테스트 정확도 출력
        for i in range(5):
            print(f"⦁ Test acc for label {sent_type[i]}: {test_accuracy[i] / (batch_id+1)}")

In [None]:
if __name__ == "__main__":
    run_classifier()

1. Seed 설정이 완료되었습니다.
2. 데이터가 성공적으로 로드되었습니다.


Some weights of RobertaModel were not initialized from the model checkpoint at klue/roberta-small and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


3. 각 폴드에 대해 훈련 및 검증 수행
Fold 1
train & val encode_label success




4.  훈련 데이터셋 크기: 20583 검증 데이터셋 크기: 10292

훈련 루프가 시작되었습니다.


322it [01:54,  2.81it/s]


⦁ Train acc for label angry: 0.6899068322981367
⦁ Train acc for label fear: 0.6982298136645964
⦁ Train acc for label happy: 0.47875776397515546
⦁ Train acc for label sad: 0.0038198757763975156
⦁ Train acc for label neutral: 0.17295031055900614
검증 루프가 시작되었습니다.


161it [00:21,  7.53it/s]


⦁ Val acc for label angry: 0.8244720496894409
⦁ Val acc for label fear: 0.6550310559006213
⦁ Val acc for label happy: 0.8463354037267077
⦁ Val acc for label sad: 0.0004347826086956522
⦁ Val acc for label neutral: 0.4611180124223601
 ▶ epoch 1 ◀ train_loss: 1.3987 val_loss: 1.2713

훈련 루프가 시작되었습니다.


322it [01:52,  2.85it/s]


⦁ Train acc for label angry: 0.7629503105590066
⦁ Train acc for label fear: 0.7028881987577641
⦁ Train acc for label happy: 0.7624223602484469
⦁ Train acc for label sad: 0.15295031055900615
⦁ Train acc for label neutral: 0.5181366459627327
검증 루프가 시작되었습니다.


161it [00:21,  7.46it/s]


⦁ Val acc for label angry: 0.6100000000000003
⦁ Val acc for label fear: 0.7624844720496896
⦁ Val acc for label happy: 0.6921118012422359
⦁ Val acc for label sad: 0.28770186335403725
⦁ Val acc for label neutral: 0.6421739130434783
 ▶ epoch 2 ◀ train_loss: 1.2660 val_loss: 1.2697

훈련 루프가 시작되었습니다.


175it [01:01,  2.90it/s]