In [22]:
import random
import cv2
import os
import time
import pandas as pd
import numpy as np

from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.datasets as datasets
import torch.utils.data as data

from torchvision import transforms
from PIL import Image, ImageFile, ImageOps
from torch.utils import data as data_utils
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, random_split, Dataset, SubsetRandomSampler

In [23]:
def Histogram_Equalization_pil(image):
    r, g, b = image.split()
    r_eq = ImageOps.equalize(r)
    g_eq = ImageOps.equalize(g)
    b_eq = ImageOps.equalize(b)
    histogram_image = Image.merge("RGB", (r_eq, g_eq, b_eq))
    return np.array(histogram_image)

def load_image_pil(path):
    image = Image.open(path).convert("RGB")
    return image

def load_and_preprocess_image_pil(path):
    image = load_image_pil(path)
    image = Histogram_Equalization_pil(image)
    return Image.fromarray(image)

class CustomDataset(Dataset):
    def __init__(self, data_path, transform=None):
        self.data_path = data_path
        self.transform = transform
        self.classes = os.listdir(data_path)
        self.images = self.load_images()

    def load_images(self):
        images = []
        for class_name in self.classes:
            class_path = os.path.join(self.data_path, class_name)
            for image_name in os.listdir(class_path):
                image_path = os.path.join(class_path, image_name)
                images.append((image_path, class_name))
        return images

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image_path, class_name = self.images[index]
        image = load_and_preprocess_image_pil(image_path)

        if self.transform:
            image = self.transform(image)

        return image, self.classes.index(class_name)

train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.Grayscale(),
    transforms.RandomRotation(1),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5), (0.5))  
])

test_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_data_path = r'C:\Users\dlwks\OneDrive\바탕 화면\VSCode\BTS_2023\remove_dataset\train'
train_dataset = CustomDataset(train_data_path, transform=train_transform)

test_data_path = r'C:\Users\dlwks\OneDrive\바탕 화면\VSCode\BTS_2023\remove_dataset\test'
test_dataset = CustomDataset(test_data_path, transform=test_transform) 

image, label = train_dataset[0]
image.shape

torch.Size([1, 256, 256])

In [24]:
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size

## StartifiedShuufleSplit
sss = StratifiedShuffleSplit(n_splits=5, test_size=val_size, random_state=42)
train_indices, val_indices = next(sss.split(train_dataset, [label for _, label in train_dataset.images]))
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

In [25]:
train_loader = DataLoader(train_dataset, batch_size = 32, sampler = train_sampler)
val_loader = DataLoader(train_dataset, batch_size = 32, sampler = val_sampler)
test_loader = DataLoader(test_dataset, batch_size = 32, shuffle = False)

In [26]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.bencmark = True

