In [None]:
!pip install efficientnet_pytorch

In [None]:
import os
import shutil
import zipfile
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from PIL import Image
import numpy as np
import torch
from torchvision import transforms
from torchvision import models
import time
import tqdm
import os
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score

from torchvision.models import mobilenet_v3_small, mobilenet_v3_large
from efficientnet_pytorch import EfficientNet
import timm  # PyTorch Image Models (timm) 라이브러리
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [None]:
def count_all_files_in_directory(directory):
    total_files = 0
    # os.walk를 사용해 하위 디렉토리까지 모두 탐색
    for root, dirs, files in os.walk(directory):
        total_files += len(files)  # 현재 디렉토리의 파일 개수를 더함
    return total_files

# 디렉토리 경로 설정
directory = '/kaggle/input/intel-image/seg_train/seg_train'

# 파일 개수 출력
file_count = count_all_files_in_directory(directory)
print(f"'{directory}' 이하의 모든 폴더의 파일 개수: {file_count}")

In [None]:
def count_all_files_in_directory(directory):
    total_files = 0
    # os.walk를 사용해 하위 디렉토리까지 모두 탐색
    for root, dirs, files in os.walk(directory):
        total_files += len(files)  # 현재 디렉토리의 파일 개수를 더함
    return total_files

# 디렉토리 경로 설정
directory = '/kaggle/input/intel-image/seg_test/seg_test'

# 파일 개수 출력
file_count = count_all_files_in_directory(directory)
print(f"'{directory}' 이하의 모든 폴더의 파일 개수: {file_count}")

In [None]:
import random
import numpy as np

# 시드 설정 함수 정의
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 여러 GPU 사용 시
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 시드 설정
set_seed(42)

In [None]:
def collect_and_shuffle_files(data_dir):
    """
    주어진 디렉토리 내의 모든 파일을 수집하여 랜덤하게 섞어 반환.
    하위 폴더에 있는 파일도 모두 포함.
    """
    all_files = []
    data_dir = '/kaggle/input/intel-image/seg_train/seg_train'
    
    # os.walk를 사용해 하위 폴더 내의 모든 파일을 수집
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            all_files.append(os.path.join(root, file))
    
    # 파일 리스트를 랜덤하게 섞음
    random.shuffle(all_files)
    
    return all_files

def split_data_files(data_dir, output_dir, train_ratio=0.8, seed=42):
    """
    데이터를 학습(train)과 검증(valid)으로 분할하고, 데이터를 새 폴더에 저장합니다.
    data_dir: 원본 데이터가 있는 폴더 경로
    output_dir: 분할된 데이터를 저장할 폴더 경로
    train_ratio: 학습 데이터 비율 (기본값: 0.8)
    seed: 랜덤 시드 값
    """
    # 랜덤 시드 고정
    random.seed(seed)

    # data_intel 폴더 생성
    os.makedirs(output_dir, exist_ok=True)

    # 학습 및 검증 폴더 생성
    train_output_dir = os.path.join(output_dir, 'train')
    valid_output_dir = os.path.join(output_dir, 'valid')

    os.makedirs(train_output_dir, exist_ok=True)
    os.makedirs(valid_output_dir, exist_ok=True)

    # 모든 파일을 수집하고 섞음
    all_files = collect_and_shuffle_files(data_dir)
    total_num = len(all_files)

    # 학습 데이터와 검증 데이터로 분할
    train_num = int(total_num * train_ratio)
    train_files = all_files[:train_num]
    valid_files = all_files[train_num:]

    # 파일을 새로운 폴더로 복사
    for file in train_files:
        # 하위 폴더 구조를 유지하면서 복사
        relative_path = os.path.relpath(file, data_dir)
        train_dest_path = os.path.join(train_output_dir, relative_path)
        os.makedirs(os.path.dirname(train_dest_path), exist_ok=True)
        shutil.copy(file, train_dest_path)

    for file in valid_files:
        # 하위 폴더 구조를 유지하면서 복사
        relative_path = os.path.relpath(file, data_dir)
        valid_dest_path = os.path.join(valid_output_dir, relative_path)
        os.makedirs(os.path.dirname(valid_dest_path), exist_ok=True)
        shutil.copy(file, valid_dest_path)

    print(f"Train files: {len(train_files)}개")
    print(f"Valid files: {len(valid_files)}개")

# 경로 설정
train_data_dir = '/kaggle/input/intel-image/seg_train/seg_train'
output_dir = 'data_intel/split_data'

