This notebook is intended to run on a high-end GPU, as it requires a fairly significant amount of computational power.
Moreover, this notebook contains some redundant code compared to the main project. Due to performance constraints, it has been optimized to run on Google Colab.

In [None]:
# path
path_img_s2 = "/content/data/"

In [None]:
# Connect datashare, transfer to the machine and unzip
# mount gdrive to save the weight
from google.colab import drive
drive.mount('/content/drive')

! cp /content/drive/MyDrive/Fac/Master/Remote_sensing/data.zip /content/
# unzip
! cd /content && unzip data.zip
# i don't remove archive to save machine time

#
# do mount point to save output picture
! ln -s /content/drive/MyDrive/Fac/Master/Remote_sensing/output/ /content/output

In [None]:
! pip install torchange rasterio opencv-python numpy pillow

In [None]:
# do the import
from urllib.request import urlretrieve
import os

def checkpoints_downloader(model_type):
    # Dummy function to simulate downloading checkpoints
    print(f"Checking and downloading checkpoints for model type: {model_type} if needed.")
    path_of_model_base = "/content/models/"
    os.makedirs(path_of_model_base, exist_ok=True)
    name_of_model_file = f"vit_{model_type.lower()}.pth"
    if not os.path.exists(os.path.join(path_of_model_base, name_of_model_file)):
        print(f"Model {name_of_model_file} not found. Proceeding to download.")
        # downloading model
        match model_type:
            case "h":
                url = "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth"
            case "l":
                url = "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth"
            case "b":
                url = "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth"
            case _:
                raise ValueError("Invalid model type. Choose from 'h', 'l', or 'b'.")
        urlretrieve(url, os.path.join(path_of_model_base, name_of_model_file))
        print(f"Model {name_of_model_file} downloaded successfully.")
    else:
        print(f"Model {name_of_model_file} already exists. No download needed.")

    return os.path.join(path_of_model_base, name_of_model_file)

# checkpoints_downloader("b")
checkpoints_downloader("h")
# checkpoints_downloader("l")


In [None]:
import matplotlib.pyplot as plt
from skimage.io import imread
from torchange.models.segment_any_change import AnyChange, show_change_masks
from skimage.segmentation import find_boundaries
from torchange.models.segment_any_change.segment_anything.utils.amg import (
    area_from_rle,
    box_xyxy_to_xywh,
    rle_to_mask,
    MaskData
)
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
from tqdm import tqdm
import rasterio
import cv2
import os


def aggregate_mask(mask_data):
  # code rearrange from "https://github.com/Z-Zheng/pytorch-change-models/"
    assert isinstance(mask_data, MaskData)
    anns = []
    for idx in range(len(mask_data["rles"])):
        ann_i = {
            "segmentation": rle_to_mask(mask_data["rles"][idx]),
            "area": area_from_rle(mask_data["rles"][idx]),
        }
        if 'boxes' in mask_data._stats:
            ann_i['bbox'] = box_xyxy_to_xywh(mask_data["boxes"][idx]).tolist()
        anns.append(ann_i)

    if len(anns) == 0:
        return
    sorted_anns = sorted(anns, key=(lambda x: x['area']), reverse=True)

    img = np.zeros((sorted_anns[0]['segmentation'].shape[0], sorted_anns[0]['segmentation'].shape[1]), np.int16)

    for ann in sorted_anns:
        m = ann['segmentation']
        img[m] = 255

    return img


def compute_change_mask_AnyChange(city, path_data="/content/data/S2", plot_change=False):
  # initialize AnyChange
  m = AnyChange('vit_h', sam_checkpoint='/content/models/vit_h.pth') # select SAM model

  # customize hyperparameters of SAM mask generator
  m.make_mask_generator(
      points_per_side=16,
      stability_score_thresh=0.90,
  )
  # customize AnyChange hyperparameters
  m.set_hyperparameters(
      change_confidence_threshold=165,
      use_normalized_feature=True,
      bitemporal_match=True,
  )

  imageA = cv2.imread(os.path.join(path_data, f"{city}/pair/img1.png"), 1)
  imageB = cv2.imread(os.path.join(path_data, f"{city}/pair/img2.png"), 1)
  # convert image to RGB for SAM
  imageA_ = cv2.cvtColor(imageA, cv2.COLOR_BGR2RGB)
  imageB = cv2.cvtColor(imageB, cv2.COLOR_BGR2RGB)

  # compute change mask with AnyChange
  changemasks, _, _ = m.forward(imageA, imageB)

  if plot_change :

    fig, axes = show_change_masks(imageA, imageB, changemasks)
    plt.show()

  # Compute a binary mask from the aggregation of the different masks.
  change_mask = aggregate_mask(changemasks)
  return change_mask

