In [1]:
import geopandas as gpd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import GroupKFold
from sklearn.metrics import precision_score, recall_score, f1_score, cohen_kappa_score

# =============================================================================
# 1. Load and preprocess the GeoDataFrame
# =============================================================================

# Load mesh and compute binary labels for favela classification
mesh = gpd.read_file("/home/stagiaire/Téléchargements/PR/D/mesh_rj.shp")
mesh['label'] = np.where(
    (mesh['vegetation'] <= 0.95) &
    (mesh['ghsl'] >= 0.5) &
    (mesh['osm'] <= 0.1) &
    (mesh['favelas'] > 0.9),
    1,
    np.where(
        (mesh['vegetation'] <= 0.95) &
        (mesh['ghsl'] >= 0.5) &
        (mesh['osm'] <= 0.1) &
        (mesh['favelas'] == 0),
        0,
        np.nan
    )
)
dataset = mesh[mesh['label'].notna()].copy()

# Assign each cell to a spatial zone (for grouped cross-validation)
zones = gpd.read_file("/home/stagiaire/Téléchargements/PR/D/zones.shp")
dataset['centroid'] = dataset.geometry.centroid
points_zones = gpd.sjoin(
    dataset.set_geometry('centroid'),
    zones[['fid', 'geometry']],
    how='left',
    predicate='within'
)
dataset['zone'] = points_zones['fid']
dataset = dataset.drop(columns=['centroid'])
dataset = dataset[dataset['zone'].notna()].reset_index(drop=True)

# =============================================================================
# 2. Build feature matrix and label vector
# =============================================================================

# Only 9 local features are used; neighbor features are padded with zeros
feature_columns = [
    'vegetation', 'slope', 'profile_co', 'entropy',
    'nodes', 'roads', 'mean_conne', 'min_connex', 'max_connex'
]
num_cell_features = len(feature_columns)
num_neighbors = 8
total_features = num_cell_features * (1 + num_neighbors)  # 9 * 9 = 81

X_list, y_list, groups_list = [], [], []
zero_padding = np.zeros(num_cell_features * num_neighbors, dtype=np.float32)

for _, row in dataset.iterrows():
    cell_features = row[feature_columns].values.astype(np.float32)
    padded_features = np.concatenate([cell_features, zero_padding])
    X_list.append(padded_features)
    y_list.append(int(row['label']))
    groups_list.append(int(row['zone']))

X = torch.tensor(np.vstack(X_list), dtype=torch.float32)
y = torch.tensor(np.array(y_list, dtype=np.int64))
groups = np.array(groups_list)

print("Feature matrix shape:", X.shape)
print("Number of examples:", X.shape[0])

# =============================================================================
# 3. Class balancing by undersampling
# =============================================================================

def undersample_indices(indices, labels_tensor):
    """Random undersampling of majority class within the given indices."""
    indices = np.array(indices)
    labels = labels_tensor[indices].numpy()
    class_0 = indices[labels == 0]
    class_1 = indices[labels == 1]

    if len(class_0) == 0 or len(class_1) == 0:
        return indices

    min_count = min(len(class_0), len(class_1))
    sampled_0 = np.random.choice(class_0, min_count, replace=False)
    sampled_1 = np.random.choice(class_1, min_count, replace=False)
    balanced_indices = np.concatenate([sampled_0, sampled_1])
    np.random.shuffle(balanced_indices)
    return balanced_indices

# =============================================================================
# 4. Define the MLP model
# =============================================================================

class MLP(nn.Module):
    """Simple MLP with one hidden layer for binary classification."""
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        return self.fc2(x)

# =============================================================================
# 5. Training and evaluation loop
# =============================================================================

def train_and_evaluate_mlp_no_neighbors(train_idx, test_idx):
    """Train and evaluate the MLP model on one fold with no neighbor info."""
    train_idx_bal = undersample_indices(train_idx, y)
    test_idx_bal = undersample_indices(test_idx, y)

    X_train, y_train = X[train_idx_bal], y[train_idx_bal]
    X_test, y_test = X[test_idx_bal], y[test_idx_bal]

    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
    test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=32)

    model = MLP(input_size=X.shape[1], hidden_size=64, output_size=2).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training
    model.train()
    for epoch in range(1, 401):
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

        if epoch % 100 == 0:
            print(f"Epoch {epoch:03d} complete.")

    # Evaluation
    model.eval()
    all_preds, all_targets = [], []
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            outputs = model(batch_x.to(device))
            preds = torch.argmax(outputs, dim=1)
            all_preds.append(preds.cpu())
            all_targets.append(batch_y)

    all_preds = torch.cat(all_preds).numpy()
    all_targets = torch.cat(all_targets).numpy()

    precision = precision_score(all_targets, all_preds, zero_division=0)
    recall = recall_score(all_targets, all_preds, zero_division=0)
    f1 = f1_score(all_targets, all_preds, zero_division=0)
    kappa = cohen_kappa_score(all_targets, all_preds)

    print(f"Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f} | Kappa: {kappa:.4f}")
    return precision, recall, f1, kappa

