# Deep learning for dynamic network analysis (DLDNA) - Project

Dolphins: R. ARNAUD M. DELPLANQUE A. KARILA-COHEN A. RAMPOLDI

Comprehensive soil classification dataset: https://www.kaggle.com/datasets/ai4a-lab/comprehensive-soil-classification-datasets/code

CNN puis GAN puis CyGAN

Binary classification : Binary CrossEntropy Loss $ \mathcal{L}_{\text{BCE}}(y,\hat y)
= - \left[ y \log(\hat y) + (1-y)\log(1-\hat y) \right]
 $ 

Rappel: le learning rate $ \alpha $ est le pas de mise à jour lors de la descente de gradient. Formule de la descente de gradient $ L(\theta_{n+1})= L(\theta_{n})-\alpha \nabla  L(\theta_{n}) $

Params loaded. Device: cpu


In [16]:
root_dir = '..'
data_dir = os.path.join(root_dir, 'data')
images_dir = os.path.join(data_dir, 'Orignal-Dataset')
root_dir, data_dir, images_dir

('..', '..\\data', '..\\data\\Orignal-Dataset')

In [17]:
# Function to load all images from a folder

def load_images_from_folder(folder_path, resize=None):
    """
    Load all images from a folder
    
    Args:
        folder_path: path to the folder
        resize: tuple (width, height) to resize images, None to skip resizing
    
    Returns:
        list: list of tuples (image, filename)
    """
    images = []
    if not os.path.exists(folder_path):
        print(f"Folder {folder_path} does not exist")
        return images
    
    for filename in os.listdir(folder_path):
        if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.tiff')):
            filepath = os.path.join(folder_path, filename)
            try:
                # Use PIL to load image and convert to numpy array (BGR format for OpenCV compatibility)
                img = Image.open(filepath).convert('RGB')
                img_array = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
                
                if resize:
                    img_array = cv2.resize(img_array, resize)
                images.append((img_array, filename))
            except Exception as e:
                print(f"Failed to load {filename}: {e}")
    
    return images

# Load images from Orignal-Dataset
soil_types = ['Alluvial_Soil', 'Arid_Soil', 'Black_Soil', 'Laterite_Soil', 
              'Mountain_Soil', 'Red_Soil', 'Yellow_Soil']

images_dict = {}
for soil_type in soil_types:
    folder_path = os.path.join(data_dir, 'Orignal-Dataset', soil_type)
    images_dict[soil_type] = load_images_from_folder(folder_path)
    print(f"{soil_type}: {len(images_dict[soil_type])} images loaded")

Alluvial_Soil: 51 images loaded
Arid_Soil: 284 images loaded
Black_Soil: 255 images loaded
Laterite_Soil: 219 images loaded
Mountain_Soil: 201 images loaded
Red_Soil: 109 images loaded
Yellow_Soil: 69 images loaded


In [24]:
import cv2
import numpy as np

soil_types = [
    'Alluvial_Soil', 'Arid_Soil', 'Black_Soil',
    'Laterite_Soil', 'Mountain_Soil',
    'Red_Soil', 'Yellow_Soil'
]

IMG_W, IMG_H = 80, 80
PAD = 10
LABEL_W = 180
FONT = cv2.FONT_HERSHEY_SIMPLEX

rows = []

