In [None]:
import os

import numpy as np
import pandas as pd

In [None]:
import time
import torch
import torchvision
from glob import glob
import torch.nn as nn
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torchvision.transforms as transform
from torch.utils.data import DataLoader,Dataset
from torchvision.utils import make_grid

from nets import SimpleSegmentationNet
from data import CityscapesDataset
from labels import training_classes, labels

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

In [None]:
# Print the current process ID
print("Current Process ID:", os.getpid())

In [None]:
# file_paths = glob('/cluster/projects/vc/data/ad/open/Cityscapes/gtFine_trainvaltest/gtFine/train/bremen/**/*', recursive=True)
train_seg_path = sorted(glob('/cluster/projects/vc/data/ad/open/Cityscapes/gtFine_trainvaltest/gtFine/train/*/*labelIds.png'))
train_img_path = sorted(glob('/cluster/projects/vc/data/ad/open/Cityscapes/leftImg8bit_trainvaltest/leftImg8bit/train/*/*.png'))

val_seg_path = sorted(glob('/cluster/projects/vc/data/ad/open/Cityscapes/gtFine_trainvaltest/gtFine/val/*/*labelIds.png'))
val_img_path = sorted(glob('/cluster/projects/vc/data/ad/open/Cityscapes/leftImg8bit_trainvaltest/leftImg8bit/val/*/*.png'))

test_seg_path = sorted(glob('/cluster/projects/vc/data/ad/open/Cityscapes/gtFine_trainvaltest/gtFine/test/bielefeld/*labelIds.png'))
test_img_path = sorted(glob('/cluster/projects/vc/data/ad/open/Cityscapes/leftImg8bit_trainvaltest/leftImg8bit/test/bielefeld/*.png'))

In [None]:
print(len(train_seg_path))
print(len(train_img_path))

fig,ax = plt.subplots(5,2,figsize=(10,30))
for i in range(5):
    img1 = plt.imread(test_seg_path[i])
    img2 = plt.imread(test_img_path[i])
    print(test_seg_path[i])
    ax[i][0].imshow(img1)
    ax[i][1].imshow(img2)

## Dataset and dataloader

In [None]:
traindata = CityscapesDataset(train_img_path, train_seg_path)
valdata = CityscapesDataset(val_img_path, val_seg_path)
# testdata = CityscapesDataset(test_img_path, test_seg_path)

In [None]:
batch_size = 4
train_loader = DataLoader(traindata, batch_size)
val_loader = DataLoader(valdata, batch_size)
# test_loader = DataLoader(testdata, batch_size)

## Testing the dataloader

data = next(iter(train_loader))

len(data)
print(data[0].size())

fig,ax = plt.subplots(4,2,figsize=(10,30))
for i in range(4):
    img_raw = data[0][i].squeeze().permute((1, 2, 0))
    img_labels = data[1][i].squeeze()
    ax[i][0].imshow(img_raw)
    ax[i][1].imshow(img_labels)

## Neural network

In [None]:
num_classes = training_classes
model = SimpleSegmentationNet(num_classes)
model.to(device)

## Training

In [None]:
def show(img, output, label):
    img, output, label = img.cpu(), output.cpu(), label.cpu()
    fig, ax = plt.subplots(len(img), 3, figsize=(15, 30))
    cols = ['Input Image', 'Actual Output', 'Predicted Output']
    
    for x, col in zip(ax[0], cols):
        x.set_title(col)

    for i in range(len(img)):
        Img = img[i].permute(1, 2, 0)
        Lab = output[i].squeeze()  # Assuming output is a segmentation map
        act = label[i].squeeze()  # Assuming label is a segmentation map
        
        ax[i][0].imshow(Img)
        ax[i][1].imshow(act, cmap='tab20')  # Apply a colormap suitable for labels
        ax[i][2].imshow(Lab, cmap='tab20')  # Apply the same colormap to predictions

    plt.tight_layout()
    plt.show()

In [None]:
lr = 0.01
epochs = 30
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [None]:
train_acc = []
val_acc = []
train_loss = []
val_loss = []
val_accuracy = []

checkpoint_dir = "checkpoints/cnn"
os.makedirs(checkpoint_dir, exist_ok=True)

for i in range(epochs):
    
    model.train()
    trainloss = 0
    
    for data in train_loader:
        # Training
        img, label = data[0].to(device), data[1].to(device)
        label = label.squeeze(1).to(dtype=torch.long)
        optimizer.zero_grad()

        output = model(img)
        loss = criterion(output, label)
        loss.backward()

        optimizer.step()
        trainloss += loss.item()

    train_loss.append(trainloss / len(train_loader))    
    
    model.eval()

    with torch.no_grad():
        valloss = 0
        total_correct = 0
        total_pixels = 0
        
        for data in val_loader:
            # Validation
            img, label = data[0].to(device), data[1].to(device)
            label = label.squeeze(1).to(dtype=torch.long)
            output = model(img)
            loss = criterion(output, label)
            valloss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(output.data, 1)
            total_correct += (predicted == label).sum().item()
            total_pixels += label.nelement()

        # show(img, predicted.unsqueeze(1), label)

    val_loss.append(valloss / len(val_loader))
    val_accuracy.append(total_correct / total_pixels)
    
    print("Epoch: {} , Train Loss: {} , Valid Loss: {} , Valid Acc: {:.2f}%".format(i, train_loss[-1], val_loss[-1], 100 * val_accuracy[-1]))

    if i%20 == 0:
         # Checkpointing
        checkpoint = {
            'epoch': i,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': train_loss,
            'val_loss': val_loss,
            'val_accuracy': val_accuracy
        }
        torch.save(checkpoint, os.path.join(checkpoint_dir, f'checkpoint_epoch_{i}.pth'))

