<a href="https://colab.research.google.com/github/federicOO1/LAB-IA/blob/main/Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## import librerie


In [1]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import datasets
import matplotlib.pyplot as plt
from torchvision import transforms
import torch.nn as nn
import os
import numpy as np
import PIL
import warnings
from tqdm import tqdm
import torch.optim as optim
import albumentations
warnings.filterwarnings('ignore')

In [2]:
!pip install rasterio
import rasterio
from rasterio.plot import reshape_as_image



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
os.chdir("/content/drive/MyDrive/PotsdamDataset")

In [5]:
class PotsdamDataset(Dataset):
    def __init__(self, dataset_folder, transform=None):
        self.dataset_folder = dataset_folder
        self.image_paths = []
        self.world_file_paths = []
        self.mask_paths = []
        self.transform = transform

        # Leggi i percorsi delle immagini, dei file .tfw e delle maschere
        end_RGBIR_folder = os.listdir(dataset_folder)[0]
        end_LABELS_folder = os.listdir(dataset_folder)[1]

        RGBIR_folder = data_folder + '/' + end_RGBIR_folder
        LABELS_folder = data_folder + '/' + end_LABELS_folder

        for file_name in os.listdir(RGBIR_folder):
            if file_name.endswith('.tif'):
                image_path = os.path.join(RGBIR_folder, file_name)
                world_file_path = os.path.join(RGBIR_folder, file_name.replace('.tif', '.tfw'))

                if os.path.exists(world_file_path):
                    self.image_paths.append(image_path)
                    self.world_file_paths.append(world_file_path)


        for label_name in os.listdir(LABELS_folder):
            mask_path = os.path.join(LABELS_folder, label_name)

            if os.path.exists(mask_path):
              self.mask_paths.append(mask_path)
        self.image_paths.sort()
        self.world_file_paths.sort()
        self.mask_paths.sort()

    def __len__(self):
        return len(self.image_paths)

    def get_image_paths(self, indices):
        return [self.image_paths[idx] for idx in indices]

    def get_mask_paths(self, indices):
        return [self.mask_paths[idx] for idx in indices]

    def load_world_file(self, world_file_path):
          lines = open(world_file_path).readlines()
          try:
              parameters = [float(line.strip()) for line in lines if line.strip()]
              if len(parameters) == 6:
                  return parameters
              else:
                  raise ValueError("Il file .tfw non contiene 6 parametri.")
          except Exception as e:
              print(f"Errore durante la lettura dei parametri di georeferenziazione: {str(e)}")
              return None

    def convert_labels_to_tensor(self, rgb_label):
        # Definisci i colori delle classi nella maschera RGB
        colors_to_labels = {
            (255, 255, 255): 0,  # Impervious surfaces -> Classe 0
            (0, 0, 255): 1,      # Building -> Classe 1
            (0, 255, 255): 2,    # Low vegetation -> Classe 2
            (0, 255, 0): 3,      # Tree -> Classe 3
            (255, 255, 0): 4,    # Car -> Classe 4
            (255, 0, 0): 5       # Clutter/background -> Classe 5
        }

        # Trasponi l'array per avere le dimensioni (6000, 6000, 3)
        transposed_label = np.transpose(rgb_label, (1, 2, 0))

        # Crea un array vuoto per le etichette di classe
        class_label = np.zeros((6000, 6000), dtype=np.int64)

        # Confronta ogni pixel con i valori nel dizionario colors_to_labels

        for color, label in colors_to_labels.items():
            mask = np.all(transposed_label == np.array(color).reshape(1, 1, 3), axis=-1)
            class_label[mask] = label

        # Crea il tensore delle etichette di classe
        class_label_tensor = torch.tensor(class_label, dtype=torch.long)

        return class_label_tensor

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        world_file_path = self.world_file_paths[idx]
        mask_path = self.mask_paths[idx]

        # Carica l'immagine TIFF utilizzando la libreria rasterio
        image = rasterio.open(image_path).read()

        # Carica i parametri di georeferenziazione dal file .tfw
        world_params = self.load_world_file(world_file_path)

        # Carica la maschera con rasterio
        mask = rasterio.open(mask_path).read()

        # Converti la maschera RGB nel formato appropriato per CrossEntropyLoss
        label = self.convert_labels_to_tensor(mask)

        image = torch.from_numpy(image)

        # Estrai mean e std dalla lista di trasformazione, se presente
        if self.transform and isinstance(self.transform, list) and len(self.transform) == 2:
            mean, std = self.transform
            # Applica la trasformazione di normalizzazione se sono presenti mean e std
            image = self.normalize(image, mean, std)


        # Restituisci l'immagine, la maschera e i parametri di georeferenziazione
        return image, label, world_params

    def normalize(self, image, mean=None, std=None):
        if mean is not None and std is not None:
            # Converte l'immagine in float
            image = image.float()

            # Normalizza per canale
            for c in range(image.size(0)):
                image[c] = (image[c] - mean[c]) / std[c]

        return image

