In [2]:
# !pip freeze > requirements.txt
# !pip install -r requirements.txt

In [2]:
from datetime import datetime
import json
import math
import os
import random

import numpy as np
import pytz
import torch
import torch.nn as nn
import torch.optim
from torch.utils.data import Dataset, DataLoader, random_split, Sampler
from torch.utils.tensorboard import SummaryWriter
from torchsummary import summary
from torchvision import models, transforms
from tqdm import tqdm
from PIL import Image
from sklearn.metrics import (
    confusion_matrix,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)
from sklearn.model_selection import train_test_split

import warnings
warnings.filterwarnings("ignore")

# Preprocessing

In [3]:
class PathLabelProcessor:
    def __init__(self,
                 base_path,
                 folder_name):
        self.base_path = base_path
        self.folder_name = folder_name
        
        self.label_images()
        
    def find_folders_by_name(self):
        matching_folders = []

        for root, dirs, files in os.walk(self.base_path):
            for dir_name in dirs:
                if self.folder_name in dir_name:
                    folder_path = os.path.join(root, dir_name)
                    matching_folders.append(folder_path)

        return matching_folders

    def find_image_json_pairs(self, folder_path):
        image_paths = []
        json_paths = []

        for root, dirs, files in os.walk(folder_path):
            for image_file in filter(lambda x: x.lower().endswith(('jpg', 'png')), files):
                image_path = os.path.join(root, image_file)
                json_file = f"{os.path.splitext(image_path)[0]}.json"
                if os.path.isfile(json_file):
                    image_paths.append(image_path)
                    json_paths.append(json_file)

        return image_paths, json_paths

    def label_images(self):
        self.labeled_image_paths = []

        for folder_path in self.find_folders_by_name():
            image_paths, json_paths = self.find_image_json_pairs(folder_path)
            
            for image_path, json_path in zip(image_paths, json_paths):
                with open(json_path) as f:
                    data = json.load(f)

                label = 1 if data['metaData']['lesions'] == 'A7' else 0
                self.labeled_image_paths.append((image_path, label))
            
        symptomatic_count = sum(1 for _, label in self.labeled_image_paths if label == 0)
        asymptomatic_count = sum(1 for _, label in self.labeled_image_paths if label == 1)
        
        weight_class_0 = 1.0 / symptomatic_count
        weight_class_1 = 1.0 / asymptomatic_count
        self.class_weights = torch.tensor([weight_class_0, weight_class_1])

        print(f'Total cases: {len(self.labeled_image_paths)}')
        print(f'Number of symptomatic cases: {symptomatic_count}, Number of asymptomatic cases: {asymptomatic_count}')

In [4]:
%%time
base_path = 'skin/Train'
folder_name = '일반'

processor = PathLabelProcessor(base_path=base_path, folder_name=folder_name)

data = processor.labeled_image_paths
class_weights = processor.class_weights

KeyboardInterrupt: 

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]
        image = Image.open(image_path)
        image = self.transform(image)

        return image, label

class ImageDataset():
    def __init__(self,
                 data,
                 transform,
                 test_size,
                 seed,
                 batch_size,
                 shuffle,
                 num_workers):
        dataset = self.make_dataset(data, transform, test_size, seed)
        self.dataloader = self.make_dataloader(dataset, batch_size, shuffle, num_workers)
        
        
    def make_dataset(self, data, transform, test_size=None, seed=42):
        if test_size:
            train_data, val_data = train_test_split(data, 
                                                    test_size=test_size,
                                                    random_state=seed)
            dataset_dict = {'train': train_data,
                            'val': val_data}
        else:
            dataset_dict = {'test' : data}

        dataset = {k: CustomDataset(v, transform[k])
                   for k, v in dataset_dict.items()}
        
        return dataset
        
    def make_dataloader(self, dataset, batch_size, shuffle, num_workers):
        dataloader = {k: DataLoader(dataset=dataset[k],
                                    batch_size=batch_size,
                                    shuffle=shuffle,
                                    num_workers=num_workers,
                                    pin_memory=True)
                      for k in dataset.keys()}
        
        for k, v in dataloader.items():
            self.print_class_distribution(k, v)
        
        return dataloader
    
    def compute_class_counts(self, data_loader):
        counts = torch.zeros(2, dtype=torch.long)
        
        for _, labels in data_loader:
            counts += torch.bincount(labels, minlength=2)
        
        return counts

    def print_class_distribution(self, phase, data_loader):
        print(f"Class Distribution for {phase}:")
        class_counts = self.compute_class_counts(data_loader)
        for class_label, count in enumerate(class_counts):
            print(f"  Class {class_label}: {count} samples")

