Imports

In [14]:
# Import necessari
import cv2
import numpy as np
import os
import pickle
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from sklearn.model_selection import train_test_split
import json
import torchvision
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

DATASET CLASS

In [15]:
import os
import glob
from PIL import Image
import os
import glob
from PIL import Image
import re

class ParkingLotDataset(Dataset):
    def __init__(self, root_img, root_msk, pairs=None, transforms=None, mask_transforms=None):
        self.root_img = root_img
        self.root_msk = root_msk
        self.transforms = transforms
        self.mask_transforms = mask_transforms

        if pairs is None:
            # Get all image files
            self.image_paths = sorted(glob.glob(os.path.join(root_img, '*.png')))

            # Get all mask files
            self.mask_paths = sorted(glob.glob(os.path.join(root_msk, '*.png')))

            # Pair image and mask files based on their filenames
            #self.pairs = [(image_path, mask_path) for image_path in self.image_paths for mask_path in self.mask_paths if os.path.splitext(os.path.basename(image_path))[0] == os.path.splitext(os.path.basename(mask_path))[0]]
            self.pairs = []

            for image_path in self.image_paths:
                image_filename = os.path.splitext(os.path.basename(image_path))[0]
                
                # Remove suffix like _1, _2, _3 using regular expressions
                image_filename_base = re.sub(r'_[1-9]$', '', image_filename)
                
                mask_filename = f"{image_filename_base}_SegmentationClass.png"
                mask_path = os.path.join(root_msk, mask_filename)
                
                if os.path.exists(mask_path):
                    self.pairs.append((image_path, mask_path))

        else:
            self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx, threshold=0.5):
        image_path, mask_path = self.pairs[idx]

        # Load image
        image = Image.open(image_path)
        image_array = np.array(image)
        self.input_channels = image_array.shape[0]

        # Apply transformations
        if self.transforms:
            image_array = self.transforms(image_array)
        
        #mask = Image.open(mask_path)
        #mask_array = np.array(mask)
        mask_array = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask_array = (mask_array > threshold).astype(np.float32)
        self.input_channels = mask_array.shape[0]

        # Apply transformations
        if self.mask_transforms:
            mask_array = self.mask_transforms(mask_array)


        return image_array, mask_array
        


Dataset splits and data_augmentation

In [None]:
import os
from PIL import Image
from tqdm import tqdm
import torchvision.transforms as transforms
import torchvision.transforms.functional as F
import torch
import os
import shutil
from PIL import Image
from tqdm import tqdm
import torchvision.transforms as transforms
import torchvision.transforms.functional as F
import torch
import random


# Define synchronized transformation class
class SynchronizedTransform:
    def __init__(self):
        self.color_transforms = transforms.ColorJitter(
            brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1
        )
        self.to_tensor = transforms.ToTensor()
        self.resize = transforms.Resize(size=(512, 512))
        self.to_pil = transforms.ToPILImage()

    def __call__(self, image, mask):
        # Convert to tensor
        image = self.to_tensor(image)
        mask = self.to_tensor(mask)

        # Apply Resize
        image = self.resize(image)
        mask = self.resize(mask)

        # Apply Random Horizontal Flip
        if torch.rand(1) < 0.5:
            image = F.hflip(image)
            mask = F.hflip(mask)

        # Apply Random Vertical Flip
        if torch.rand(1) < 0.5:
            image = F.vflip(image)
            mask = F.vflip(mask)

        # Apply Random Rotation
        angle = transforms.RandomRotation.get_params([-10, 10])
        image = F.rotate(image, angle)
        mask = F.rotate(mask, angle, interpolation=F.InterpolationMode.NEAREST)

        # Apply Random Resized Crop
        i, j, h, w = transforms.RandomResizedCrop.get_params(image, scale=(0.8, 1.0), ratio=(3.0/4.0, 4.0/3.0))
        image = F.resized_crop(image, i, j, h, w, size=(512, 512))
        mask = F.resized_crop(mask, i, j, h, w, size=(512, 512), interpolation=F.InterpolationMode.NEAREST)

        # Apply color transforms only to the image
        image = self.color_transforms(image)

        # Convert back to PIL
        image = self.to_pil(image)
        mask = self.to_pil(mask)

        return image, mask

