# Test Notebook

In [None]:
import sys
import os
sys.path.append(os.path.abspath('..'))

In [None]:
# In a Jupyter notebook or IPython environment, run this in the first cell
%load_ext autoreload
%autoreload 2

In [None]:
%reload_ext autoreload

In [None]:
from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as pltbasemap

In [None]:
from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as plt
import numpy as np
# set up orthographic map projection with
# perspective of satellite looking down at 45N, 100W.
# use low resolution coastlines.
map = Basemap(projection='ortho',lat_0=45,lon_0=-100,resolution='l')
# draw coastlines, country boundaries, fill continents.
map.drawcoastlines(linewidth=0.25)
map.drawcountries(linewidth=0.25)
map.fillcontinents(color='coral',lake_color='aqua')
# draw the edge of the map projection region (the projection limb)
map.drawmapboundary(fill_color='aqua')
# draw lat/lon grid lines every 30 degrees.
map.drawmeridians(np.arange(0,360,30))
map.drawparallels(np.arange(-90,90,30))
# make up some data on a regular lat/lon grid.
nlats = 73; nlons = 145; delta = 2.*np.pi/(nlons-1)
lats = (0.5*np.pi-delta*np.indices((nlats,nlons))[0,:,:])
lons = (delta*np.indices((nlats,nlons))[1,:,:])
wave = 0.75*(np.sin(2.*lats)**8*np.cos(4.*lons))
mean = 0.5*np.cos(2.*lats)*((np.sin(2.*lats))**2 + 2.)
# compute native map projection coordinates of lat/lon grid.
x, y = map(lons*180./np.pi, lats*180./np.pi)
# contour data over the map.
cs = map.contour(x,y,wave+mean,15,linewidths=1.5)
plt.title('contour lines over filled continent background')
plt.show()

In [None]:
from datasets.utils import split_and_save_images

for folder_name in ["train", "val", "test"]: 
        split_and_save_images(
                input_dir=f"../data/Levir-cd/{folder_name}", 
                output_dir=f"../data/Levir-cd-256/{folder_name}", 
                patch_size=256, 
                images_folder_names=["A", "B"], 
                label_folder_name = "label"
        )


In [None]:
from models import TinyCD, SiameseResNetUNet

model = SiameseResNetUNet()

In [None]:
from torch.utils.data import DataLoader
train_dl = DataLoader(dataset=train_data, shuffle=True, batch_size=16, pin_memory=True)
val_dl = DataLoader(dataset=val_data, shuffle=True, batch_size=16, pin_memory=True)
test_dl = DataLoader(dataset=test_data, shuffle=False, batch_size=16, pin_memory=True)

In [None]:
from training import train, testing
from torch.utils.data import DataLoader
import torch.nn as nn
import torch 
from metrics import iou_score, f1_score, precision, recall
from losses import DiceLoss, Ensemble, FocalLoss
import torch.optim as optim
from training.utils import define_weighted_random_sampler

mode = "multiclass"
#_ , class_weights_dict = define_weighted_random_sampler(dataset=train_data, mask_key="mask", subset_size=200)
class_weights = [1.0, 20.0] # [v for _ , v in class_weights_dict.items()]
#print("Computed Class Weights : ", class_weights)

nb_epochs = 2
#ce = torch.nn.CrossEntropyLoss(weight=torch.tensor(class_weights), reduction='mean').to("cuda")
#focal_loss = FocalLoss(mode=mode, gamma=2)
#dice_loss = DiceLoss(mode=mode)
#criterion = Ensemble(list_losses=[ce, dice_loss], weights = [0.7, 0.3])
criterion =  torch.nn.CrossEntropyLoss(weight=torch.tensor(class_weights), reduction='mean').to("cuda")

metrics = [f1_score, iou_score, precision, recall]

optimizer = optim.AdamW(params=filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3, weight_decay=1e-2,amsgrad=False)
# scheduler for the lr of the optimizer
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
early_stopping_params = {"patience": 10, "trigger_times": 0}

