In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from PIL import Image
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
import torch.nn.functional as F

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
class CustomImageDataset(torchvision.datasets.VisionDataset):
    def __init__(self, root, transform=None, target_transform=None):
        super().__init__(root, transform=transform, target_transform=target_transform)
        # Assuming that the images are organized in subfolders representing classes.
        self.classes, self.class_to_idx = self._find_classes(root)
        self.samples = self.make_dataset(root, self.class_to_idx)
        self.targets = [s[1] for s in self.samples]

    def _find_classes(self, dir):
        """
        Finds the class folders in a dataset.
        """
        if not os.path.isdir(dir):
            raise FileNotFoundError(f"Couldn't find directory: {dir}")
        classes = sorted(entry.name for entry in os.scandir(dir) if entry.is_dir())
        class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
        classes = classes[0:]
        return classes, class_to_idx

    def make_dataset(self, directory, class_to_idx):
        instances = []
        for target_class in sorted(class_to_idx.keys()):
            class_index = class_to_idx[target_class]
            target_dir = os.path.join(directory, target_class)
            if not os.path.isdir(target_dir):
                continue
            for root, _, fnames in sorted(os.walk(target_dir)):
                for fname in fnames:
                    if fname.lower().endswith(('jpg', 'jpeg', 'png', 'bmp', 'tiff', 'tif')):
                        path = os.path.join(root, fname)
                        instances.append((path, class_index))
        return instances

    def __getitem__(self, index):
        """
        Args:
            index (int): Index

        Returns:
            tuple: (sample, target) where target is class_index of the target class.
        """
        path, target = self.samples[index]
        
        sample = self.loader(path)
            

        if self.transform is not None:
            sample = self.transform(sample)
            mask = np.ones_like(sample)
        if self.target_transform is not None:
            target = self.target_transform(target)
        if self.target_transform is None:
            target = np.eye(len(self.classes))[int(target)]
        return sample, mask, target


    def __len__(self):
        return len(self.samples)


    def loader(self, path):
        try:
            # Open the image file, avoiding automatic resource management to control the closing process
            with open(path, 'rb') as f:
                image = Image.open(path)
                if image.mode in ['I', 'F', 'I;16', 'I;16L', 'I;16B', 'I;16N']:
                    image_array = np.array(image, dtype=np.int32)
                    max_val = image_array.max()
                    if max_val > 0:image = Image.fromarray((image_array / max_val * 255).astype(np.uint8))
                if image.mode != 'RGB': image = image.convert('RGB')

            return image
        except Exception:
            print(f"Failed to load image {path}. Skipping.")
            return None


class ClassificationVisualizeDataset(torch.utils.data.Dataset):

    def __init__(self, data_dir, transforms):
        self.data_dir   = data_dir
        self.transforms = transforms

        # This one-liner basically generates a sorted list of full paths to each image in the test directory
        self.img_paths  = list(map(lambda fname: os.path.join(self.data_dir, fname), sorted(os.listdir(self.data_dir))))

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        image = Image.open(self.img_paths[idx])
        if image.mode in ['I', 'F', 'I;16', 'I;16L', 'I;16B', 'I;16N']:
            image_array = np.array(image, dtype=np.int32)
            max_val = image_array.max()
            if max_val > 0:image = Image.fromarray((image_array / max_val * 255).astype(np.uint8))
        if image.mode != 'RGB': image = image.convert('RGB')
        mask = np.ones_like(self.transforms(image))
        return self.transforms(image), mask
        # return self.transforms(image)

mean=[0.4195, 0.3118, 0.1418]
std=[0.2289, 0.2239, 0.2249]
train_transforms = torchvision.transforms.Compose([
    torchvision.transforms.RandomHorizontalFlip(),
    torchvision.transforms.RandomAffine(degrees=[-15, 15], scale=(1/1.2, 1.2)),  # Random stretching
    # torchvision.transforms.GaussianBlur(kernel_size=(7,7),sigma=(0.1,0.2)),
    torchvision.transforms.Resize((224, 224)), 
    torchvision.transforms.ToTensor(), 
    torchvision.transforms.Normalize(mean,std)
])
valid_transforms = torchvision.transforms.Compose([
    torchvision.transforms.Resize((224, 224)),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean,std)
])

TRAIN_DIR = '/ocean/projects/cis230079p/shared/CapStone-VeyTel-2024/datasets/data-clean/train'
VAL_DIR = "/ocean/projects/cis230079p/shared/CapStone-VeyTel-2024/datasets/data-clean/dev"
TEST_DIR = "/ocean/projects/cis230079p/shared/CapStone-VeyTel-2024/datasets/data-clean/test"