sync_transform = SynchronizedTransform()

def save_image(image, path, filename):
    image.save(os.path.join(path, filename))

def augment_and_save(image_path, mask_path, save_image_path, save_mask_path, num_augmentations=3):
    #image_files = sorted(os.listdir(image_path))
    #mask_files = sorted(os.listdir(mask_path))

    if not os.path.exists(save_image_path):
        os.makedirs(save_image_path)
    if not os.path.exists(save_mask_path):
        os.makedirs(save_mask_path)

    current_index = 0

    
    img = Image.open(image_path).convert("RGB")
    mask = Image.open(mask_path).convert("L")

    # Save original image and mask
    save_image(img, save_image_path, f"{current_index}.png")
    save_image(mask, save_mask_path, f"{current_index}_SegmentationClass.png")
    #current_index += 1

    for _ in range(num_augmentations):
        augmented_img, augmented_mask = sync_transform(img, mask)

        save_image(augmented_img, save_image_path, f"{current_index}.png")
        save_image(augmented_mask, save_mask_path, f"{current_index}_SegmentationClass.png")
        current_index += 1

# Percorsi delle cartelle
#image_path = 'Resize_no_padding'
image_path1 = 'Resize_no_padding_1'
image_path2 = 'C:/Users/loren/Documents/GitHub/Parking-lot/Resize_no_padding_2'
image_path3 = 'C:/Users/loren/Documents/GitHub/Parking-lot/Resize_no_padding_3'
mask_path = 'Resizedmasks_nopad'

# Crea le cartelle di destinazione
train_image_path = 'train_images_2'
train_mask_path = 'train_masks'
val_image_path = 'val_images'
val_mask_path = 'val_masks'
test_image_path = 'test_images'
test_mask_path = 'test_masks'

os.makedirs(train_image_path, exist_ok=True)
os.makedirs(train_mask_path, exist_ok=True)
os.makedirs(val_image_path, exist_ok=True)
os.makedirs(val_mask_path, exist_ok=True)
os.makedirs(test_image_path, exist_ok=True)
os.makedirs(test_mask_path, exist_ok=True)

transform = transforms.Compose([
    transforms.ToTensor(),
])

mask_transforms = transforms.Compose([
    transforms.ToTensor(),
])

# Crea il dataset
dataset1 = ParkingLotDataset(image_path1, mask_path, transforms=transform, mask_transforms=mask_transforms)
dataset2 = ParkingLotDataset(image_path2, mask_path, transforms=transform, mask_transforms=mask_transforms)
dataset3 = ParkingLotDataset(image_path3, mask_path, transforms=transform, mask_transforms=mask_transforms)

# Definisci le proporzioni per la divisione
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1
total_size = len(dataset1)+len(dataset2)+len(dataset3)
print(total_size)

# Mischia il dataset
random.shuffle(dataset1.pairs)
random.shuffle(dataset2.pairs)
random.shuffle(dataset3.pairs)

# Calcola le dimensioni di ogni split
train_size1 = int(train_ratio * len(dataset1))
val_size1 = int(val_ratio * len(dataset1))

train_size2 = int(train_ratio * len(dataset2))
val_size2 = int(val_ratio * len(dataset2))

train_size3 = int(train_ratio * len(dataset3))
val_size3 = int(val_ratio * len(dataset3))

# Creazione del set di training
train_pairs = dataset1.pairs[:train_size1] + \
              dataset2.pairs[:train_size2] + \
              dataset3.pairs[:train_size3]

# Creazione del set di validazione
val_pairs = dataset1.pairs[train_size1:train_size1 + val_size1] + \
            dataset2.pairs[train_size2:train_size2 + val_size2] + \
            dataset3.pairs[train_size3:train_size3 + val_size3]

