In [None]:
# 기본 라이브러리 import

from tqdm import tqdm
import glob
import os
import PIL
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from einops import rearrange
import random

# 모델 학습에 필요한 토치 관련 패키지 import
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import argparse
from torch.utils.data import Dataset, DataLoader, random_split 
from torch.optim.lr_scheduler import StepLR
from transformers import ViTFeatureExtractor, ViTForImageClassification


In [None]:

# check device

def check_gpu_usage():
    if torch.cuda.is_available():
        device = torch.cuda.current_device()
        print(f'GPU 사용: {torch.cuda.get_device_name(device)}')
    else:
        print('GPU를 사용할 수 없습니다.')

check_gpu_usage()

In [None]:

current_directory = os.getcwd()
os.chdir(current_directory +'/sin')
current_directory = os.getcwd()
print("현재 작업 디렉토리:", current_directory)

image_dir_path = './data_l/' #데이터 최상위 디렉토리

transform = transforms.Compose(
    [transforms.ToTensor(),
    transforms.Normalize((0.1, 0.1, 0.1), (0.5, 0.5, 0.5))])

class WeatherDataset(Dataset):
    def __init__(self, image_path, transform=None): 
        self.image_path = image_path
        
        self.hazy_path = image_path + '/Hazy'
        self.normal_path = image_path + '/Normal'
        self.rainy_path = image_path + '/Rainy'
        self.snowy_path = image_path + '/Snowy'

        self.hazy_img_list = glob.glob(self.hazy_path + '/*.jpg')
        self.normal_img_list = glob.glob(self.normal_path + '/*.jpg')
        self.rainy_img_list = glob.glob(self.rainy_path + '/*.jpg')
        self.snowy_img_list = glob.glob(self.snowy_path + '/*.jpg')
        
        self.transform = transform

        self.img_list = self.hazy_img_list + self.normal_img_list + self.rainy_img_list + self.snowy_img_list
        self.class_list = [0] * len(self.hazy_img_list) + [1] * len(self.normal_img_list) + [2] * len(self.rainy_img_list) + [3] * len(self.snowy_img_list) 
    
    def __len__(self):
        return len(self.img_list)
    
    def __getitem__(self, idx):
        if idx < 0 or idx >= len(self):
            raise IndexError("Index out of range")

        img_path = self.img_list[idx]
        label = self.class_list[idx]
        img = Image.open(img_path).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)

        return img, label

dataset = WeatherDataset(image_path=image_dir_path, transform=transform)
print(len(dataset))

#학습, 검증 데이터 스플릿

train_ratio = 0.8 
val_ratio = 1 - train_ratio 