# 데이터 분할 및 저장
split_data_files(train_data_dir, output_dir, train_ratio=0.8)

In [None]:
def collect_and_shuffle_files(data_dir):
    """
    주어진 디렉토리 내의 모든 파일을 수집하여 랜덤하게 섞어 반환.
    하위 폴더에 있는 파일도 모두 포함.
    """
    all_files = []
    data_dir = '/kaggle/input/intel-image/seg_test/seg_test'
    
    # os.walk를 사용해 하위 폴더 내의 모든 파일을 수집
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            all_files.append(os.path.join(root, file))
    
    # 파일 리스트를 랜덤하게 섞음
    random.shuffle(all_files)
    
    return all_files

def split_test_files(data_dir, num_samples, seed=42):
    """
    테스트 데이터를 랜덤하게 추출합니다.
    data_dir: 원본 테스트 데이터가 있는 폴더 경로
    num_samples: 랜덤하게 추출할 파일 개수
    seed: 랜덤 시드 값
    """
    # 랜덤 시드 고정
    random.seed(seed)

    # 모든 파일을 수집하고 섞음
    all_files = collect_and_shuffle_files(data_dir)

    # 요청된 개수만큼 파일을 랜덤하게 추출
    if num_samples > len(all_files):
        print(f"Warning: 요청된 파일 개수({num_samples})가 실제 파일 수({len(all_files)})보다 큽니다.")
        num_samples = len(all_files)

    selected_files = all_files[:num_samples]

    return selected_files

def copy_files_to_test_dir(selected_files, dest_dir, data_dir):
    """
    선택된 테스트 파일을 지정된 디렉토리로 복사
    selected_files: 선택된 파일 경로 리스트
    dest_dir: 복사할 목적 디렉토리
    data_dir: 원본 데이터 디렉토리 경로
    """
    # 목적 폴더가 없으면 생성
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)

    for file in selected_files:
        # 원본 폴더 구조를 유지하면서 파일 복사
        relative_path = os.path.relpath(file, data_dir)
        destination = os.path.join(dest_dir, relative_path)
        
        # 하위 디렉토리 생성
        os.makedirs(os.path.dirname(destination), exist_ok=True)
        
        # 파일 복사
        shutil.copy(file, destination)

    print(f"{len(selected_files)}개의 파일이 '{dest_dir}'에 복사되었습니다.")

# 경로 설정
test_data_dir = '/kaggle/input/intel-image/seg_test/seg_test'  # 원본 테스트 데이터 경로
split_data_test_dir = 'data_intel/split_data/test'  # 테스트 데이터를 복사할 경로

# 테스트 데이터에서 랜덤하게 추출할 파일 수 설정
num_samples = 1000  # 원하는 만큼 설정

# 테스트 데이터에서 랜덤 파일 추출
random_test_files = split_test_files(test_data_dir, num_samples)
print(f"Test files: {len(random_test_files)}개")

# 추출된 파일을 split_data/test 폴더로 복사
copy_files_to_test_dir(random_test_files, split_data_test_dir, test_data_dir)

In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset
import numpy as np

class customDataset(Dataset):
    def __init__(self, files, root_dir, mode, transform=None, class_to_idx=None):
        """
        files: 파일 리스트
        root_dir: 파일들이 있는 폴더 경로
        mode: 'train' 또는 'test' 모드
        transform: 이미지 변환 파이프라인
        class_to_idx: 클래스 이름을 레이블로 변환하는 딕셔너리 (예: {'buildings': 0, 'forest': 1, ...})
        """
        self.files = files
        self.root_dir = root_dir
        self.mode = mode
        self.transform = transform
        self.class_to_idx = class_to_idx

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.files[idx])
        img = Image.open(img_path).convert('RGB')  # RGB로 변환하여 이미지를 읽음

        # 폴더 이름(클래스 이름)을 추출하여 레이블 설정
        folder_name = os.path.basename(os.path.dirname(img_path))
        label = self.class_to_idx[folder_name]

        if self.transform:
            img = self.transform(img)

        # 'train' 모드일 때는 이미지와 레이블 반환
        if self.mode == 'train':
            return img, np.array(label)

        # 'test' 모드일 때는 이미지와 파일 이름 반환 (레이블 없음)
        else:
            return img, self.files[idx]

