<h1><center> Training a prompt model

# 1- Dataset

In [2]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np
import rasterio
import os 
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


In [3]:
data_dir = "split"

train_img_dir = os.path.join(data_dir, 'train', 'img')
test_img_dir = os.path.join(data_dir, 'val', 'img')
train_msk_dir = os.path.join(data_dir, 'train', 'msk')
test_msk_dir = os.path.join(data_dir, 'val', 'msk')

In [14]:
def calculate_ndwi(img):
    # Sentinel-2 image bands
    # {"Blue": 0, "Green": 1, "Red": 2, "NIR": 3, "SWIR1": 4, "SWIR2": 5}
    # Extract bands
    red_band = img[2]
    nir_band = img[3]
    
    # Calculate NDVI
    ndvi = (nir_band.astype(float) - red_band.astype(float)) / (nir_band + red_band)

    # Handle division by zero
    ndvi = np.nan_to_num(ndvi, nan=0.0)

    return ndvi

In [39]:
import os
import numpy as np
import rasterio
import tqdm 

def load_images_and_masks(img_folder, mask_folder):
    images = []
    masks = []
    img_filenames = [f for f in os.listdir(img_folder) if f.endswith('.tif') and "img" in f]
    
    for filename in tqdm.tqdm(img_filenames):
        # Charger l'image
        img_path = os.path.join(img_folder, filename)
        with rasterio.open(img_path) as src:
            image = src.read()
            image = calculate_ndwi(image)
        
        mask_filename = filename.replace("img", "msk")  # Supposition sur la convention de nommage
        mask_path = os.path.join(mask_folder, mask_filename)
        with rasterio.open(mask_path) as src:
            mask = src.read()
        
        images.append(image)
        masks.append(mask)
                
    # Empiler les images et les masques pour obtenir des arrays numpy
    images_stacked = np.stack(images)
    masks_stacked = np.stack(masks)
    
    return images_stacked, masks_stacked


X_train, Y_train = load_images_and_masks(train_img_dir, train_msk_dir)

X_test, Y_test = load_images_and_masks(test_img_dir, test_msk_dir)



y_train = Y_train.squeeze()
y_test = Y_test.squeeze()


  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
100%|██████████| 13158/13158 [01:37<00:00, 134.74it/s]
100%|██████████| 6113/6113 [00:43<00:00, 140.62it/s]


In [38]:
print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
print("y_train shape:", Y_train.shape)
print("y_test shape:", Y_test.shape)

X_train shape: (13158, 256, 256)
X_test shape: (6113, 256, 256)
y_train shape: (13158, 1, 256, 256)
y_test shape: (6113, 1, 256, 256)


In [46]:
from torch.utils.data import DataLoader, TensorDataset
import torch

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long) 

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)
X_train_tensor = X_train_tensor.unsqueeze(1)
X_test_tensor = X_test_tensor.unsqueeze(1)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

KeyboardInterrupt: 

In [49]:
print("train_loader:", len(train_loader))
print(train_loader.dataset.tensors[1].shape)

train_loader: 412
torch.Size([13158, 256, 256])


# 2- Model

In [41]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


In [42]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleUNet(nn.Module):
    def __init__(self, in_channels=1, out_channels=1):
        super(SimpleUNet, self).__init__()

        # Encodeur
        self.enc1 = self.contract_block(in_channels, 32, 3, 1)
        self.enc2 = self.contract_block(32, 64, 3, 1)

        # Partie centrale
        self.bottleneck = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, 3, padding=1),
            nn.ReLU(inplace=True)
        )

        # Décodeur
        self.dec1 = self.expand_block(128, 32, 3, 1)
        self.final = nn.Sequential(
            nn.Conv2d(64, 32, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, out_channels, 3, padding=1)
        )

    def forward(self, x):
        # Encodeur
        e1 = self.enc1(x)
        e2 = self.enc2(e1)

        # Bottleneck
        b = self.bottleneck(e2)

        # Décodeur
        d1 = torch.cat([e2, F.interpolate(b, scale_factor=2, mode='bilinear', align_corners=True)], 1)
        d2 = self.dec1(d1)
        out = self.final(torch.cat([e1, F.interpolate(d2, scale_factor=2, mode='bilinear', align_corners=True)], 1))
        return out

    def contract_block(self, in_channels, out_channels, kernel_size, padding):
        contract = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=2, padding=padding),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, padding=padding),
            nn.ReLU(inplace=True),
        )
        return contract

    def expand_block(self, in_channels, out_channels, kernel_size, padding):
        expand = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, padding=padding),
            nn.ReLU(inplace=True)
        )
        return expand


In [43]:
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}')


In [45]:
model = SimpleUNet(in_channels=32, out_channels=1)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

train_model(model, train_loader, criterion, optimizer)


NotImplementedError: Got 3D input, but bilinear mode needs 4D input