<a href="https://colab.research.google.com/github/JensH-2157843/AML_Project/blob/main/src/neural_networks/NN1(segmentation).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Library imports

In [1]:
!pip install segmentation-models==1.0.1 albumentations==1.3.1 --quiet
import os
import numpy as np
from PIL import Image
from glob import glob
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch
import torchvision.transforms as T
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import tensorflow as tf
import matplotlib.pyplot as plt
from transformers import SegformerFeatureExtractor

import time
import copy
from torchvision import transforms

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/125.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.7/125.7 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/50.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.7/50.7 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h

# Dataset import

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
## DATASET IMPORT ##
deepglobe_dir = "/content/drive/MyDrive/train"
import os

deepglobe_images = sorted(glob(os.path.join(deepglobe_dir, '*_sat.jpg')))
deepglobe_masks = sorted(glob(os.path.join(deepglobe_dir, '*_mask.png')))

for tile in sorted(os.listdir(deepglobe_dir)):
    tile_path = os.path.join(deepglobe_dir, tile)
    if not os.path.isdir(tile_path):
        continue
    img_folder = os.path.join(tile_path, "images")
    mask_folder = os.path.join(tile_path, "masks")
    deepglobe_images.extend(sorted(glob(os.path.join(img_folder, '*.jpg'))))
    deepglobe_masks.extend(sorted(glob(os.path.join(mask_folder, '*.png'))))

all_images = deepglobe_images
all_masks = deepglobe_masks

train_imgs, val_imgs, train_masks, val_masks = train_test_split(
    all_images, all_masks, test_size=0.2, random_state=42
)

In [7]:
IMG_SIZE = (256, 256)

def rgb_to_binary_mask(mask_image, suitable_rgbs):
    mask = np.array(mask_image)
    binary_mask = np.zeros((mask.shape[0], mask.shape[1]), dtype=np.int64)
    for rgb in suitable_rgbs:
        matches = np.all(mask == rgb, axis=-1)
        binary_mask[matches] = 1
    return binary_mask

image_transforms = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

mask_transforms = transforms.Compose([
    transforms.Resize(IMG_SIZE, interpolation=transforms.InterpolationMode.NEAREST)
])

In [4]:
# --- Define Your PyTorch Dataset (This IS your combined loader + preprocessor) ---
class SolarPanelDataset(Dataset):
    def __init__(self, img_paths, mask_paths, suitable_rgbs, img_transform=None, mask_transform=None):
        self.img_paths = img_paths
        self.mask_paths = mask_paths
        self.suitable_rgbs = suitable_rgbs
        self.img_transform = img_transform
        self.mask_transform = mask_transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        # --- Start of Preprocessing Logic (PyTorch version) ---
        img_path = self.img_paths[idx]
        mask_path = self.mask_paths[idx]
        image = Image.open(img_path).convert("RGB")
        mask_rgb = Image.open(mask_path).convert("RGB")
        if self.img_transform:
            image = self.img_transform(image) # Applies resize, ToTensor, Normalize
        if self.mask_transform:
            mask_rgb = self.mask_transform(mask_rgb) # Applies resize (NEAREST)
        mask_binary = rgb_to_binary_mask(mask_rgb, self.suitable_rgbs) # Converts mask
        mask = torch.from_numpy(mask_binary) # To PyTorch Tensor
        # --- End of Preprocessing Logic ---
        return image, mask

In [5]:
# RGB values for classes we consider 'Suitable' (Class 1)
SUITABLE_RGB_VALUES = [
    (255, 255, 0),  # Agriculture land
    (255, 0, 255),  # Rangeland
    (255, 255, 255),# Barren land
    (60, 16, 152),  # Building
    (132, 41, 246)  # Unpaved land
]

In [11]:
val_loader = SolarPanelDataset(train_imgs, train_masks, SUITABLE_RGB_VALUES,  image_transforms, mask_transforms)
train_loader = SolarPanelDataset(val_imgs, val_masks, SUITABLE_RGB_VALUES,  image_transforms, mask_transforms)

val_loader = DataLoader(val_loader, batch_size=20, shuffle=False)
train_loader = DataLoader(train_loader, batch_size=20, shuffle=True)

# Model

In [9]:
## ARCHITECTURE ##
class ConvBlock(nn.Module):
    """
    Convolutional Block: (Conv -> BN -> ReLU) * 2
    """
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu2 = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        return x