# Creazione del set di test
test_pairs = dataset1.pairs[train_size1 + val_size1:] + \
             dataset2.pairs[train_size2 + val_size2:] + \
             dataset3.pairs[train_size3 + val_size3:]

# Applica data augmentation e salva il train dataset
# Applica data augmentation e salva il train dataset
for i, (image_path, mask_path) in enumerate(tqdm(train_pairs, desc="Augmenting train dataset")):
    augment_and_save(image_path, mask_path, os.path.join(train_image_path, str(i)), os.path.join(train_mask_path, str(i)))

# Salva il val dataset
for i, (image_path, mask_path) in enumerate(tqdm(val_pairs, desc="Saving val dataset")):
    image = Image.open(image_path)
    mask = Image.open(mask_path)
    save_image(image, val_image_path, f"{i}.png")
    save_image(mask, val_mask_path, f"{i}_SegmentationClass.png")

# Salva il test dataset
for i, (image_path, mask_path) in enumerate(tqdm(test_pairs, desc="Saving test dataset")):
    image = Image.open(image_path)
    mask = Image.open(mask_path)
    save_image(image, test_image_path, f"{i}.png")
    save_image(mask, test_mask_path, f"{i}_SegmentationClass.png")


DATASET SLPIT

In [None]:
import random
from torch.utils.data import random_split
from torchvision.transforms import functional as F

# magari rifare il dataloader con due cartelle
train_path = '/kaggle/input/big-ddd/Dataset_splittato/train_images'
train_mask_path = '/kaggle/input/big-ddd/Dataset_splittato/train_masks'

val_path = '/kaggle/input/big-ddd/Dataset_splittato/val_images'
val_mask_path = '/kaggle/input/big-ddd/Dataset_splittato/val_masks'

test_path = '/kaggle/input/big-ddd/Dataset_splittato/test_images'
test_mask_path = '/kaggle/input/big-ddd/Dataset_splittato/test_masks'

#normalize = transforms.Normalize(mean=[0.5], std=[0.5])
#normalize = transforms.Normalize(mean=[35.5, 35.2, 33.4], std=[21.8, 21.6, 20.9])

transform = transforms.Compose([
    
    transforms.ToTensor(),
    
    #transforms.Normalize(mean=[35.5, 35.2, 33.4], std=[21.8, 21.6, 20.9]),
    # Add other transforms here as needed
])

mask_transforms = transforms.Compose([
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.5], std=[0.5]),
    # Add other mask transformations here
])



# Create datasets for each split
train_dataset = ParkingLotDataset(train_path, train_mask_path, transforms=transform, mask_transforms=mask_transforms)
val_dataset = ParkingLotDataset(val_path, val_mask_path, transforms=transform, mask_transforms=mask_transforms)
test_dataset = ParkingLotDataset(test_path, test_mask_path, transforms=transform, mask_transforms=mask_transforms)

# Now you can create data loaders for each split
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False) 

Model