## Load model

In [None]:
num_classes = training_classes
model = SimpleSegmentationNet(num_classes)
model.to(device)

lr = 0.01
epochs = 50
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

checkpoint_dir = "checkpoints/cnn"
checkpoint_path = os.path.join(checkpoint_dir, 'checkpoint_epoch_15.pth') # Replace X with the epoch number

# Load the checkpoint
checkpoint = torch.load(checkpoint_path)

# Update model and optimizer states
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# If you need to resume training from a specific epoch
start_epoch = checkpoint['epoch'] + 1

# If you also need to access the loss and accuracy history
train_loss = checkpoint['train_loss']
val_loss = checkpoint['val_loss']
val_accuracy = checkpoint['val_accuracy']

## Ploting the Training VS Validation Loss Curve

In [None]:
plt.plot(train_loss,color='b',label='train loss')
plt.plot(val_loss,color='r',label = 'val_loss')
plt.legend()

## Testing on testset

import time

# Set the model to evaluation mode
model.eval()

with torch.no_grad():
    correct = 0
    total = 0
    total_images_processed = 0  # Initialize total images processed
    
    for data in tqdm(val_loader):
        image, label = data[0].to(device), data[1].to(device)
        label = label.squeeze(1).to(dtype=torch.long)

        batch_size = label.size(0)  # Get batch size
        total_images_processed += batch_size  # Accumulate total images processed
        output = model(image)

        # Get predictions from the maximum value
        _, predicted = torch.max(output.data, 1)
        total += label.nelement()
        correct += (predicted == label).sum().item()

accuracy = 100 * correct / total
print('Accuracy of the model on the test images: {:.2f}%'.format(accuracy))

In [None]:
data_list = []
for data in valdata:
    image, _ = data[0].to(device), data[1].to(device)
    data_list.append(image)


model.eval()  

start_time = time.time() 

for i in range(len(data_list)):
    output = model(data_list[i])

end_time = time.time()  


In [None]:
total_time = end_time - start_time  # Total time for inference
fps = len(data_list) / total_time  # Calculate FPS

print(len(data_list))

In [None]:
print(total_time)

In [None]:
# CNN
# Total inference time: 5.27 seconds
# FPS: 94.91


print(f"Total inference time: {total_time:.2f} seconds")
print(f"FPS: {fps:.2f}")

In [None]:
c = 0
with torch.no_grad():
    for img,label in (val_loader):
            img = img.to(device)
            label = label.to(device)
            output = model(img)
            _, predicted = torch.max(output.data, 1)
            show(img, predicted.unsqueeze(1), label)

            if c>5:
                break
            c+=1

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt

image_paths = glob("/cluster/home/bendikgh/cityscapes_semantic_segmentation/trondheim_images/*")
print(image_paths)

In [None]:
from torchvision.transforms import functional as F

def center_crop_to_aspect_ratio(img):
    original_width, original_height = img.size   # Get dimensions

    # Determine the target dimensions based on the desired 2:1 width to height ratio
    # The limiting dimension will dictate the size of the crop
    if original_width >= 2 * original_height:
        # The height is the limiting dimension, so the width will be twice the height
        target_height = original_height
        target_width = 2 * original_height
    else:
        # The width is the limiting dimension, so the height will be half the width
        target_width = original_width
        target_height = original_width / 2

    left = (original_width - target_width) / 2
    top = (original_height - target_height) / 2
    right = (original_width + target_width) / 2
    bottom = (original_height + target_height) / 2

    # Crop the center of the image to the target size
    img = img.crop((left, top, right, bottom))
    return img

# Replace this with the transformations used during your model's training
img_transform = transforms.Compose([
    transforms.ToTensor(),
])


In [None]:

def process(img):
    img = img.convert("RGB")
    img = center_crop_to_aspect_ratio(img)
    img = transform.Resize((1024, 2048))(img)
    img = transform.ToTensor()(img)
    return img

target_width, target_height = 2048, 1024
images = [process(Image.open(img_path)) for img_path in image_paths]

img_val = next(iter(val_loader))[0][0]
images.append(img_val)

print(len(images))
# print(images)

In [None]:
# Ensure the model is in evaluation mode
model.eval()

# Disable gradient computation
with torch.no_grad():
    for i in range(len(images)):
        img = images[i].unsqueeze(0).to(device)

        outputs = model(img)
        _, predicted = torch.max(outputs.data, 1)

        print(img.size())
        print(predicted.size())

        show(img, predicted.unsqueeze(1), predicted.unsqueeze(1))