# CNN segmentation

In [1]:
import torch 
from torchsummary import summary
import glob  # Import library for finding all files matching a pattern
from PIL import Image  # Import library for image processing
import numpy as np  # Import library for numerical operations (not used here)
import os  # Import library for operating system functionalities
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import pandas as pd
from tqdm import tqdm
import scipy
import torch
from datetime import datetime, timedelta  # Import libraries for date and time manipulation
import torchvision
import torch.nn as nn
import torch.optim as optim  # Optimization algorithms for training the model
import torch.nn.functional as F  # Common loss functions and activation functions
from scipy.stats import spearmanr, pearsonr  # Statistical functions for correlation calculation
import itertools  # Utility functions for generating combinations
from torch.optim.lr_scheduler import CosineAnnealingLR  # Learning rate scheduler for training
import matplotlib.pyplot as plt  # Plotting library for visualization
import albumentations as A
from albumentations.pytorch import ToTensorV2


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Print the chosen device for training
print(device)


cpu


In [2]:
class ImageOpenerDataset(Dataset):
    def __init__(self, file_list, image_dir, gt_dir, transform=None):
        
        self.transform = transform
        self.image_paths = np.array([])
        self.mask_paths = np.array([])
        
        with open(file_list, "r") as f:
            self.image_paths = f.readlines()
        
        self.mask_paths = self.image_paths.copy()
        
        for i in range(len(self.image_paths)):
            self.image_paths[i] = os.path.join(image_dir, self.image_paths[i].strip()) + ".jpg"
            self.mask_paths[i] = os.path.join(gt_dir, self.mask_paths[i].strip()) + ".png"
        
        
        self.image_paths.sort()
        self.mask_paths.sort()
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = np.array(Image.open(self.image_paths[idx]))
        mask = np.array(Image.open(self.mask_paths[idx]))
        
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        
        return  image, mask

In [3]:
import cv2

path = '../datasets/VOC_exercise/'
target_size = (256, 256)

aug = A.Compose([
    A.Resize(height=256, width=256, interpolation=cv2.INTER_NEAREST),
    A.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225],
    ),
    ToTensorV2()
])

trainaug = A.Compose([
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.Resize(height=256, width=256, interpolation=cv2.INTER_NEAREST),
    A.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225],
    ),
    ToTensorV2()
])

trainset = ImageOpenerDataset(file_list=os.path.join(path, 'train.txt'),
                             image_dir=os.path.join(path, 'images'),
                             gt_dir=os.path.join(path, 'targets'),
                             transform=trainaug)
valset = ImageOpenerDataset(file_list=os.path.join(path, 'validation.txt'),
                                image_dir=os.path.join(path, 'images'),
                                gt_dir=os.path.join(path, 'targets'),
                                transform=aug)

testset = ImageOpenerDataset(file_list=os.path.join(path, 'test.txt'),
                                image_dir=os.path.join(path, 'images'),
                                gt_dir=os.path.join(path, 'targets'),
                                transform=aug)

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SegmentationNet(nn.Module):
    def __init__(self):
        super(SegmentationNet, self).__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, k,ernel_size=3, padding=1)
            nn.GELU(),
            nn.BatchNorm2d(64),
            Conv2d(64, 64, kernel_size=3, padding=1),
            nn.GELU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.GELU(),
            nn.BatchNorm2d(128),
            Conv2d(128, 128, kernel_size=3, padding=1),
            nn.GELU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        # Decoder
        self.upconv1 = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2),
            nn.GELU(),
            nn.BatchNorm2d(64),
            )
        self.upconv2 = nn.Sequential(
            nn.ConvTranspose2d(64, 1, kernel_size=2, stride=2),
            nn.Sigmoid(),
        )

    def forward(self, x):
        # Encoder
        x = self.conv1(x)
        x = self.conv2(x)

        # Decoder
        x = self.upconv1(x)
        x = self.upconv2(x)
        

        return x

# Example usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SegmentationNet().to(device)
# Load data and train the model
# ...

In [5]:
import torch

def calculate_iou(outputs, labels, num_classes):
    """
    Calculates the Intersection over Union (IoU) for each class.

    Args:
        outputs (torch.Tensor): Model outputs of shape (batch_size, num_classes, height, width).
        labels (torch.Tensor): Ground truth labels of shape (batch_size, height, width).
        num_classes (int): Number of classes.

    Returns:
        iou (torch.Tensor): IoU for each class of shape (num_classes,).
    """
    outputs = outputs.argmax(dim=1)  # Get the predicted class for each pixel
    iou = torch.zeros(num_classes)

    for cls in range(num_classes):
        true_positives = ((outputs == cls) & (labels == cls)).sum().float()
        false_positives = ((outputs == cls) & (labels != cls)).sum().float()
        false_negatives = ((outputs != cls) & (labels == cls)).sum().float()
        denominator = true_positives + false_positives + false_negatives

        if denominator == 0:
            iou[cls] = 0
        else:
            iou[cls] = true_positives / denominator

    return iou

