#### 도로 표지판 분류 모델 만들기 <hr>


In [43]:
# ---------------------------------------------------------------------
# 모델링 관련 모듈 로딩
# ---------------------------------------------------------------------
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

from torchmetrics.classification import MulticlassF1Score
import torch.optim.lr_scheduler as lr_scheduler

import torchvision.models as models

# ---------------------------------------------------------------------
# 데이터 분석 관련 모듈 로딩
# ---------------------------------------------------------------------
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ---------------------------------------------------------------------
# 이미지 관련 모듈 로딩
# ---------------------------------------------------------------------
from torchvision import transforms

# ---------------------------------------------------------------------
# 기타 모듈 로딩
# ---------------------------------------------------------------------
import os
import pickle

# 활용 패키지 버전 체크
print(f'torch Ver.:{torch.__version__}')
print(f'pandas Ver.:{pd.__version__}')
print(f'numpy Ver.:{np.__version__}')


torch Ver.:2.4.1+cu118
pandas Ver.:2.0.3
numpy Ver.:1.24.4


In [44]:
# DEVICE 설정
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'


*   data0.pickle - 셔플링
*   data1.pickle - 셔플링, /255.0 정규화
*   data2.pickle - 셔플링, /255.0 + 평균 정규화
*   data3.pickle - 셔플링, /255.0 + 평균 + STD 정규화
*   data4.pickle - 회색조, 셔플링
*   data5.pickle - 회색조, 셔플링, 로컬 히스토그램 평준화
*   data6.pickle - 회색조, 셔플링, 로컬 히스토그램 평준화, /255.0 정규화
*   data7.pickle - 회색조, 셔플링, 로컬 히스토그램 평준화, /255.0 + 평균 정규화
*   data8.pickle - 회색조, 셔플링, 로컬 히스토그램 평준화, /255.0 + 평균 + STD 정규화


<hr>


In [45]:
# 경로 설정 및 데이터 로드
BASE_DIR = r"C:\Users\Administrator\Desktop\장재웅\traffic_sign_data"
file_path = os.path.join(BASE_DIR, 'data1.pickle')

# pickle 파일 로드
with open(file_path, 'rb') as file:
    data = pickle.load(file)

print('keys:', data.keys())


keys: dict_keys(['x_train', 'y_test', 'x_test', 'x_validation', 'labels', 'y_train', 'y_validation'])


In [46]:
x_train = data['x_train']  # shape: (86989, 32, 3, 32)
y_train = data['y_train']  # shape: (86989,)

x_test = data['x_test']    # shape: (12630, 32, 3, 32)
y_test = data['y_test']    # shape: (12630,)


In [47]:
x_train.shape


(86989, 3, 32, 32)

In [48]:
x_train = x_train.transpose(0, 2, 1, 3)
x_test = x_test.transpose(0, 2, 1, 3)


In [49]:
x_train.shape


(86989, 32, 3, 32)

In [50]:
x_train = x_train.transpose(0, 3, 1, 2)  # (N, H, W, C) → (N, C, H, W)
x_test = x_test.transpose(0, 3, 1, 2)

print("Final shape:", x_train.shape)


Final shape: (86989, 32, 32, 3)


In [51]:
# class ImageDataset(Dataset):
#     def __init__(self, features, targets, transform=None):
#         super().__init__()
#         self.features = features
#         self.targets = targets
#         self.n_rows = features.shape[0]
#         self.transform = transform

#     def __len__(self):
#         return self.n_rows

#     def __getitem__(self, index):
#         # 변환된 이미지와 타겟 반환
#         featureTS = self.features[index]
#         targetTS = torch.tensor(self.targets[index], dtype=torch.long)  # torch.uint8 -> torch.long

#         # 이미지 전처리(정규화 및 텐서 변환)
#         if self.transform:
#             featureTS = self.transform(featureTS)

#         return featureTS, targetTS

class ImageDataset(Dataset):
    def __init__(self, features, targets, transform=None):
        super().__init__()
        self.features = features
        self.targets = targets
        self.n_rows = features.shape[0]
        self.transform = transform

    def __getitem__(self, index):
        feature = self.features[index]  # NumPy 배열 그대로 유지
        target = self.targets[index]

        # Transform 적용
        if self.transform:
            feature = self.transform(feature)

        return feature, torch.tensor(target, dtype=torch.long)

    def __len__(self):
        return self.n_rows


