In [190]:
import torch
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
from torch.utils.data import Dataset, DataLoader,random_split
from torch.optim.lr_scheduler import StepLR,ReduceLROnPlateau
import torchmetrics.functional as metrics
import os
import shutil
from torchvision import transforms
from PIL import Image

In [191]:
# train 학습 검증용 데이터
folder_path = '../data/train/train'
target_data = []
img_data = []
for encoding_label,label in enumerate(os.listdir(folder_path)):
    for img in os.listdir(folder_path+'/'+label):
        image_path = os.path.join(folder_path,label,img)
        with open(image_path, 'rb') as file:
            image = Image.open(file)
            # 이미지 크기 확인
            width, height = image.size
            if width == 48 and height == 48:
                image_array = np.array(image)
                
                target_data.append(encoding_label)
                img_data.append(image_array)

In [192]:
# test 테스트용 데이터
folder_path_ = '../data/test'
target_test = []
img_test = []
for encoding_label,label in enumerate(os.listdir(folder_path_)):
    for img in os.listdir(folder_path_+'/'+label):
        image_path = os.path.join(folder_path_,label,img)
        with open(image_path, 'rb') as file:
            image = Image.open(file)
            if width == 48 and height == 48:
                image_array = np.array(image)
                target_test.append(encoding_label)
                img_test.append(image_array)

In [193]:
pd.Series(target_test).value_counts().sort_index()

0     958
1     111
2    1024
3    1774
4    1233
5    1247
6     831
Name: count, dtype: int64

In [194]:
os.listdir(folder_path)

