ModelV1 : Changed final few layers, and few changes...

In [1]:
import os
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
from tqdm import tqdm

# libs
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from torchvision import transforms

In [10]:
TRAIN_DIR = r"E:\THIRD YEAR\Datasets\deepglobe\smallSizeTrain"
VALID_DIR = r"E:\THIRD YEAR\Datasets\deepglobe\smallSizeTrain"
TEST_DIR = r"E:\THIRD YEAR\Datasets\deepglobe\smallSizeTest"
# COLOR_CODES = r"E:\THIRD YEAR\Datasets\deepglobe\class_dict.csv"
COLOR_CODES = "E:\\THIRD YEAR\\Datasets\\deepglobe\\class_dict.csv"


image_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(p=0.5),  # Augmentation
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

mask_transform = transforms.Compose([
    transforms.Resize((256, 256), interpolation=transforms.InterpolationMode.NEAREST),
    transforms.ToTensor()
])

print(os.path.exists(COLOR_CODES))
print(os.path.exists(TRAIN_DIR))


False
False


In [9]:
df = pd.read_csv(COLOR_CODES)
label_map = {}
for index, row in df.iterrows():
    label_map[index] = [row["r"],row["g"],row["b"]]
label_map

# label_map = {
#     0: [0, 255, 255],  # class1
#     1: [255, 255, 0],  # class2
#     2: [255, 0, 255],  # class3
#     3: [0, 255, 0],    # class4
#     4: [0, 0, 255],    # class5
#     5: [255, 255, 255],# class6
#     6: [0, 0, 0]       # class7
# }
# label_map

FileNotFoundError: [Errno 2] No such file or directory: 'E:\\THIRD YEAR\\Datasets\\deepglobe\\class_dict.csv'

In [None]:
class Segmentation_Dataset(Dataset):
    def __init__(self, image_dir, label_map, image_transform=None, mask_transform=None):
        self.image_dir = image_dir
        self.image_transform = image_transform
        self.mask_transform = mask_transform
        self.label_map = label_map

        # 🛠️ FIX: Use `natsorted()` to ensure correct file mapping
        self.images_name = natsorted([f for f in os.listdir(self.image_dir) if f.endswith('_sat.jpg')])
        self.targets_name = natsorted([f for f in os.listdir(self.image_dir) if f.endswith('_mask.png')])

        print(f"Number of images: {len(self.images_name)}")
        print(f"Number of masks: {len(self.targets_name)}")

        assert len(self.images_name) == len(self.targets_name), "Mismatch in images and masks count!"

    def __len__(self):
        return len(self.images_name)

    def __getitem__(self, idx):
        if idx >= len(self.images_name):
            raise IndexError(f"Index {idx} out of range for dataset length {len(self.images_name)}.")

        image_path = os.path.join(self.image_dir, self.images_name[idx])
        image = cv2.imread(image_path)  # Load as BGR
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert to RGB

        mask_path = os.path.join(self.image_dir, self.targets_name[idx])
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)  # Keep 1-channel

        mask = self.colormap_to_labelmap(mask)
        image = Image.fromarray(image)
        mask = Image.fromarray(mask.astype(np.uint8))
        if self.image_transform:
            image = self.image_transform(image)
        if self.mask_transform:
            mask = self.mask_transform(mask)

        return image, mask

    def colormap_to_labelmap(self, mask):
        label_values = np.array(list(self.label_map.values()))
        mask = mask.reshape(-1, 3)
        unique_colors, inverse = np.unique(mask, axis=0, return_inverse=True)
        label_map = {tuple(color): idx for idx, color in enumerate(label_values)}

        mapped_mask = np.array([label_map.get(tuple(color), 0) for color in unique_colors])
        mapped_mask = mapped_mask[inverse].reshape(mask.shape[:2])
        return mapped_mask.astype(np.int64)

In [None]:
# modelV1
import torch
import torch.nn as nn
import torch.nn.functional as F