# =============================================================================
# 6. Spatial cross-validation using GroupKFold
# =============================================================================

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

num_iterations = 10
num_folds = 5
num_metrics = 4  # [Precision, Recall, F1, Kappa]
metrics_all = np.zeros((num_iterations, num_folds, num_metrics))

for iteration in range(num_iterations):
    print(f"\n=== Iteration {iteration + 1}/{num_iterations} ===")
    gkf = GroupKFold(n_splits=num_folds)
    for fold_idx, (train_idx, test_idx) in enumerate(gkf.split(X.numpy(), y.numpy(), groups)):
        print(f"\n--- Fold {fold_idx + 1}/{num_folds} ---")
        metrics = train_and_evaluate_mlp_no_neighbors(train_idx, test_idx)
        metrics_all[iteration, fold_idx] = metrics

# =============================================================================
# 7. Aggregate results and print
# =============================================================================

mean_per_fold = metrics_all.mean(axis=0)
std_per_fold = metrics_all.std(axis=0)

print(f"\n--- Per-zone results over {num_iterations} iterations ---")
for fold in range(num_folds):
    print(f"\nZone {fold + 1}")
    print(f"Precision: {mean_per_fold[fold, 0]:.2f} ± {std_per_fold[fold, 0]:.2f}")
    print(f"Recall:    {mean_per_fold[fold, 1]:.2f} ± {std_per_fold[fold, 1]:.2f}")
    print(f"F1:        {mean_per_fold[fold, 2]:.2f} ± {std_per_fold[fold, 2]:.2f}")
    print(f"Kappa:     {mean_per_fold[fold, 3]:.2f} ± {std_per_fold[fold, 3]:.2f}")

global_mean = mean_per_fold.mean(axis=0)
global_std = mean_per_fold.std(axis=0)

print("\n--- Global averages across all zones ---")
print(f"Precision: {global_mean[0]:.2f} ± {global_std[0]:.2f}")
print(f"Recall:    {global_mean[1]:.2f} ± {global_std[1]:.2f}")
print(f"F1:        {global_mean[2]:.2f} ± {global_std[2]:.2f}")
print(f"Kappa:     {global_mean[3]:.2f} ± {global_std[3]:.2f}")


Dimensions of X: torch.Size([16969, 81])
Number of examples: 16969

=== Iteration 1/10 ===

--- Fold 1/5 ---
Epoch 100 | Loss: 0.5090
Epoch 200 | Loss: 0.4591
Epoch 300 | Loss: 0.4869
Epoch 400 | Loss: 0.4183
Metrics | Precision: 0.8817 | Recall: 0.8723 | F1: 0.8770 | Kappa: 0.7553

--- Fold 2/5 ---
Epoch 100 | Loss: 0.4086
Epoch 200 | Loss: 0.3749
Epoch 300 | Loss: 0.3799
Epoch 400 | Loss: 0.3899
Metrics | Precision: 0.7313 | Recall: 0.8448 | F1: 0.7840 | Kappa: 0.5345

--- Fold 3/5 ---
Epoch 100 | Loss: 0.5226
Epoch 200 | Loss: 0.3823
Epoch 300 | Loss: 0.3776
Epoch 400 | Loss: 0.3743
Metrics | Precision: 0.8600 | Recall: 0.5375 | F1: 0.6615 | Kappa: 0.4500

--- Fold 4/5 ---
Epoch 100 | Loss: 0.4273
Epoch 200 | Loss: 0.4196
Epoch 300 | Loss: 0.4025
Epoch 400 | Loss: 0.3787
Metrics | Precision: 0.8529 | Recall: 0.7785 | F1: 0.8140 | Kappa: 0.6443

--- Fold 5/5 ---
Epoch 100 | Loss: 0.5350
Epoch 200 | Loss: 0.4594
Epoch 300 | Loss: 0.4106
Epoch 400 | Loss: 0.4061
Metrics | Precision: 0.