for soil in soil_types:
    imgs = images_dict.get(soil, [])
    row_imgs = []

    # zone label à gauche
    label_img = np.zeros((IMG_H, LABEL_W, 3), dtype=np.uint8)
    cv2.putText(
        label_img,
        soil,
        (10, IMG_H // 2),
        FONT,
        0.5,
        (255, 255, 255),
        1,
        cv2.LINE_AA
    )

    row_imgs.append(label_img)

    for i in range(min(5, len(imgs))):
        img, _ = imgs[i]
        img_resized = cv2.resize(img, (IMG_W, IMG_H))
        row_imgs.append(img_resized)

    # padding images manquantes
    while len(row_imgs) < 1 + 5:
        row_imgs.append(np.zeros((IMG_H, IMG_W, 3), dtype=np.uint8))

    # ajouter padding horizontal
    padded_row = []
    for im in row_imgs:
        padded_row.append(im)
        padded_row.append(np.zeros((IMG_H, PAD, 3), dtype=np.uint8))

    rows.append(cv2.hconcat(padded_row[:-1]))  # enlever dernier padding

# padding vertical entre lignes
final_rows = []
for r in rows:
    final_rows.append(r)
    final_rows.append(np.zeros((PAD, r.shape[1], 3), dtype=np.uint8))

grid = cv2.vconcat(final_rows[:-1])

cv2.imshow("Soil types – 5 images per category", grid)
cv2.waitKey(0)
cv2.destroyAllWindows()


Penser à convertir les X et y en tenseur torch avant de procéder au datasplit via ``sklearn.model_selection.train_test_split`` <br>
```X = torch.FloatTensor(X) ``` et ```y = torch.FloatTensor(y) ``` <br

In [None]:
# from sklearn.model_selection import train_test_split

# # First split: separate out test set (30% of total)
# X_train, X_temp, y_train, y_temp = train_test_split(
#     X, y, test_size=0.3, random_state=SEED  # 70% train, 30% temp
# )
# # Second split: split temp into validation (10%) and test (20%)
# X_val, X_test, y_val, y_test = train_test_split(
#     X_temp, y_temp, test_size=2/3, random_state=SEED  # 10% val, 20% test
# )
# print(f"Training samples: {len(X_train)}")  # Show split sizes
# print(f"Validation samples: {len(X_val)}")
# print(f"Test samples: {len(X_test)}")

**Notes**
Standardizer les data pour RGB et passer de 0 à 255 à 0 à 1

**My first CNN**

In [28]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        # First convolutional layer: 3 input channels -> 32 output channels
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        # Second convolutional layer: 32 -> 64 channels
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        # Max pooling layer: reduces spatial dimensions by 2
        self.pool = nn.MaxPool2d(2)
        # First fully connected layer: 64*32*3 flattened features -> 128
        self.fc1 = nn.Linear(64 * 32 * 32, 128)
        # Output layer: 128 -> 10 classes
        self.fc2 = nn.Linear(128, 7)
        
    def forward(self, x):
        # First conv block: apply conv -> relu -> pool
        x = self.pool(F.relu(self.conv1(x)))
        # Second conv block: apply conv -> relu -> pool
        x = self.pool(F.relu(self.conv2(x)))
        # Flatten for fully connected layers: (batch, 64, 32, 32) -> (batch, 64*32*32)
        x = x.view(x.size(0), -1)
        # First FC layer with relu activation
        x = F.relu(self.fc1(x))
        # Output layer (no activation - CrossEntropyLoss handles softmax)
        return self.fc2(x)

**Definition of the MLP**

In [41]:
model = SimpleCNN().to(device)  # Create model and move to device
regression_loss_fn = nn.CrossEntropyLoss()  # Binary Cross Entropy loss
regression_optimizer = Adam(model.parameters(), lr=LEARNING_RATE)  # Adam optimizer

**Grid search (hyperparameters optimisation for Adam GD): hidden size and learning rate**

If the learning rate is too big, the gradient descent cannot be stable. In a contrary if it is too small, the learning can be slow

**Training with validation**

In [None]:
def train_with_validation(model, train_loader, val_loader, optimizer, loss_fn, device, epochs=100):
    """Train the model with validation monitoring."""
    train_losses = []  # Track training losses
    val_losses = []  # Track validation losses
    
    for epoch in range(1, epochs + 1):
        # Training phase
        model.train()  # Set to training mode
        train_loss = 0.0  # Reset epoch loss
        for xb, yb in train_loader:
            xb, yb = xb.to(device),  yb.to(device)  # Move to device
            optimizer.zero_grad()  # Reset gradients
            preds = model(xb)  # Forward pass
            loss = loss_fn(preds, yb)  # Compute loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights
            train_loss += loss.item()  # Accumulate loss
        train_loss /= len(train_loader)  # Average loss
        train_losses.append(train_loss)  # Store loss
        
        # Validation phase
        model.eval()  # Set to evaluation mode
        val_loss = 0.0  # Reset validation loss
        with torch.no_grad():  # Disable gradient computation
            for xb, yb in val_loader:
                xb, yb = xb.to(device),  yb.to(device)  # Move to device
                preds = model(xb)  # Forward pass
                val_loss += loss_fn(preds, yb).item()  # Compute loss
            val_loss /= len(val_loader)  # Average loss
            val_losses.append(val_loss)  # Store loss
            
        if epoch % 10 == 0 or epoch == 1:  # Print every 10 epochs
            print(f"Epoch {epoch:03d} | Training Loss: {train_loss:.4f} | Validation Loss: {val_loss:.4f}")
    
    return model, train_losses, val_losses

**Test model**

In [None]:
def test_model(model, test_loader, loss_fn, device):
    """Perform final testing on the model using the held-out test set."""
    model.eval()  # Set to evaluation mode
    test_loss = 0.0  # Initialize test loss
    all_preds = []  # Store all predictions
    all_actuals = []  # Store all actual values
    
    with torch.no_grad():  # Disable gradient computation
        for xb, yb in test_loader:
            xb = xb.to(device)  # Move batch to device
            yb = yb.to(device)  # Move labels to device
            preds = model(xb)  # Forward pass
            test_loss += loss_fn(preds, yb).item()  # Accumulate loss
            all_preds.append(preds)  # Store predictions
            all_actuals.append(yb)  # Store actuals
            
    test_loss /= len(test_loader)  # Average loss
    all_preds = torch.cat(all_preds)  # Concatenate all predictions
    all_actuals = torch.cat(all_actuals)  # Concatenate all actuals
    
    return test_loss, all_preds, all_actuals

**Test function**

In [None]:
learning_rates = [1e-1, 1e-2, 1e-3, 1e-4, 5e-4] 
hidden_sizes_options = [(32,16),(64,32), (128,64)]

def grid_search_hyperparameters(
    train_loader,
    val_loader,
    learning_rates,
    hidden_sizes_options,
    device,
    epochs=10,
    base_model=None,
    model_fn=None,
    save_path="best_model.pth",
):
    """
    Grid search over learning rates and hidden layer sizes.
    - model_fn: callable taking hidden_sizes (e.g., (h1, h2)) and returning an nn.Module (on CPU).
    - hidden_sizes_options: list of tuples like [(64,32), (128,64), ...]
    Saves the globally best model (by validation accuracy) to 'save_path'.
    Returns: (results_list, best_cfg_dict, best_model_loaded)
    """
    assert model_fn is not None, "Provide model_fn(hidden_sizes) -> nn.Module"

    results = []  # Store all results
    best_val_acc = -1.0  # Track best validation accuracy
    best_cfg = None  # Track best configuration
    best_state = None  # Track best model state

    for lr in learning_rates:  # Loop over learning rates
        for hidden_sizes in hidden_sizes_options:  # Loop over architectures
            print(f"Testing: lr={lr}, hidden_sizes={hidden_sizes}")

            model = model_fn(hidden_sizes).to(device)  # Create model
            optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # Create optimizer
            loss_fn = nn.CrossEntropyLoss()  # Define loss function

            # Train model
            _, _, _, train_accuracies, val_accuracies = train_with_validation(
                model=model,  # Model to train
                train_loader=train_loader,  # Training data
                val_loader=val_loader,  # Validation data
                optimizer=optimizer,  # Optimizer
                loss_fn=loss_fn,  # Loss function
                device=device,  # Device
                epochs=epochs,  # Number of epochs
                task_type='classification'  # Task type
            )

            cur_best_val = max(val_accuracies)  # Get best validation accuracy

            # Store results
            results.append({
                'lr': lr,  # Learning rate
                'hidden_sizes': hidden_sizes,  # Architecture
                'best_val_acc': cur_best_val,  # Best validation accuracy
                'final_train_acc': train_accuracies[-1],  # Final training accuracy
                'final_val_acc': val_accuracies[-1]  # Final validation accuracy
            })

            print(f"Best validation accuracy: {cur_best_val:.2f}%")

            # Update best model if this is better
            if cur_best_val > best_val_acc:
                best_val_acc = cur_best_val  # Update best accuracy
                best_cfg = {'lr': lr, 'hidden_sizes': hidden_sizes}  # Update best config
                best_state = {k: v.cpu() for k, v in model.state_dict().items()}  # Save state to CPU
                if save_path is not None:
                    torch.save(best_state, save_path)  # Save to disk
                    print(f"Saved new best model to: {save_path}")

            del model  # Free memory
            if torch.cuda.is_available():
                torch.cuda.empty_cache()  # Clear CUDA cache

    # Rebuild best model
    best_model = None
    if best_state is not None:
        best_model = model_fn(best_cfg['hidden_sizes']).to(device)  # Create model
        best_model.load_state_dict(best_state)  # Load best weights


    return results, best_cfg, best_model


**Train validate and test**

In [None]:
# Step 1: Train the model with validation monitoring
model, train_losses, val_losses = train_with_validation(
    model=regression_model,  # Model to train
    train_loader=train_loader,  # Training data
    val_loader=val_loader,  # Validation data
    optimizer=regression_optimizer,  # Optimizer
    loss_fn=regression_loss_fn,  # Loss function
    device=device,  # Device (CPU/GPU)
    epochs=100  # Number of epochs
)

# Step 2: Perform final testing on held-out test set
test_loss, test_preds, test_actuals = test_model(
    model=model,  # Trained model
    test_loader=test_loader,  # Test data
    loss_fn=regression_loss_fn,  # Loss function
    device=device  # Device
)

# Print evaluation metrics
avg_train_loss = sum(train_losses) / len(train_losses)  # Calculate average training loss
avg_val_loss = sum(val_losses) / len(val_losses)  # Calculate average validation loss
print(f"Average Training MSE: {avg_train_loss:.4f} | "
      f"Average Validation MSE: {avg_val_loss:.4f} | "
      f"Test MSE: {test_loss:.4f}")

**Training and validation visualization**

In [None]:
# Plot the training loss and validation loss over epochs
def plot_training_results(train_losses, val_losses):
    plt.figure(figsize=(10, 6))  # Create figure
    plt.plot(train_losses, label='Training Loss', color='blue', linewidth=2)  # Plot training
    plt.plot(val_losses, label='Validation Loss', color='red', linewidth=2)  # Plot validation
    plt.xlabel('Epoch')  # X-axis label
    plt.ylabel('Loss')  # Y-axis label
    plt.title('Training and Validation Loss Over Epochs')  # Title
    plt.legend()  # Show legend
    plt.grid(True, alpha=0.3)  # Add grid
    plt.show()  # Display plot

plot_training_results(train_losses, val_losses)  # Plot results

In [32]:
# Standardize the data

def standardize_image(image):
    """
    Standardize an image to have zero mean and unit variance per channel.
    
    Args:
        image: numpy array of shape (H, W, C)

    Returns:
        standardized_image: numpy array of same shape as input
    """
    standardized_image = np.zeros_like(image, dtype=np.float32)
    for c in range(image.shape[2]):
        channel = image[:, :, c]
        mean = np.mean(channel)
        std = np.std(channel)
        if std > 0:
            standardized_image[:, :, c] = (channel - mean) / std
        else:
            standardized_image[:, :, c] = channel - mean  # Avoid division by zero
    return standardized_image

# Standardize all images in the dataset
standardized_images_dict = {}
for soil_type, images in images_dict.items():
    standardized_images = []
    for img, filename in images:
        standardized_img = standardize_image(img)
        standardized_images.append((standardized_img, filename))
    standardized_images_dict[soil_type] = standardized_images

In [33]:
# Create train set, test set and validation set

def split_dataset(images_dict, train_ratio=0.7, val_ratio=0.1, test_ratio=0.2):
    """
    Split the dataset into training, validation, and test sets.
    
    Args:
        images_dict: dictionary with soil types as keys and lists of (image, filename) tuples as values
        train_ratio: proportion of data to use for training
        val_ratio: proportion of data to use for validation
        test_ratio: proportion of data to use for testing
    Returns:
        train_set, val_set, test_set: dictionaries with the same structure as images_dict
    """
    train_set = {}
    val_set = {}
    test_set = {}
    
    for soil_type, images in images_dict.items():
        np.random.shuffle(images)  # Shuffle images to ensure randomness
        total_images = len(images)
        train_end = int(total_images * train_ratio)
        val_end = train_end + int(total_images * val_ratio)
        
        train_set[soil_type] = images[:train_end]
        val_set[soil_type] = images[train_end:val_end]
        test_set[soil_type] = images[val_end:]
    
    return train_set, val_set, test_set

train_set, val_set, test_set = split_dataset(standardized_images_dict)

In [34]:
batch_size = 32
train_loader = torch.utils.data.DataLoader(
    train_set, batch_size=batch_size, shuffle=True
)
val_loader = torch.utils.data.DataLoader(
    val_set, batch_size=batch_size, shuffle=False
)

In [35]:
# Create a custom Dataset class for PyTorch

from torch.utils.data import Dataset

class SoilDataset(Dataset):
    def __init__(self, images_dict):
        """
        Args:
            images_dict: dictionary with soil types as keys and lists of (image, filename) tuples
        """
        self.data = []
        self.labels = []
        
        # Create a mapping from soil type to index
        self.soil_type_to_idx = {soil_type: idx for idx, soil_type in enumerate(images_dict.keys())}
        
        # Flatten the dictionary into lists
        for soil_type, images in images_dict.items():
            label = self.soil_type_to_idx[soil_type]
            for img, filename in images:
                self.data.append(img)
                self.labels.append(label)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        # Convert image from (H, W, C) to (C, H, W) for PyTorch
        img = self.data[idx]
        img = np.transpose(img, (2, 0, 1))  # Convert to (C, H, W)
        img_tensor = torch.FloatTensor(img)
        label = self.labels[idx]
        return img_tensor, label

# Create datasets
train_dataset = SoilDataset(train_set)
val_dataset = SoilDataset(val_set)
test_dataset = SoilDataset(test_set)

print(f"Train set size: {len(train_dataset)}")
print(f"Validation set size: {len(val_dataset)}")
print(f"Test set size: {len(test_dataset)}")

Train set size: 828
Validation set size: 115
Test set size: 245


In [43]:
# =========================
# 8) TRAIN / EVAL LOOPS
# =========================
def batch_accuracy(logits, y):
    preds = logits.argmax(dim=1)
    return (preds == y).float().mean().item()

def run_epoch(loader, train=True):
    model.train() if train else model.eval()
    total_loss, total_acc, n_batches = 0.0, 0.0, 0

    with torch.set_grad_enabled(train):
        for x, y in loader:
            x = x.to(DEVICE, non_blocking=True)
            y = y.to(DEVICE, non_blocking=True)

            logits = model(x)
            loss = regression_loss_fn(logits, y)

            if train:
                regression_optimizer.zero_grad()
                loss.backward()
                regression_optimizer.step()

            total_loss += loss.item()
            total_acc  += batch_accuracy(logits, y)
            n_batches  += 1

    return total_loss / n_batches, total_acc / n_batches


In [44]:
# =========================
# 9) ENTRAÎNEMENT + VALIDATION
# =========================
for epoch in range(1, EPOCHS + 1):
    tr_loss, tr_acc = run_epoch(train_loader, train=True)
    va_loss, va_acc = run_epoch(val_loader, train=False)

    print(f"Epoch {epoch:02d} | "
          f"train loss {tr_loss:.4f} acc {tr_acc:.3f} | "
          f"val loss {va_loss:.4f} acc {va_acc:.3f}")


KeyError: 0

In [None]:
# =========================
# 10) TEST FINAL
# =========================
te_loss, te_acc = run_epoch(test_loader, train=False)
print(f"TEST | loss {te_loss:.4f} acc {te_acc:.3f}")


In [None]:
# =========================
# 12) (OPTION) CHECK RAPIDE D'UN BATCH
# =========================
x, y = next(iter(train_loader))
print("Batch x:", x.shape, x.dtype, x.min().item(), x.max().item())  # (B,3,128,128), float, [0,1]
print("Batch y:", y.shape, y.dtype, y.min().item(), y.max().item())  # (B,), long, [0..6]
