In [6]:
# data preprocessing
import os
import numpy as np
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision.transforms import Compose, ToTensor, Normalize


class CityscapesDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transforms=None):
        self.transforms = transforms
        self.images = []
        self.masks = []

        # Walk through the image directory
        for root, _, files in os.walk(image_dir):
            for file in files:
                if file.endswith('.png'):
                    img_path = os.path.join(root, file)
                    mask_path = os.path.join(mask_dir, root.split('/')[-1], file.replace('leftImg8bit', 'gtFine_labelIds'))
                    
                    # Check if mask exists
                    if os.path.exists(mask_path):
                        self.images.append(img_path)
                        self.masks.append(mask_path)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        mask_path = self.masks[idx]
        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")  # Convert to grayscale

        if self.transforms:
            image = self.transforms(image)
            mask = ToTensor()(mask)  # Ensure mask is also a tensor

        return image, mask

# Define transforms
transforms = Compose([
    ToTensor(),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Setup dataset
train_dataset = CityscapesDataset(
    image_dir='/home/maith/Desktop/cityscapes/leftImg8bit/train',
    mask_dir='/home/maith/Desktop/cityscapes/gtFine/train',
    transforms=transforms
)

val_dataset = CityscapesDataset(
    image_dir='/home/maith/Desktop/cityscapes/leftImg8bit/val',
    mask_dir='/home/maith/Desktop/cityscapes/gtFine/val',
    transforms=transforms
)

In [7]:
#Data Loader
from torch.utils.data import DataLoader

def get_dataloader(batch_size, train_dataset, val_dataset, shuffle=True, num_workers=4):
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=shuffle, 
        num_workers=num_workers
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=num_workers
    )
    
    return train_loader, val_loader

# Define batch size
batch_size = 4

# Setup the validation dataset using the correct paths
val_dataset = CityscapesDataset(
    image_dir='/home/maith/Desktop/cityscapes/leftImg8bit/val',
    mask_dir='/home/maith/Desktop/cityscapes/gtFine/val',
    transforms=transforms
)

# Create data loaders
train_loader, val_loader = get_dataloader(batch_size, train_dataset, val_dataset)

print(f"Train loader length (number of batches): {len(train_loader)}")
print(f"Validation loader length (number of batches): {len(val_loader)}")

Train loader length (number of batches): 744
Validation loader length (number of batches): 125


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class InitialBlock(nn.Module):
    def __init__(self, out_channels, inp_relu=True, batch_norm=True):
        super(InitialBlock, self).__init__()
        self.conv = nn.Conv2d(3, out_channels - 3, kernel_size=3, stride=2, padding=1, bias=False)
        self.maxpool = nn.MaxPool2d(2, stride=2, return_indices=True)
        self.batch_norm = nn.BatchNorm2d(out_channels) if batch_norm else None
        self.relu = nn.ReLU(inplace=True) if inp_relu else None

    def forward(self, x):
        x1 = self.conv(x)
        x2, indices = self.maxpool(x)
        x = torch.cat((x1, x2), 1)
        if self.batch_norm:
            x = self.batch_norm(x)
        if self.relu:
            x = self.relu(x)
        print(f'InitialBlock: x.shape = {x.shape}, indices.shape = {indices.shape}')
        return x, indices, x2.size()

class Bottleneck(nn.Module):
    def __init__(self, in_channels, out_channels, dilated=False, downsample=False, upsample=False, relu=True):
        super(Bottleneck, self).__init__()
        self.relu = nn.ReLU(inplace=True) if relu else None
        internal_channels = out_channels // 4

        self.conv1 = nn.Conv2d(in_channels, internal_channels, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(internal_channels)

        if downsample:
            self.conv2 = nn.Conv2d(internal_channels, internal_channels, 3, stride=2, padding=1, bias=False)
        elif dilated:
            self.conv2 = nn.Conv2d(internal_channels, internal_channels, 3, stride=1, padding=2, dilation=2, bias=False)
        else:
            self.conv2 = nn.Conv2d(internal_channels, internal_channels, 3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(internal_channels)

        self.conv3 = nn.Conv2d(internal_channels, out_channels, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.dropout = nn.Dropout2d(0.1)

        self.downsample = nn.MaxPool2d(2, stride=2, return_indices=True) if downsample else None
        self.upsample = nn.MaxUnpool2d(2, stride=2) if upsample else None

    def forward(self, x, indices=None, output_size=None):
        residual = x

        x = self.conv1(x)
        x = self.bn1(x)
        if self.relu:
            x = self.relu(x)
        
        if self.downsample is not None:
            x, new_indices = self.downsample(x)
            print(f'Bottleneck (downsample): x.shape = {x.shape}, new_indices.shape = {new_indices.shape}')
        elif self.upsample is not None:
            x = self.upsample(x, indices, output_size=output_size)
            print(f'Bottleneck (upsample): x.shape = {x.shape}, indices.shape = {indices.shape}, output_size = {output_size}')
        else:
            x = self.conv2(x)
            x = self.bn2(x)
            if self.relu:
                x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.dropout(x)

        if self.relu:
            x = self.relu(x)
        
        return x, new_indices if self.downsample else indices, x.size()

class ENet(nn.Module):
    def __init__(self):
        super(ENet, self).__init__()
        self.initial_block = InitialBlock(16)
        self.bottleneck1_0 = Bottleneck(16, 64, downsample=True)
        self.bottleneck1_1 = Bottleneck(64, 64)
        self.bottleneck2_0 = Bottleneck(64, 128, downsample=True)
        self.upsample1 = Bottleneck(128, 64, upsample=True)
        self.upsample2 = Bottleneck(64, 16, upsample=True)

    def forward(self, x):
        x, initial_indices, initial_size = self.initial_block(x)
        x, indices1, size1 = self.bottleneck1_0(x)
        x, _, _ = self.bottleneck1_1(x)
        x, indices2, size2 = self.bottleneck2_0(x)
        x, _, _ = self.upsample1(x, indices=indices2, output_size=size2)
        x, _, _ = self.upsample2(x, indices=indices1, output_size=size1)
        print(f'ENet forward final: x.shape = {x.shape}')
        return x

# Instantiate the model
enet_model = ENet()
print(enet_model)

ENet(
  (initial_block): InitialBlock(
    (conv): Conv2d(3, 13, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (maxpool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (batch_norm): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
  )
  (bottleneck1_0): Bottleneck(
    (relu): ReLU(inplace=True)
    (conv1): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv3): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (dropout): Dropout2d(p=0.1, inplace=False)
    (downsample): MaxPool2

In [9]:
import torch.optim as optim

# Setup the loss function and optimizer
criterion = nn.CrossEntropyLoss()  # Suitable for classification with multiple classes
optimizer = optim.Adam(enet_model.parameters(), lr=0.001)  # Using Adam optimizer

# Check if CUDA is available and move the model to GPU if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
enet_model.to(device)
print(f"Training on {device}")

Training on cuda


In [14]:
import torch.optim as optim

# Setup the loss function and optimizer
criterion = nn.CrossEntropyLoss()  # Suitable for classification with multiple classes
optimizer = optim.Adam(enet_model.parameters(), lr=0.001)  # Using Adam optimizer

# Check if CUDA is available and move the model to GPU if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
enet_model.to(device)
print(f"Training on {device}")

# Function to train the model
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=25):
    model.train()  # Set the model to training mode

    for epoch in range(num_epochs):
        torch.cuda.empty_cache()  # Clear GPU cache at the start of each epoch
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels.long())

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        print(f'Epoch {epoch}/{num_epochs - 1}, Loss: {epoch_loss:.4f}')

        # Validation step
        val_loss = validate_model(model, val_loader, criterion)
        print(f'Validation Loss: {val_loss:.4f}')

    print('Training complete')

# Function to validate the model
def validate_model(model, val_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels.long())
            val_loss += loss.item()

    val_loss /= len(val_loader)
    model.train()  # Set the model back to training mode
    return val_loss

# Train the model
num_epochs = 25
train_model(enet_model, train_loader, val_loader, criterion, optimizer, num_epochs)

Training on cuda
InitialBlock: x.shape = torch.Size([4, 16, 512, 1024]), indices.shape = torch.Size([4, 3, 512, 1024])
Bottleneck (downsample): x.shape = torch.Size([4, 16, 256, 512]), new_indices.shape = torch.Size([4, 16, 256, 512])
Bottleneck (downsample): x.shape = torch.Size([4, 32, 128, 256]), new_indices.shape = torch.Size([4, 32, 128, 256])


ValueError: invalid output_size "torch.Size([128, 256])" (dim 0 must be between 254 and 258)