# 🔹 Fixed Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_c, out_c, stride=1):
        super(ResidualBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_c, out_c, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_c)
        self.conv2 = nn.Conv2d(out_c, out_c, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_c)

        # Only apply shortcut when dimensions change
        self.shortcut = nn.Sequential()
        if stride != 1 or in_c != out_c:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_c)
            )

    def forward(self, x):
        identity = self.shortcut(x)  # Shortcut path
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += identity  # Skip connection
        return F.relu(x)


# 🔹 Fixed ResNet Classifier
class ModelV1(nn.Module):
    def __init__(self, n_classes):
        super(ModelV1, self).__init__()

        self.h1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.layer1 = ResidualBlock(64, 128, stride=2)
        self.layer2 = ResidualBlock(128, 256, stride=2)
        self.layer3 = ResidualBlock(256, 512, stride=2)
        self.layer4 = ResidualBlock(512, 1024, stride=2)

        self.pooled = nn.AdaptiveAvgPool2d((1, 1))

        # final layer convert flatten() to classification of 7
        self.fc = nn.Linear(1024, n_classes)

    def forward(self, x):
        x = self.h1(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.pooled(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

#### pls copy and paster futher shit... from the OG files..

In [None]:
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score
from tqdm import tqdm

class Train():
    def __init__(self, model, dataloader, optimizer, num_epochs, device, loss):
        self.model = model
        self.dataloader = dataloader
        self.optimizer = optimizer
        self.num_epochs = num_epochs
        self.device = device
        self.loss = loss

    def trainModel(self):
        self.model.to(self.device)
        self.model.train()

        for epoch in range(self.num_epochs):
            total_loss = 0.0
            correct_pixels = 0
            total_pixels = 0
            all_preds = []
            all_labels = []

            loop = tqdm(self.dataloader, unit="batch", desc=f"Epoch {epoch+1}/{self.num_epochs}")
            for inputs, labels in loop:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                labels = labels.long()
                self.model.train();
                predictions = self.model(inputs)

                if labels.dim() == 4:
                    labels = labels.squeeze(1)

                loss = self.loss(predictions, labels)

                # Backpropagation
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # Accumulate loss
                total_loss += loss.item() * inputs.size(0)  # Multiply by batch size

                # Compute pixel accuracy
                preds_flat = predictions.argmax(dim=1).cpu().numpy().flatten()
                labels_flat = labels.cpu().numpy().flatten()

                all_preds.extend(preds_flat)
                all_labels.extend(labels_flat)

                correct_pixels += (preds_flat == labels_flat).sum()
                total_pixels += len(preds_flat)

                # Update tqdm progress bar
                loop.set_postfix({
                    "Loss": total_loss / total_pixels,  # Normalized loss
                    "Accuracy": correct_pixels / total_pixels
                })

            # Compute final epoch metrics
            epoch_loss = total_loss / total_pixels  # Normalize loss
            epoch_accuracy = correct_pixels / total_pixels
            epoch_precision = precision_score(all_labels, all_preds, average='weighted', zero_division=1)
            epoch_recall = recall_score(all_labels, all_preds, average='weighted', zero_division=1)
            epoch_f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=1)

            # Print epoch summary
            print(f"\n[Epoch {epoch + 1}/{self.num_epochs}] Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")
            print(f"Precision: {epoch_precision:.4f}, Recall: {epoch_recall:.4f}, F1 Score: {epoch_f1:.4f}")


In [None]:
# shit
from torch.nn import CrossEntropyLoss
num_classes = len(label_map.keys())
lr = 0.001
batch_size = 4
train_dataset = Segmentation_Dataset(TRAIN_DIR, label_map, image_transform, mask_transform)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
optimizer = torch.optim.Adam(model.parameters(), lr)
num_epochs = 2
device =  torch.device("cuda" if torch.cuda.is_available() else "cpu")
loss = CrossEntropyLoss()


model = ModelV1(n_class=num_classes).to(device)

In [None]:
trainer = Train(model, train_dataloader, optimizer, num_epochs, device, loss)
trainer.trainModel()