In [None]:
%%time
transform = {'train': transforms.Compose([transforms.Resize((176, 176)),
                                          transforms.RandomHorizontalFlip(),
                                          transforms.RandomVerticalFlip(),
                                          transforms.RandomRotation(degrees=10),
                                          transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
                                          transforms.ToTensor(),
                                          transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])]),
             'val': transforms.Compose([transforms.Resize((232, 232)),
                                        transforms.ToTensor(),
                                        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])}
test_size = 0.2
seed = 42
batch_size = 64
shuffle = True
num_workers = os.cpu_count()

dataloader = ImageDataset(data=data,
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

Class Distribution for train:


# Modeling

In [None]:
class CosineAnnealingWarmUpRestarts(torch.optim.lr_scheduler._LRScheduler):
    def __init__(self, optimizer, T_0, T_mult=1, eta_max=0.1, T_up=0, gamma=1., last_epoch=-1):
        if T_0 <= 0 or not isinstance(T_0, int):
            raise ValueError("Expected positive integer T_0, but got {}".format(T_0))
        if T_mult < 1 or not isinstance(T_mult, int):
            raise ValueError("Expected integer T_mult >= 1, but got {}".format(T_mult))
        if T_up < 0 or not isinstance(T_up, int):
            raise ValueError("Expected positive integer T_up, but got {}".format(T_up))
        self.T_0 = T_0
        self.T_mult = T_mult
        self.base_eta_max = eta_max
        self.eta_max = eta_max
        self.T_up = T_up
        self.T_i = T_0
        self.gamma = gamma
        self.cycle = 0
        self.T_cur = last_epoch
        super(CosineAnnealingWarmUpRestarts, self).__init__(optimizer, last_epoch)
    
    def get_lr(self):
        if self.T_cur == -1:
            return self.base_lrs
        elif self.T_cur < self.T_up:
            return [(self.eta_max - base_lr)*self.T_cur / self.T_up + base_lr for base_lr in self.base_lrs]
        else:
            return [base_lr + (self.eta_max - base_lr) * (1 + math.cos(math.pi * (self.T_cur-self.T_up) / (self.T_i - self.T_up))) / 2
                    for base_lr in self.base_lrs]

    def step(self, epoch=None):
        if epoch is None:
            epoch = self.last_epoch + 1
            self.T_cur = self.T_cur + 1
            if self.T_cur >= self.T_i:
                self.cycle += 1
                self.T_cur = self.T_cur - self.T_i
                self.T_i = (self.T_i - self.T_up) * self.T_mult + self.T_up
        else:
            if epoch >= self.T_0:
                if self.T_mult == 1:
                    self.T_cur = epoch % self.T_0
                    self.cycle = epoch // self.T_0
                else:
                    n = int(math.log((epoch / self.T_0 * (self.T_mult - 1) + 1), self.T_mult))
                    self.cycle = n
                    self.T_cur = epoch - self.T_0 * (self.T_mult ** n - 1) / (self.T_mult - 1)
                    self.T_i = self.T_0 * self.T_mult ** (n)
            else:
                self.T_i = self.T_0
                self.T_cur = epoch
                
        self.eta_max = self.base_eta_max * (self.gamma**self.cycle)
        self.last_epoch = math.floor(epoch)
        for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
            param_group['lr'] = lr

In [None]:
class FocalLossWithLabelSmoothing(nn.Module):
    def __init__(self, gamma=2, alpha=None, smoothing=0.1, num_classes=2, reduction='mean'):
        super(FocalLossWithLabelSmoothing, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.smoothing = smoothing
        self.num_classes = num_classes
        self.reduction = reduction

    def forward(self, inputs, targets):
        targets = targets.to(inputs.device)

        target_one_hot = torch.zeros_like(inputs)
        target_one_hot.scatter_(1, targets.unsqueeze(1), 1 - self.smoothing)
        target_one_hot.scatter_(1, (targets.unsqueeze(1) + 1) % self.num_classes, self.smoothing)

        ce_loss = nn.functional.cross_entropy(inputs, target_one_hot, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = (1 - pt) ** self.gamma * ce_loss

        if self.alpha is not None:
            self.alpha = self.alpha.to(inputs.device)
            focal_loss = self.alpha[targets] * focal_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        elif self.reduction == 'none':
            return focal_loss
        else:
            raise ValueError("Invalid reduction option")

In [None]:
class ModelTrainer:
    def __init__(self,
                 model,
                 device,
                 dataloader,
                 criterion,
                 optimizer,
                 scheduler):
        self.device = device
        self.model = model.to(self.device)
        model_name = model.__class__.__name__
        self.dataloader = dataloader
        self.criterion = criterion.to(device)
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.best_f1_score = 0.0
        korea = pytz.timezone('Asia/Seoul')
        now = datetime.now(korea)
        start_time = now.strftime('%Y%m%d-%H%M%S')
        self.name = f'{start_time}_{model_name}_symptoms.pth'
        self.writer = SummaryWriter(log_dir=f'runs/{self.name}')

    def calculate_f1_score(self, predicted, labels):
        return f1_score(labels, predicted, average='binary')

    def calculate_auc_roc(self, predicted, labels):
        return roc_auc_score(labels, predicted)

    def run_epoch(self, epoch, num_epochs):
        for phase in ['train', 'val']:
            self.model.train() if phase == 'train' else self.model.eval()
            dataloader = self.dataloader[phase]

            total_loss = 0.0
            correct = 0
            total = 0
            all_predicted = []
            all_labels = []

            for inputs, labels in tqdm(dataloader, desc=f'{phase.capitalize()} Epoch {epoch + 1}/{num_epochs}', unit='batch'):
                inputs, labels = inputs.to(self.device), labels.to(self.device)

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        self.optimizer.step()
                        self.optimizer.zero_grad()

                    total_loss += loss.item()

                    _, predicted = torch.max(outputs, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

                    all_predicted.extend(predicted.cpu().numpy())
                    all_labels.extend(labels.cpu().numpy())

            avg_loss = total_loss / len(dataloader)
            accuracy = correct / total
            self.writer.add_scalar(f'Loss/{phase}', avg_loss, epoch)
            self.writer.add_scalar(f'Accuracy/{phase}', accuracy, epoch)

            if phase == 'val':
                current_f1_score = self.calculate_f1_score(np.array(all_predicted), np.array(all_labels))
                current_auc_roc = self.calculate_auc_roc(np.array(all_predicted), np.array(all_labels))

                self.writer.add_scalar('F1 Score/valid', current_f1_score, epoch)
                self.writer.add_scalar('AUC-ROC/valid', current_auc_roc, epoch)

                if current_f1_score > self.best_f1_score:
                    self.best_f1_score = current_f1_score
                    torch.save(self.model, self.name)

        lr_value = self.scheduler.get_lr()[0]
        self.writer.add_scalar('LearningRate', lr_value, epoch)

    def train(self, num_epochs):
        for epoch in range(num_epochs):
            self.run_epoch(epoch, num_epochs)

        self.writer.close()

In [None]:
model = models.convnext_large(weights='DEFAULT')
for name, param in model.named_parameters():
    if "last_layer" not in name:
        param.requires_grad = False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_ftrs = model.classifier[2].in_features
model.classifier[2] = nn.Linear(num_ftrs, 2)
criterion = FocalLossWithLabelSmoothing(gamma=2, alpha=class_weights, reduction='sum')
optimizer = torch.optim.AdamW(model.parameters(), lr=0, weight_decay=1e-5)
scheduler = CosineAnnealingWarmUpRestarts(optimizer, T_0=10, T_mult=1, eta_max=0.1,  T_up=10, gamma=0.5)

trainer = ModelTrainer(model=model,
                       device=device,
                       dataloader=dataloader.dataloader,
                       criterion=criterion,
                       optimizer=optimizer,
                       scheduler=scheduler)

In [None]:
trainer.train(50)

Train Epoch 1/30: 100%|██████████| 823/823 [01:57<00:00,  7.00batch/s]
Val Epoch 1/30: 100%|██████████| 206/206 [00:27<00:00,  7.55batch/s]
Train Epoch 2/30: 100%|██████████| 823/823 [01:57<00:00,  7.03batch/s]
Val Epoch 2/30: 100%|██████████| 206/206 [00:27<00:00,  7.61batch/s]
Train Epoch 3/30: 100%|██████████| 823/823 [01:56<00:00,  7.07batch/s]
Val Epoch 3/30: 100%|██████████| 206/206 [00:27<00:00,  7.56batch/s]
Train Epoch 4/30: 100%|██████████| 823/823 [02:07<00:00,  6.44batch/s]
Val Epoch 4/30: 100%|██████████| 206/206 [00:32<00:00,  6.33batch/s]
Train Epoch 5/30: 100%|██████████| 823/823 [03:13<00:00,  4.26batch/s]
Val Epoch 5/30: 100%|██████████| 206/206 [00:55<00:00,  3.69batch/s]
Train Epoch 6/30: 100%|██████████| 823/823 [03:57<00:00,  3.47batch/s]
Val Epoch 6/30: 100%|██████████| 206/206 [00:56<00:00,  3.68batch/s]
Train Epoch 7/30: 100%|██████████| 823/823 [03:57<00:00,  3.47batch/s]
Val Epoch 7/30: 100%|██████████| 206/206 [00:56<00:00,  3.66batch/s]
Train Epoch 8/30: 10

# Evaluation

In [10]:
class ModelTester:
    def __init__(self, path, device, dataloader):
        self.device = device
        self.dataloader = dataloader
        self.load_model(path)
        self.evaluate()

    def load_model(self, path):
        self.model = torch.load(path)
        self.model.to(self.device)

    def classify(self):
        self.model.eval()
        predictions = []
        labels = []
        probabilities = []

        with torch.no_grad():
            for inputs, targets in tqdm(self.dataloader):
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                outputs = self.model(inputs)
                
                _, predicted = torch.max(outputs, 1)

                predictions.extend(predicted.cpu().numpy())
                labels.extend(targets.cpu().numpy())
                probabilities.extend(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())

        return predictions, labels, probabilities

    def calculate_prob_stats(self, probabilities):
        probabilities = np.array(probabilities)
        min_probs = np.min(probabilities)
        max_probs = np.max(probabilities)
        std_probs = np.std(probabilities)
        mean_probs = np.mean(probabilities)

        return min_probs, max_probs, std_probs, mean_probs

    def calculate_percentage(self, value):
        return f'{value*100:.2f}%'

    def evaluate(self):
        predictions, labels, probabilities = self.classify()
        cm = confusion_matrix(labels, predictions)
        accuracy = accuracy_score(labels, predictions)
        f1 = f1_score(labels, predictions, average='weighted')

        min_probs, max_probs, std_probs, mean_probs = self.calculate_prob_stats(probabilities)

        print('Evaluation Results:')
        print(f'Confusion Matrix:\n{cm}')
        print(f'Accuracy: {self.calculate_percentage(accuracy)}')
        print(f'F1 Score: {self.calculate_percentage(f1)}')
        print(f'Mean Probability: {self.calculate_percentage(mean_probs)}')
        print(f'Max Probability: {self.calculate_percentage(max_probs)}')
        print(f'Min Probability: {self.calculate_percentage(min_probs)}')
        print(f'Standard Deviation of Probabilities: {std_probs:.4f}')

In [11]:
%%time
base_path = 'skin/Valid'
folder_name = '일반'

processor = PathLabelProcessor(base_path=base_path, folder_name=folder_name)

data = processor.labeled_image_paths

Total cases: 13808
Number of symptomatic cases: 138, Number of asymptomatic cases: 13670
CPU times: user 844 ms, sys: 316 ms, total: 1.16 s
Wall time: 1.17 s


In [12]:
%%time
transform = {'test': transforms.Compose([transforms.Resize((232, 232)),
                                        transforms.ToTensor(),
                                        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])}
test_size = None
seed = 42
batch_size = 32
shuffle = False
num_workers = os.cpu_count()

dataloader = ImageDataset(data=data,
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

path = '20231215-141835_EfficientNet_개_안검염.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ModelTester(path=path, device=device, dataloader=dataloader.dataloader['test'])

Class Distribution for test:


  Class 0: 138 samples
  Class 1: 13670 samples


100%|██████████| 432/432 [00:41<00:00, 10.35it/s]

Evaluation Results:
Confusion Matrix:
[[  128    10]
 [ 1931 11739]]
Accuracy: 85.94%
F1 Score: 91.56%
Mean Probability: 50.00%
Max Probability: 99.98%
Min Probability: 0.02%
Standard Deviation of Probabilities: 0.3726
CPU times: user 46.1 s, sys: 7.23 s, total: 53.4 s
Wall time: 60 s





<__main__.ModelTester at 0x7fbb0e4a1d20>

In [13]:
%%time
dataloader = ImageDataset(data=[item for item in data if item[1] == 0],
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

ModelTester(path=path, device=device, dataloader=dataloader.dataloader['test'])

Class Distribution for test:
  Class 0: 138 samples
  Class 1: 0 samples


100%|██████████| 5/5 [00:01<00:00,  3.04it/s]

Evaluation Results:
Confusion Matrix:
[[128  10]
 [  0   0]]
Accuracy: 92.75%
F1 Score: 96.24%
Mean Probability: 50.00%
Max Probability: 91.68%
Min Probability: 8.32%
Standard Deviation of Probabilities: 0.2129
CPU times: user 1.37 s, sys: 1.41 s, total: 2.78 s
Wall time: 3.45 s





<__main__.ModelTester at 0x7fb9b04b03d0>

In [14]:
%%time
dataloader = ImageDataset(data=[item for item in data if item[1] == 1],
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

ModelTester(path=path, device=device, dataloader=dataloader.dataloader['test'])

Class Distribution for test:


  Class 0: 0 samples
  Class 1: 13670 samples


100%|██████████| 428/428 [00:43<00:00,  9.85it/s]


Evaluation Results:
Confusion Matrix:
[[    0     0]
 [ 1931 11739]]
Accuracy: 85.87%
F1 Score: 92.40%
Mean Probability: 50.00%
Max Probability: 99.98%
Min Probability: 0.02%
Standard Deviation of Probabilities: 0.3739
CPU times: user 47.4 s, sys: 7.16 s, total: 54.6 s
Wall time: 54.8 s


<__main__.ModelTester at 0x7fbab7225d80>

In [None]:
class PreModelTester:
    def __init__(self, path, device, dataloader):
        self.device = device
        self.dataloader = dataloader
        self.model = models.vgg16_bn(pretrained=True)
        self.load_model(path)
        self.evaluate()

    def load_model(self, path):
        self.model = models.vgg16_bn(pretrained=True)
        nr_filters = self.model.classifier[0].in_features
        self.model.classifier = nn.Linear(nr_filters, 1)
        state_dict = torch.load(path, map_location=torch.device("cpu"))
        model_dict = self.model.state_dict()
        state_dict = {k: v for k, v in state_dict.items() if k in model_dict}
        model_dict.update(state_dict)
        self.model.load_state_dict(model_dict)
        self.model = self.model.to(self.device)

    def classify(self):
        self.model.eval()
        predictions = []
        labels = []
        probabilities = []

        with torch.no_grad():
            for inputs, targets in tqdm(self.dataloader):
                inputs = inputs.to(self.device)
                targets = targets.to(self.device)
                outputs = self.model(inputs)
                
                probs = torch.nn.functional.softmax(outputs, dim=1)
                _, predicted = torch.max(outputs, 1)

                predictions.extend(predicted.cpu().numpy())
                labels.extend(targets.cpu().numpy())
                probabilities.extend(probs.max(dim=1).values.cpu().numpy())

        return predictions, labels, probabilities

    def calculate_prob_stats(self, probabilities):
        probabilities = np.array(probabilities)
        min_probs = np.min(probabilities)
        max_probs = np.max(probabilities)
        std_probs = np.std(probabilities)
        mean_probs = np.mean(probabilities)

        return min_probs, max_probs, std_probs, mean_probs

    def evaluate(self):
        predictions, labels, probabilities = self.classify()
        cm = confusion_matrix(labels, predictions)
        accuracy = accuracy_score(labels, predictions)
        f1 = f1_score(labels, predictions, average='weighted')

        min_probs, max_probs, std_probs, mean_probs = self.calculate_prob_stats(probabilities)

        print("Evaluation Results:")
        print(f"Confusion Matrix:\n{cm}")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"F1 Score: {f1:.4f}")
        print(f"Mean Probability: {mean_probs:.4f}")
        print(f"Max Probability: {max_probs:.4f}")
        print(f"Min Probability: {min_probs:.4f}")        
        print(f"Standard Deviation of Probabilities: {std_probs:.4f}")

In [None]:
%%time
transform = {'test': transforms.Compose([transforms.Resize((240, 240)),
                                        transforms.ToTensor(),
                                        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])}
test_size = None
seed = 42
batch_size = 32
shuffle = false
num_workers = os.cpu_count()

dataloader = ImageDataset(data=data,
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

path = '숙대모델'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ModelTester(path=path, device=device, dataloader=dataloader.dataloader['test'])

In [None]:
%%time
dataloader = ImageDataset(data=[item for item in data if item[1] == 0],
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

ModelTester(path=path, device=device, dataloader=dataloader.dataloader['test'])

In [None]:
%%time
dataloader = ImageDataset(data=[item for item in data if item[1] == 1],
                          transform=transform,
                          test_size=test_size,
                          seed=seed,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=num_workers)

ModelTester(path=path, device=device, dataloader=dataloader.dataloader['test'])