train(
    model = model,
    train_dl = train_dl,
    valid_dl = val_dl,
    loss_fn = criterion,
    optimizer = optimizer, 
    scheduler = scheduler, 
    metrics = metrics,
    nb_epochs = nb_epochs,
    experiment_name = "Tiny_CD",
    log_dir="../runs",
    model_dir="../models",
    resume_path=None,
    early_stopping_params = early_stopping_params,
    image_key = "post_image",
    mask_key = "mask",
    num_classes = len(class_weights), 
    verbose = False,  # Adding verbose flag
    checkpoint_interval = 5,  # Add checkpoint interval parameter
    debug = False,  # Add debug flag for memory logging, 
    training_log_interval = 2, 
    is_mixed_precision=True,
    reduction= "weighted",
    class_weights = class_weights,
    siamese=True,
    
)

In [None]:
from metrics import compute_model_class_performance

compute_model_class_performance(
    model=model,
    dataloader=test_dl,
    num_classes=2,
    device='cuda',
    class_names=["No Change", "Change"], 
    siamese=True,
    image_key="image",
    mask_key="mask",
    average_mode="macro",
    output_file="../outputs/class_performance.txt"
)

### Test Ensemble Methods 

In [None]:
from torch.utils.data import DataLoader 
test_dl = DataLoader(
    dataset=test_dl,
    batch_size=8,
    shuffle=False,
    num_workers=8,
    pin_memory=False
)

In [None]:
def benchmark_models(
    models,
    models_names,
    test_dataloader,
    loss_fn,
    metrics,
    image_key="post_image",
    mask_key="post_mask",
    verbose=True,
    is_mixed_precision=True,
    num_classes=5,
    reduction="weighted",
    class_weights=None,
    tta=False,
    siamese=False,
    device="cuda"
):
    """
    Benchmark multiple models on a test dataloader.

    Args:
        models (list): List of models to evaluate.
        models_names (list): List of model's name
        test_dataloader (DataLoader): PyTorch DataLoader with test data.
        loss_fn (callable): Loss function.
        metrics (dict): Dictionary of metric functions to evaluate.
        image_key (str): Key to access image data from the dataloader batch.
        mask_key (str): Key to access mask/label data from the dataloader batch.
        verbose (bool): If True, print detailed logs for each model.
        is_mixed_precision (bool): Use mixed precision during evaluation.
        num_classes (int): Number of classes in the task.
        reduction (str): Reduction method for metrics (e.g., "weighted").
        class_weights (torch.Tensor): Class weights for loss computation.
        tta (bool): Apply test-time augmentation if True.
        siamese (bool): Use Siamese model logic if True.
        device (str) : cuda or cpu

    Returns:
        dict: Dictionary containing test losses and metrics for each model.
    """
    results = {}

    for i, model in enumerate(models):
        if verbose:
            print(f"/nEvaluating Model {i+1}/{len(models)}: {models_names[i]}")
        
        # Test the model using the provided testing function
        epoch_tloss, test_metrics = testing(
            model=model.to(device),
            test_dataloader=test_dataloader,
            loss_fn=loss_fn,
            metrics=metrics,
            image_key=image_key,
            mask_key=mask_key,
            verbose=verbose,
            is_mixed_precision=is_mixed_precision,
            num_classes=num_classes,
            reduction=reduction,
            class_weights=class_weights,
            tta=tta,
            siamese=siamese,
        )

        # Store results
        results[models_names[i]] = {
            "test_loss": epoch_tloss,
            "test_metrics": test_metrics,
        }

    return results


In [None]:
display_semantic_predictions_batch(
    images=post_image, 
    mask_predictions=outputs.argmax(dim=1),
    mask_labels=inputs["post_mask"],
    normalized={
        "mean" : (0.485, 0.456, 0.406),
        "std" : (0.229, 0.224, 0.225)
        }
)

In [None]:
display_semantic_predictions_batch(
    images=pre_image, 
    mask_predictions=tta_predictions,
    mask_labels=inputs["post_mask"],
    normalized={
        "mean" : (0.485, 0.456, 0.406),
        "std" : (0.229, 0.224, 0.225)
        }
)

### Test Segformer 

In [None]:
from models import Segformer, Unet, ResNet_Unet
model_name = "nvidia/segformer-b0-finetuned-ade-512-512"
label2id = {"building": 1, "background": 0 } #{"cloud": 1, "no_cloud": 0 }
id2label = {v: k for k,v in label2id.items()}
num_labels = 2
freeze_encoder = True