def evaluate(net, dataloader, train, criterion, optimizer, scheduler):
    try:
        pbar.close()
    except:
        pass

    num_classes = 21

    running_loss = []
    running_iou = torch.zeros(num_classes)
    correct_pixels = 0
    total_pixels = 0

    pbar = tqdm(total=len(dataloader), desc=f"{'Train' if train else 'Validation'}", leave=True)
    for data in dataloader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        if train:
            outputs = net(inputs)
        else:
            with torch.no_grad():
                outputs = net(inputs)

        loss = criterion(outputs, labels)
        running_loss.append(loss.item())

        iou = calculate_iou(outputs, labels, num_classes)
        running_iou += iou

        correct_pixels += (outputs.argmax(dim=1) == labels).sum().item()
        total_pixels += torch.numel(labels)

        if train:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            scheduler.step()

        pbar.set_description(
            f"{'Train' if train else 'Validation'} Loss: {np.mean(running_loss):.6f}, Acc: {correct_pixels / total_pixels:.4f}, mIoU: {running_iou.mean():.4f}"
        )
        pbar.update(1)

    pbar.close()
    return correct_pixels / total_pixels, running_iou.mean()



def train(net, trainloader, valloader, epochs, criterion, optimizer, scheduler, continue_training=''):
    previous_epoch = 0  # Initialize previous_epoch to 0

    try:
        print(f"Found best model, calculating acc...")
        
        checkpoint = torch.load(os.path.join(path, 'net_best.pth'))
        best_model = checkpoint['model']
        best_criterion = checkpoint['loss']
        best_scheduler = checkpoint['scheduler']
        best_optimizer = checkpoint['optimizer']
        best_model.load_state_dict(checkpoint['model_state_dict'])
        best_criterion.load_state_dict(checkpoint['criterion_state_dict'])
        best_scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        best_optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        
        best_acc = evaluate(net=best_model, dataloader=valloader, train=False, criterion=best_criterion, optimizer=best_optimizer, scheduler=best_scheduler)
        
        print(f"SROCC best model: {best_acc:.3f}")
        del best_model, best_optimizer, best_criterion, best_scheduler, checkpoint, gt_labels, pr_labels
    except Exception as e:
        best_acc = -1
        print(e)
        print("No best model found, starting from scratch")

    if continue_training != '':
        try:
            checkpoint = torch.load(os.join(path, f'net_{continue_training}.pth'))
            net = checkpoint['model']
            criterion = checkpoint['loss']
            scheduler = checkpoint['scheduler']
            optimizer = checkpoint['optimizer']
            net.load_state_dict(checkpoint['model_state_dict'])
            criterion.load_state_dict(checkpoint['criterion_state_dict'])
            scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            previous_epoch = checkpoint['epoch']  # Update previous_epoch
            epochs += previous_epoch  # Update total number of epochs to train
            
            del checkpoint
            print(f"Continuing training of {continue_training} model, checkpoint at epoch {previous_epoch}")
        except Exception as e:
            print(e)
            print(f"No {continue_training} checkpoint found, starting from scratch")

    for epoch in range(previous_epoch, epochs):  # Loop over the dataset for multiple epochs
        print(f"Epoch {epoch}/{epochs}: ")
        
        net.train()  # Set model to training mode
        evaluate(net=net,
                dataloader=trainloader,
                train=True,
                criterion=criterion,
                optimizer=optimizer,
                scheduler=scheduler)
        
        net.eval()  # Set model to evaluation mode
        acc = evaluate(net = net,
                dataloader = valloader,
                train=False,
                criterion=criterion,
                optimizer=optimizer,
                scheduler=scheduler)

        if acc > best_acc:
            best_acc = acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': net.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'criterion_state_dict': criterion.state_dict(),
                'model' : net,
                'loss': criterion,
                'optimizer': optimizer,
                'scheduler': scheduler,
                }, '../datasets/imdb/net_best.pth')
            print(f"New best model saved with SROCC: {sp:.3f}")
            
        torch.save({
            'epoch': epoch,
            'model_state_dict': net.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'criterion_state_dict': criterion.state_dict(),
            'model' : net,
            'loss': criterion,
            'optimizer': optimizer,
            'scheduler': scheduler,
            }, f'../datasets/imdb/net_last.pth')

In [6]:
bs = 64
model = SegmentationNet().to(device)

trainloader = DataLoader(trainset, batch_size=bs, shuffle=True)
valloader = DataLoader(valset, batch_size=bs, shuffle=False)
testloader = DataLoader(testset, batch_size=bs, shuffle=False)

In [7]:

train(model, trainloader, valloader, 10, nn.CrossEntropyLoss(), optim.Adam(model.parameters(), lr=0.001), CosineAnnealingLR(optim.Adam(model.parameters(), lr=0.001), T_max=10), continue_training='best') # Train the model for 10 epochs

Found best model, calculating acc...
[Errno 2] No such file or directory: '../datasets/VOC_exercise/net_best.pth'
No best model found, starting from scratch
Epoch 0/10: 


Train:   0%|          | 0/23 [00:00<?, ?it/s]

RuntimeError: Given transposed=1, weight of size [64, 32, 2, 2], expected input[64, 128, 256, 256] to have 64 channels, but got 128 channels instead