seed_everything(42)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [27]:
class Mish(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x * torch.tanh(nn.functional.softplus(x))

class GIGAJINI(torch.nn.Module):

    def __init__(self):
        super(GIGAJINI, self).__init__()

        self.layer1 = torch.nn.Sequential(
            nn.Conv2d(1, 32, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(32),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )

        self.layer2 = torch.nn.Sequential(
            nn.Conv2d(32, 64, kernel_size = 3, stride =1, padding = 1),
            nn.BatchNorm2d(64),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )

        self.layer3 = torch.nn.Sequential(
            nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(128),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )

        self.layer4 = torch.nn.Sequential(
            nn.Conv2d(128, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(128),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )

        self.layer5 = torch.nn.Sequential(
            nn.Conv2d(128, 256, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(256),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride= 2)
        )

        self.layer6 = torch.nn.Sequential(
            nn.Conv2d(256, 512, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(512),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )

        self.layer7 = torch.nn.Sequential(
            nn.Conv2d(512, 512, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(512),
            Mish(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )
        
        self.fc =torch.nn.Sequential(
            nn.Linear(512 * 2 * 2, 2, bias = True),            
            Mish()
        )        
    
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        out = self.layer7(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [28]:
model = GIGAJINI().to(device)
learning_rate = 1e-4
epochs = 50
batch_size = 16
class_weight = torch.tensor([0.1, 0.9])
criterion = torch.nn.CrossEntropyLoss(weight = class_weight).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

In [29]:
def train(model, train_loader, val_loader, epochs, learning_rate, patience):

    best_loss = float('inf')
    best_model = None
    epochs_without_importvement = 0

    for epoch in range(epochs):
        model.train()
        avg_loss = 0

        for X, Y in train_loader:
            X = X.to(device)
            Y = Y.to(device)

            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, Y)
            loss.backward()
            optimizer.step()

            avg_loss += loss.item()

        avg_loss /= len(train_loader)

        val_loss = evaluate(model, val_loader) 

        print(f'Epoch : {epoch + 1}, Train Loss : {avg_loss:.4f}, Validation Loss: {val_loss:.4f}')

        if val_loss < best_loss:
            best_loss = val_loss
            best_model = model.state_dict().copy()
            print('Model Saved')
            epochs_without_importvement = 0

        else:
            epochs_without_importvement += 1

        if epochs_without_importvement >= patience:
            print(f'Early stopping: No improvement in validation loss for {patience} epochs.')
            break

    return best_model

def evaluate(model, dataloader):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for X, Y in dataloader:
            X = X.to(device)
            Y = Y.to(device)
            output = model(X)
            loss = criterion(output, Y)
            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    return avg_loss

In [30]:
patience = 5
best_model = train(model, train_loader, val_loader, epochs, learning_rate, patience)
model.load_state_dict(best_model)

model.eval()
test_loss = 0
correct = 0
all_preds = []
all_labels = []

with torch.no_grad():
    for X, Y in test_loader:
        X = X.to(device)
        Y = Y.to(device)
        output = model(X)
        test_loss += criterion(output, Y).item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(Y.view_as(pred)).sum().item()

        all_preds.extend(pred.cpu().numpy())
        all_labels.extend(Y.cpu().numpy())

test_loss /= len(test_loader.dataset)
accuracy = correct / len(test_loader.dataset)

print(f"Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.2%}")

# F1 score 계산
f1_micro = f1_score(all_labels, all_preds, average='micro')
f1_macro = f1_score(all_labels, all_preds, average='macro')
f1_weighted = f1_score(all_labels, all_preds, average='weighted')
print(f"F1 Score (Micro): {f1_micro:.8f}")
print(f"F1 Score (Macro): {f1_macro:.8f}")
print(f"F1 Score (Weighted): {f1_weighted:.8f}")

# Precision 계산
precision_micro = precision_score(all_labels, all_preds, average='micro')
precision_macro = precision_score(all_labels, all_preds, average='macro')
precision_weighted = precision_score(all_labels, all_preds, average='weighted')
print(f"Precision (Micro): {precision_micro:.8f}")
print(f"Precision (Macro): {precision_macro:.8f}")
print(f"Precision (Weighted): {precision_weighted:.8f}")

# Recall 계산
recall_micro = recall_score(all_labels, all_preds, average='micro')
recall_macro = recall_score(all_labels, all_preds, average='macro')
recall_weighted = recall_score(all_labels, all_preds, average='weighted')
print(f"Recall (Micro): {recall_micro:.8f}")
print(f"Recall (Macro): {recall_macro:.8f}")
print(f"Recall (Weighted): {recall_weighted:.8f}")

# Confusion Matrix 계산
cm = confusion_matrix(all_labels, all_preds)
print('Confusion Matrix:')
print(cm)

# 분류 리포트 출력
class_names = [str(num) for num in torch.arange(2).tolist()]
classification_rep = classification_report(all_labels, all_preds, target_names=class_names)
print('Classification Report:')
print(classification_rep)

Epoch : 1, Train Loss : 0.8248, Validation Loss: 0.6893
Model Saved
Epoch : 2, Train Loss : 0.5569, Validation Loss: 0.6880
Model Saved
Epoch : 3, Train Loss : 0.7223, Validation Loss: 0.6897
Epoch : 4, Train Loss : 0.6172, Validation Loss: 0.6923
Epoch : 5, Train Loss : 0.4446, Validation Loss: 0.6962
Epoch : 6, Train Loss : 0.4842, Validation Loss: 0.6999
Epoch : 7, Train Loss : 0.5040, Validation Loss: 0.7049
Early stopping: No improvement in validation loss for 5 epochs.
Test Loss: 0.1407, Accuracy: 60.00%
F1 Score (Micro): 0.60000000
F1 Score (Macro): 0.37500000
F1 Score (Weighted): 0.45000000
Precision (Micro): 0.60000000
Precision (Macro): 0.30000000
Precision (Weighted): 0.36000000
Recall (Micro): 0.60000000
Recall (Macro): 0.50000000
Recall (Weighted): 0.60000000
Confusion Matrix:
[[3 0]
 [2 0]]
Classification Report:
              precision    recall  f1-score   support

           0       0.60      1.00      0.75         3
           1       0.00      0.00      0.00         

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [16]:
## 1로 예측했는데 0인 것들
# class_1_predictions = []
# for i, (X, Y) in enumerate(test_dataset):
#     X = X.to(device)
#     output = model(X.unsqueeze(0))  # 배치 차원을 추가하여 예측합니다.
#     pred = output.argmax(dim=1, keepdim=True)
#     if pred.item() == 1:
#         class_1_predictions.append((X.cpu(), Y, pred.item()))

# num_to_display = min(len(class_1_predictions), 10)

# for i in range(num_to_display):
#     image, true_label, predicted_label = class_1_predictions[i]
#     image = (image.squeeze().numpy() + 1) / 2  # 이미지를 원래 범위로 변환합니다.
#     plt.imshow(image, cmap='gray')
#     plt.title(f"True Label: {true_label}, Predicted Label: {predicted_label}")
#     plt.show()

In [31]:
class_1_predictions = []
for i, (X, Y) in enumerate(test_dataset):
    X = X.to(device)
    output = model(X.unsqueeze(0))  # 배치 차원을 추가하여 예측합니다.
    pred = output.argmax(dim=1, keepdim=True)
    if pred.item() == 1 and Y == 1: 
        class_1_predictions.append((X.cpu(), Y, pred.item()))

num_to_display = min(len(class_1_predictions), 10)

for i in range(num_to_display):
    image, true_label, predicted_label = class_1_predictions[i]
    image = (image.squeeze().numpy() + 1) / 2  # 이미지를 원래 범위로 변환합니다.

In [32]:
PATH = r'AD_CNN_gray_256.pth'

torch.save(model, PATH)