model = Segformer(model_name=model_name,
                  label2id=label2id,
                  num_labels=num_labels,
                  freeze_encoder=freeze_encoder
                  )

In [None]:
from datasets import  Puerto_Rico_Building_Dataset
from torch.utils.data import DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
from training.augmentations import get_val_augmentation_pipeline

# Define Albumentations transforms with normalization
transform = get_val_augmentation_pipeline(image_size=(512, 512), max_pixel_value=1, mean=(0,0,0), std=(1,1,1))

### Test Dataset 

In [None]:
from models import AutoEncoder
from training.augmentations import get_val_augmentation_pipeline
import torch.nn as nn

cloud_filter_params = {"model_class": AutoEncoder(num_input_channel=3,base_channel_size=64), 
                        "device": "cuda", 
                        "file_path": "../models/AutoEncoder_Cloud_Detector_0.001297.pth",
                        "threshold": 0.001297, 
                        "loss": nn.MSELoss,
                        "batch_size": 32
                        }

In [None]:
data_puerto = Puerto_Rico_Building_Dataset(
    base_dir="../data/Puerto_Rico_dataset/tiff_tiles",
    pre_disaster_dir="Pre_Event_Grids_In_TIFF",
    post_disaster_dir="Post_Event_Grids_In_TIFF",
    mask_dir="Post_Event_Grids_In_TIFF_mask",
    transform=transform,
    extension="tif",
    cloud_filter_params=cloud_filter_params,
    preprocessing_mode="online",
    filtered_list_path=None
    )

In [None]:
from models import ResNet_UNET

model = ResNet_UNET(
        in_channels=3,
        out_channels=2,
        backbone_name="resnet18",
        pretrained=True,
        freeze_backbone=True,
    )
model = model.load(file_path="../models/xDB_ResNet18_Unet_20241130-201241_best_model.pth").to("cuda")

In [None]:
data_dl = DataLoader(dataset=data_puerto, batch_size=16, shuffle=True)

In [None]:
inputs = next(iter(data_dl))
outputs = model.predict(inputs["pre_image"].to("cuda"))

In [None]:
from utils import display_semantic_predictions_batch

display_semantic_predictions_batch(images=inputs["pre_image"], 
                                    mask_predictions=outputs,
                                    mask_labels=inputs["mask"], 
                                    normalized=None, 
                                    folder_path=None
                                    )

In [None]:
from models import Maskrcnn
# Initialize the Mask R-CNN model
from models import Maskrcnn
maskrcnn = Maskrcnn(num_classes=2, hidden_layer_dim=256, pretrained=False)
maskrcnn.load("../models/xDB_ResNet50_MaskRCNN_checkpoint.pth")

In [None]:
from utils import display_semantic_predictions_batch
import numpy as np
import torch
maskrcnn.eval()
with torch.no_grad():
    images = inputs["pre_image"].to("cuda")
    mask_predictions = maskrcnn.predict_sem_seg(images = images)

In [None]:
display_semantic_predictions_batch(images=inputs["pre_image"], 
                                    mask_predictions=mask_predictions,
                                    mask_labels=inputs["mask"], 
                                    normalized=None, 
                                    folder_path=None
                                    )

In [None]:
from utils import display_instance_predictions_batch
maskrcnn.eval()
with torch.no_grad():
    images = inputs["pre_image"].to("cuda")
    instances_predictions = maskrcnn.predict(images = images)
    
display_instance_predictions_batch(images, mask_predictions, score_threshold=0.6, max_images=len(images), display=["mask","boxes"])

### Post Processing techniques 

In [None]:
from models import SiameseResNetUNet
model = SiameseResNetUNet(
    in_channels=3,
    out_channels=5,
    backbone_name="resnet18",
    pretrained=True,
    freeze_backbone=False,
    mode="conc")
model = model.load("../models/Siamese_ResNet18_Unet_20241206-152757_best_model.pth")
model = model.to("cuda")

In [None]:
file_paths = [f"../data/xDB/tier3/images/joplin-tornado_0000000{i}_pre_disaster.png" for i in range(0,5)]
pre_images = [Image.open(file_path).convert('RGB') for file_path in file_paths]
file_paths = [f"../data/xDB/tier3/images/joplin-tornado_0000000{i}_post_disaster.png" for i in range(0,5)]
post_images = [Image.open(file_path).convert('RGB') for file_path in file_paths]