In [6]:
# Definisci il percorso della cartella contenente i dati .tif e .tfw
data_folder = "/content/drive/MyDrive/PotsdamDataset"

# Crea un'istanza del dataset
dataset = PotsdamDataset(data_folder)

size = len(dataset)
train_size = int(0.7 * size)
val_size = int(0.15 * size)
test_size = int(size - train_size - val_size)

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

batch_size = 2

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [7]:
##
num_channels = 4  # RGBIR has 4 channels
dtype = torch.float32  # Imposta il tipo di dato desiderato

# placeholders
psum = torch.zeros(num_channels, dtype=dtype)
psum_sq = torch.zeros(num_channels, dtype=dtype)
count = 0

# loop through images
for inputs, labels, georeference_info in tqdm(train_loader):
    # Converti il tensore in tipo di dato a precisione maggiore
    inputs = inputs.to(dtype)

    psum += inputs.sum(axis = [0, 2, 3])
    psum_sq += (inputs ** 2).sum(axis = [0, 2, 3])

    # Conteggio dei pixel
    count += inputs.size(0) * inputs.size(2) * inputs.size(3)

total_mean = psum / count

# Calcola la varianza per canale
total_var = (psum_sq / count) - (total_mean ** 2)

# Calcola la deviazione standard per canale
total_std = torch.sqrt(total_var)
# output
print('mean per channel: ' + str(total_mean))
print('std per channel: ' + str(total_std))


100%|██████████| 13/13 [02:23<00:00, 11.00s/it]

mean per channel: tensor([86.1248, 92.1221, 85.4812, 96.6683])
std per channel: tensor([35.3596, 34.9417, 36.3786, 35.6384])





In [8]:
dataset_with_transform = PotsdamDataset(data_folder, transform=[total_mean, total_std])

In [9]:

size_with_transform = len(dataset_with_transform)

# Specifica la percentuale di divisione tra training, validation e test
train_percentage = 0.7
val_percentage = 0.15
test_percentage = 0.15

# Calcola le dimensioni dei set di addestramento, validazione e test
train_size_with_transform = int(train_percentage * size_with_transform)
val_size_with_transform = int(val_percentage * size_with_transform)
test_size_with_transform = size_with_transform - train_size_with_transform - val_size_with_transform

# Crea i set di addestramento, validazione e test
train_dataset_with_transform, val_dataset_with_transform, test_dataset_with_transform = random_split(
    dataset_with_transform, [train_size_with_transform, val_size_with_transform, test_size_with_transform]
)

# Definisci la dimensione del batch
batch_size = 2

# Crea i DataLoader per i set di addestramento, validazione e test
train_loader_with_transform = DataLoader(train_dataset_with_transform, batch_size=batch_size, shuffle=True)
val_loader_with_transform = DataLoader(val_dataset_with_transform, batch_size=batch_size, shuffle=False)
test_loader_with_transform = DataLoader(test_dataset_with_transform, batch_size=batch_size, shuffle=False)


##Implementazione UNET

In [10]:
import torch
import torch.nn as nn