In [None]:
class SmallUNet_RGB(nn.Module):
    def __init__(self):
        super(SmallUNet_RGB, self).__init__()
        # Encoder
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.dropout1 = nn.Dropout(0.2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.dropout2 = nn.Dropout(0.2)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.dropout3 = nn.Dropout(0.2)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.dropout4 = nn.Dropout(0.2)
        self.maxpool = nn.MaxPool2d(2)

        # Decoder
        self.upconv3 = nn.Conv2d(384, 128, 3, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.dropout5 = nn.Dropout(0.2)
        self.upconv2 = nn.Conv2d(192, 64, 3, padding=1)
        self.bn6 = nn.BatchNorm2d(64)
        self.dropout6 = nn.Dropout(0.2)
        self.upconv1 = nn.Conv2d(96, 32, 3, padding=1)
        self.bn7 = nn.BatchNorm2d(32)
        self.dropout7 = nn.Dropout(0.2)
        self.final_conv = nn.Conv2d(32, 1, 1)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

    def forward(self, x):
        # Encoder
        conv1 = self.dropout1(F.relu(self.bn1(self.conv1(x))))
        x = self.maxpool(conv1)
        conv2 = self.dropout2(F.relu(self.bn2(self.conv2(x))))
        x = self.maxpool(conv2)
        conv3 = self.dropout3(F.relu(self.bn3(self.conv3(x))))
        x = self.maxpool(conv3)
        x = self.dropout4(F.relu(self.bn4(self.conv4(x))))

        # Decoder
        x = self.upsample(x)
        x = torch.cat([x, conv3], dim=1)
        x = self.dropout5(F.relu(self.bn5(self.upconv3(x))))
        x = self.upsample(x)
        x = torch.cat([x, conv2], dim=1)
        x = self.dropout6(F.relu(self.bn6(self.upconv2(x))))
        x = self.upsample(x)
        x = torch.cat([x, conv1], dim=1)
        x = self.dropout7(F.relu(self.bn7(self.upconv1(x))))
        out = self.final_conv(x)

        return out


In [None]:
import torch.nn.functional as F

def dice_loss(pred, target, smooth=1e-6):
    pred = pred.contiguous()
    target = target.contiguous()
    
    intersection = (pred * target).sum(dim=2).sum(dim=2)
    loss = (2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)
    
    return 1 - loss.mean()

class CombinedLoss(nn.Module):
    def __init__(self, weight_dice=0.5, weight_bce=0.5):
        super(CombinedLoss, self).__init__()
        self.weight_dice = weight_dice
        self.weight_bce = weight_bce

    def forward(self, outputs, targets):
        bce_loss = F.binary_cross_entropy_with_logits(outputs, targets)
        dice = dice_loss(torch.sigmoid(outputs), targets)
        return self.weight_bce * bce_loss + self.weight_dice * dice


Train

In [None]:
from torch.utils.data import DataLoader
from torch import optim
import torch.nn.functional as F
import wandb
from torch.optim.lr_scheduler import StepLR
from torch import save
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Start a new run
#from Architetture import SmallUNet_RGB

os.environ['WANDB_API_KEY'] = 'cf05b564865bb4bf8601ed59cbace5b02a587fa9'
#wandb.login('cf05b564865bb4bf8601ed59cbace5b02a587fa9')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = SmallUNet_RGB().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-2)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)
criterion = CombinedLoss()
epochs = 100
update_loss=1

run = wandb.init(
    #Set the project where this run will be logged
    project="Parking_lot_zones",
    # Track hyperparameters and run metadata
    config={
        "learning_rate": 1e-2,
        "epochs": epochs,
    },
    #entity='lorenzo_barbieri'
    entity='occelli-2127855'
)

for epoch in range(epochs):
    model.train()
    train_loss = 0
    for images, masks in train_loader:
        images = images.to(device)
        masks = masks.to(device, dtype=torch.float32)

        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)
    wandb.log({"Train Loss": train_loss})

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for images, masks in val_loader:
            images = images.to(device)
            masks = masks.to(device, dtype=torch.float32)

            outputs = model(images)
            loss = criterion(outputs, masks)    
            val_loss += loss.item()

    val_loss /= len(val_loader)
    if val_loss<update_loss:
        torch.save(model.state_dict(), '/kaggle/working/best_model_1_e2_no_labeled.pth')
        print("model saved")
        update_loss = val_loss
    wandb.log({"Validation Loss": val_loss})
    
    scheduler.step(val_loss)
    
    print(f"Epoch {epoch+1}, Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

Metrics definition

In [None]:
import numpy as np

def iou_score(pred, target):
    intersection = np.logical_and(pred, target)
    union = np.logical_or(pred, target)
    return np.sum(intersection) / np.sum(union)

def dice_coefficient(pred, target):
    intersection = np.sum(pred * target)
    return (2. * intersection) / (np.sum(pred) + np.sum(target))

def precision_score(pred, target):
    true_positive = np.sum(np.logical_and(pred, target))
    predicted_positive = np.sum(pred)
    return true_positive / predicted_positive if predicted_positive > 0 else 0

def recall_score(pred, target):
    true_positive = np.sum(np.logical_and(pred, target))
    actual_positive = np.sum(target)
    return true_positive / actual_positive if actual_positive > 0 else 0

Test

In [None]:
import torch
import os
from PIL import Image
import numpy as np

# Create directories to save images
save_dir = '/kaggle/working/segmentation_results_e2_no_label'
os.makedirs(os.path.join(save_dir, 'input_images'), exist_ok=True)
os.makedirs(os.path.join(save_dir, 'ground_truth'), exist_ok=True)
os.makedirs(os.path.join(save_dir, 'predictions'), exist_ok=True)

model = SmallUNet_RGB().to(device)
model.load_state_dict(torch.load('/kaggle/working/best_model_1_e2_no_labeled.pth'))
model.eval()

total = 0
correct = 0
total_iou = 0
total_dice = 0
total_precision = 0
total_recall = 0
num_samples = 0

with torch.no_grad():
    for batch_idx, batch in enumerate(test_loader):
        images, masks = batch[:2]
        images = images.to(device)
        masks = masks.to(device, dtype=torch.float32)
        outputs = model(images)
        predicted = (outputs > 0.5).float()
        
        total += masks.numel()
        correct += (predicted == masks).sum().item()
        
        # Calculate additional metrics and save images
        for i in range(images.size(0)):
            pred_np = predicted[i][0].cpu().numpy()
            mask_np = masks[i][0].cpu().numpy()
            
            total_iou += iou_score(pred_np, mask_np)
            total_dice += dice_coefficient(pred_np, mask_np)
            total_precision += precision_score(pred_np, mask_np)
            total_recall += recall_score(pred_np, mask_np)
            num_samples += 1
            
            # Save input image
            input_img = Image.fromarray((images[i].permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8))
            input_img.save(os.path.join(save_dir, 'input_images', f'input_{batch_idx}_{i}.png'))
            
            # Save ground truth mask
            gt_mask = Image.fromarray((mask_np * 255).astype(np.uint8))
            gt_mask.save(os.path.join(save_dir, 'ground_truth', f'gt_{batch_idx}_{i}.png'))
            
            # Save predicted mask
            pred_mask = Image.fromarray((pred_np * 255).astype(np.uint8))
            pred_mask.save(os.path.join(save_dir, 'predictions', f'pred_{batch_idx}_{i}.png'))

accuracy = 100 * correct / total
mean_iou = total_iou / num_samples
mean_dice = total_dice / num_samples
mean_precision = total_precision / num_samples
mean_recall = total_recall / num_samples

print(f"Accuracy on test set: {accuracy:.2f}%")
print(f"Mean IoU: {mean_iou:.4f}")
print(f"Mean Dice Coefficient: {mean_dice:.4f}")
print(f"Mean Precision: {mean_precision:.4f}")
print(f"Mean Recall: {mean_recall:.4f}")

# Log metrics to wandb
wandb.log({
    "Test Accuracy": accuracy,
    "Mean IoU": mean_iou,
    "Mean Dice Coefficient": mean_dice,
    "Mean Precision": mean_precision,
    "Mean Recall": mean_recall
})

print(f"Images saved in {save_dir}")

wandb.finish()
import os
import zipfile

def zipdir(path, ziph):
    for root, dirs, files in os.walk(path):
        for file in files:
            ziph.write(os.path.join(root, file), 
                       os.path.relpath(os.path.join(root, file), 
                                       os.path.join(path, '..')))

output_dir = '/kaggle/working/segmentation_results_e2_no_label'  # La directory che vuoi scaricare
zipf = zipfile.ZipFile('/kaggle/working/segmentation_results_no_label.zip', 'w', zipfile.ZIP_DEFLATED)
zipdir(output_dir, zipf)
zipf.close()