big_image_pre = merge_images(pre_images)
big_image_post = merge_images(post_images)

inference = Inference(
        model=model, 
        pre_image=big_image_pre, 
        post_image=big_image_post,
        window_size=512, 
        num_classes=5,
        stride=100, 
        device ='cuda', 
        mode="siamese",
        transform=transforms.Compose([
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize(mean=[0.349, 0.354, 0.268], std=[0.114, 0.102, 0.094]),
        ])
    )

prediction = inference.infer().argmax(axis=0)

In [None]:
damage_colors = {
        0: (0, 0, 0),      # Black for background
        1: (0, 255, 0),    # Green for no-damage
        2: (255, 255, 0),  # Yellow for minor-damage
        3: (255, 126, 0),  # Orange for major-damage
        4: (255, 0, 0)     # Red for destroyed
    }

plot_results_building(image=big_image_post, prediction=prediction, color_dict = damage_colors)

In [None]:
from datasets import Levir_cd_dataset

In [None]:
data.L_paths

In [None]:
data.display_data(list_indices=[0])

In [None]:
from training.augmentations import (
    get_val_augmentation_pipeline )

transform = get_val_augmentation_pipeline(image_size = (512, 512))
data = Levir_cd_dataset(origin_dir="../data/data_samples/Levir-cd", type="test", transform=transform)

In [None]:
import re

def extract_coordinates(filename, folderpath : str = None):
    """
    Extracts the x and y coordinates from a filename like 'train_1_0_256.png'.
    
    Returns:
        (x, y): Tuple of integers representing coordinates.
    """
    filename = os.path.join(folderpath if folderpath is not None else "", filename)
    # Use regex to extract all numeric components
    parts = filename.replace('.png', '').split('_')
    print(parts)
    x = int(parts[2])
    y = int(parts[3])
    return (x, y)


# Example usage
filename = "train_1_0_256.png"
coords = extract_coordinates(filename)
print(coords)  # Output: (0, 256)

In [None]:
import rasterio
import matplotlib.pyplot as plt
bands = [1, 2, 3]
image_file = "tile_0_0.tif"
with rasterio.open(f"../data/processed_data/Post_Event_San_Juan_sample/{image_file}") as dataset:
    base_image = np.transpose(dataset.read(bands), (1, 2, 0))

with rasterio.open(f"../data/predictions/{image_file}") as dataset:
    preds = np.transpose(dataset.read([1, 2, 3, 4]), (1, 2, 0))

In [None]:
# with rasterio 
# access to 
# bounds : dst.bounds.left, top , right, bottom
# affine transform (map image pixels to CRS (a projected reference in a specific location (expressed in meters)) with an affine transformation)
# crs (here : EPSG:32619)
# image bands => .read => numpy array 

In [None]:
# writing mode : 
# driver# width and height
# count : number of bands 
# dtype : data type of dataset
# crs : coordinate system 
# transform affine transformation
# nodata 

# Every band in a dataset get a mask = src.read_masks(num_band)
#   0 : nodata regiion , 255 : valid data region 
# some nodata value can appear in the valid data region because of issue conversion 
# if dataset have some 0 

In [None]:
import os
import re

i_min, i_max = 0, 10
j_min, j_max = 0, 10
folder_path = "../data/predictions"

def extract_tile_ij(filename: str):
    pattern = r"^tile_(\d+)_(\d+)\.tif$"
    match = re.search(pattern, filename)
    if match:
        return int(match.group(1)), int(match.group(2))
    return None, None

def filter_list(filename: str):
    i, j = extract_tile_ij(filename)
    if i is None or j is None:
        return False
    return (i_min <= i <= i_max) and (j_min <= j <= j_max)

def build_list_file(folder: str):
    absolute_path = os.path.abspath(folder)
    file_list = [
        os.path.join(absolute_path, f)
        for f in os.listdir(absolute_path)
        if filter_list(f)
    ]
    return file_list

# Example usage:
filtered_files = build_list_file(folder_path)
print(filtered_files)