In [None]:
class CustomTestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        root_dir: 이미지 파일이 포함된 상위 디렉토리
        transform: 이미지 전처리 및 변환을 위한 torchvision.transforms 객체
        """
        self.root_dir = root_dir
        self.transform = transform

        # 모든 이미지 파일 경로를 저장
        self.image_paths = []
        self.labels = []

        # 클래스 이름을 인덱스로 변환하는 매핑 딕셔너리
        self.class_to_idx = {'buildings': 0, 'forest': 1, 'glacier': 2, 'mountain': 3, 'sea': 4, 'street': 5}

        # 디렉토리 내의 각 이미지 파일 경로와 해당 클래스 레이블을 저장
        for class_name in os.listdir(root_dir):
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for img_file in os.listdir(class_dir):
                    if img_file.endswith(('.png', '.jpg', '.jpeg')):
                        self.image_paths.append(os.path.join(class_dir, img_file))
                        self.labels.append(self.class_to_idx[class_name])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        # 파일 이름 추출
        # filename = os.path.basename(img_path)
        filename = img_path
        
        return image, label,filename  # 이미지와 해당 레이블 반환

In [None]:
# 5. 이미지 전처리 설정
org_size = (256, 256)
img_size = 224
visual_transform = transforms.Compose([
    transforms.Resize(org_size),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(img_size),
    # 평균이 0.5 표준편차 0.5 (0~1사이의 실수)
    transforms.ToTensor(),
    ])
train_transform = transforms.Compose([
    transforms.Resize(org_size),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(img_size),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

test_transform = transforms.Compose([
    transforms.Resize(org_size),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [None]:
# 데이터셋 인스턴스화

# Intel 데이터셋 경로 설정
train_dir = 'data_intel/split_data/train'
valid_dir = 'data_intel/split_data/valid'
test_dir = 'data_intel/split_data/test'

# 클래스별로 디렉토리가 나뉘어 있기 때문에 각 클래스 디렉토리에서 파일을 읽어와야 함
class_to_idx = {'buildings': 0, 'forest': 1, 'glacier': 2, 'mountain': 3, 'sea': 4, 'street': 5}

# 각 클래스별 파일 목록 수집
train_files = {cls: os.listdir(os.path.join(train_dir, cls)) for cls in class_to_idx}
valid_files = {cls: os.listdir(os.path.join(valid_dir, cls)) for cls in class_to_idx}


# 데이터셋 생성 (클래스별로)
visual_datasets = []
train_datasets = []
valid_datasets = []
test_datasets = []

for cls in class_to_idx:
    visual_datasets.append(customDataset(train_files[cls], os.path.join(train_dir, cls), 'train', transform=visual_transform,class_to_idx=class_to_idx))
    train_datasets.append(customDataset(train_files[cls], os.path.join(train_dir, cls), 'train', transform=train_transform,class_to_idx=class_to_idx))
    valid_datasets.append(customDataset(valid_files[cls], os.path.join(valid_dir, cls), 'train', transform=train_transform,class_to_idx=class_to_idx))
    # 테스트 데이터셋 생성
test_dataset = CustomTestDataset(root_dir=test_dir, transform=test_transform)

# 데이터셋 결합 (모든 클래스 통합)
visual_dataset = ConcatDataset(visual_datasets)
train_dataset = ConcatDataset(train_datasets)
valid_dataset = ConcatDataset(valid_datasets)
#test_dataset = ConcatDataset(test_datasets)

# 데이터셋 크기 출력
print(f"visual dataset size: {len(visual_dataset)}")
print(f"Train dataset size: {len(train_dataset)}")
print(f"Valid dataset size: {len(valid_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

print(class_to_idx)

In [None]:
# 데이터로더 객체생성

from torch.utils.data import DataLoader

batch_size = 32
visual_loader = DataLoader(visual_dataset, batch_size=batch_size, shuffle=True)
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
valid_loader = DataLoader(valid_dataset, batch_size = batch_size, shuffle = False)
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle = False)

In [None]:
import matplotlib.pyplot as plt

# 배치사이즈만큼의 이미지를 시각화함함
images, labels = next(iter(visual_loader))
classes ={0:'buildings',1:'forest', 2: 'glacier', 3:'mountain', 4: 'sea', 5: 'street' }

fig = plt.figure(figsize=(14, 8))
for i in range(batch_size):
    #4행 8열
    ax = fig.add_subplot(4, 8, i + 1)
    ax.set_title(classes[labels[i].item()])
    ax.axis('off')
    #컬러 채널 순서를 재정렬
    ax.imshow(images[i].permute(1, 2, 0))

plt.show()

In [None]:
import torch
import torch.nn as nn
import torchvision
from efficientnet_pytorch import EfficientNet

# device 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# He 초기화를 적용하는 함수 정의
def initialize_weights_he(module):
    if isinstance(module, nn.Linear) or isinstance(module, nn.Conv2d):
        torch.nn.init.kaiming_normal_(module.weight, nonlinearity='relu')
        if module.bias is not None:
            torch.nn.init.zeros_(module.bias)

# 분류기 레이어를 수정하는 함수 정의
def modify_classifier(model, num_features, num_classes, model_name):
    """모델의 분류기 레이어를 수정하고 He 초기화를 적용하는 함수"""
    classifier = nn.Sequential(
        nn.Linear(num_features, 512),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(512, num_classes),
    ).to(device)
    
    # He 초기화 적용
    classifier.apply(initialize_weights_he)
    
    # 분류기 레이어에 학습 가능하도록 requires_grad 설정
    for param in classifier.parameters():
        param.requires_grad = True

    # 모델의 분류기 부분을 교체
    if model_name in ['efficientNetB2', 'efficientNetB0']:
        model._fc = classifier
    elif model_name in ['MNV3_large', 'MNV3_small']:
        model.classifier = classifier
    elif model_name in ['RESNET50', 'RESNET18']:
        model.fc = classifier

    return model

# 모델 생성 및 수정 함수
def create_model(model_name, num_classes):
    """모델 생성 및 수정"""
    if model_name == 'efficientNetB2':
        model = EfficientNet.from_pretrained('efficientnet-b2')
        num_features = model._fc.in_features
    elif model_name == 'efficientNetB0':
        model = EfficientNet.from_pretrained('efficientnet-b0')
        num_features = model._fc.in_features
    elif model_name == 'MNV3_large':
        model = torchvision.models.mobilenet_v3_large(pretrained=True)
        num_features = model.classifier[0].in_features
    elif model_name == 'MNV3_small':
        model = torchvision.models.mobilenet_v3_small(pretrained=True)
        num_features = model.classifier[0].in_features
    elif model_name == 'RESNET50':
        model = torchvision.models.resnet50(pretrained=True)
        num_features = model.fc.in_features
    elif model_name == 'RESNET18':
        model = torchvision.models.resnet18(pretrained=True)
        num_features = model.fc.in_features
    else:
        raise ValueError(f"Model '{model_name}' not recognized.")
    
    # 모든 파라미터 고정
    for param in model.parameters():
        param.requires_grad = False

    # 분류기 레이어 수정 및 학습 가능하게 설정
    model = modify_classifier(model, num_features, num_classes, model_name).to(device)

    # 모델을 GPU로 이동
    model = model.to(device)

    return model

# 선택 가능한 모델 출력
models = ['efficientNetB2', 'efficientNetB0', 'MNV3_large', 'MNV3_small', 'RESNET50', 'RESNET18']
print("사용 가능한 모델:")
for model_name in models:
    print(f" - {model_name}")

# 다중 분류를 위한 클래스 수 설정
num_classes = 6  # 예시: 6개의 클래스를 분류한다고 가정

# 모델 선택
selected_model_name = "RESNET50"  # 선택하려는 모델 이름

# 모델 생성
model = create_model(selected_model_name, num_classes)  # selected_model_name을 사용
print(f"선택된 모델: {selected_model_name}")

In [None]:
# EarlyStopping 클래스
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt'):
        """
        Args:
            patience (int): 성능 개선이 없을 때 몇 번의 에포크까지 기다릴지.
            verbose (bool): True일 경우 개선될 때마다 메시지 출력.
            delta (float): 성능 개선으로 간주될 최소 변화량.
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss
        # 처음에 호출됐을때는 best_score가 None이라서 초기값을 설정
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        # 지금까지의 best_scor
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''검증 손실이 감소하면 모델을 저장합니다.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
            torch.save(model.state_dict(), self.path)  # 모델 상태 저장
        self.val_loss_min = val_loss

