In [1]:
## Hyperparamter Tuning for Multi Resolution Solar Photovoltaik System Segmentation Paper

In [2]:
import os
from PIL import Image
import numpy as np

import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset, DataLoader

from torchvision.utils import make_grid
from torchvision import transforms
from torchvision.transforms import functional as F, ToTensor, InterpolationMode
import torchvision.models as models
from torchvision.models.segmentation import deeplabv3_resnet101, DeepLabV3_ResNet101_Weights 

import torch.nn as nn
import torch.optim as optim

class SegmentationDataset(Dataset):
    def __init__(self, root_dirs, resize=256, crop_size=256):
        self.root_dirs = root_dirs
        self.image_dirs = [os.path.join(root_dir, 'image') for root_dir in root_dirs]
        self.mask_dirs = [os.path.join(root_dir, 'mask') for root_dir in root_dirs]
        self.image_filenames = self.collect_image_filenames()
        self.transforms = transforms.Compose([
            transforms.CenterCrop(crop_size),
            transforms.Resize([resize], interpolation=InterpolationMode.BILINEAR),
            transforms.ToTensor()
        ])

    def collect_image_filenames(self):
        image_filenames = []
        for image_dir in self.image_dirs:
            image_filenames += os.listdir(image_dir)
        return image_filenames

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        image_name = self.image_filenames[idx]
        image_path = self.find_image_path(image_name)
        mask_path = self.find_mask_path(image_name)

        image = self.load_image(image_path)
        mask = self.load_mask(mask_path)

        return image, mask

    def find_image_path(self, image_name):
        for image_dir in self.image_dirs:
            image_path = os.path.join(image_dir, image_name)
            if os.path.exists(image_path):
                return image_path
        raise FileNotFoundError(f"Image file not found: {image_name}")

    def find_mask_path(self, image_name):
        for mask_dir in self.mask_dirs:
            mask_path = os.path.join(mask_dir, image_name)
            if os.path.exists(mask_path):
                return mask_path
        raise FileNotFoundError(f"Mask file not found: {image_name}")

    def load_image(self, path):
        image = Image.open(path).convert('RGB')
        image = self.transforms(image)
        return image

    def load_mask(self, path):
        mask = Image.open(path).convert('L')
        mask = self.transforms(mask)
        mask = torch.squeeze(mask, dim=0)  # Squeeze the mask
        return mask


In [3]:
use_cuda = True

if use_cuda and not torch.cuda.is_available():
    print("Error: cuda requested but not available, will use cpu instead!")
    device = torch.device('cpu')
elif not use_cuda:
    print("Info: will use cpu!")
    device = torch.device('cpu')
else:
    print(f"Info: Devices: {torch.cuda.device_count()} {torch.cuda.get_device_name(0)} GPU available, will use gpu!")
    device = torch.device('cuda')
    
print(f"Number of CPU cores: {os.cpu_count()}")

Info: Devices: 4 A100-SXM4-40GB GPU available, will use gpu!
Number of CPU cores: 128


In [4]:
from torch.utils.tensorboard import SummaryWriter

import torch.nn as nn # nn.CrossEntropyLoss, nn.MSELoss, nn.BCEWithLogitsLoss, nn.HuberLoss
import torchvision.ops as ops # ops.complete_box_iou_loss, ops.distance_box_iou, ops.generalized_box_iou
import torch.optim as optim # optim.Adagrad, Adam, RMSprop, SGD

In [5]:
from torchmetrics.classification import BinaryAccuracy, BinaryRecall, BinaryPrecision, BinaryF1Score, BinaryJaccardIndex
calculate_accuracy = BinaryAccuracy()
calculate_precision = BinaryPrecision()
calculate_recall = BinaryRecall()
calculate_f1_score = BinaryF1Score()
calculate_iou = BinaryJaccardIndex()

In [6]:
# fixed paramters
batch_size = 16
num_epochs = 100

# search space parameters
search_space = {'learning_rate':[0.001, 0.0001, 0.00001],
                'loss_function':[nn.HuberLoss, nn.BCEWithLogitsLoss, nn.MSELoss, nn.CrossEntropyLoss],
                'optimiser':[optim.Adagrad, optim.Adam, optim.RMSprop, optim.SGD],
               # 'batch_size':[2, 4, 8, 12, 16, 18, 24, 32, 48, 64], 
               # 'stride':[2, 3, 4, 5]
                }

In [7]:
import itertools
import csv

