In [3]:
#imports
import itertools
import os
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torchvision.models import segmentation

In [None]:
#Code for custon input dataset
import os
from PIL import Image
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = sorted(os.listdir(image_dir))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        state, image_index, subimage_index = img_name.split('_')[:3]

        # Construct image and mask file paths
        img_path = os.path.join(self.image_dir, img_name)
        mask_name = f"{state}_raster_mask_{image_index}.tiff"
        mask_path = os.path.join(self.mask_dir, mask_name)

        # Load image and mask
        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")

        if self.transform is not None:
            image = self.transform(image)
            mask = self.transform(mask)

        return image, mask

In [None]:
# Modified DeepLabV3+ with atrous convolution
class ModifiedDeepLabV3Plus(nn.Module):
    def __init__(self, num_classes, in_channels, num_levels, dilation):
        super(ModifiedDeepLabV3Plus, self).__init__()
        self.pyramid_pooling = PyramidPooling(in_channels, num_levels=num_levels)
        self.model = segmentation.deeplabv3plus_xception(pretrained=False, num_classes=num_classes, aux_loss=None)
        
        #Adds atrous convolution to convolutional layer in xception backbone
        for name, module in self.model.named_modules():
            if isinstance(module, nn.Conv2d):
                module.dilation = dilation
                
    def forward(self, x):
        return self.model(x)['out']


In [None]:
class PyramidPooling(nn.Module):
    def __init__(self, in_channels, num_levels, kernel_size):
        super(PyramidPooling, self).__init__()
        
        self.num_levels = num_levels
        
        #number of filters
        out_channels = in_channels // num_levels
        
        #pooling layers
        self.pooling_layers = nn.ModuleList()
        for i in range(num_levels):
            self.pooling_layers.append(nn.AdaptiveAvgPool2d((i+1, i+1)))
        
        #convolutional layers
        self.conv_layers = nn.ModuleList()
        for i in range(num_levels):
            self.conv_layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size))
     
    #Forward Step   
    def forward(self, x):
        features = [x]
        
        #Pooling and convolution operations per level
        for i in range(self.num_levels):
            pooled = self.pooling_layers[i](x)
            conv_out = self.conv_layers[i](pooled)
            upsampled = F.interpolate(conv_out, size=x.size()[2:], mode='bilinear', align_corners=False)
            features.append(upsampled)
        
        output = torch.cat(features, dim=1)
        
        return output



In [None]:
class DeepLabV3PlusModel(pl.LightningModule):
    def __init__(self, num_classes, in_channels, learning_rate, lr_step_size, lr_gamma):
        super(DeepLabV3PlusModel, self).__init__()

        self.model = ModifiedDeepLabV3Plus(num_classes, in_channels)
        self.criterion = nn.CrossEntropyLoss()

        self.learning_rate = learning_rate
        self.lr_step_size = lr_step_size
        self.lr_gamma = lr_gamma
    
    #Forwatd Step   
    def forward(self, x):
        return self.model(x)
    
    #Model Training
    def training_step(self, batch):
        images, masks = batch
        outputs = self(images)
        
        loss_ce = self.criterion(outputs, masks)
        loss_fwio = self.calculate_fwio_loss(outputs, masks)
        loss = loss_ce + loss_fwio
        
        self.log('train_loss_ce', loss_ce)
        self.log('train_loss_fwio', loss_fwio)
        self.log('train_loss_total', loss)
        
        return loss
    
    #Model Validation
    def validation_step(self, batch):
        images, masks = batch
        outputs = self(images)
        
        loss_ce = self.criterion(outputs, masks)
        loss_fwio = self.calculate_fwio_loss(outputs, masks)
        loss = loss_ce + loss_fwio
        iou = self.calculate_iou(outputs, masks)

        self.log('val_loss_ce', loss_ce)
        self.log('val_loss_fwio', loss_fwio)
        self.log('val_loss_total', loss)
        self.log('val_iou', iou)
        
        
        
    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = StepLR(optimizer, step_size=self.lr_step_size, gamma=self.lr_gamma)
        return [optimizer], [scheduler]
    
    def calculate_fwio_loss(self, outputs, masks):
        #calculates loss
        num_classes = outputs.size(1)
        fwio_loss = 0.0
        
        for cls in range(num_classes):
            pred = outputs[:, cls, :, :]
            gt = (masks == cls).float()
            
            intersection = torch.sum(pred * gt)
            union = torch.sum(pred) + torch.sum(gt) - intersection
            
            fwio_loss += (intersection / union)
        
        fwio_loss /= num_classes
        return 1.0 - fwio_loss
    
    def calculate_iou(self, outputs, masks):
        #Calculates intersection over union
        preds = torch.argmax(outputs, dim=1)
        intersection = torch.sum(preds & masks)
        union = torch.sum(preds | masks)
        iou = intersection.float() / union.float()
        return iou


In [None]:

image_dir = '/Users/andrew/Documents/Spring 2023/Intro to Deep Learning/trail_data/image_dir'
mask_dir = '/Users/andrew/Documents/Spring 2023/Intro to Deep Learning/trail_data/mask_dir'

#Transforms
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

#Instance of Dataset custom class
dataset = CustomDataset(image_dir, mask_dir, transform=transform)

#Dataset Splitting 
train_ratio = 0.8  # Ratio for training data
train_data, val_data = train_test_split(dataset, train_size=train_ratio, test_size=1 - train_ratio)

#DataLoaders for training and validation
train_dataloader = DataLoader(train_data, batch_size=4, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=4, shuffle=True)