train_dataset   = CustomImageDataset(TRAIN_DIR, transform= train_transforms)
valid_dataset   = CustomImageDataset(VAL_DIR, transform= valid_transforms)
test_dataset    = ClassificationVisualizeDataset(TEST_DIR, transforms =valid_transforms)

train_loader = torch.utils.data.DataLoader(
    dataset     = train_dataset,
    batch_size  = 32,
    shuffle     = True,
    num_workers = 10,
    pin_memory = True
)

valid_loader = torch.utils.data.DataLoader(
    dataset     = valid_dataset,
    batch_size  = 32,
    shuffle     = False,
    num_workers = 10,
    pin_memory = True
)

test_loader = torch.utils.data.DataLoader(
    dataset     = test_dataset,
    batch_size  = 32,
    shuffle     = False,
    num_workers = 5,
    pin_memory  = True
)



In [3]:
print("Number of classes    : ", len(train_dataset.classes))
print("No. of train images  : ", train_dataset.__len__())
print("Shape of image       : ", train_dataset[0][0].shape)
print("Batch size           : ", train_loader.batch_size)
print("Train batches        : ", train_loader.__len__())

Number of classes    :  3
No. of train images  :  19588
Shape of image       :  torch.Size([3, 224, 224])
Batch size           :  32
Train batches        :  613


In [4]:
print("Number of classes    : ", len(valid_dataset.classes))
print("No. of valid images  : ", valid_dataset.__len__() )
print("Shape of image       : ", valid_dataset[0][0].shape)
print("Batch size           : ", valid_loader.batch_size)
print("Valid batches        : ", valid_loader.__len__())

Number of classes    :  3
No. of valid images  :  1224
Shape of image       :  torch.Size([3, 224, 224])
Batch size           :  32
Valid batches        :  39


In [5]:
print("No. of test images  : ", test_dataset.__len__() )
print("Shape of image      : ", test_dataset[0][0].shape)
print("Batch size          : ", test_loader.batch_size)
print("Test batches        : ", test_loader.__len__())

No. of test images  :  3674
Shape of image      :  torch.Size([3, 224, 224])
Batch size          :  32
Test batches        :  115


In [6]:
import gc
torch.cuda.empty_cache()
gc.collect()

0

In [7]:
!git clone https://github.com/PedroRASB/ISNet.git
import sys
sys.path.append('./ISNet')
from ISNetLightning import IsDense121Lightning
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

pretrained_model = torch.load('COVIDIsNet.pt', map_location=DEVICE)
old_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
model = IsDense121Lightning(multiLabel=False, multiMask=False, classes=3, pretrained=True, LR=5e-6)
pretrained_model.eval()
model.load_state_dict(pretrained_model.state_dict(), strict=False)

model.to(DEVICE)
sys.stdout.close()
sys.stdout = old_stdout

fatal: destination path 'ISNet' already exists and is not an empty directory.


In [9]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping

logger = TensorBoardLogger("isnet_logs/", name="log")

trainer = Trainer(
    logger=logger,
    max_epochs=20,
    accelerator='gpu',
    devices=1,
    precision=16 if torch.cuda.is_available() else 32,
    callbacks=[
        ModelCheckpoint(dirpath='isnet_checkpoints', monitor='val_loss', save_top_k=3, mode='min'),
        EarlyStopping(monitor='val_loss', patience=3, mode='min')
    ]
)

trainer.fit(model, train_loader, valid_loader)

Using 16bit None Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type               | Params
-------------------------------------------------
0 | DenseNet  | DenseNet           | 7.0 M 
1 | LRPBlock  | _LRPDenseNet       | 6.9 M 
2 | criterion | CrossEntropyLoss   | 0     
3 | acc       | MulticlassAccuracy | 0     
-------------------------------------------------
7.0 M     Trainable params
0         Non-trainable params
7.0 M     Total params
13.914    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

  image_array = np.array(image, dtype=np.int32)
  image_array = np.array(image, dtype=np.int32)


Failed to load image /ocean/projects/cis230079p/shared/CapStone-VeyTel-2024/datasets/data-clean/dev/2/18326_bimcv_neg.png. Skipping.Failed to load image /ocean/projects/cis230079p/shared/CapStone-VeyTel-2024/datasets/data-clean/dev/1/23843_bimcv_pos.png. Skipping.

