In [None]:
########## IMPORTS ##########

# common
import rasterio
import numpy as np
import matplotlib.pyplot as plt

# shapefile to mask for stats
import geopandas as gpd
from rasterio.features import rasterize

# stats
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

# Haralick textures quick computation
from skimage.feature import graycomatrix, graycoprops
from joblib import Parallel, delayed


In [None]:
########## DATA ##########

list_raster_pairs = [(r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_pre.tif",
                       r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_post.tif")]
shp_path = r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_grd_truth\zta1_grd_truth.shp"


In [None]:
########## DERIVED INDICES  ##########

def get_sar_indices_and_profile(input_path: str, output_path: str = None ) -> tuple[dict, np.ndarray]:
    """
    Calculates SAR vegetation and degradation indices from Gamma0 and Coherence bands.
    
    Input band order (expected):
    1: Gamma0 VH
    2: Gamma0 VV
    3: Coherence VH
    4: Coherence VV
    
    Returns:
        tuple: (updated_profile, output_stack)
    """
    with rasterio.open(input_path) as src:
        # Load input bands
        gamma_vh = src.read(1).astype('float32')
        gamma_vv = src.read(2).astype('float32')
        coh_vh = src.read(3).astype('float32')
        coh_vv = src.read(4).astype('float32')
        
        # Copy and prepare the profile for 9 bands
        profile = src.profile.copy()
        
        # Set error handling for division by zero (common in radar shadows) and invalid divisions (e.g. 0/0, inf-inf)
        np.seterr(divide='ignore', invalid='ignore')  # remove warnings linked problematic divisions, sets division by 0 to +-inf and invalid divisions to NaN 

        # --- Gamma0 Derived Indices ---
        # Back-scattering ratio: q = VH / VV
        q_ratio = gamma_vh / gamma_vv
        
        # Radar Vegetation Index: RVI = (4 * VH) / (VV + VH)
        rvi = (4 * gamma_vh) / (gamma_vv + gamma_vh)
        
        # Dual-polarization SAR vegetation index: DPSVI = VV / (VV + VH)
        dpsvi = gamma_vv / (gamma_vv + gamma_vh)
        
        # Radar forest degradation index: RFDI = (VV - VH) / (VV + VH)
        rfdi = (gamma_vv - gamma_vh) / (gamma_vv + gamma_vh)

        # --- Coherence Derived Index ---
        # Coherence ratio: Cq = VH / VV
        cq_ratio = coh_vh / coh_vv

        # Clean up calculation artifacts (set inf and -inf to nan, keep nan as nan)
        # We include input bands in the cleanup to ensure the whole stack is consistent
        all_bands = [gamma_vh, gamma_vv, q_ratio, rvi, dpsvi, rfdi, coh_vh, coh_vv, cq_ratio]
        
        for layer in all_bands:
            # np.isfinite returns False for NaN, Inf and -Inf
            layer[~np.isfinite(layer)] = np.nan

        # --- Stack the final 9 bands in one object of size (9, H, W)---
        output_stack = np.stack(all_bands)

        # Update metadata to reflect the 9-band output
        profile.update({
            "count": 9,
            "dtype": 'float32',
            "nodata": np.nan  
        })
    
        with rasterio.open(output_path, 'w', **profile) as dst:
            dst.write(output_stack)
            dst.descriptions = ('G0_VH', 'G0_VV', 'q_ratio', 'RVI', 'DPSVI', 'RFDI', 'coh_VH', 'coh_VV', 'cq_ratio')

        print(f"Success! Indices saved to: {output_path}")

        return profile, output_stack
    
# Example 

pre_path, post_path = r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_pre.tif", r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_post.tif"
output_path_pre = r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_pre_indices.tif"
output_path_post = r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_post_indices.tif"

get_sar_indices_and_profile(pre_path, output_path_pre)
get_sar_indices_and_profile(post_path, output_path_post)


In [None]:
########## TEXTURES ##########


def _compute_row_glcm_features(y: int, padded_img: np.ndarray, window_size: int, mask_row: np.ndarray, levels: int = 32) -> np.ndarray:
    """
    Computes 9 GLCM textures for a single row.
    Standard properties ('contrast', 'dissimilarity', 'homogeneity', 'correlation', 'energy', 'ASM')
    from skimage  + Variance + Entropy + Mean.
    """
    width = mask_row.shape[0]
    # Standard properties from skimage
    properties = ['contrast', 'dissimilarity', 'homogeneity', 'correlation', 'energy', 'ASM']
    n_features = 9 # 6 standard + 3 custom
    row_out = np.full((n_features, width), np.nan, dtype='float32') # creates a nan ouput, where the valid pixels will be filled
    
    # Pre-calculate gray level indices for Variance/Entropy (0 to levels-1)
    i_indices = np.arange(levels).reshape(levels, 1, 1, 1) 
    
    for x in range(width):
        if mask_row[x]: # not a valid pîxel 
            continue
            
        window = padded_img[y : y + window_size, x : x + window_size]
        
        # 1. Compute GLCM (levels, 4 directions) for one pixel 
        # The GLCM is a 4D array with shape: (levels, levels, n_distances, n_angles).
        # - levels: The number of gray levels (e.g., 32). The first two dimensions (rows/cols) 
        #   represent the probability P(i,j) (relative frequency here) of moving from gray level i to j.
        # - n_distances: The number of spatial offsets used (usually 1).
        # - n_angles: The number of directions analyzed (e.g., 0°, 45°, 90°, 135°).
        # Each element glcm[i, j, d, a] represents the probability of a pixel with intensity i having a neighbor with intensity j at distance d and angle a.
        glcm = graycomatrix(window, distances=[1], 
                            angles=[0, np.pi/4, np.pi/2, 3*np.pi/4], 
                            levels=levels, symmetric=True, normed=True)
        
        # 2. Extract standard skimage properties
        for i, prop in enumerate(properties):
            row_out[i, x] = np.mean(graycoprops(glcm, prop)) # mean because four directions in the graycomatrix, we want to detect texture whatever the direction of the shapes
            
        # 3. Manual calculation: Variance (Sum of Squares)
        # It is the weighted sum of gray levels (i) by their probabilities (=frequencies here) P(i,j).
        # axis=(0, 1) sums over rows (i) and columns (j) of the matrix.
        # Variance measures the dispersion of gray levels around the mean.
        # Formula: Sum over i,j of P(i,j) * (i - mu)^2.
        # i_indices - mu uses broadcasting to subtract mu from each gray level i.
        # We use broadcasting to perform calculations across all angles simultaneously without loops:
        # - glcm has shape (32, 32, 1, 4)
        # - i_indices has shape (32, 1, 1, 1)
        # When multiplying (i_indices * glcm), NumPy automatically "stretches" the 1s in i_indices 
        # to match the shape of glcm, allowing for ELEMENT-WISE multiplication across all dimensions.

        mu = np.sum(i_indices * glcm, axis=(0, 1))
        var = np.sum(glcm * (i_indices - mu)**2, axis=(0, 1))
        row_out[6, x] = np.mean(var) # mean to aggregate the directions
        row_out[7, x] = np.mean(mu) # idem
        
        # 4. Manual calculation: Entropy -x*lnx
        ent = -np.sum(glcm * np.log(glcm + 1e-10), axis=(0, 1))  # 1e-10 to avoid log(0)
        row_out[8, x] = np.mean(ent) # mean to aggregate the directions
            
    return row_out

def process_raster_textures(input_path: str, output_path: str, window_size: int = 7, levels: int = 32,
                            target_band_indices: list = None, custom_labels: list = None):
    """
    Computes GLCM textures for selected or all bands.
    Input: Multi-band raster.
    Output: Multi-band raster containing [Original Bands] + [Generated Textures].
    """
    with rasterio.open(input_path) as src:
        original_data = src.read().astype('float32')
        profile = src.profile.copy()
        # Use existing descriptions, custom labels, or generic names
        input_names = custom_labels if custom_labels else (list(src.descriptions) if src.descriptions[0] else [f"Band_{i+1}" for i in range(src.count)])

    n_bands, h, w = original_data.shape
    pad = window_size // 2
    tex_names = ['contrast', 'dissimilarity', 'homogeneity', 'correlation', 'energy', 'ASM', 'variance', 'mean', 'entropy']
    
    # Define which bands to process (default: all)
    indices_to_process = target_band_indices if target_band_indices is not None else list(range(n_bands))
    
    all_texture_results = []
    output_labels = list(input_names) # Start labels with original band names

    for b_idx in indices_to_process:
        band_data = original_data[b_idx]
        mask = np.isnan(band_data)
        band_name = input_names[b_idx]
        
        print(f"Processing textures for: {band_name}...")

        # 1. Quantization (0-(levels-1))  
        if np.any(~mask):
            b_min, b_max = np.nanmin(band_data), np.nanmax(band_data)
            clean_band = np.nan_to_num(band_data, nan=b_min) # can't process textures on NaN
            # Normalization per band to handle different signal ranges (e.g., VV vs VH)
            band_32 = ((clean_band - b_min) / (b_max - b_min + 1e-6) * (levels-1)).astype(np.uint8)
        else:
            print(f"Skipping empty band {b_idx}")
            continue

        padded_band = np.pad(band_32, pad, mode='reflect') # to handle the sliding window when on the side of the raster

        # 2. Parallel row processing for this specific band
        results = Parallel(n_jobs=-1)(
            delayed(_compute_row_glcm_features)(y, padded_band, window_size, mask[y]) 
            for y in range(h)
        )

        # 3. Reconstruct (9, H, W) from results and store
        band_textures = np.transpose(np.array(results), (1, 0, 2))
        all_texture_results.append(band_textures)
        
        # 4. Generate labels for these textures
        output_labels.extend([f"{band_name}_{t}" for t in tex_names])

    # Final stack: [Original Bands] + [All Generated Textures]
    final_output = np.concatenate([original_data] + all_texture_results, axis=0)

    # 5. Metadata Update
    profile.update(count=final_output.shape[0], dtype='float32', nodata=np.nan)

    with rasterio.open(output_path, 'w', **profile) as dst:
        dst.write(final_output)
        dst.descriptions = tuple(output_labels)
    
    print(f"Success! Generated {final_output.shape[0]} bands in: {output_path}")
    return final_output, output_labels

# Example usage:

# ['G0_VH', 'G0_VV', 'q_ratio', 'RVI', 'DPSVI', 'RFDI', 'coh_VH', 'coh_VV', 'cq_ratio']
pre_path, post_path = r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_pre.tif", r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_post.tif"
output_path = r"C:\Users\gbonlieu\Documents\herramienta\explore\zta1_post_40.tif"
process_raster_textures(post_path, output_path, target_band_indices=[0, 1, 2, 3], custom_labels=['G0_VH', 'G0_VV','coh_VH','coh_VV']) 


In [None]:
########## VARIABLES CORRELATION ##########


def compute_mean_difference_correlation_study(list_raster_pairs: list[tuple[str, str]]) -> np.ndarray :
    """
    Computes the absolute difference for each pair of n-band rasters, 
    calculates the correlation matrix for each, and returns the average correlation matrix.
    """
    all_corr_matrices = []
    n_bands = None

    for (path1, path2) in list_raster_pairs:
        with rasterio.open(path1) as src1, rasterio.open(path2) as src2:
            # Check if band counts match
            if src1.count != src2.count:
                print(f"[SKIP] Band mismatch: {path1} and {path2}")

                continue
            
            if n_bands is None:
                n_bands = src1.count
            
            # Read all bands: shape (n_bands, height, width)
            img1 = src1.read().astype(np.float32)
            img2 = src2.read().astype(np.float32)

            # 1) Difference
            diff = img1 - img2

            # 2) Reshape for correlation: (n_bands, total_pixels)
            # We flatten height and width into a single dimension
            diff_flattened = diff.reshape(n_bands, -1)  # -1 asks reshape tu automatically calculate the size of that dimension so as to keep the same number of total element 

            # 3) Identify valid pixels (where no band has a NaN)
            valid_mask = ~np.isnan(diff_flattened).any(axis=0)  # first part eliminates pixels that are NaN on at least a band (such a pixel = False in the mask)

            # Filter to keep only valid pixels
            diff_cleaned = diff_flattened[:, valid_mask]
            
            # 4) Calculate correlation matrix (n_bands x n_bands)
            # np.corrcoef treats each row as a variable (band)
            if diff_cleaned.shape[1] > 1:

                corr_matrix = np.corrcoef(diff_cleaned)

                # Handle potential NaN (e.g. if a band has zero variance in the difference (0/0))
                corr_matrix = np.nan_to_num(corr_matrix)   # will put a 0 in case of a  0/0 (no correlation in this case)
                all_corr_matrices.append(corr_matrix)
            else:
                # Notification if the pair is empty
                print(f"[WARNING] less than 2 (1 or 0) valid pixels found for pair: {path1} and {path2}. Skipping.")

    if not all_corr_matrices:  # list empty
        raise ValueError("No valid correlation matrices were computed.")

    # 4) Compute the mean correlation matrix
    mean_corr_matrix = np.mean(np.abs(all_corr_matrices), axis=0) # axis=0 because mean of the matrices (all_corr_matrices is a 3D block)

    return mean_corr_matrix

def display_correlation_matrix(matrix: np.ndarray, labels: list[str] = None):
    """
    Simple display utility for the correlation matrix.
    """
    
    fig, ax = plt.subplots(figsize=(16, 12))
    im = ax.imshow(matrix, cmap="YlOrRd", vmin= 0, vmax=1)
    plt.colorbar(im)

    if labels:
        ax.set_xticks(np.arange(len(labels)))
        ax.set_yticks(np.arange(len(labels)))
        ax.set_xticklabels(labels)
        ax.set_yticklabels(labels)

    # --- Add numerical values inside the matrix ---
    # We iterate over rows (i) and columns (j)
    for i in range(matrix.shape[0]):
        for j in range(matrix.shape[1]):
            # Display the value formatted to 2 decimal places
            # ha/va are for horizontal and vertical alignment
            val = matrix[i, j]
            color = "white" if abs(val) > 0.7 else "black" # Adjust text color for contrast
            ax.text(j, i, f"{val:.2f}", ha="center", va="center", color=color, fontsize=8)

    ax.set_title("Mean Correlation Matrix of Band Differences")
    plt.show()

# Example:

list_raster_pairs = [(r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_pre_indices.tif", r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_post_indices.tif")]
mean_corr = compute_mean_difference_correlation_study(list_raster_pairs)
display_correlation_matrix(mean_corr, labels=[f"{i+1}" for i in range(mean_corr.shape[0])])


In [None]:
########## CPA ##########


def perform_global_pca_analysis(list_raster_pairs, list_shp_paths, n_components=2, labels: list = None):
    """
    Performs Global PCA and correlation circle visualizatioon on the (pre-post) difference of multiple (pre, post) pairs using specific shapefiles for each pair.
    Calculates the mean position of each class.
    """
    all_pixels_list = [] 
    all_labels_list = [] 

    for i, ((path1, path2), shp_path) in enumerate(zip(list_raster_pairs, list_shp_paths)):
        with rasterio.open(path1) as src1, rasterio.open(path2) as src2:
            
            # 1) CHECK PROFILE CONSISTENCY
            # We compare the metadata
            profiles_match = {k: v for k, v in src1.profile.items() if k != 'nodata'} == {k: v for k, v in src2.profile.items() if k != 'nodata'}
            if not profiles_match: # if we compare nodata, np.nan == np.nan systematically returns false 
                print(f"[WARNING] Profile mismatch in pair {i+1}:")
                print(f"  - Pre:  {src1.profile}")
                print(f"  - Post: {src2.profile}")
                # You might want to raise an error or continue with caution
            
            # Read images and compute difference
            n_bands = src1.count
            diff = (src1.read().astype(np.float32) - src2.read().astype(np.float32))
            diff_flat = diff.reshape(n_bands, -1).T 

            # Rasterize Ground Truth
            gdf = gpd.read_file(shp_path)
            
            if gdf.crs != src1.crs: # Adding a clear notification about the CRS mismatch
                print(f"[INFO] CRS mismatch for {shp_path}. Reprojecting from {gdf.crs} to {src1.crs}")
                gdf = gdf.to_crs(src1.crs)
            
            shapes = [(geom, 1) for geom in gdf.geometry if geom is not None]

            # Rasterize using reference metadata (shape and transform)
            gt_mask = rasterize(shapes=shapes, out_shape=src1.shape, 
                                transform=src1.transform, fill=0, dtype="uint8")
            gt_flat = gt_mask.ravel()  # efficient way (no copy) to flatten the 2D mask (H x W) into a 1D vector
            
            # Clean NaNs
            valid_mask = ~np.isnan(diff_flat).any(axis=1)

            # Collect results
            all_pixels_list.append(diff_flat[valid_mask])
            all_labels_list.append(gt_flat[valid_mask])
            
            print(f"Pair {i+1}/{len(list_raster_pairs)} processed: {np.sum(valid_mask)} valid pixels.")

    # 2) Global Concatenation and Scaling (bands might have different scales/units, shift expected value to 0 and standard deviation to 1)
    X_global = np.vstack(all_pixels_list) # stacks the arrays vertically (TotalPixels, Bands)
    y_global = np.concatenate(all_labels_list)


    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_global)
    # operating in two steps 
    # fit goes through each column and calculates its expected value and standard deviation
    # transforms each pixel with the formula stored in scaler  

    # 3) PCA Execution
    pca = PCA(n_components=n_components)
    X_pca = pca.fit_transform(X_scaled)
    # first looks for eigenvectors containing maximum variance, then projects pixels on these axes
    exp_var = pca.explained_variance_ratio_

    # 4) Calculate centroids (mean position of each class) and separability
    # Mean of PC1 and PC2 for 'No Change' (0) and 'Deforestation' (1)
    centroid_no_change = np.mean(X_pca[y_global == 0], axis=0)
    centroid_change = np.mean(X_pca[y_global == 1], axis=0)

    dist_centroids = np.linalg.norm(centroid_change - centroid_no_change)
    
    std_0 = np.mean(np.std(X_pca[y_global == 0], axis=0))
    std_1 = np.mean(np.std(X_pca[y_global == 1], axis=0))
    
    # Separability Ratio formula: S = d / (sigma0 + sigma1)
    separability_ratio = dist_centroids / (std_0 + std_1)
    

   # 5) Visualization: PC1 vs PC2
    plt.figure(figsize=(12, 8))

    # Plot background points
    scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y_global, 
                         cmap='coolwarm', s=1, alpha=0.3)
    # X_pca[:, 0/1] takes all the pixels from PC1/PC2, 
    # c=y_global colors each point according to its value in y_global (0 or 1)
    # s=1 size of points, alpha=0,5 transparency 

    # Plot Centroids with high visibility
    plt.scatter(centroid_no_change[0], centroid_no_change[1], color='blue', 
                marker='X', s=200, edgecolors='white', label='Mean: No Change')
    plt.scatter(centroid_change[0], centroid_change[1], color='red', 
                marker='X', s=200, edgecolors='white', label='Mean: Deforestation')

    plt.xlabel(f'PC1 ({exp_var[0]:.2%})')
    plt.ylabel(f'PC2 ({exp_var[1]:.2%})')
    plt.title('Global PCA Space with Class Centroids')
    plt.legend(loc='lower right', markerscale=1)  # si we let plt.legend(), by default (loc= 'best'), it calculates the location of all pixels to know where there's some space, which can be long
    plt.colorbar(scatter, label='Ground Truth (0: Stable, 1: Change)')
    plt.grid(True, linestyle='--', alpha=0.5)
    stats_text = f'Separability Ratio: {separability_ratio:.2f}'
    plt.gca().text(0.05, 0.95, stats_text, transform=plt.gca().transAxes, 
                   fontsize=12, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))
    plt.show()

    print(f"Separation distance in PCA space: {dist_centroids:.4f}")
    print(f"Separability Ratio (S): {separability_ratio:.4f}") 

    # 6) Correlation Circle
    # Calculate the matrix of the coordinates of the variables (Corr(X_i, PC_j) = Component_ij * sqrt(eigenvalue_j))
    
    loadings = pca.components_.T * np.sqrt(pca.explained_variance_)  # pca.components_.T (n_features, n_components)

    # Define variable names 
    feature_names = labels if labels is not None else [f'Band {i+1}' for i in range(X_global.shape[1])]

    plt.figure(figsize=(8, 8))
    # Plot the circle
    circle = plt.Circle((0, 0), 1, color='blue', fill=False, linestyle='--')
    plt.gca().add_artist(circle)

    # Plot arrows for each variable
    for i in range(len(feature_names)):
        plt.arrow(0, 0, loadings[i, 0], loadings[i, 1], 
                  color='red', alpha=0.8, head_width=0.05, length_includes_head=True)
        plt.text(loadings[i, 0] * 1.15, loadings[i, 1] * 1.15, 
                 feature_names[i], color='black', ha='center', va='center', fontweight='bold')

    # Formatting the circle plot
    plt.xlim(-1.1, 1.1)
    plt.ylim(-1.1, 1.1)
    plt.axhline(0, color='black', linewidth=1)
    plt.axvline(0, color='black', linewidth=1)
    plt.xlabel(f'PC1 ({exp_var[0]:.2%})')
    plt.ylabel(f'PC2 ({exp_var[1]:.2%})')
    plt.title('Correlation Circle (Variable Importance)')
    plt.grid(True, linestyle=':', alpha=0.6)
    plt.gca().set_aspect('equal')
    plt.show()

    return pca, X_pca, y_global

# Example 
labels = ['G0', 'contrast', 'dissimilarity', 'homogeneity', 'correlation', 'energy', 'ASM', 'variance', 'entropy']
shp_path = r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_grd_truth\zta1_grd_truth.shp"
list_raster_pairs = [(r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_pre_indices.tif", r"C:\Users\gbonlieu\Documents\herramienta\change_detection_tool\data\preprocessed\zta1\zta1_explore\zta1_post_indices.tif")]
pca_model, pca_data, labels = perform_global_pca_analysis(list_raster_pairs, [shp_path])