def train_with_hyperparameters(model, dataset, dataset_eval, num_epochs, batch_size, learning_rate, save_path, log_dir, dataset_paths, loss_function, optimizer):
    # Instantiate the DeepLabV3_ResNet101 model with pretrained weights
   
    # Define the loss function and optimizer
    criterion = loss_function()
    optimizer = optimizer(model.parameters(), lr=learning_rate)

    # Create the data loader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Create the data loader for evaluation
    eval_dataloader = DataLoader(dataset_eval, batch_size=100, shuffle=False)

    # Training loop
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch + 1, num_epochs))
        print('-' * 10)

        running_loss = 0.0
        model.train()

        for images, masks in dataloader:
            # Move images and masks to the device
            images = images.to(device)
            masks = masks.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)['out']
            outputs = torch.squeeze(outputs)  # Remove the extra dimensions

            loss = criterion(outputs, masks)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)

        epoch_loss = running_loss / len(dataset)
        print('Loss: {:.4f}'.format(epoch_loss))

        best_iou = 0.0  # Variable to track the highest IoU
        best_accuracy = 0.0
        best_precision = 0.0
        best_recall = 0.0
        best_f1_score = 0.0
        best_epoch = 0
    
        # Evaluate the model and plot the images, masks, and predicted masks every 10th epoch
        if (epoch + 1) % 10 == 0:
            model.eval()
            with torch.no_grad():
                accuracy_list = []
                precision_list = []
                recall_list = []
                f1_score_list = []
                iou_list = []
                best_epoch = []
                
                for images, masks in eval_dataloader:
                    images = images.to(device)
                    masks = masks.to(device)

                    outputs = model(images)['out']
                    predicted_masks = torch.sigmoid(outputs) > 0.5

                    images = images.cpu()
                    masks = masks.cpu()
                    predicted_masks = predicted_masks.cpu()

                    threshold = 0.5
                    mask = (masks >= threshold).int()
                    predicted_mask = (predicted_masks >= threshold).int()

                    accuracy = calculate_accuracy(predicted_mask.squeeze(), mask)
                    precision = calculate_precision(predicted_mask.squeeze(), mask)
                    recall = calculate_recall(predicted_mask.squeeze(), mask)
                    f1_score = calculate_f1_score(predicted_mask.squeeze(), mask)
                    iou = calculate_iou(predicted_mask.squeeze(), mask)

                    accuracy_list.append(accuracy)
                    precision_list.append(precision)
                    recall_list.append(recall)
                    f1_score_list.append(f1_score)
                    iou_list.append(iou)

                # Calculate average metrics
                avg_accuracy = sum(accuracy_list) / len(accuracy_list)
                avg_precision = sum(precision_list) / len(precision_list)
                avg_recall = sum(recall_list) / len(recall_list)
                avg_f1_score = sum(f1_score_list) / len(f1_score_list)
                avg_iou = sum(iou_list) / len(iou_list)

                # Save the trained model
                if avg_iou >= best_iou:
                    best_iou = avg_iou
                    best_accuracy = avg_accuracy
                    best_precision = avg_precision
                    best_recall = avg_recall
                    best_f1_score = avg_f1_score 
                    best_epoch = epoch + 1
                
                else:
                    break
                    
    print('Accuracy: {:.4f}'.format(best_accuracy))
    print('Precision: {:.4f}'.format(best_precision))
    print('Recall: {:.4f}'.format(best_recall))
    print('F1 Score: {:.4f}'.format(best_f1_score))
    print('IoU: {:.4f}'.format(best_iou))
    
    return best_accuracy.item(), best_precision.item(), best_recall.item(), best_f1_score.item(), best_iou.item(), best_epoch

def train_segmentation_network(model, dataset, dataset_eval, num_epochs, batch_size, save_path, log_dir, dataset_paths):
    # Generate all combinations of hyperparameters
    hyperparameter_combinations = list(itertools.product(search_space['learning_rate'], search_space['loss_function'], search_space['optimiser']))

    # Create a CSV file to store the results
    csv_file = open(save_path + 'hyper_training_results_add.csv', 'w', newline='')
    writer = csv.writer(csv_file)
    writer.writerow(['Loss Function', 'Optimiser', 'Learning Rate', 'Accuracy', 'Precision', 'Recall', 'F1 Score', 'IoU', 'best_epoch'])

    # Iterate over the hyperparameter combinations
    for hyperparameters in hyperparameter_combinations:
        learning_rate, loss_function, optimiser = hyperparameters

        # Print the current hyperparameters
        print('Training with hyperparameters:')
        print('Learning Rate:', learning_rate)
        print('Loss Function:', loss_function.__name__)
        print('Optimiser:', optimiser.__name__)
        print('-' * 10)

        # Load the model for each combination of hyperparameters
        model = load_model()

        # Train the model with the current hyperparameters
        accuracy, precision, recall, f1_score, iou, best_epoch = train_with_hyperparameters(model, dataset, dataset_eval, num_epochs, batch_size, learning_rate, save_path, log_dir, dataset_paths, loss_function, optimiser)

        # Write the results to the CSV file
        writer.writerow([loss_function.__name__, optimiser.__name__, learning_rate, accuracy, precision, recall, f1_score, iou, best_epoch])

    # Close the CSV file
    csv_file.close()        

In [8]:
# load and prepare model
def load_model():
    model = models.segmentation.deeplabv3_resnet101(weights=DeepLabV3_ResNet101_Weights.COCO_WITH_VOC_LABELS_V1)
    model.backbone.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
    num_classes = 1  # Assuming binary segmentation (1 class)
    model.classifier = models.segmentation.deeplabv3.DeepLabHead(2048, num_classes)

        # Set up the device (CPU or GPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    return(model)

In [9]:
dataset_path1 = '/share/data1/mkleebauer/pv_segmentation/PV01_train'
val_path1 = '/share/data1/mkleebauer/pv_segmentation/PV01_val'

#dataset_path1 = '/share/data1/mkleebauer/pv_segmentation/mini_train'
#val_path1 = '/share/data1/mkleebauer/pv_segmentation/mini_eval'

dataset_path = '/share/data1/mkleebauer/pv_segmentation/'
save_path = dataset_path 
log_dir = dataset_path

In [10]:
dataset_paths = [dataset_path1]
#dataset_paths = [dataset_path1, dataset_path2, dataset_path3]
dataset = SegmentationDataset(dataset_paths)

dataset_path_eval = [val_path1]
dataset_eval = SegmentationDataset(dataset_path_eval)

model = load_model()

In [None]:
train_segmentation_network(model, dataset, dataset_eval, num_epochs, batch_size, save_path, log_dir, dataset_paths)