train_size = int(train_ratio * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

batch_size = 32

train_dataloader = DataLoader(dataset=train_dataset,
                        batch_size=batch_size,
                        shuffle=True, 
                        drop_last=False) #마지막 꼬다리

val_dataloader = DataLoader(dataset=val_dataset,
                        batch_size=batch_size,
                        shuffle=False,
                        drop_last=False)

dataloader_iterator = iter(train_dataloader)
first_batch = next(dataloader_iterator)

# first_batch 출력
print(first_batch)
print(first_batch[0].shape) # B, C, W, H 제대로 나오는 거 확인 완료!
# 샘플 하나 출력
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
img = train_features[0].squeeze()
label = train_labels[0]

img = train_features[0].squeeze()
print(type(img)) # <class 'torch.Tensor'> 토치의 이미지 텐서

plt.imshow(img.permute(1, 2, 0).numpy())  # 채널의 순서를 변경 (C, H, W) -> (H, W, C)
plt.show()
print(f"Label: {label}")


In [None]:
# 새로 바꾼 모델! 

class CNNModel(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(32 * 56 * 56, 128)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        
        return x


In [None]:
# ViT 모델 로드
class ViTModel(nn.Module):
    def __init__(self, num_classes, pretrained=True):
        super(ViTModel, self).__init__()
        self.feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16')
        self.vit = ViTForImageClassification.from_pretrained('google/vit-base-patch16')
        self.fc = nn.Linear(self.vit.config.hidden_size, num_classes)

    def forward(self, x):
        x = self.feature_extractor(x)['pixel_values']
        x = self.vit(x)['logits']
        return x

num_classes = 4  
model = ViTModel(num_classes)
# 이미지 전처리 및 예측
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
# 이미지를 모델에 전달하기 전에 전처리 수행
image = transform(PIL.Image.open('example.jpg')).unsqueeze(0) 
outputs = model(image)
# 예측 결과 확인
predictions = torch.argmax(outputs, dim=1)
print("Predicted class:", predictions.item())

In [None]:

# 이미지 채널, 크기 설정
input_channels = 3
input_size = (224, 224)
num_classes = len(set(first_batch[1]))
#만들어둔 모델 클래스 상속한 객체하나 만들기
model = CNNModel(num_classes)
#하이퍼파라미터 init
learning_rate =  0.0001
num_epochs = 100
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

early_stopping_patience = 10  
early_stopping_counter = 0  
best_val_loss = float('inf')

model_checkpoint_dir = 'checkpoints'
os.makedirs(model_checkpoint_dir, exist_ok=True)
model_checkpoint_prefix = os.path.join(model_checkpoint_dir, 'model_epoch_')


# 학습 루프
for epoch in range(num_epochs):
    print('Epoch:', epoch + 1)

    epoch_loss = 0
    epoch_correct = 0
    model.train()

    # tqdm으로 학습 데이터 루프 감싸기
    with tqdm(total=len(train_dataloader), unit="batch") as pbar:
        for images, labels in train_dataloader:
            x, y = images.to(device), labels.long().to(device)  # device에 넣어줌 ; 쿠다 GPU 사용하겠다는 거임!

            pred = model(x)

            optimizer.zero_grad()
            loss = loss_fn(pred, y)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item() * batch_size
            epoch_correct += pred.argmax(dim=1).eq(y).sum().item()

            # tqdm 업데이트
            pbar.update(1)

    val_loss = 0
    val_correct = 0

    model.eval()  # 평가 모드!

    with torch.no_grad():
        # tqdm을 사용하여 검증 데이터 루프를 감싸기
        with tqdm(total=len(val_dataloader), unit="batch") as pbar:
            for images, labels in val_dataloader:
                x, y = images.to(device), labels.to(device)

                pred = model(x)

                loss = loss_fn(pred, y)

                val_loss += loss.item() * len(images)  # 배치 크기 대신 이미지 수로 정규화
                val_correct += pred.argmax(dim=1).eq(y).sum().item()

                # tqdm 업데이트
                pbar.update(1)

    print("Val Loss:", val_loss / len(val_dataset))  # 검증 데이터셋 크기로 정규화
    print("Val Acc:", (val_correct / len(val_dataset)) * 100)  # 검증 정확도 출력
    
    
    # 얼리스탑핑 체크
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stopping_counter = 0
        # 모델 저장
        torch.save(model.state_dict(), model_checkpoint_prefix + str(epoch+1) + '.pth')
        print(epoch+1, "번째 model_0920.pth 가 best val_loss 를 경신해서 저장을 완료했다!>.<")
    else:
        early_stopping_counter += 1

    # 얼리스탑핑 인내 횟수를 초과하면 학습 종료
    if early_stopping_counter >= early_stopping_patience:
        print("Early stopping triggered.")
        break

# 학습 완료 후 모델 저장
torch.save(model.state_dict(), 'model_0920_fin.pth')




In [None]:
#테스트, 평가
test_data_dir = 'd:/sin/data_test'
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1, 0.1, 0.1), (0.5, 0.5, 0.5))
])

test_dataset = WeatherDataset(image_path=test_data_dir, transform=test_transform)

# 테스트 데이터 공급을 위한 DataLoader 생성
test_dataloader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, drop_last=False)

# 학습한 모델을 로드
model.load_state_dict(torch.load('model_0920_fin.pth'))
model.eval()  # 모델을 평가 모드로 설정

# 오분류된 예측을 저장할 리스트
wrong_predictions = []
# 예측 및 오분류된 예측 추적
with torch.no_grad():
    for images, labels in test_dataloader:
        x, y = images.to(device), labels.to(device)

        pred = model(x)
        predicted_labels = pred.argmax(dim=1)

        # 오분류된 예측을 찾아서 리스트에 저장
        wrong_indices = (predicted_labels != y).nonzero().squeeze()
        wrong_predictions.extend(wrong_indices.cpu().numpy())

# 오분류된 예측 시각화
samples = random.sample(wrong_predictions, min(15, len(wrong_predictions)))

for n in samples:
    print("예측확률분포:", pred[n].cpu().numpy())
    print("라벨:", test_dataset.class_list[n], ", 예측결과:", predicted_labels[n].cpu().numpy())

    # 이미지 로드 및 시각화
    img = Image.open(test_dataset.img_list[n]).convert('RGB')
    img = test_transform(img)
    plt.imshow(img.permute(1, 2, 0).numpy())
    plt.show()