#Grid Search
# Hyperparameter Ranges
learning_rate_range = [1e-5, 1e-4, 1e-3]
batch_size_range = [16, 32, 64, 128, 256]
epochs_range = [50, 100, 150, 200]
network_depth_range = [16, 34, 50, 101, 152]
num_levels_range = [2, 3, 4, 5]
dilation_range = [(1, 1), (2, 2), (4, 4)]
lr_step_size_range = [10, 20, 30]
lr_gamma_range = [0.1, 0.5, 0.9]
kernel_size_range = [2, 3, 4, 5]

#validation metrics and models for each hyperparameter
validation_metric_dict = {
    'learning_rate': ['mean_intersection_over_union', 'loss'],
    'batch_size': ['mean_intersection_over_union', 'loss'],
    'epochs': ['mean_intersection_over_union', 'loss'],
    'network_depth': ['mean_intersection_over_union', 'loss'],
    'num_levels': ['mean_intersection_over_union', 'loss'],
    'dilation': ['mean_intersection_over_union', 'loss'],
    'lr_stepsize': ['mean_intersection_over_union', 'loss'], 
    'lr_gamma': ['mean_intersection_over_union', 'loss'],  
    'kernel_size': ['mean_intersection_over_union', 'loss'] 
}

model_dict = {
    'learning_rate': ['DeepLabV3+'],
    'batch_size': ['DeepLabV3+'],
    'epochs': ['DeepLabV3+'],
    'network_depth': ['DeepLabV3+'],
    'num_levels': ['DeepLabV3+'],
    'dilation': ['DeepLabV3+'],
    'lr_stepsize': ['DeepLabV3+'], 
    'lr_gamma': ['DeepLabV3+'], 
    'kernel_size': ['DeepLabV3+']  
}

best_val_loss = float('inf')
best_val_iou = float('inf')
best_hyperparameters = {}

for learning_rate, batch_size, epochs, network_depth, num_levels, dilation, lr_step_size, lr_gamma, kernel_size in itertools.product(
    learning_rate_range, batch_size_range, epochs_range, network_depth_range,
    num_levels_range, dilation_range, lr_step_size_range, lr_gamma_range, kernel_size_range
):
    #DeepLabV3PlusModel
    num_classes = 2  # Trail or not trail
    in_channels = 1  # Because each channel of the normalized image was saved each image only has one channel
    
    model = DeepLabV3PlusModel(num_classes, in_channels)

    # Assign number of levels and dilation rate
    model.model.pyramid_pooling.num_levels = num_levels
    for name, module in model.model.model.named_modules():
        if isinstance(module, nn.Conv2d):
            module.dilation = dilation
    
    #optimizer and scheduler
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = StepLR(optimizer, step_size=lr_step_size, gamma=lr_gamma)
    
    #Training
    for epoch in range(epochs):
        model.train()
        for batch_images, batch_masks in train_dataloader:
            optimizer.zero_grad()
            outputs = model(batch_images)
            loss_ce = model.criterion(outputs, batch_masks)
            loss_fwio = model.calculate_fwio_loss(outputs, batch_masks)
            loss = loss_ce + loss_fwio
            loss.backward()
            optimizer.step()
    
        #Validation
        model.eval()
        val_loss = 0.0
        intersection = 0
        union = 0
        with torch.no_grad():
            for batch_images, batch_masks in val_dataloader:
                outputs = model(batch_images)
                loss_ce = model.criterion(outputs, batch_masks)
                loss_fwio = model.calculate_fwio_loss(outputs, batch_masks)
                loss = loss_ce + loss_fwio
                val_loss += loss.item()

                #Calculate intersection and union
                predicted_masks = torch.argmax(outputs, dim=1)
                intersection += torch.sum(predicted_masks * batch_masks)
                union += torch.sum(predicted_masks) + torch.sum(batch_masks) - intersection

        #Loss
        val_loss /= len(val_dataloader)

        #Intersection over Union
        iou = intersection / union

        # Log losses and IoU
        print(f"Epoch {epoch + 1} - Train Loss: {loss.item()} - Val Loss: {val_loss} - IoU: {iou}")

# Check if the current hyperparameters result in the best validation loss
if val_loss < best_val_loss:
    best_val_loss = val_loss
    best_hyperparameters_loss = {
        'learning_rate': learning_rate,
        'batch_size': batch_size,
        'epochs': epochs,
        'network_depth': network_depth,
        'num_levels': num_levels,
        'dilation': dilation,
        'lr_step_size': lr_step_size,
        'lr_gamma': lr_gamma,
        'kernel_size': kernel_size
    }

# Check if the current hyperparameters result in the best IoU
if iou > best_val_iou:
    best_val_iou = iou
    best_hyperparameters_iou = {
        'learning_rate': learning_rate,
        'batch_size': batch_size,
        'epochs': epochs,
        'network_depth': network_depth,
        'num_levels': num_levels,
        'dilation': dilation,
        'lr_step_size': lr_step_size,
        'lr_gamma': lr_gamma,
        'kernel_size': kernel_size
    }


# Print the best hyperparameters and validation loss based on loss
print("Best Hyperparameters (based on Loss):")
print(best_hyperparameters_loss)
print("Best Validation Loss:")
print(best_val_loss)

# Print the best hyperparameters and validation loss based on IoU
print("Best Hyperparameters (based on IoU):")
print(best_hyperparameters_iou)
print("Best Intersection over Union (IoU):")
print(best_val_iou)

