In [None]:
import torch
!pip install torchsummary
from torchsummary import summary
import torch.nn as nn
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch.optim as optim
from torchvision import datasets, transforms
import torch.nn.functional as F
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader ,  Dataset
from PIL import Image
import os

# Model

## U-Net Model

In [None]:
class UNet(nn.Module):
    def __init__(self, n_class = 2):
        super(UNet,self).__init__()
        
        self.quant = torch.ao.quantization.QuantStub()
        self.dequant = torch.ao.quantization.DeQuantStub()
        
        self.conv_block_down1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        self.downsample = nn.Sequential(
            nn.MaxPool2d(4,4)
        )
        
        self.conv_block_down2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        self.conv_block_up1 = nn.Sequential(
            nn.Conv2d(in_channels=128+64, out_channels=64, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        
        self.upsample = nn.Upsample(scale_factor=4, mode='bilinear', align_corners=True) 
        
        self.conv_last = nn.Conv2d(64, n_class, 1, bias=False)
        
    def forward(self,x):
        
        x = self.quant(x)
        
        conv1 = self.conv_block_down1(x)
        x = self.downsample(conv1)
        
        x = self.conv_block_down2(x)
        
        x = self.upsample(x)
        x = torch.cat([x, conv1], dim=1)
        
        x = self.conv_block_up1(x)
        
        out = self.conv_last(x)
        
        return out
    
    
model = UNet()

sample_input = ( 3, 16, 16)

summary(model, sample_input)

## U-Net model with single convolution layer per block

In [None]:
class UNet(nn.Module):
    def __init__(self, n_class = 2):
        super(UNet,self).__init__()
        
        self.quant = torch.ao.quantization.QuantStub()
        self.dequant = torch.ao.quantization.DeQuantStub()
        
        self.conv_block_down1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        self.downsample = nn.Sequential(
            nn.MaxPool2d(4,4)
        )
        
        self.conv_block_down2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        self.conv_block_down3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        self.conv_block_up2 = nn.Sequential(
            nn.Conv2d(in_channels=128+64, out_channels=64, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        self.conv_block_up1 = nn.Sequential(
            nn.Conv2d(in_channels=256+128, out_channels=128, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(inplace = True),
        )
        
        
        self.upsample = nn.Upsample(scale_factor=4, mode='bilinear', align_corners=True) 
        
        self.conv_last = nn.Conv2d(64, n_class, 1)
        
    def forward(self,x):
        
        x = self.quant(x)
        
        conv1 = self.conv_block_down1(x)
        x = self.downsample(conv1)
        
        conv2 = self.conv_block_down2(x)
        x = self.downsample(conv2)
        
        x = self.conv_block_down3(x)
        
        x = self.upsample(x)
        x = torch.cat([x, conv2], dim=1)
        
        x = self.conv_block_up1(x)
        
        x = self.upsample(x)
        x = torch.cat([x, conv1], dim=1)
        
        x = self.conv_block_up2(x)
        
        out = self.conv_last(x)
        
        return out
    
    
model = UNet()

sample_input = ( 3, 16, 16)

summary(model, sample_input)

# Dataset

In [None]:
class CamVidDataset(Dataset):
    def __init__(self, img_dir, mask_dir, class_dict, transform=None, mask_transform = None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.mask_transform = mask_transform
        
        self.class_dict = pd.read_csv(class_dict)

        self.class_mapping = self._create_class_mapping()
        
        self.img_filenames = sorted(os.listdir(img_dir))
        self.mask_filenames = sorted(os.listdir(mask_dir))
        
        
        assert len(self.img_filenames) == len(self.mask_filenames), "Mismatch in number of images and masks"
    
    def _create_class_mapping(self):

        class_mapping = {}

        class_name_to_int = {}

        # Assign unique integers to class labels
        class_idx = 0
        for _, row in self.class_dict.iterrows():
            rgb = tuple(row[['r', 'g', 'b']])
            class_name = row['name']

            # If the class name is not already in the mapping, add it
            if class_name not in class_name_to_int:
                class_name_to_int[class_name] = class_idx
                class_idx += 1

            # Map RGB values to the integer class label
            class_mapping[rgb] = class_name_to_int[class_name]

        self.class_name_to_int = class_name_to_int
        
        return class_mapping

        
    def __len__(self):
        return len(self.img_filenames)

    def __getitem__(self, idx):

        img_filename = self.img_filenames[idx]
        mask_filename = self.mask_filenames[idx]
        
        img_path = os.path.join(self.img_dir, img_filename)
        mask_path = os.path.join(self.mask_dir, mask_filename)
        
        image = Image.open(img_path).convert('RGB')
        mask = Image.open(mask_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)

        # Convert mask from RGB to class labels
        mask = self._convert_mask(mask)
        
        mask = mask.view(-1,mask.shape[0],mask.shape[1])
        
        if self.mask_transform:
            mask = self.mask_transform(mask)
        
        return image, mask
    
    
    def _convert_mask(self, mask):
        
        
        # Convert RGB tuples to tensors for vectorized comparison
        rgb_values = torch.tensor(list(self.class_mapping.keys())).float()

        mask_np = np.array(mask)
        mask_tensor = torch.from_numpy(mask_np)
        

        mask_tensor_flat = mask_tensor.view(-1, 3).float()

        height, width, _ = mask_tensor.shape
        class_mask = torch.zeros((height * width), dtype=torch.int64)

        
        # Compare the mask_tensor with each RGB tuple in class_mapping
        for i, rgb_value in enumerate(rgb_values):

            match = torch.all(mask_tensor_flat == rgb_value, dim=1)

            # Flatten indices
            indices = match.nonzero(as_tuple=True)[0]

            # Assign class label to the corresponding pixels in class_mask
            class_mask[indices] = list(self.class_mapping.values())[i]
            
        # Reshape the flattened class_mask tensor back to (height, width)
        class_mask = class_mask.view(height, width)

        return class_mask



In [None]:
transform = transforms.Compose([
    transforms.Resize((16, 16), antialias=True),
    transforms.ToTensor()
])
mask_transform = transforms.Compose([
    transforms.Resize((16, 16), antialias=True)
])

## Train

In [None]:
img_dir = '/kaggle/input/camvid/CamVid/train'
mask_dir = '/kaggle/input/camvid/CamVid/train_labels'
class_dict_path = '/kaggle/input/camvid/CamVid/class_dict.csv'

train_dataset  = CamVidDataset(img_dir, mask_dir, class_dict_path, transform=transform, mask_transform = mask_transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)


In [None]:
train_loader_iterator = iter(train_loader)

images, masks = next(train_loader_iterator)

print("Image batch shape:", images.shape)
print("Mask batch shape:", masks.shape)


In [None]:
train_loader_iterator = iter(train_loader)
images, masks = next(train_loader_iterator)

plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.imshow(images[0].permute(1, 2, 0).cpu())  # Permute dimensions to show the RGB image correctly
plt.title("Image 1")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(masks[0].permute(1, 2, 0).cpu(), cmap='gray')  # Use gray colormap for masks
plt.title("Mask 1")
plt.axis("off")

plt.show()


## Test

In [None]:
test_img_dir = '/kaggle/input/camvid/CamVid/test'
test_mask_dir = '/kaggle/input/camvid/CamVid/test_labels'
class_dict_path = '/kaggle/input/camvid/CamVid/class_dict.csv'

test_dataset  = CamVidDataset(test_img_dir, test_mask_dir, class_dict_path, transform=transform, mask_transform = mask_transform)

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)


In [None]:
test_dataset.class_name_to_int

In [None]:
test_loader_iterator = iter(test_loader)

test_images, test_masks = next(test_loader_iterator)

print("Image batch shape:", test_images.shape)
print("Mask batch shape:", test_masks.shape)


In [None]:
test_loader_iterator = iter(test_loader)
test_images, test_masks = next(test_loader_iterator)


plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.imshow(test_images[0].permute(1, 2, 0).cpu())  # Permute dimensions to show the RGB image correctly
plt.title("Image 1")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(test_masks[0].permute(1, 2, 0).cpu(), cmap='gray')  # Use gray colormap for masks
plt.title("Mask 1")
plt.axis("off")

plt.show()


## Val

In [None]:
val_img_dir = '/kaggle/input/camvid/CamVid/val'
val_mask_dir = '/kaggle/input/camvid/CamVid/val_labels'
class_dict_path = '/kaggle/input/camvid/CamVid/class_dict.csv'

val_dataset  = CamVidDataset(val_img_dir, val_mask_dir, class_dict_path, transform=transform, mask_transform = mask_transform)

val_loader = DataLoader(val_dataset, batch_size=16, shuffle=True)


In [None]:
val_loader_iterator = iter(val_loader)

val_images, val_masks = next(val_loader_iterator)

print("Image batch shape:", val_images.shape)
print("Mask batch shape:", val_masks.shape)


In [None]:
# Sample one batch from the train loader
val_loader_iterator = iter(val_loader)
val_images, val_masks = next(val_loader_iterator)


# Display one image and its mask
plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.imshow(val_images[0].permute(1, 2, 0).cpu())  # Permute dimensions to show the RGB image correctly
plt.title("Image 1")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(val_masks[0].permute(1, 2, 0).cpu(), cmap='gray')  # Use gray colormap for masks
plt.title("Mask 1")
plt.axis("off")

plt.show()


# Training loop , Testing

## Quantization-aware training

In [None]:
from torch.quantization import prepare_qat, convert, get_default_qat_qconfig


model = UNet(n_class=32)
optimizer = optim.Adam(model.parameters(), lr=0.001)


model_fp32 = model.to('cpu')


qconfig = get_default_qat_qconfig('qnnpack')
model_fp32.qconfig = qconfig


model_fp32 = prepare_qat(model_fp32, inplace=True)


criterion = nn.CrossEntropyLoss()


num_epochs = 30
for epoch in range(num_epochs):
    model_fp32.train()
    running_loss = 0.0
    
    for images, masks in train_loader:

        images, masks = images.to('cpu'), masks.to('cpu')
        
        masks = masks.squeeze(1)
        
        optimizer.zero_grad()
        
        outputs = model_fp32(images)
        loss = criterion(outputs, masks)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    epoch_loss = running_loss / len(train_loader)
    
    model_fp32.eval()
    val_running_loss = 0.0
    
    with torch.no_grad():
        for val_images, val_masks in val_loader:

            val_images, val_masks = val_images.to('cpu'), val_masks.to('cpu')
            
            val_masks = val_masks.squeeze(1)
            
            val_outputs = model_fp32(val_images)
            
            val_loss = criterion(val_outputs, val_masks)
            
            val_running_loss += val_loss.item()
        
    val_epoch_loss = val_running_loss / len(val_loader)
    
    print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_epoch_loss:.4f}")

# Convert the QAT-trained model to a fully quantized model
model_int8 = model_fp32.to('cpu')  # Moving the QAT model to CPU
model_int8 = convert(model_int8, inplace=True)  # Convert to a quantized model


In [None]:
model_int8 = model_int8.to('cpu')
model_int8.eval()


total_accuracy = 0
total_iou = 0
num_batches = 0

with torch.no_grad():
    for images, masks in test_loader:

        images, masks = images.to('cpu'), masks.to('cpu')

        images = images.to(torch.float32)

        outputs = model_int8(images)

        _, predicted_classes = torch.max(outputs, dim=1)

        intersection = (predicted_classes & masks).sum(dim=(1, 2))
        union = (predicted_classes | masks).sum(dim=(1, 2))
        iou = (intersection / union).mean().item()

        correct = (predicted_classes == masks).float().mean(dim=(1, 2))
        accuracy = correct.mean().item()

        total_iou += iou
        total_accuracy += accuracy
        num_batches += 1

average_iou = total_iou / num_batches
average_accuracy = total_accuracy / num_batches

print(f"Average IoU: {average_iou:.4f}")
print(f"Average Accuracy: {average_accuracy:.4f}")


# Output masks

In [None]:
model_int8.eval()
model_int8 = model_int8.to('cpu')

test_loader_iterator = iter(test_loader)
images, masks = next(test_loader_iterator)

images, masks = images.to('cpu'), masks.to('cpu')

images = images.to(torch.float32) 
masks = masks.to(torch.uint8) 

with torch.no_grad():
    outputs = model_int8(images)

_, predicted_classes = torch.max(outputs, dim=1)

num_plots = 4

plt.figure(figsize=(15, 10))


for i in range(num_plots):

    plt.subplot(3, num_plots, i + 1)
    plt.imshow(images[i].permute(1, 2, 0).cpu().numpy())  # Convert image shape from (C, H, W) to (H, W, C)
    plt.title(f"Input Image {i + 1}")
    plt.axis("off")


    plt.subplot(3, num_plots, i + num_plots + 1)
    plt.imshow(predicted_classes[i].cpu().numpy(), cmap='gray')
    plt.title(f"Predicted Mask {i + 1}")
    plt.axis("off")


    plt.subplot(3, num_plots, i + 2 * num_plots + 1)
    plt.imshow(masks[i].permute(1,2,0).cpu().numpy(), cmap='gray')
    plt.title(f"Ground Truth Mask {i + 1}")
    plt.axis("off")

plt.tight_layout()

plt.show()


In [None]:
file_path = 'test_3.txt'

np.set_printoptions(threshold=np.inf, precision=6)

with open(file_path, 'w') as f:

    for name, module in model_int8.named_modules():

        if isinstance(module, torch.ao.nn.quantized.modules.conv.Conv2d):

            packed_params = module._packed_params
            
            weights, biases = packed_params.unpack()
            
            f.write(f"Module name: {name}\n")
            
            dequantized_weights = torch.dequantize(weights)
            
            weights_np = dequantized_weights.detach().numpy()
            f.write(f"Weights:\n{weights_np}\n")
            
            if biases is not None:
                dequantized_biases = torch.dequantize(biases)
                biases_np = dequantized_biases.detach().numpy()
                f.write(f"Biases:\n{biases_np}\n")
            else:
                f.write("No biases\n")

print(f"Model parameters have been written to {file_path}.")


In [None]:
for name, module in model_int8.named_modules():
    
    if isinstance(module, torch.ao.nn.quantized.modules.conv.Conv2d):
        
        packed_params = module._packed_params

        weights, biases = packed_params.unpack()
        
        param_np = weights.detach().cpu()

        param_min = param_np.min()
        param_max = param_np.max()

        print(f"Parameter Name: {name}, Min: {param_min}, Max: {param_max}")
