In [None]:
## Grid generation 

In [None]:
import geopandas as gpd
import rasterio
from shapely.geometry import Polygon


class GridGenerator:
    """
    A class to generate a grid of polygons over a raster image.

    Attributes:
        image_path (str): Path to the raster image.
        taille_carreau (float): Size of each grid cell in the same units as the raster's CRS.
        output_grid_path (str): Path to save the generated grid shapefile.
    """

    def __init__(self, image_path, output_grid_path=None, taille_carreau=150):
        """
        Initializes the GridGenerator with the given parameters.

        Args:
            image_path (str): Path to the raster image.
            output_grid_path (str, optional): Path to save the generated grid shapefile.
            taille_carreau (float): Size of each grid cell in the same units as the raster's CRS.
        """
        self.image_path = image_path
        self.taille_carreau = taille_carreau
        if output_grid_path is None:
            output_grid_path = f"{image_path.rsplit('.', 1)[0]}_grid.shp"
        self.output_grid_path = output_grid_path

    def generate_grid(self):
        """
        Generates a grid of polygons over the raster image and saves it to a shapefile.
        """
        with rasterio.open(self.image_path) as src:
            bounds = src.bounds
            crs = src.crs

        xmin, ymin, xmax, ymax = bounds
        width = xmax - xmin
        height = ymax - ymin

        cols = int(width // self.taille_carreau)
        rows = int(height // self.taille_carreau)

        polygons = []
        for i in range(cols):
            for j in range(rows):
                x_left = xmin + i * self.taille_carreau
                x_right = xmin + (i + 1) * self.taille_carreau
                y_top = ymax - j * self.taille_carreau
                y_bottom = ymax - (j + 1) * self.taille_carreau

                polygon = Polygon([
                    (x_left, y_top),
                    (x_right, y_top),
                    (x_right, y_bottom),
                    (x_left, y_bottom)
                ])
                polygons.append(polygon)

        grid_gdf = gpd.GeoDataFrame(geometry=polygons, crs=crs)
        grid_gdf.to_file(self.output_grid_path)


image_path = "/path/to/your/image/S2.tif"
grid_generator = GridGenerator(image_path)
grid_generator.generate_grid()

In [None]:
## Entropy calculation 

In [None]:
import torch
import numpy as np
import rasterio
from skimage.exposure import rescale_intensity


def normalize_band(band: np.ndarray) -> np.ndarray:
    """
    Normalize a band between 0 and 1.
    """
    return rescale_intensity(band, out_range=(0, 1))


def compute_entropy_block(block: torch.Tensor, epsilon: float = 1e-10) -> torch.Tensor:
    """
    Compute entropy for a 15x15 pixel block.
    """
    hist = torch.histc(block, bins=256, min=0.0, max=1.0)
    probs = hist / hist.sum()
    probs = torch.clamp(probs, min=epsilon)
    entropy = -torch.sum(probs * torch.log(probs))
    return entropy


# File paths
sentinel2_path = "/path/to/your/data/S2.tif"
entropy_output_path = "/path/to/your/output/entropy.tif"

# Load Sentinel-2 image
with rasterio.open(sentinel2_path) as src:
    meta = src.meta.copy()
    meta.update({
        'height': src.height // 15,
        'width': src.width // 15,
        'count': 1,
        'dtype': 'float32',
        'transform': src.transform * src.transform.scale(15, 15)
    })

    entropy_avg = np.zeros((meta['height'], meta['width']), dtype=np.float32)

    for i in range(1, 13):
        band = src.read(i)
        band_normalized = normalize_band(band)
        band_tensor = torch.tensor(band_normalized, dtype=torch.float32)

        for y in range(0, band_tensor.size(0) - 15 + 1, 15):
            for x in range(0, band_tensor.size(1) - 15 + 1, 15):
                block = band_tensor[y:y+15, x:x+15]
                entropy = compute_entropy_block(block)
                entropy_avg[y // 15, x // 15] += entropy.item()

    entropy_avg /= 12.0

    with rasterio.open(entropy_output_path, 'w', **meta) as dst:
        dst.write(entropy_avg, 1)

print(f"Entropy TIFF file generated: {entropy_output_path}")

In [None]:
## DEM feature calculation 

In [None]:
import numpy as np
import rasterio
from scipy.ndimage import sobel


def calculate_slope(dem: np.ndarray, resolution: float) -> np.ndarray:
    """
    Compute the slope from the Digital Elevation Model (DEM).

    Parameters:
        dem (numpy.ndarray): 2D array representing the DEM.
        resolution (float): Spatial resolution of the DEM (pixel size in meters).

    Returns:
        numpy.ndarray: 2D array containing slope values in degrees.
    """
    dx = sobel(dem, axis=1) / (8.0 * resolution)  # East-West gradient
    dy = sobel(dem, axis=0) / (8.0 * resolution)  # North-South gradient

    slope_rad = np.arctan(np.sqrt(dx**2 + dy**2))
    slope_deg = np.degrees(slope_rad)

    return slope_deg


def calculate_profile_convexity(dem: np.ndarray, resolution: float) -> np.ndarray:
    """
    Compute the profile convexity from the Digital Elevation Model (DEM).

    Parameters:
        dem (numpy.ndarray): 2D array representing the DEM.
        resolution (float): Spatial resolution of the DEM (pixel size in meters).

    Returns:
        numpy.ndarray: 2D array containing profile convexity values.
    """
    dzdx = sobel(dem, axis=1) / (8.0 * resolution)
    dzdy = sobel(dem, axis=0) / (8.0 * resolution)

    aspect = np.arctan2(dzdy, dzdx)

    d2zdx2 = sobel(dzdx, axis=1) / (8.0 * resolution)
    d2zdy2 = sobel(dzdy, axis=0) / (8.0 * resolution)

    profile_convexity = d2zdx2 * np.cos(aspect) ** 2 + d2zdy2 * np.sin(aspect) ** 2

    return profile_convexity


dem_path = "/path/to/your/data/cop_dem.tif"
slope_output_path = "/path/to/your/output/slope_output.tif"
convexity_output_path = "/path/to/your/output/profile_convexity_output.tif"

with rasterio.open(dem_path) as dem_dataset:
    dem = dem_dataset.read(1)
    resolution = dem_dataset.transform[0]

    slope = calculate_slope(dem, resolution)
    profile_convexity = calculate_profile_convexity(dem, resolution)

    profile = dem_dataset.profile
    profile.update(dtype=rasterio.float32, count=1, nodata=None)

    with rasterio.open(slope_output_path, "w", **profile) as slope_dataset:
        slope_dataset.write(slope.astype(rasterio.float32), 1)

    with rasterio.open(convexity_output_path, "w", **profile) as convexity_dataset:
        convexity_dataset.write(profile_convexity.astype(rasterio.float32), 1)

print(f"Slope file saved as: {slope_output_path}")
print(f"Profile convexity file saved as: {convexity_output_path}")

In [None]:
## OSM feature integration 

In [None]:
import geopandas as gpd
import numpy as np
from tqdm import tqdm


# Load the shapefile grid
grid = gpd.read_file("/path/to/your/data/grid.gpkg")
print(f"Grid loaded: {len(grid)} cells")

# Load OSM shapefiles for nodes and edges
osm_nodes = gpd.read_file("/path/to/your/data/osm_nodes.shp")
osm_edges = gpd.read_file("/path/to/your/data/osm_edges.shp")

print(f"OSM nodes loaded: {len(osm_nodes)}")
print(f"OSM edges loaded: {len(osm_edges)}")

# Convert CRS of OSM data to match the grid
osm_nodes = osm_nodes.to_crs(grid.crs)
osm_edges = osm_edges.to_crs(grid.crs)

# Add columns for statistics in the grid
grid["nodes"] = 0
grid["roads"] = 0.0
grid["mean_connections"] = 0.0
grid["min_connections"] = 0.0
grid["max_connections"] = 0.0

# Compute statistics for each grid cell
for idx, row in tqdm(grid.iterrows(), total=len(grid)):
    geometry = row["geometry"]

    # Filter nodes inside the cell
    nodes_in_cell = osm_nodes[osm_nodes.intersects(geometry)]
    grid.at[idx, "nodes"] = len(nodes_in_cell)

    # Filter roads inside the cell
    roads_in_cell = osm_edges[osm_edges.intersects(geometry)]

    # Compute total road length inside the cell
    total_length = sum(
        route.geometry.intersection(geometry).length for _, route in roads_in_cell.iterrows()
    )
    grid.at[idx, "roads"] = total_length

    # Compute connection statistics
    if len(nodes_in_cell) > 0:
        connections = nodes_in_cell["street_cou"].values  
        grid.at[idx, "mean_connections"] = connections.mean()
        grid.at[idx, "min_connections"] = connections.min()
        grid.at[idx, "max_connections"] = connections.max()

# Save updated grid
grid.to_file("grid.shp")
print("Grid saved with road network statistics!")

In [None]:
## Raster feature integration 

In [None]:
import geopandas as gpd
import rasterio
from rasterio.features import geometry_mask
import numpy as np
import torch
from tqdm import tqdm


def compute_cell_mean(geometry, affine, raster_tensor, device):
    """
    Calculate the mean pixel value inside a grid cell.

    Parameters:
        geometry (shapely.geometry.Polygon): Grid cell geometry.
        affine (Affine): Raster transformation matrix.
        raster_tensor (torch.Tensor): Raster values on device.
        device (torch.device): Device (CPU or GPU).

    Returns:
        float: Mean pixel value within the cell.
    """
    mask = geometry_mask(
        [geometry], transform=affine, invert=True, out_shape=raster_tensor.shape
    ).astype(np.uint8)

    mask_tensor = torch.tensor(mask, dtype=torch.bool, device=device)
    cell_pixels = raster_tensor[mask_tensor]

    if cell_pixels.numel() > 0:
        return torch.nanmean(cell_pixels).item()
    return np.nan


def save_grid(grid, index):
    """
    Save the grid with computed entropy values as a shapefile.

    Parameters:
        grid (GeoDataFrame): Grid data.
        index (str): Suffix for output filename.
    """
    filename = f"grid_{index}.shp"
    print(f"Saving: {filename}")
    grid.to_file(filename)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load the shapefile grid
grid = gpd.read_file("/path/to/your/data/grid.gpkg")

# Open the raster file
with rasterio.open("/path/to/your/data/entropy.tif") as src:
    affine = src.transform
    raster_data = src.read(1)
    raster_nodata = src.nodata

# Convert raster data to tensor and move to device
raster_tensor = torch.tensor(raster_data, dtype=torch.float32, device=device)

if raster_nodata is not None:
    raster_tensor[raster_tensor == raster_nodata] = float("nan")

print(f"Raster converted to tensor with shape {raster_tensor.shape}")

# Initialize new column
grid["entropy"] = np.nan

print("Computing mean pixel values for each grid cell...")

for idx, row in tqdm(grid.iterrows(), total=len(grid)):
    geometry = row["geometry"]
    mean_value = compute_cell_mean(geometry, affine, raster_tensor, device)
    grid.at[idx, "entropy"] = mean_value

save_grid(grid, "final")
print("Grid saved with computed entropy values!")

In [None]:
## Vector feature integration 

In [None]:
import geopandas as gpd
import pandas as pd
from tqdm import tqdm


def compute_intersection_proportions(grid: gpd.GeoDataFrame, dataset: gpd.GeoDataFrame, label: str) -> pd.DataFrame:
    """
    Compute the proportion of each grid cell area intersecting with a dataset.

    Parameters:
        grid (GeoDataFrame): Grid containing cell geometries.
        dataset (GeoDataFrame): Dataset to intersect.
        label (str): Name of the column storing intersection proportions.

    Returns:
        DataFrame: Aggregated intersection proportions for each grid cell.
    """
    intersections = gpd.sjoin(grid, dataset, how="left", predicate="intersects").reset_index()

    tqdm.pandas(desc=f"Computing proportions for {label}")
    intersections["intersection_surface"] = intersections.progress_apply(
        lambda row: (
            row["geometry"].intersection(dataset.loc[row["index_right"], "geometry"]).area
            / row["cell_surface"]
            if pd.notnull(row["index_right"])
            else 0
        ),
        axis=1,
    )

    return (
        intersections.groupby("index")
        .agg({"intersection_surface": "sum"})
        .rename(columns={"intersection_surface": label})
    )


# Load shapefiles
grid = gpd.read_file("/path/to/your/data/grid.shp")
vegetation = gpd.read_file("/path/to/your/data/vegetation.shp")
ghsl = gpd.read_file("/path/to/your/data/ghsl.shp")
favelas = gpd.read_file("/path/to/your/data/favelas.shp")
osm = gpd.read_file("/path/to/your/data/open_street_map.shp")

# Ensure CRS consistency
if favelas.crs != grid.crs:
    favelas = favelas.to_crs(grid.crs)

# Compute surface area of each cell
grid["cell_surface"] = grid.geometry.area

# Compute intersection proportions
vegetation_agg = compute_intersection_proportions(grid, vegetation, "vegetation")
ghsl_agg = compute_intersection_proportions(grid, ghsl, "ghsl")
favelas_agg = compute_intersection_proportions(grid, favelas, "favelas")
osm_agg = compute_intersection_proportions(grid, osm, "osm")

# Merge results into grid
grid = (
    grid.reset_index()
    .merge(vegetation_agg, on="index", how="left")
    .merge(ghsl_agg, on="index", how="left")
    .merge(favelas_agg, on="index", how="left")
    .merge(osm_agg, on="index", how="left")
)

# Fill missing values
for col in ["vegetation", "ghsl", "favelas", "osm"]:
    grid[col] = grid[col].fillna(0)

# Save final result
grid.to_file("data.gpkg", driver="GPKG")

In [None]:
## Grid cleaning 

In [None]:
import geopandas as gpd
from tqdm import tqdm


# Load the shapefiles
grid = gpd.read_file("/path/to/your/data/data.gpkg")
boundary = gpd.read_file("/path/to/your/data/city_boundary.shp")

# Ensure both datasets share the same CRS
boundary = boundary.to_crs(grid.crs)

# Add an ID column to the grid
grid["id"] = grid.index

# Initialize tqdm for progress tracking
tqdm.pandas()

# Spatial join: keep only grid cells intersecting the boundary
grid_filtered = gpd.sjoin(grid, boundary, how="inner", predicate="intersects")

# Drop extra columns from the spatial join
grid_filtered = grid_filtered[grid.columns]

# Save the filtered grid
output_path = "/path/to/your/output/data.gpkg"
grid_filtered.to_file(output_path, driver="GPKG")

print(f"Remaining cells: {len(grid_filtered)}")

In [None]:
## Dataset statistics 

In [None]:
import geopandas as gpd
import numpy as np


# Load shapefile
grid = gpd.read_file("/path/to/your/data/data.gpkg")

# Assign labels based on defined conditions
grid["label"] = np.where(
    (grid["vegetation"] <= 0.95)
    & (grid["ghsl"] >= 0.5)
    & (grid["osm"] <= 0.5)
    & (grid["favelas"] > 0.9),
    1,
    np.where(
        (grid["vegetation"] <= 0.95)
        & (grid["ghsl"] >= 0.5)
        & (grid["osm"] <= 0.5)
        & (grid["favelas"] == 0),
        0,
        np.nan,
    ),
)

# Keep only labeled data
dataset = grid[grid["label"].notna()].copy()

# Load zones shapefile
zones = gpd.read_file("/path/to/your/data/zones.shp")

# Assign each grid cell to a zone
dataset["centroid"] = dataset.geometry.centroid
points_zones = gpd.sjoin(
    dataset.set_geometry("centroid"),
    zones[["fid", "geometry"]],
    how="left",
    predicate="within",
)
dataset["zone"] = points_zones["fid"]
dataset = dataset.drop(columns=["centroid"])
dataset = dataset[dataset["zone"].notna()]

# Compute statistics per zone
for label in [1, 0]:
    print(f"\nStatistics for label {label}:")
    total = len(dataset[dataset["label"] == label])
    for i in dataset["zone"].unique():
        count = len(dataset[(dataset["zone"] == i) & (dataset["label"] == label)])
        ratio = count / total if total > 0 else 0
        print(f"Zone {int(i)}: {count} ({ratio:.2f})")

# Print total counts of each label
print("\nTotal labeled samples:")
print(f"Label 0: {len(dataset[dataset['label'] == 0])}")
print(f"Label 1: {len(dataset[dataset['label'] == 1])}")