class DoubleConvolution(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConvolution, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels, features=[64,128,256,512]):
        super(UNet, self).__init__()

        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        for feature in features:
          self.downs.append(DoubleConvolution(in_channels, feature))
          in_channels = feature

        for feature in reversed(features):
          self.ups.append(nn.ConvTranspose2d(feature*2, feature, kernel_size=2, stride=2))
          self.ups.append(DoubleConvolution(feature*2, feature))

        self.bottleneck = DoubleConvolution(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)


    def forward(self, x):

      skip_connections = []
      for down in self.downs:
        x = down(x)
        skip_connections.append(x)
        x = self.pool(x)

      x = self.bottleneck(x)
      skip_connections = skip_connections[::-1]

      for i in range(0,len(self.ups),2):
        x = self.ups[i](x)
        skip_connection = skip_connections[i//2]

        if x.shape != skip_connection.shape:
          x = transforms.functional.resize(x, size=skip_connection.shape[2:])

        concat_skip = torch.cat((skip_connection,x), dim=1)
        x = self.ups[i+1](concat_skip)

      return self.final_conv(x)



In [None]:
model = UNet(in_channels=4, out_channels=6)
preds = model(i)
print(preds.shape)
print(i.shape)
assert preds.shape == i.shape

##Training part

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

def train_function(loader, model, optimizer, loss_function, scaler):
  for idx, batch in enumerate(tqdm(loader)):
    images, labels, geo_inf = batch
    images = images.to(DEVICE)
    labels = labels.to(DEVICE)

    with torch.cuda.amp.autocast():
      predictions = model(images)
      loss = loss_function(predictions, labels)

    optimizer.zero_grad()
    scaler(loss).backward()
    scaler.step(optimizer)
    scaler.update()

In [63]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm

# Imposta i parametri di riduzione delle dimensioni
image_height, image_width = 256, 256  # Dimensioni ridotte
subset_percentage = 1

# Carica il dataset originale
dataset = PotsdamDataset(data_folder,transform=[total_mean, total_std])

# Riduci la dimensione del dataset
subset_size = int(len(dataset) * subset_percentage)
subset_dataset, _ = random_split(dataset, [subset_size, len(dataset) - subset_size])

# Carica il subset del dataset
train_size = int(0.7 * len(subset_dataset))
val_size = int(0.15 * len(subset_dataset))
test_size = len(subset_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(subset_dataset, [train_size, val_size, test_size])

# Crea i DataLoader
batch_size = 2
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

resize_transform = transforms.Resize((image_height, image_width))  # Imposta new_height e new_width desiderati



In [64]:
resize_transform = transforms.Resize((image_height, image_width))  # Imposta new_height e new_width desiderati

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Definisci il modello, il criterio di perdita e l'ottimizzatore
model = UNet(in_channels=4, out_channels=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Imposta il numero di epoche
num_epochs = 5

# Ciclo di addestramento
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels, _ in tqdm(train_loader):
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        inputs_resized = torch.stack([resize_transform(img) for img in inputs])
        labels_resized = torch.stack([resize_transform(lbl.unsqueeze(0)).squeeze(0) for lbl in labels])
        optimizer.zero_grad()
        outputs = model(inputs_resized)
        loss = criterion(outputs, labels_resized)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}')


100%|██████████| 13/13 [05:32<00:00, 25.54s/it]


Epoch 1/5, Loss: 1.3190110646761382


100%|██████████| 13/13 [05:31<00:00, 25.51s/it]


Epoch 2/5, Loss: 1.1055783675267146


100%|██████████| 13/13 [05:34<00:00, 25.73s/it]


Epoch 3/5, Loss: 1.0236014769627497


100%|██████████| 13/13 [05:32<00:00, 25.57s/it]


Epoch 4/5, Loss: 0.9454069504371057


100%|██████████| 13/13 [05:28<00:00, 25.27s/it]

Epoch 5/5, Loss: 0.9460139641394982





In [65]:
model.eval()
# Lista per memorizzare le predizioni
all_predictions = []

# Ciclo attraverso i dati di test
with torch.no_grad():
    for inputs, _, _ in tqdm(test_loader):
        inputs = inputs.to(DEVICE)
        inputs_resized = torch.stack([resize_transform(img) for img in inputs])
        outputs = model(inputs_resized)

        # Aggiungi le predizioni alla lista
        all_predictions.append(outputs.cpu().numpy())

# Concatena le predizioni lungo l'asse del batch
all_predictions = np.concatenate(all_predictions, axis=0)

100%|██████████| 4/4 [00:53<00:00, 13.50s/it]


In [41]:
np.argmax(all_predictions, axis=1).reshape(-1)


array([3, 3, 3, ..., 3, 3, 3])

In [66]:
from sklearn.metrics import accuracy_score

all_labels_resized = []

# Ciclo attraverso i dati di test
with torch.no_grad():
    for _, labels, _ in tqdm(test_loader):
        labels_resized = torch.stack([resize_transform(lbl.unsqueeze(0)).squeeze(0) for lbl in labels])
        all_labels_resized.append(labels_resized.numpy().flatten())

# Concatena le etichette ridotte lungo l'asse del batch
flat_labels_resized = np.concatenate(all_labels_resized)

# Converte le predizioni in classi (argmax lungo l'asse delle classi)
predicted_classes = np.argmax(all_predictions, axis=1)

# Flatten delle predizioni e delle etichette reali
flat_predictions = predicted_classes.reshape(-1)

# Calcolo dell'accuratezza
accuracy = accuracy_score(flat_labels_resized, flat_predictions)
print(f'\nAccuracy: {accuracy}')



100%|██████████| 4/4 [00:43<00:00, 10.92s/it]


Accuracy: 0.6360495431082589





In [53]:
all_labels_resized[0][:10]

array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

In [57]:
predicted_classes[0]

array([[3, 3, 3, ..., 3, 3, 3],
       [3, 3, 3, ..., 3, 3, 3],
       [3, 2, 2, ..., 3, 3, 3],
       ...,
       [3, 3, 3, ..., 3, 3, 3],
       [3, 3, 3, ..., 3, 3, 3],
       [3, 3, 3, ..., 3, 3, 3]])

In [None]:
def main():
  train_transform = albumentations.Compose([
      albumentations.Normalize(mean=total_mean, std=total_std, max_pixel_value=255.0),
      albumentations.ToTensorV2()
  ])
  val_transform = albumentations.Compose([
      albumentations.Normalize(mean=total_mean, std=total_std, max_pixel_value=255.0),
      albumentations.ToTensorV2()
  ])
  test_transform = albumentations.Compose([
      albumentations.Normalize(mean=total_mean, std=total_std, max_pixel_value=255.0),
      albumentations.ToTensorV2()
  ])
  model = UNet(in_channels=4, out_channels=6).to(DEVICE)
  loss_function = nn.CrossEntropyLoss()

In [None]:
if __name__ == "__main__":
  main()

AttributeError: module 'albumentations' has no attribute 'ToTensorV2'