# 데이터 전처리 변환 정의
transform = transforms.Compose([
    transforms.ToPILImage(),            # 텐서를 PIL 이미지로 변환
    transforms.Resize((256, 256)),      # 크기 조정
    transforms.CenterCrop(224),         # 중앙 자르기
    transforms.ToTensor(),              # 다시 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])


In [52]:
## 데이터 셋 생성
train_dataset = ImageDataset(x_train, y_train, transform=transform)
test_dataset = ImageDataset(x_test, y_test, transform=transform)

# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


#### 사전 훈련 모델 들고 오기 <hr>


In [53]:
### 사전학습된 모델 로딩
model = models.vgg19(weights=models.VGG19_Weights.IMAGENET1K_V1)


In [54]:
### 사전학습된 모델의 파라미터 비활성화 설정
for named, param in model.named_parameters():
    print(f'[{named}] - {param.shape}')
    # 역전파 시에 업데이트 되지 않도록 설정
    param.requires_grad = False


[features.0.weight] - torch.Size([64, 3, 3, 3])
[features.0.bias] - torch.Size([64])
[features.2.weight] - torch.Size([64, 64, 3, 3])
[features.2.bias] - torch.Size([64])
[features.5.weight] - torch.Size([128, 64, 3, 3])
[features.5.bias] - torch.Size([128])
[features.7.weight] - torch.Size([128, 128, 3, 3])
[features.7.bias] - torch.Size([128])
[features.10.weight] - torch.Size([256, 128, 3, 3])
[features.10.bias] - torch.Size([256])
[features.12.weight] - torch.Size([256, 256, 3, 3])
[features.12.bias] - torch.Size([256])
[features.14.weight] - torch.Size([256, 256, 3, 3])
[features.14.bias] - torch.Size([256])
[features.16.weight] - torch.Size([256, 256, 3, 3])
[features.16.bias] - torch.Size([256])
[features.19.weight] - torch.Size([512, 256, 3, 3])
[features.19.bias] - torch.Size([512])
[features.21.weight] - torch.Size([512, 512, 3, 3])
[features.21.bias] - torch.Size([512])
[features.23.weight] - torch.Size([512, 512, 3, 3])
[features.23.bias] - torch.Size([512])
[features.25.we

In [55]:
model.classifier[6] = nn.Sequential(
    nn.Linear(4096, 512),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(512, 43)  # 최종 클래스 수 43
)


In [56]:
print(model.classifier)


Sequential(
  (0): Linear(in_features=25088, out_features=4096, bias=True)
  (1): ReLU(inplace=True)
  (2): Dropout(p=0.5, inplace=False)
  (3): Linear(in_features=4096, out_features=4096, bias=True)
  (4): ReLU(inplace=True)
  (5): Dropout(p=0.5, inplace=False)
  (6): Sequential(
    (0): Linear(in_features=4096, out_features=512, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=512, out_features=43, bias=True)
  )
)


In [57]:
# classifier 파라미터 활성화 설정
for named, param in model.classifier[6].named_parameters():
    print(f'[{named}] - {param.shape}')
    param.requires_grad = True


[0.weight] - torch.Size([512, 4096])
[0.bias] - torch.Size([512])
[3.weight] - torch.Size([43, 512])
[3.bias] - torch.Size([43])


In [58]:
### models 폴더 아래 프로젝트 폴더 아래 모델 파일 저장
import os

# 저장 경로
SAVE_PATH = 'traffic_sign_model/'
# 저장 파일명
SAVE_FILE = SAVE_PATH +'model_train_wbs.pth'
# 모델 구조 및 파라미터 모두 저장 파일명
SAVE_MODEL= SAVE_PATH +'model_all.pth'


In [59]:
# 데이터 저장 폴더 존재 여부 체크 후 생성
if not os.path.exists(SAVE_PATH):
    os.mkdir(SAVE_PATH)


In [60]:
# CUDA 장치 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model=model.to(DEVICE)


In [61]:
# 최적화 인스턴스
optimizer = torch.optim.Adam(model.classifier[6].parameters(), lr=0.01)

# 손실함수 인스턴스
loss_func = torch.nn.CrossEntropyLoss()

# 최적화 스케쥴링 인스턴스 생성 => lr 조절 및 성능 개선 여부 체크
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=15, verbose=True) # score 사용 시


In [62]:
# 손실값과 성능평가값 저장 필요
LOSS_HISTORY, SCORE_HISTROY = [[], []], [[], []]

EPOCH = 10
f1_score_metric = MulticlassF1Score(num_classes=43).to(device)


for epoch in range(EPOCH):
    # 학습 모드로 모델 설정
    model.train()

    # 배치크기 만큼 데이터 로딩해서 학습 진행
    train_loss_total, train_score_total = 0, 0
    val_loss_total, val_score_total = 0, 0

    for featureTS, targetTS in train_loader:
        # 텐서를 CUDA로 이동
        featureTS = featureTS.to(device)
        targetTS = targetTS.to(device)

        # 학습 진행
        pre_y = model(featureTS)

        # 손실 계산
        loss = loss_func(pre_y, targetTS)
        train_loss_total += loss.item()

        # 성능 평가 계산
        score = f1_score_metric(torch.argmax(pre_y, dim=1), targetTS)
        train_score_total += score.item()

        # 최적화 진행
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 에포크 당 검증기능
    # 모델 검증 모드 설정
    model.eval()
    with torch.no_grad():
        for val_featureTS, val_targetTS in test_loader:
            # 텐서를 CUDA로 이동
            val_featureTS = val_featureTS.to(device)
            val_targetTS = val_targetTS.to(device)

            # 추론/평가
            pre_val = model(val_featureTS)

            # 손실
            loss_val = loss_func(pre_val, val_targetTS)
            val_loss_total += loss_val.item()

            # 성능 평가
            score_val = f1_score_metric(torch.argmax(pre_val, dim=1), val_targetTS)
            val_score_total += score_val.item()

    # 에포크 당 손실값과 성능평가값 저장
    LOSS_HISTORY[0].append(train_loss_total / len(train_loader))
    SCORE_HISTROY[0].append(train_score_total / len(train_loader))

    LOSS_HISTORY[1].append(val_loss_total / len(test_loader))
    SCORE_HISTROY[1].append(val_score_total / len(test_loader))


    print(f'{epoch}/{EPOCH} => [TRAIN] LOSS: {LOSS_HISTORY[0][-1]} SCORE: {SCORE_HISTROY[0][-1]}')
    print(f'\t=> [VAL] LOSS: {LOSS_HISTORY[1][-1]} SCORE: {SCORE_HISTROY[1][-1]}')

    # 성능이 좋은 학습 가중치 저장
    average_train_loss = val_loss_total / len(val_loader)
    average_train_score = val_score_total / len(val_loader)

    SAVE_MODEL = f'loss({average_train_loss:.5f})_score({average_train_score:.5f}).pth'

    if len(SCORE_HISTROY[1]) == 1:
        # 첫번째라서 무조건 모델 파라미터 저장
        torch.save(model, SAVE_PATH + SAVE_MODEL)
    else:
        if SCORE_HISTROY[1][-1] > max(SCORE_HISTROY[1][:-1]):
            torch.save(model, SAVE_PATH + SAVE_MODEL)


# 에포크 범위 생성
epoch_range = range(1, EPOCH + 1)

# Loss
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(epoch_range, LOSS_HISTORY[0], 'r-', label='Train')
plt.plot(epoch_range, LOSS_HISTORY[1], 'b-', label='Valid')
plt.grid()
plt.title('Loss over EPOCH')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Accuracy
plt.subplot(1, 2, 2)
plt.plot(epoch_range, SCORE_HISTROY[0], 'r-', label='Train')
plt.plot(epoch_range, SCORE_HISTROY[1], 'b-', label='Valid')
plt.grid()
plt.title('F1_Score over Epochs')
plt.xlabel('Epochs')
plt.ylabel('F1_Score')
plt.legend()

plt.show()



    # 최적화 스케쥴러 인스턴스 업데이트
    # scheduler.step(loss_val)
    # print(f'scheduler.num_bad_epochs => {scheduler.num_bad_epochs}', end=' ')
    # print(f'scheduler.patience => {scheduler.patience}')

    # 손실 감소(또는 성능 개선)이 안되는 경우 조기종료
    # if scheduler.num_bad_epochs >= scheduler.patience:
    #     print(f'{scheduler.patience}EPOCH 성능 개선이 없어서 조기종료함')
    #     break


In [None]:
# def train_model(model, loss_func, optimizer, DEVICE, num_epochs=1, is_train=True):
#     f1score_func = MulticlassF1Score(num_classes=43).to(DEVICE)
#     since = time.time()
#     loss_history = [],[]
#     acc_history = [],[]
#     best_acc = 0.0
#     # num_epochs = 5

#     for epoch in range(num_epochs):
#         # 학습 모드로 모델 설정
#         model.train()

#         print('Epoch {}/{}'.format(epoch+1, num_epochs))
#         print('-' * 10)

#         running_corrects = 0
#         running_loss = 0.0
        
#         score_total = 0

#         for featureTS, targetTS in train_loader:

#             # DEVICE : CPU/GPU 사용
#             featureTS=featureTS.to(DEVICE)
#             targetTS=targetTS.to(DEVICE)
            
#             # 학습 진행
#             pre_y = model(featureTS)

#             # 손실 계산
#             loss = loss_func(pre_y, targetTS)

#             # # 예제 점수계산법
#             _, preds = torch.max(pre_y, 1)

#             # 성능평가 계산
#             score=f1score_func(pre_y, targetTS.reshape(-1))
#             score_total += score.item()

#             # 최적화 진행
#             optimizer.zero_grad()
#             loss.backward()
#             optimizer.step()

#             running_corrects += torch.sum(preds == targetTS.data)
#             # running_loss += loss.item() * featureTS.size(0) # loss값 * batch_size
#             running_loss += loss.item() 

#         # 검증 모드로 모델 설정
#         model.eval()

#         running_val_corrects = 0
#         running_val_loss = 0.0
        
#         with torch.no_grad():
#             # 검증 데이터셋
#             for feature, target in test_loader:
                
#                 # CPU/GPU 사용
#                 feature=feature.to(DEVICE)
#                 target=target.to(DEVICE)

#                 # 평가
#                 pre_val=model(feature)

#                 # 손실
#                 loss_val=loss_func(pre_val, target.reshape(-1).long())

#                 # 성능평가
#                 score_val=f1score_func(pre_val, target.reshape(-1))
            
#             running_val_corrects+=score_val.item()
#             running_val_loss+=loss_val.item()

#         epoch_acc = running_corrects.double() / len(train_loader)
#         epoch_loss = running_loss / len(train_loader)

#         print('[TRAIN] LOSS: {:.4f} SCORE: {:.4f}'.format(epoch_loss, score_total/len(train_loader))) 
#         print('[VAL] LOSS: {:.4f} SCORE: {:.4f}'.format(running_val_loss, running_val_corrects))

#         if epoch_acc > best_acc:
#             best_acc = epoch_acc

#         loss_history[0].append(epoch_loss)        
#         acc_history[0].append(score_total/len(train_loader))

#         loss_history[1].append(running_val_loss)
#         acc_history[1].append(running_val_corrects)

#         ### 모델 저장 부분
#         # 끝나는 시간 저장
#         end_time = time.strftime('%y.%m.%d..%H_%M_%S')

#         # 모델 파라미터 저장
#         if len(acc_history[1]) == 1:
#             torch.save(model.state_dict(), os.path.join('./models/', '{0:0=2d}_{1}.pth'.format(epoch+1, end_time)))
#         else:
#             if acc_history[1][-1] > max(acc_history[1][:-1]):
#                 torch.save(model.state_dict(), os.path.join('./models/', '{0:0=2d}_{1}.pth'.format(epoch+1, end_time)))
#         print()

#         # 모델 경로 지정
#         SAVE_PATH = './models'
#         SAVE_MODEL = f'/model_num_loss({running_val_loss:.4f})_score({running_val_corrects:.4f}).pth'
        
#         # 모델 전체 저장
#         if len(acc_history[1]) == 1:
#             torch.save(model, SAVE_PATH+SAVE_MODEL)
#         else:
#             if acc_history[1][-1] > max(acc_history[1][:-1]):
#                 torch.save(model, SAVE_PATH+SAVE_MODEL)

#         # 최적화 스케쥴러 인스턴스 업데이트
#         scheduler.step(loss_val)
#         print(f'scheduler.num_bad_epochs => {scheduler.num_bad_epochs}', end=' ')
#         print(f'scheduler.patience => {scheduler.patience}')

#         # # 손실 감소(또는 성능 개선)이 안되는 경우 조기종료
#         # if scheduler.num_bad_epochs >= scheduler.patience:
#         #     print(f'{scheduler.patience}EPOCH 성능 개선이 없어서 조기종료함')
#         #     break

#     time_elapsed = time.time() - since
#     print('모델 학습 시간: {:.0f}분 {:.0f}초'.format(time_elapsed // 60, time_elapsed % 60))
    
# 	### 학습 & 검증 결과 그래프로 출력
#     epochs = range(1, len(acc_history[1]) + 1)
    
# 	# Loss
#     plt.figure(figsize=(10, 4))
#     plt.subplot(1, 2, 1)
#     plt.plot(epochs, loss_history[0], 'r-', label='Train')
#     plt.plot(epochs, loss_history[1], 'b-', label='Valid')
#     plt.grid()
#     plt.title('Loss over Epochs')
#     plt.xlabel('Epochs')
#     plt.ylabel('Loss')
#     plt.legend()

#     # Accuracy
#     plt.subplot(1, 2, 2)
#     plt.plot(epochs, acc_history[0], 'r-', label='Train')
#     plt.plot(epochs, acc_history[1], 'b-', label='Valid')
#     plt.grid()
#     plt.title('F1_Score over Epochs')
#     plt.xlabel('Epochs')
#     plt.ylabel('F1_Score')
#     plt.legend()