In [None]:
# 7. 손실함수 및 옵티마이저 정의
lr = 0.00001
epochs = 15
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# 학습률 조정(loss를 기준으로 한다면 mode가 min/ accuracy를 기준으로 한다면 max로 수정한다.)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor = 0.5, patience = 3)

# EarlyStopping 인스턴스 생성 (patience=10)
path = f"{selected_model_name}_best.pth"
early_stopping = EarlyStopping(patience=10, verbose=True, path=path)

In [None]:
import time
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm

def imshow(img):
    # 이미지 정규화를 해제하기 위해 역변환
    img = img / 2 + 0.5  # (0.5, 0.5, 0.5)로 정규화된 이미지일 경우
    np_img = img.numpy()
    plt.imshow(np.transpose(np_img, (1, 2, 0)))
    plt.show()

def fit(model, criterion, optimizer, epochs, train_loader, valid_loader):
    # 훈련 시작시 초기 lr을 로딩
    pre_lr = optimizer.param_groups[0]['lr']
    model.train()

    train_loss = 0
    train_acc = 0
    train_correct = 0

    # 그래프를 출력하기 위해 리스트에 누적
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []

    for epoch in range(3):
        start = time.time()

        # tqdm을 사용하여 각 에포크의 진행률 표시
        train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [Train]", leave=False)

        # 1 epoch 중에 train에 해당하는 for 문
        for train_x, train_y in train_loader_tqdm:
            model.train()
            train_x, train_y = train_x.to(device), train_y.to(device).long()
            
            # 레이블의 차원이 1D인지 확인 및 변환
            # if train_y.dim() > 1:
            #     train_y = train_y.squeeze()  # 레이블을 1D로 변환
            # 기울기 초기화
            optimizer.zero_grad()
            # 예측
            pred = model(train_x)
            # 손실계산
            loss = criterion(pred, train_y).squeeze()
            # 오차역전파
            loss.backward()
            # 진행
            optimizer.step()
            train_loss += loss.item()

            # 소프트맥스
            y_pred = pred.argmax(dim=1).cpu()
            # y_pred와 train_y값이 같으면 train_correct 추가
            train_correct += y_pred.eq(train_y.cpu()).int().sum()

        # validation data check
        valid_loss = 0
        valid_acc = 0
        valid_correct = 0

        # tqdm을 사용하여 각 에포크의 validation 진행률 표시
        valid_loader_tqdm = tqdm(valid_loader, desc=f"Epoch {epoch+1}/{epochs} [Validation]", leave=False)

        # 1 epoch 중에 validation에 해당하는 for 문
        for valid_x, valid_y in valid_loader_tqdm:
            with torch.no_grad():
                model.eval()
                valid_x, valid_y = valid_x.to(device), valid_y.to(device).long()
                # 레이블의 차원이 1D인지 확인 및 변환
                # if valid_y.dim() > 1:
                #     valid_y = valid_y.squeeze()  # 레이블을 1D로 변환
                pred = model(valid_x)
                loss = criterion(pred, valid_y).squeeze()
            valid_loss += loss.item()

            # 소프트맥스
            y_pred = pred.argmax(dim=1).cpu()
            valid_correct += y_pred.eq(valid_y.cpu()).int().sum()

        train_acc = train_correct / len(train_loader.dataset)
        valid_acc = valid_correct / len(valid_loader.dataset)

        print(f'{time.time() - start:.3f}sec : [Epoch {epoch+1}/{epochs}] -> train loss: {train_loss/len(train_loader):.4f}, train acc: {train_acc*100:.3f}% / valid loss: {valid_loss/len(valid_loader):.4f}, valid acc: {valid_acc*100:.3f}%')
        print('reserved:', round(torch.cuda.memory_reserved(0)/1024**2))
        train_losses.append(train_loss/len(train_loader))
        train_accuracies.append(train_acc)
        valid_losses.append(valid_loss/len(valid_loader))
        valid_accuracies.append(valid_acc)

        train_loss = 0
        train_acc = 0
        train_correct = 0
        
            # ReduceLROnPlateau 스케줄러를 사용하여 검증 손실에 따라 학습률을 조정
        scheduler.step(valid_loss)

        # 현재 학습률 출력
        now_lr = optimizer.param_groups[0]['lr']
        if now_lr != pre_lr:
            pre_lr = now_lr
            lr_str = ', LR changed!!'
        else:
            lr_str = ''

        print(f'learning_rate {epoch+1}: {now_lr:.8f}'+lr_str)

        # EarlyStopping을 호출하여 학습 중단 여부 확인
        early_stopping(valid_loss, model)

        # 학습 중단 조건을 충족하면 break
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break
        print('-' * 70)
        
    # 학습 및 검증 손실/정확도 시각화
    plt.plot(train_losses, label='Train Loss')
    plt.plot(valid_losses, label='Valid Loss')
    plt.legend()
    plt.title('Loss')
    plt.show()

    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(valid_accuracies, label='Valid Accuracy')
    plt.legend()
    plt.title('Accuracy')
    plt.show()