In [None]:
# duplicate from main.py and utils.py
def get_city_names_from_folder(folder_path: Path) -> list[str]:
    city_names = []
    for city_dir in sorted([p for p in folder_path.iterdir() if p.is_dir()]):
        city_names.append(city_dir.name)
    return city_names

def evaluate_change_map(change_map: np.ndarray, ground_truth: np.ndarray)->dict:
    """
    Évalue la carte de changement en calculant les métriques de performance.

    Args:
        change_map: Carte de changement (0->255)
        ground_truth: Ground truth (0 ou 1)
    """
    #On transforme la carte de changement (0->255) en une image binaire (0 ou 1)
    pred = (change_map > 0).astype(np.uint8)
    gt = (ground_truth > 0).astype(np.uint8)
    TP = np.sum((pred == 1) & (gt == 1))
    TN = np.sum((pred == 0) & (gt == 0))
    FP = np.sum((pred == 1) & (gt == 0))
    FN = np.sum((pred == 0) & (gt == 1))

    # Métriques
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    # IoU (Intersection over Union) / Jaccard Index
    iou = TP / (TP + FP + FN) if (TP + FP + FN) > 0 else 0

    # Kappa de Cohen (corrige le hasard)
    po = accuracy  # Accord observé
    pe = ((TP + FP) * (TP + FN) + (FN + TN) * (FP + TN)) / ((TP + TN + FP + FN) ** 2)
    kappa = (po - pe) / (1 - pe) if (1 - pe) > 0 else 0

    return {
        'TP': TP, 'TN': TN, 'FP': FP, 'FN': FN,
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1_score,
        'IoU': iou,
        'Kappa': kappa
    }

def mean_metric(metrics_list, key):
    return np.round(np.mean([m[key] for m in metrics_list]), 4)

In [None]:
def bulk_process_change_masks(path_img):
  path_s2 = os.path.join(path_img, "S2")
  city_name = sorted(get_city_names_from_folder(Path(path_s2)))

  metrics = []

  for city in tqdm(city_name):
    change_mask = compute_change_mask_AnyChange(city, path_data=path_s2, plot_change=False)

    with rasterio.open(os.path.join(path_img, f"ground_truth/{city}/cm/{city}-cm.tif")) as src:
        ground_truth =  src.read()[0]


    metric = evaluate_change_map(change_mask, ground_truth)
    metrics.append(metric)

  return metrics


In [None]:
def compute_several_distrib(metrics, key):
  mean = np.round(np.mean([m[key] for m in metrics]), 4)
  std = np.round(np.std([m[key] for m in metrics]), 4)
  min = np.round(np.min([m[key] for m in metrics]), 4)
  max = np.round(np.max([m[key] for m in metrics]), 4)
  print(f"{key}\n Mean: {mean}, Std: {std}, Min: {min}, Max: {max}")

#### Compute Metrics for Analysis

In [None]:
print(
    "Accuracy: ", mean_metric(metrics, 'Accuracy'),
    "Precision: ", mean_metric(metrics, 'Precision'),
    "Recall: ", mean_metric(metrics, 'Recall'),
    "F1-Score: ", mean_metric(metrics, 'F1-Score'),
    "IoU: ", mean_metric(metrics, 'IoU'),
    "Kappa: ", mean_metric(metrics, 'Kappa')
)

In [None]:
compute_several_distrib(metrics, 'F1-Score')
compute_several_distrib(metrics, 'IoU')
compute_several_distrib(metrics, 'Kappa')

In [None]:
# get image for the report
changemask = compute_change_mask_AnyChange("beirut", plot_change=False)
img_1 = cv2.imread("/content/data/S2/beirut/pair/img1.png", 1)
img_2 = cv2.imread("/content/data/S2/beirut/pair/img2.png", 1)
img1 = cv2.cvtColor(img_1, cv2.COLOR_BGR2RGB)
img2 = cv2.cvtColor(img_2, cv2.COLOR_BGR2RGB)
ground_truth = cv2.imread("/content/data/ground_truth/beirut/cm/cm.png", 0)


In [None]:
# print images for the report
fig, axes = plt.subplots(1, 4, figsize=(12, 4), sharex=True, sharey=True)
axes[0].imshow(img1)
axes[0].set_title("Image before (t1)")

axes[1].imshow(img2)
axes[1].set_title("Image after (t2)")


axes[2].imshow(changemask)
axes[2].set_title("Change map")

axes[3].imshow(ground_truth)
axes[3].set_title("Ground truth")

for ax in axes:
    ax.axis('off')