['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

In [195]:
pd.Series(target_data).value_counts().sort_index()

0    3995
1     436
2    4097
3    7215
4    4965
5    4830
6    3171
Name: count, dtype: int64

In [196]:
image = Image.open("../data/test/angry/PrivateTest_1488292.jpg")

# 이미지의 너비와 높이 확인
width, height = image.size
print("이미지의 너비:", width)
print("이미지의 높이:", height)


이미지의 너비: 48
이미지의 높이: 48


In [197]:
# 이미지 데이터 정규화
x_data = np.array(img_data)/255.
x_data = x_data.reshape((-1,48*48))
print(x_data.shape)

(28709, 2304)


In [198]:
x_data_test = np.array(img_test)/255.
x_data_test = x_data_test.reshape((-1,48*48))
print(x_data_test.shape)

(7178, 2304)


In [199]:
target_data = pd.Series(target_data).replace({0:3, 5:4, 2:5, 3:2, 6:0, 4:6, 7:1})
target_data.value_counts()

2    7215
6    4965
4    4830
5    4097
3    3995
0    3171
1     436
Name: count, dtype: int64

In [200]:
target_test = pd.Series(target_test).replace({0:3, 5:4, 2:5, 3:2, 6:0, 4:6, 7:1})
target_test.value_counts()

2    1774
4    1247
6    1233
5    1024
3     958
0     831
1     111
Name: count, dtype: int64

In [201]:
# 데이터 클래스 생성
class DLdataset(Dataset):
    
    def __init__(self,x_data,y_data):
        super().__init__()
        self.feature = torch.FloatTensor(x_data)
        self.target = torch.LongTensor(y_data)
        
    def __len__(self):
        return self.target.shape[0]
    
    def __getitem__(self,idx):
        return self.feature[idx], self.target[idx]

In [202]:
# 데이터셋 생성
dataset = DLdataset(x_data,target_data)
dataset_test = DLdataset(x_data_test,target_test)

In [203]:
# 학습용, 검증용 데이터 준비
seed = torch.Generator().manual_seed(42)
trainDS, validDS = random_split(dataset, [0.8,0.2], generator=seed)

In [204]:
# 배치사이즈 32
BATCH = 32
trainDL = DataLoader(trainDS, batch_size=BATCH)
validDL = DataLoader(validDS, batch_size=BATCH)
testDL = DataLoader(target_test, batch_size=BATCH)

In [205]:
import torch.nn as nn
def generate_models():
    models = []
    
    model1 = My_MODEL(48*48,10,[512,256])
    models.append(model1)
    
    model2 = My_MODEL(48*48, 10,[512,256,128] )
    models.append(model2)
    
    model3 = My_MODEL(48*48, 10, [256,128])
    models.append(model3)
    
    model4 = My_MODEL(48*48, 10, [64,32,16])
    models.append(model4)
    
    model5 = My_MODEL(48*48, 10, [884,886,888])
    models.append(model5)
    
    model6 = My_MODEL(48*48, 10, [500, 550, 600, 650, 700])
    models.append(model6)
    
    return models

In [206]:
class My_MODEL(nn.Module):
    def __init__(self, in_dim, out_dim, hidden_dims):
        super().__init__()
        self.in_dim = in_dim
        self.out_dim = out_dim
        self.hidden_dims = hidden_dims  # 은닉층의 차원들을 담은 리스트
        self.layers = nn.ModuleList()   # 모듈을 담는 리스트
        
        prev_dim = self.in_dim
        for hidden_dim in hidden_dims:
            self.layers.append(nn.Linear(prev_dim, hidden_dim))
            self.relu = nn.ReLU()
            prev_dim = hidden_dim
        self.layers.append(nn.Linear(prev_dim, self.out_dim))
        self.relu = nn.ReLU()
        
    def forward(self, x):
        for layer in self.layers[:-1]:
            x = self.relu(layer(x)) # 은닉층 계산하고 바로 AF 계산
        x = self.layers[-1](x)
        return x

In [207]:
# models = generate_models()

In [208]:
'''for idx, model in enumerate(models):
    print(f'Model{idx+1}:')
    print(model)
    print()'''

"for idx, model in enumerate(models):\n    print(f'Model{idx+1}:')\n    print(model)\n    print()"

In [209]:
# DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# 
# EPOCHS = 100
# 
# IN = dataset.feature.shape[1]
# OUT = pd.Series(target_data).nunique()
# 
# # 손실함수
# LF = nn.CrossEntropyLoss().to(DEVICE)
# 
# # 옵티마이저
# model01_OPTIMIZER = torch.optim.Adam( models[0].parameters())
# model02_OPTIMIZER = torch.optim.Adam(models[1].parameters())
# model03_OPTIMIZER = torch.optim.Adam(models[2].parameters())
# model04_OPTIMIZER = torch.optim.Adam(models[3].parameters())
# model05_OPTIMIZER = torch.optim.Adam(models[4].parameters())
# model06_OPTIMIZER = torch.optim.Adam(models[5].parameters())
# 
# # 스케줄러
# model01_SCHEDULER = ReduceLROnPlateau(model01_OPTIMIZER, mode = 'min', patience = 3)
# model02_SCHEDULER = ReduceLROnPlateau(model02_OPTIMIZER, mode = 'min', patience = 3)
# model03_SCHEDULER = ReduceLROnPlateau(model03_OPTIMIZER, mode = 'min', patience = 3)
# model04_SCHEDULER = ReduceLROnPlateau(model04_OPTIMIZER, mode = 'min', patience = 3)
# model05_SCHEDULER = ReduceLROnPlateau(model05_OPTIMIZER, mode = 'min', patience = 3)
# model06_SCHEDULER = ReduceLROnPlateau(model06_OPTIMIZER, mode = 'min', patience = 3)

In [210]:
# def training(dataLoader,model):
#     
#     for model in models:
#         model.train()
#     train_report=[[],[],[]]
#     for idx, (feature, target)  in enumerate(dataLoader):
#         # 배치크기만큼의 학습 데이터 준비
#         feature, target = feature.to(DEVICE), target.to(DEVICE)
#         
#         # 학습
#         pre_target = model(feature)
#         
#         # 손실계산
#         loss = LF(pre_target, target)
#         train_report[0].append(loss)
#   
#         # 성능 평가
#         acc = metrics.accuracy(pre_target.argmax(dim=1), target, task = 'multiclass',num_classes=OUT)
#         train_report[1].append(acc)
# 
#         f1 = metrics.f1_score(pre_target.argmax(dim=1), target, task='multiclass', num_classes=OUT)
#         train_report[2].append(f1)
#         
#         # W,b업데이트
#         OPTIMIZER.zero_grad()
#         loss.backward()
#         OPTIMIZER.step()
#     
#     loss_score = sum(train_report[0])/len(train_report[0])
#     acc_score = sum(train_report[1])/len(train_report[1])
#     print(f'[Train loss] ==> {loss_score}    [Train Accuracy] ==> {acc_score}')
#     return loss_score, acc_score

In [211]:
training(trainDL,model)

TypeError: training() takes 1 positional argument but 2 were given

In [None]:
# def testing(dataLoader,model):
#     
#     model.eval()
#     
#     with torch.no_grad():
#         test_report=[[], [], []]
#         for (feature, target) in dataLoader:
#             feature, target = feature.to(DEVICE), target.to(DEVICE)
#             
#             # 학습
#             pre_target = model(feature)
#             
#             # 손실계산
#             loss = LF(pre_target, target)
#             test_report[0].append(loss)
#             
#             # 성능평가
#             acc = metrics.accuracy(pre_target.argmax(dim=1), target, task = 'multiclass',num_classes=OUT)
#             test_report[1].append(acc)
#             
#         loss_score = sum(test_report[0])/len(test_report[0])
#         acc_score = sum(test_report[1])/len(test_report[1])
#         
#     print(f'[Test loss] ==> {loss_score}    [Test Accuracy] ==> {acc_score}')
#     return loss_score, acc_score

In [None]:
# min_loss = 100.0  # 초기 최소 손실 설정
# cnt = 0
# train_score = [[],[],[],[],[],[],[],[],[],[],[],[]]
# val_score =  [[],[],[],[],[],[],[],[],[],[],[],[]]
# 
# for eps in range(EPOCHS):
#     print(f'[{eps+1}/{EPOCHS}]')
#     
#     # 각 모델에 대해 학습 및 검증 수행
#     for model in models:
#         train_loss, train_acc = training(trainDL, model)
#         val_loss, val_acc = testing(validDL, model)
#         if models.index(model)== 0: # model 1
#             train_score[0].append(train_loss)
#             train_score[1].append(train_acc)
#             val_score[0].append(val_loss)
#             val_score[1].append(val_acc)
#             
#         if models.index(model)== 1: # model 2
#             train_score[2].append(train_loss)
#             train_score[3].append(train_acc)
#             val_score[2].append(val_loss)
#             val_score[3].append(val_acc)
#             
#         if models.index(model)== 2: # model 3
#             train_score[4].append(train_loss)
#             train_score[5].append(train_acc)
#             val_score[4].append(val_loss)
#             val_score[5].append(val_acc)
#             
#         if models.index(model)== 3: # model 4 
#             train_score[6].append(train_loss)
#             train_score[7].append(train_acc)
#             val_score[6].append(val_loss)
#             val_score[7].append(val_acc)
#         
#         if models.index(model)== 4: # model 5
#             train_score[8].append(train_loss)
#             train_score[9].append(train_acc)
#             val_score[8].append(val_loss)
#             val_score[9].append(val_acc)
#             
#         if models.index(model)== 5: # model 6
#             train_score[10].append(train_loss)
#             train_score[11].append(train_acc)
#             val_score[10].append(val_loss)
#             val_score[11].append(val_acc)
#     
#         # 최소 손실 업데이트
#         if val_loss < min_loss:
#             min_loss = val_loss
#             cnt = 0
#         else:
#             cnt += 1
# 
#     # 조기 종료 기능 => 조건 : val_loss가 지정된 횟수 이상 개선이 안되면 학습 종료
#     if SCHEDULER.num_bad_epochs >= SCHEDULER.patience or cnt >= 50:
#         print(f"Early stopping at epoch {eps}")
#         break

In [213]:
models = []

model1 = My_MODEL(48*48,10,[512,256])
models.append(model1)

model2 = My_MODEL(48*48, 10,[512,256,128] )
models.append(model2)

model3 = My_MODEL(48*48, 10, [256,128])
models.append(model3)

model4 = My_MODEL(48*48, 10, [64,32,16])
models.append(model4)

model5 = My_MODEL(48*48, 10, [884,886,888])
models.append(model5)

model6 = My_MODEL(48*48, 10, [500, 550, 600, 650, 700])
models.append(model6)

In [219]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

EPOCHS = 100

IN = dataset.feature.shape[1]
OUT = pd.Series(target_data).nunique()

# 손실함수
LF = nn.CrossEntropyLoss().to(DEVICE)

train_report=[[],[],[]]
final_train_report=[[],[]] #loss, acc   

test_report=[[],[],[]]
final_test_report=[[],[]] #loss, acc

def training(dataLoader):

  
    OPTIMIZER = torch.optim.Adam( model.parameters())
    SCHEDULER = ReduceLROnPlateau(OPTIMIZER, mode = 'min', patience = 3)  
    
    model.train()

    for idx, (feature, target)  in enumerate(dataLoader):
        # 배치크기만큼의 학습 데이터 준비
        feature, target = feature.to(DEVICE), target.to(DEVICE)
        
        # 학습
        pre_target = model(feature)
        
        # 손실계산
        loss = LF(pre_target, target)
        train_report[0].append(loss)
  
        # 성능 평가
        acc = metrics.accuracy(pre_target.argmax(dim=1), target, task = 'multiclass',num_classes=OUT)
        train_report[1].append(acc)

        f1 = metrics.f1_score(pre_target.argmax(dim=1), target, task='multiclass', num_classes=OUT)
        
        
        final_train_report.append(train_report[0])
        final_train_report.append(train_report[1])
        # print(final_train_report)

        # W,b업데이트
        OPTIMIZER.zero_grad()
        loss.backward()
        OPTIMIZER.step()
    
        loss_score = sum(train_report[0])/len(train_report[0])
        acc_score = sum(train_report[1])/len(train_report[1])
       
        print(f'[Train loss] ==> {loss_score}    [Train Accuracy] ==> {acc_score}')
        torch.save(model.state_dict(), "my_trained_model "+str(idx)+".pth")

        return loss_score, acc_score

def testing(dataLoader):
   

    model.eval()

    with torch.no_grad():
        test_report=[[], [], []]
        for (feature, target) in dataLoader:
            feature, target = feature.to(DEVICE), target.to(DEVICE)
            
            # 학습
            pre_target = model(feature)
            
            # 손실계산
            loss = LF(pre_target, target)
            test_report[0].append(loss)
            
            # 성능평가
            acc = metrics.accuracy(pre_target.argmax(dim=1), target, task = 'multiclass',num_classes=OUT)
            test_report[1].append(acc)
            
            final_test_report.append(test_report[0])
            final_test_report.append(test_report[1])
                # print(final_test_report)
        loss_score = sum(test_report[0])/len(test_report[0])
        acc_score = sum(test_report[1])/len(test_report[1])
            
        print(f'[Test loss] ==> {loss_score}    [Test Accuracy] ==> {acc_score}')
    return loss_score, acc_score

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

min_loss = 100.0  # 초기 최소 손실 설정
cnt = 0
idx = 1
for model in models:
    OPTIMIZER = torch.optim.Adam( model.parameters())
    SCHEDULER = ReduceLROnPlateau(OPTIMIZER, mode = 'min', patience = 3) 
    
    train_losses_epoch = []
    train_accuracies_epoch = []
    val_losses_epoch = []
    val_accuracies_epoch = []
    
    
    for eps in range(EPOCHS):
        print(f'[{eps+1}/{EPOCHS}]')
    
        train_loss, train_acc = training(trainDL)
        val_loss, val_acc = testing(validDL)
        
        train_losses_epoch.append(train_loss.item())  # 스칼라 값으로 변환
        train_accuracies_epoch.append(train_acc.item())  # 스칼라 값으로 변환
        val_losses_epoch.append(val_loss.item())  # 스칼라 값으로 변환
        val_accuracies_epoch.append(val_acc.item())
        

    # 최소 손실 업데이트
        if val_loss < min_loss:
            min_loss = val_loss
        # 모델 저장
            torch.save(model.state_dict(), "my_trained_model "+str(idx)+".pth")
    

    # 조기 종료 기능 => 조건 : val_loss가 지정된 횟수 이상 개선이 안되면 학습 종료
        if SCHEDULER.num_bad_epochs >= SCHEDULER.patience:
            print(f"Early stopping at epoch {eps}")
            break
    train_losses.append(train_losses_epoch)
    train_accuracies.append(train_accuracies_epoch)
    val_losses.append(val_losses_epoch)
    val_accuracies.append(val_accuracies_epoch)       
    
    idx +=1


[1/100]
[Train loss] ==> 0.6059849262237549    [Train Accuracy] ==> 0.8125
[Test loss] ==> 10.343918800354004    [Test Accuracy] ==> 0.1509481817483902
[2/100]
[Train loss] ==> 0.88130784034729    [Train Accuracy] ==> 0.765625
[Test loss] ==> 10.068485260009766    [Test Accuracy] ==> 0.2516292929649353
[3/100]
[Train loss] ==> 0.7892290949821472    [Train Accuracy] ==> 0.78125
[Test loss] ==> 10.362847328186035    [Test Accuracy] ==> 0.1509481817483902
[4/100]
[Train loss] ==> 0.8808475136756897    [Train Accuracy] ==> 0.765625
[Test loss] ==> 10.077004432678223    [Test Accuracy] ==> 0.25284454226493835
[5/100]
[Train loss] ==> 0.8255714178085327    [Train Accuracy] ==> 0.7749999761581421
[Test loss] ==> 10.398311614990234    [Test Accuracy] ==> 0.1509481817483902
[6/100]
[Train loss] ==> 0.8804194331169128    [Train Accuracy] ==> 0.765625
[Test loss] ==> 10.16835880279541    [Test Accuracy] ==> 0.25267094373703003
[7/100]
[Train loss] ==> 0.8408309817314148    [Train Accuracy] ==> 0.

KeyboardInterrupt: 