class EncoderBlock(nn.Module):
    """
    Encoder Block: ConvBlock -> MaxPool
    Returns both ConvBlock output (skip) and MaxPool output.
    """
    def __init__(self, in_channels, out_channels):
        super(EncoderBlock, self).__init__()
        self.conv_block = ConvBlock(in_channels, out_channels)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        skip = self.conv_block(x)
        pooled = self.pool(skip)
        return skip, pooled

class DecoderBlock(nn.Module):
    """
    Decoder Block: ConvTranspose -> Concat -> ConvBlock
    """
    def __init__(self, in_channels, out_channels):
        super(DecoderBlock, self).__init__()
        # Upsamples by a factor of 2, halving the channels.
        self.upconv = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        # ConvBlock takes concatenated input (skip + upconv), so its input channels
        # will be out_channels (from skip) + out_channels (from upconv).
        self.conv_block = ConvBlock(out_channels * 2, out_channels)

    def forward(self, x, skip_connection):
        x = self.upconv(x)

        # Ensure spatial dimensions match before concatenating.
        # If input sizes are powers of 2, they should match.
        # If not, cropping (from skip) or padding (to x) might be needed.
        # Here we assume they match or crop the skip connection if necessary.
        if x.shape != skip_connection.shape:
            # Simple center-cropping (adjust if needed)
            diffY = skip_connection.size()[2] - x.size()[2]
            diffX = skip_connection.size()[3] - x.size()[3]
            skip_connection = skip_connection[:, :, diffY // 2 : skip_connection.size()[2] - diffY // 2 - diffY % 2,
                                                diffX // 2 : skip_connection.size()[3] - diffX // 2 - diffX % 2]

        x = torch.cat([x, skip_connection], dim=1) # Concatenate along channel dimension (dim=1)
        x = self.conv_block(x)
        return x

class DeepUnet(nn.Module):

    def __init__(self, in_channels=3, out_classes=11):
        """
        Initializes the DeepUnet model.

        Args:
            in_channels (int): Number of input channels (e.g., 3 for RGB).
            out_classes (int): Number of output segmentation classes.
        """
        super(DeepUnet, self).__init__()
        self.in_channels = in_channels
        self.out_classes = out_classes

        # Encoder Path
        self.enc1 = EncoderBlock(in_channels, 64)
        self.enc2 = EncoderBlock(64, 128)
        self.enc3 = EncoderBlock(128, 256)
        self.enc4 = EncoderBlock(256, 512)

        # Bottleneck
        self.bottleneck = ConvBlock(512, 1024)

        # Decoder Path
        self.dec1 = DecoderBlock(1024, 512)
        self.dec2 = DecoderBlock(512, 256)
        self.dec3 = DecoderBlock(256, 128)
        self.dec4 = DecoderBlock(128, 64)

        # Output Layer
        self.output_conv = nn.Conv2d(64, out_classes, kernel_size=1)

        # Optional: Softmax layer. Often omitted if using CrossEntropyLoss,
        # which combines LogSoftmax and NLLLoss.
        # self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        """
        Defines the forward pass of the U-Net.

        Args:
            x (Tensor): The input tensor (N, C, H, W).

        Returns:
            Tensor: The output segmentation map (N, out_classes, H, W).
        """
        # Encoder path
        s1, p1 = self.enc1(x)
        s2, p2 = self.enc2(p1)
        s3, p3 = self.enc3(p2)
        s4, p4 = self.enc4(p3)

        # Bottleneck
        b1 = self.bottleneck(p4)

        # Decoder path
        d1 = self.dec1(b1, s4)
        d2 = self.dec2(d1, s3)
        d3 = self.dec3(d2, s2)
        d4 = self.dec4(d3, s1)

        # Output
        outputs = self.output_conv(d4)

        # Optional: Apply softmax
        # outputs = self.softmax(outputs)

        return outputs

# Learning algorithm

In [None]:
# --- Configuration & Constants ---
IMG_SIZE = (256, 256)
LEARNING_RATE = 1e-4
NUM_EPOCHS = 50 # A good starting point, adjust as needed
IN_CHANNELS = 3
OUT_CLASSES = 2 # 0: Not Suitable, 1: Suitable
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# EarlyStopping Configuration
EARLY_STOPPING_PATIENCE = 7 # Number of epochs to wait for improvement before stopping
EARLY_STOPPING_MIN_DELTA = 0.0001 # Minimum change in monitored quantity to qualify as improvement

# --- 4. Model, Loss, Optimizer ---
print("Setting up model, loss, and optimizer...")
model = DeepUnet(in_channels=IN_CHANNELS, out_classes=OUT_CLASSES).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# --- 5. Training Loop with EarlyStopping and History ---
print("Starting training...")
history = {'train_loss': [], 'val_loss': []}
best_val_loss = float('inf')
epochs_no_improve = 0
best_model_weights = copy.deepcopy(model.state_dict()) # Store best model

for epoch in range(NUM_EPOCHS):
    start_time = time.time()
    model.train()
    running_train_loss = 0.0
    for i, (images, masks) in enumerate(train_loader):
        images, masks = images.to(DEVICE), masks.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        running_train_loss += loss.item()
        if (i + 1) % 20 == 0: # Print training progress more frequently
             print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Step [{i+1}/{len(train_loader)}], Batch Loss: {loss.item():.4f}")

    avg_train_loss = running_train_loss / len(train_loader)
    history['train_loss'].append(avg_train_loss)

    # Validation
    model.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for images, masks in val_loader:
            images, masks = images.to(DEVICE), masks.to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, masks)
            running_val_loss += loss.item()
    avg_val_loss = running_val_loss / len(val_loader)
    history['val_loss'].append(avg_val_loss)
    epoch_time = time.time() - start_time

    print(f"--- Epoch {epoch+1}/{NUM_EPOCHS} Finished ---")
    print(f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
    print(f"Epoch Duration: {epoch_time:.2f} seconds")

    # EarlyStopping Check
    if avg_val_loss < best_val_loss - EARLY_STOPPING_MIN_DELTA:
        best_val_loss = avg_val_loss
        epochs_no_improve = 0
        best_model_weights = copy.deepcopy(model.state_dict())
        print(f"Validation loss improved. Saving model weights.")
    else:
        epochs_no_improve += 1
        print(f"Validation loss did not improve for {epochs_no_improve} epoch(s).")

    if epochs_no_improve >= EARLY_STOPPING_PATIENCE:
        print(f"Early stopping triggered after {epoch+1} epochs.")
        model.load_state_dict(best_model_weights) # Restore best model weights
        break
    print("-" * 30)

print("Training finished!")
if epoch < NUM_EPOCHS -1 and epochs_no_improve < EARLY_STOPPING_PATIENCE : # If not early stopped
    print("Completed all epochs.")
    model.load_state_dict(best_model_weights) # Ensure best model is loaded if early stopping wasn't triggered but patience was > 0

# --- 6. Print Loss History ---
print("\n--- Training History ---")
for i in range(len(history['train_loss'])):
    print(f"Epoch {i+1}: Train Loss = {history['train_loss'][i]:.4f}, Val Loss = {history['val_loss'][i]:.4f}")

Setting up model, loss, and optimizer...
Starting training...
--- Epoch 1/50 Finished ---
Train Loss: 0.7818 | Val Loss: 0.6749
Epoch Duration: 613.67 seconds
Validation loss improved. Saving model weights.
------------------------------
--- Epoch 2/50 Finished ---
Train Loss: 0.6248 | Val Loss: 0.6331
Epoch Duration: 179.33 seconds
Validation loss improved. Saving model weights.
------------------------------
--- Epoch 3/50 Finished ---
Train Loss: 0.5680 | Val Loss: 0.5611
Epoch Duration: 179.05 seconds
Validation loss improved. Saving model weights.
------------------------------
--- Epoch 4/50 Finished ---
Train Loss: 0.5083 | Val Loss: 0.5130
Epoch Duration: 174.86 seconds
Validation loss improved. Saving model weights.
------------------------------
--- Epoch 5/50 Finished ---
Train Loss: 0.5259 | Val Loss: 0.4906
Epoch Duration: 177.66 seconds
Validation loss improved. Saving model weights.
------------------------------
--- Epoch 6/50 Finished ---
Train Loss: 0.4511 | Val Loss:

In [None]:
# Define the path where you want to save the model
model_save_path = "solar_unet_model.pth"

# Save only the model's state dictionary (recommended for inference/retraining)
torch.save(model.state_dict(), model_save_path)

print(f"Neural network model saved successfully to {model_save_path}")