In [None]:
from tqdm import tqdm

# 클래스 인덱스와 이름 매핑
class_to_idx = {'buildings': 0, 'forest': 1, 'glacier': 2, 'mountain': 3, 'sea': 4, 'street': 5}
idx_to_class = {v: k for k, v in class_to_idx.items()}

# 혼동 행렬(Confusion Matrix) 그리는 함수
def plot_confusion_matrix(labels, predictions, class_names):
    cm = confusion_matrix(labels, predictions)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    
# 테스트 데이터에서 성능을 평가하고, 예측이 잘못된 이미지를 시각화하는 함수
def evaluate_and_visualize(model, test_loader, criterion):
    test_loss = 0
    test_correct = 0
    test_total = 0
    incorrect_labels = []
    incorrect_preds = []
    incorrect_filenames = []
    all_labels = []  # 전체 실제 라벨 저장
    all_preds = []   # 전체 예측 라벨 저장
    
    model.eval()
    with torch.no_grad():
        for test_x, test_y, filenames in tqdm(test_loader):
            test_x, test_y = test_x.to(device), test_y.to(device).long()
            pred = model(test_x)
            loss = criterion(pred, test_y)
            test_loss += loss.item()

            # 예측 결과 처리
            y_pred = pred.argmax(dim=1).cpu()  # 다중 분류를 위한 argmax 사용

            # 정확도 계산
            test_correct += y_pred.eq(test_y.cpu()).int().sum().item()
            test_total += test_y.size(0)
            
            # 전체 라벨과 예측값 저장 (혼동 행렬을 위한 데이터)
            all_labels.extend(test_y.cpu().numpy())
            all_preds.extend(y_pred.cpu().numpy())

            
            # 잘못된 예측 저장 (각 요소별로 비교)
            for i in range(len(y_pred)):
                if y_pred[i] != test_y[i]:  # 예측과 실제가 다른 경우에만 저장
                    incorrect_labels.append(test_y.cpu()[i].item())
                    incorrect_preds.append(y_pred.cpu()[i].item())
                    incorrect_filenames.append(filenames[i])
                    

    test_accuracy = test_correct / test_total
    print(f"test_correct: {test_correct}, test_total: {test_total}")
    print(f'Test Loss: {test_loss / len(test_loader):.4f}, Test Accuracy: {test_accuracy * 100:.2f}%')
    
    # 혼동 행렬 시각화
    class_names = [idx_to_class[i] for i in range(len(class_to_idx))]
    plot_confusion_matrix(all_labels, all_preds, class_names)
    
    # 잘못 예측된 파일 이름만 추출
    just_filenames = [os.path.basename(path) for path in incorrect_filenames]

    # 예측이 잘못된 이미지 시각화
    fig = plt.figure(figsize=(12, 12))
    for i in range(min(16, len(incorrect_filenames))):
        ax = fig.add_subplot(4, 4, i + 1)
        # 잘못된 예측된 이미지 불러오기
        image = Image.open(incorrect_filenames[i])
        ax.imshow(image)

        # 실제 라벨과 예측 라벨 가져오기
        true_label = idx_to_class[incorrect_labels[i]]
        pred_label = idx_to_class[incorrect_preds[i]]
        ax.set_title(f'Pred: {pred_label}, Label: {true_label}, {just_filenames[i]}', fontsize=10)
        ax.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
fit(model, criterion, optimizer, epochs, train_loader, valid_loader)

In [None]:
# base 디렉토리 설정
base_dir = os.getcwd()  # 현재 작업 디렉토리

# base 디렉토리 아래에 'model' 폴더 경로 설정
modelPath = os.path.join(base_dir, 'model_intel')

# 폴더가 존재하지 않으면 생성
if not os.path.exists(modelPath):
    os.makedirs(modelPath)

# 모델 파일 이름 설정
# selected_model_name에 .pth 확장자를 추가
selected_model_name = f"{selected_model_name}.pth" if not selected_model_name.endswith(".pth") else selected_model_name

# 모델 저장
torch.save(model.state_dict(), os.path.join(modelPath, selected_model_name))

print(f"Model saved to {os.path.join(modelPath, selected_model_name)}")

In [None]:
evaluate_and_visualize(model, test_loader, criterion)