# Inference and Test Set Metric Calculation

This notebook is used for both CNN and Clay model inference on the testset.

## Setup

In [None]:
import sys
import warnings

sys.path.append("../../")
warnings.filterwarnings("ignore")

# set working directory to root 
import os
os.chdir("../../")
root = os.getcwd()
root = root + "/workspaces/mine-segmentation" # for lightning studios
print(f"Root directory: {root}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import geopandas as gpd
# import leafmap
import torch
import torch.nn.functional as F
# from einops import rearrange
# from matplotlib.colors import ListedColormap
from sklearn.metrics import jaccard_score, f1_score, accuracy_score, recall_score, precision_score, roc_auc_score
from tqdm import tqdm
import matplotlib.pyplot as plt
import contextily as ctx
# from shapely.wkt import loads
# import datetime
from pathlib import Path
import random
import shutil
import os

from src.models.datamodule import MineDataModule
from src.models.cnn.model import MineSegmentorCNN
from src.models.clay.segment.model import MineSegmentor


from src.visualization.visualization_funcs import plot_pred_vs_true_mask

In [None]:
# for development
%load_ext autoreload
%autoreload 2

#### Define paths and parameters

In [None]:
# # 2048 model
# MINESEG_CHECKPOINT_PATH = ("models/cnn/2048_mineseg-cnn_epoch-08_val-iou-0.5017.ckpt")
# CHIP_SIZE = 2048
# TESTSET_BATCH_SIZE = 1
# TRAIN_CHIP_DIR = "data/processed/chips/npy/2048/train/chips/"
# TRAIN_LABEL_DIR = " data/processed/chips/npy/2048/train/labels/"
# VAL_CHIP_DIR = "data/processed/chips/npy/2048/val/chips/"
# VAL_LABEL_DIR = "data/processed/chips/npy/2048/val/labels/"
# TEST_CHIP_DIR = "data/processed/chips/npy/2048/test/chips/"
# TEST_LABEL_DIR = "data/processed/chips/npy/2048/test/labels/"
# METADATA_PATH = "configs/cnn/cnn_segment_metadata.yaml"
# CLAY = False


# # 1024 model
# MINESEG_CHECKPOINT_PATH = ("models/cnn/1024_mineseg-cnn_epoch-15_val-iou-0.5290.ckpt")
# CHIP_SIZE = 1024
# TESTSET_BATCH_SIZE = 4
# TRAIN_CHIP_DIR = "data/processed/chips/npy/1024/train/chips/"
# TRAIN_LABEL_DIR = " data/processed/chips/npy/1024/train/labels/"
# VAL_CHIP_DIR = "data/processed/chips/npy/1024/val/chips/"
# VAL_LABEL_DIR = "data/processed/chips/npy/1024/val/labels/"
# TEST_CHIP_DIR = "data/processed/chips/npy/1024/test/chips/"
# TEST_LABEL_DIR = "data/processed/chips/npy/1024/test/labels/"
# METADATA_PATH = "configs/cnn/cnn_segment_metadata.yaml"
# CLAY = False


# 512 model
MINESEG_CHECKPOINT_PATH = ("models/cnn/mineseg-cnn_epoch-16_val-iou-0.5731.ckpt")
CHIP_SIZE = 512
TESTSET_BATCH_SIZE = 16 # 32 for L4, 16 for PC GPU
TRAIN_CHIP_DIR = "data/processed/chips/npy/512/train/chips/"
TRAIN_LABEL_DIR = "data/processed/chips/npy/512/train/labels/"
VAL_CHIP_DIR = "data/processed/chips/npy/512/val/chips/"
VAL_LABEL_DIR = "data/processed/chips/npy/512/val/labels/"
TEST_CHIP_DIR = "data/processed/chips/npy/512/test/chips/"
TEST_LABEL_DIR = "data/processed/chips/npy/512/test/labels/"
TEST_LABEL_VALIDATED_DIR = "data/processed/chips/npy/512/validated/test/labels/"
METADATA_PATH = "configs/cnn/cnn_segment_metadata.yaml"
CLAY = False


# # CLAY model
# MINESEG_CHECKPOINT_PATH = "models/clay/mineseg-clay-segment_epoch-00_val-iou-0.3155.ckpt"
# CLAY_CHECKPOINT_PATH = "models/clay-v1-base.ckpt"
# CHIP_SIZE = 512
# TESTSET_BATCH_SIZE = 1
# TRAIN_CHIP_DIR = "data/processed/chips/npy/512/train/chips/"
# TRAIN_LABEL_DIR = "data/processed/chips/npy/512/train/labels/"
# VAL_CHIP_DIR = "data/processed/chips/npy/512/val/chips/"
# VAL_LABEL_DIR = "data/processed/chips/npy/512/val/labels/"
# TEST_CHIP_DIR = "data/processed/chips/npy/512/test/chips/"
# TEST_LABEL_DIR = "data/processed/chips/npy/512/test/labels/"
# METADATA_PATH = "configs/clay/clay_segment_metadata.yaml"
# CLAY = True


# general setup
DATASET = "data/processed/mining_tiles_with_masks_and_bounding_boxes.gpkg"
BATCH_SIZE = 1
if torch.cuda.is_available():
    NUM_WORKERS = 16
else:
    NUM_WORKERS = 4
PLATFORM = "sentinel-2-l2a"

In [None]:
if CLAY:
    CLAY_CHECKPOINT_PATH = root + "/" + CLAY_CHECKPOINT_PATH

MINESEG_CHECKPOINT_PATH = root + "/" + MINESEG_CHECKPOINT_PATH
METADATA_PATH = root + "/" + METADATA_PATH
TRAIN_CHIP_DIR = root +  "/" + TRAIN_CHIP_DIR
TRAIN_LABEL_DIR = root + "/" + TRAIN_LABEL_DIR
VAL_CHIP_DIR = root + "/" + VAL_CHIP_DIR
VAL_LABEL_DIR = root + "/" + VAL_LABEL_DIR
TEST_CHIP_DIR = root + "/" + TEST_CHIP_DIR
TEST_LABEL_DIR = root + "/" + TEST_LABEL_DIR
DATASET = root + "/" + DATASET

In [None]:
model_name = MINESEG_CHECKPOINT_PATH.split("/")[-1]
print(f"Using model {model_name}")
print(f"Using chip size {CHIP_SIZE}")
print(f"Using test chip dir {TEST_CHIP_DIR}")
print(f"Using test label dir {TEST_LABEL_DIR}")

#### Model Loading

In [None]:
if CLAY:
    def get_model(mineseg_checkpoint_path, clay_checkpoint_path, metadata_path):
        model = MineSegmentor.load_from_checkpoint(
            checkpoint_path=mineseg_checkpoint_path,
            metadata_path=metadata_path,
            ckpt_path=clay_checkpoint_path,
        )
        model.eval()
        return model
else: 
    def get_model(checkpoint_path: str) -> MineSegmentorCNN:
        # check if gpu is available
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {device}")
        map_location=torch.device(device)
        checkpoint = torch.load(checkpoint_path, map_location=map_location)
        model_config = checkpoint["hyper_parameters"]
        model = MineSegmentorCNN.load_from_checkpoint(checkpoint_path, **model_config)
        model.eval()
        return model

#### Data Preparation

In [None]:
def get_data(
    train_chip_dir,
    train_label_dir,
    val_chip_dir,
    val_label_dir,
    test_chip_dir,
    test_label_dir,
    metadata_path,
    batch_size,
    num_workers,
    platform,
    data_augmentation,
    index=None
):
    dm = MineDataModule(
        train_chip_dir=train_chip_dir,
        train_label_dir=train_label_dir,
        val_chip_dir=val_chip_dir,
        val_label_dir=val_label_dir,
        test_chip_dir=test_chip_dir,
        test_label_dir=test_label_dir,
        metadata_path=metadata_path,
        batch_size=batch_size,
        num_workers=num_workers,
        platform=platform,
        data_augmentation=data_augmentation,
    )
    
    dm.setup(stage="test")
    
    if index is not None:
        test_dl = iter(dm.test_dataloader())
        for i in range(index + 1):
            batch = next(test_dl)
        metadata = dm.metadata
        return batch, metadata
    else:
        test_dl = dm.test_dataloader()
        batch = next(iter(test_dl))
        metadata = dm.metadata
        return batch, metadata

#### Prediction

In [None]:
def run_prediction(model, batch, is_clay=False):
    with torch.no_grad():
        if is_clay:
            image=batch
        else:
            image = batch["pixels"]
        outputs = model(image)
    outputs = F.interpolate(
        outputs, size=(CHIP_SIZE, CHIP_SIZE), mode="bilinear", align_corners=False
    )
    return outputs

#### Post-Processing

In [None]:
def post_process(batch, outputs, metadata, index=0):
    
    outputs = outputs.sigmoid()
    prob_mask = outputs.cpu().numpy()
    pred_mask = (prob_mask > 0.5).astype(float)
    labels = batch["label"].detach().cpu().numpy()
    pixels = batch["pixels"].detach().cpu().numpy()

    # normalize and clip the image ranges
    pixels = (pixels - pixels.min()) / (pixels.max() - pixels.min())
    pixels = np.clip(pixels, 0, 1)

    images = pixels[index]
    labels = labels[index]
    prob_mask = prob_mask[index]
    pred_mask = pred_mask[index].astype(float)

    images = images.transpose((1,2,0))
    prob_mask = prob_mask.transpose((1,2,0))
    pred_mask = pred_mask.transpose((1,2,0)).astype(float)

    # normalize the probablity mask
    # prob_mask = (prob_mask - prob_mask.min()) / (prob_mask.max() - prob_mask.min())

    return images, labels, prob_mask, pred_mask

#### Plotting

In [None]:
def plot_predictions(images, labels, probas, preds):
    fig, axes = plt.subplots(1, 5, figsize=(15, 6))

    # Plot the image
    axes[0].imshow(images)
    axes[0].axis("off")
    axes[0].set_title("Image", fontsize=12)

    # Plot the actual segmentation
    axes[1].imshow(labels, vmin=0, vmax=1)
    axes[1].axis("off")
    axes[1].set_title("Actual", fontsize=12)

    # Plot the predicted segmentation
    axes[2].imshow(preds, vmin=0, vmax=1)
    axes[2].axis("off")
    axes[2].set_title("Pred", fontsize=12)

    # Plot the predicted segmentation
    axes[3].imshow(probas, vmin=0, vmax=1)
    axes[3].axis("off")
    axes[3].set_title("Proba", fontsize=12)

    # Plot the plot_pred_vs_true_mask
    plot_pred_vs_true_mask(images, labels, preds.squeeze(), ax=axes[4], add_legend=False)
    axes[4].set_title("Pred vs True", fontsize=12)

    plt.tight_layout()
    plt.show()

In [None]:
# Load model
if CLAY:
    model = get_model(MINESEG_CHECKPOINT_PATH, CLAY_CHECKPOINT_PATH, METADATA_PATH)
else:
    model = get_model(MINESEG_CHECKPOINT_PATH)

## Plot example predictions

Plot 4 chips with mining area, and 4 without mining area. 

In [None]:
test_chips = os.listdir(TEST_CHIP_DIR)
test_chips_indices = [test_chips.index(chip) for chip in test_chips]
test_chips_wo_miningarea = [chip for chip in test_chips if "nominearea" in chip]
test_chips_w_miningarea = [chip for chip in test_chips if "nominearea" not in chip]
indices_wo_miningarea = [test_chips.index(chip) for chip in test_chips_wo_miningarea]
indices_w_miningarea = [test_chips.index(chip) for chip in test_chips_w_miningarea]

print(f"Number of all test chips: {len(test_chips)}")
print(f"Number of test chips without mining area: {len(test_chips_wo_miningarea)}")
print(f"Number of test chips with mining area: {len(test_chips_w_miningarea)}")

# take a sample of the chips with mining area 
# random.seed(42)
sample_indices_w_ma = random.sample(indices_w_miningarea, 4)
print(f"Sample indices of chips with mining area: {sample_indices_w_ma}")
if not CHIP_SIZE == 2048:
    sample_indices_wo_ma = random.sample(indices_wo_miningarea, 4)
    print(f"Sample indices of chips without mining area: {sample_indices_wo_ma}")

In [None]:
def plot_sample_predictions(sample_indices):
    for index in sample_indices:

        # Get data
        batch, metadata = get_data(
            TRAIN_CHIP_DIR,
            TRAIN_LABEL_DIR,
            VAL_CHIP_DIR,
            VAL_LABEL_DIR,
            TEST_CHIP_DIR,
            TEST_LABEL_DIR,
            METADATA_PATH,
            BATCH_SIZE,
            NUM_WORKERS,
            PLATFORM,
            data_augmentation=False,
            index=index
        )

        # Move batch to GPU
        if torch.cuda.is_available():
            batch = {k: v.to("cuda") for k, v in batch.items()}

        # Run prediction
        outputs = run_prediction(model, batch, is_clay=CLAY)

        # Post-process the results
        images, labels, probas, preds = post_process(batch, outputs, metadata)

        # Plot the predictions
        plot_predictions(images, labels, probas, preds)

#### Predictions on chips with mining area: 

In [None]:
plot_sample_predictions(sample_indices_w_ma)

#### Predictions on chips without mining area:

In [None]:
if not CHIP_SIZE == 2048:
    plot_sample_predictions(sample_indices_wo_ma)

## Metric calculation on test dataset

To calculate the metric on the test dataset, we need to predict the masks for all the chips in the test dataset and then calculate an aggregate metric.


By default, the test set includes only chips with mining area. However, this will cover differently large areas of the actual tiles. Therefore, we will also calculate the metric on the full test set, which includes all chips from the test tiles. This makes models for different chip sizes comparable.

### Define Functions

In [None]:
def copy_files_to_temp_directory(tile_id, test_chip_dir, test_label_dir, temp_chip_dir, temp_label_dir):
    """
    Copies files with the specified tile_id from the test directories to the temporary directories.

    Args:
        tile_id (int): The tile ID to filter the files.
        test_chip_dir (str): The directory containing the test chip files.
        test_label_dir (str): The directory containing the test label files.
        temp_chip_dir (str): The temporary directory to copy the chip files.
        temp_label_dir (str): The temporary directory to copy the label files.

    Returns:
        tuple: A tuple containing the temporary chip directory, temporary label directory, and the last copied chip file.

    """
    # Remove any existing files in the directories
    for chip in os.listdir(temp_chip_dir):
        os.remove(os.path.join(temp_chip_dir, chip))
    for mask in os.listdir(temp_label_dir):
        os.remove(os.path.join(temp_label_dir, mask))

    filenames = []
    # Move chip files with the specified tile_id to the temp directory
    for chip in os.listdir(test_chip_dir):
        if int(chip.split("_")[0]) == tile_id:
            shutil.copy(os.path.join(test_chip_dir, chip), os.path.join(temp_chip_dir, chip))
            # get file name without extension and nominearea addition
            filename = chip.split("_")[0:7]
            filename = "_".join(filename)
            filenames.append(filename)

    # Move mask files with the specified tile_id to the temp directory
    for mask in os.listdir(test_label_dir):
        if int(mask.split("_")[0]) == tile_id:
            shutil.copy(os.path.join(test_label_dir, mask), os.path.join(temp_label_dir, mask))
            filename = mask.split("_")[0:7]
            filename = "_".join(filename)
            filenames.append(filename)

    # make sure that the filenames are all the same
    filenames = list(set(filenames))
    assert len(filenames) == 1, "Filenames are not all the same"

    # print(f"Number of chips: {len(os.listdir(temp_chip_dir))}")
    # print(f"Number of masks: {len(os.listdir(temp_label_dir))}")
    # print("Filename: ", filename)

    return filename

In [None]:
def calculate_metrics(
        model, 
        test_chip_dir, 
        test_label_dir,
        testset_batch_size, 
        calculate_per_tile=False
    ):
    """
    Calculates various metrics for evaluating the performance of a model on a test dataset.

    Args:
        model (torch.nn.Module): The model to evaluate.
        test_chip_dir (str): The directory containing the test dataset chips.
        test_label_dir (str): The directory containing the test dataset labels.
        testset_batch_size (int): The batch size for processing the test dataset.
        calculate_per_tile (bool): If True, the metrics will be calculated for each tile in the test dataset.
        
    Returns:
        pandas.DataFrame: A DataFrame containing the calculated metrics for each image in the test dataset.
            The DataFrame has the following columns: 'image', 'file_name', 'iou', 'f1', 'accuracy', 'recall', 'precision'.
    """

    # Initialize a DataFrame to store the results
    results = pd.DataFrame(columns=['image', "file_name", 'iou', 'f1', 'accuracy', 'recall', 'precision'])

    # Get the list of test chips
    test_chips = os.listdir(test_chip_dir)
    test_masks = os.listdir(test_label_dir)

    # Calculate number of batches
    num_batches = int(np.ceil(len(test_chips) / testset_batch_size))

    # Move model to GPU
    if torch.cuda.is_available():
        model = model.to("cuda")

    if calculate_per_tile:
        # Create a new directory at the same level as TEST_CHIP_DIR with the name temp
        greatgrandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(test_chip_dir)))
        temp_dir = os.path.join(greatgrandparent_dir, "temp")
        Path(os.path.join(temp_dir, "chips")).mkdir(parents=True, exist_ok=True)
        Path(os.path.join(temp_dir, "labels")).mkdir(parents=True, exist_ok=True)

        temp_chip_dir = os.path.join(temp_dir, "chips/")
        temp_label_dir = os.path.join(temp_dir, "labels/")
        print(f"Temporary chip directory: {temp_chip_dir}")
        print(f"Temporary label directory: {temp_label_dir}")

        # Get unique tile ids
        tile_ids_chips = [chip.split("_")[0] for chip in test_chips]
        tile_ids_masks = [mask.split("_")[0] for mask in test_masks]
        unique_tile_ids_chips = set(tile_ids_chips)
        unique_tile_ids_masks = set(tile_ids_masks)

        assert unique_tile_ids_chips == unique_tile_ids_masks
        unique_tile_ids = sorted(list(unique_tile_ids_chips))
        print(f"Number of unique tile ids: {len(unique_tile_ids)}")
        
    for i in tqdm(range(num_batches)):
        
        if calculate_per_tile:
            tile_id = int(unique_tile_ids[i])

            file_name = copy_files_to_temp_directory(
                tile_id, 
                test_chip_dir, 
                test_label_dir, 
                temp_chip_dir, 
                temp_label_dir
                )
            index=0
            chip_dir = temp_chip_dir
            label_dir = temp_label_dir
            file_names = [file_name]
            assert len(file_names) == 1
        else:
            # Get the file names for the current batch
            file_names = test_chips[i * testset_batch_size : (i + 1) * testset_batch_size]
            index=i
            chip_dir = test_chip_dir
            label_dir = test_label_dir

        # Get data
        batch, metadata = get_data(
            TRAIN_CHIP_DIR,
            TRAIN_LABEL_DIR,
            VAL_CHIP_DIR,
            VAL_LABEL_DIR,
            chip_dir,
            label_dir,
            METADATA_PATH,
            testset_batch_size,
            NUM_WORKERS,
            PLATFORM,
            data_augmentation=False,
            index=index
        )
        
        # Move batch to GPU
        if torch.cuda.is_available():
            batch = {k: v.to("cuda") for k, v in batch.items()}

        # Run prediction
        outputs = run_prediction(model, batch, is_clay=CLAY)

        labels = batch["label"].detach().cpu().numpy()
        probas = outputs.sigmoid().detach().cpu().numpy()
        preds = (probas > 0.5).astype(float)
            
        # Calculate the metrics for each image in the batch
        for j in range(len(file_names)):

            if calculate_per_tile:
                true_mask = labels.flatten()
                pred_mask = preds.flatten()
            else:
                true_mask = labels[j].flatten()
                pred_mask = preds[j].flatten()

            iou = jaccard_score(true_mask, pred_mask)
            f1 = f1_score(true_mask, pred_mask)
            accuracy = accuracy_score(true_mask, pred_mask)
            recall = recall_score(true_mask, pred_mask)
            precision = precision_score(true_mask, pred_mask)

            # Add the results to the DataFrame
            results = pd.concat(
                [results, 
                pd.DataFrame(
                    {
                        'image': [i * testset_batch_size + j], 
                        'file_name': [file_names[j]],
                        'iou': [iou], 
                        'f1': [f1], 
                        'accuracy': [accuracy], 
                        'recall': [recall], 
                        'precision': [precision],
                    })])

    return results

In [None]:
def print_grouped_metrics(results_grouped):
    # Metrics per mine type
    print("Metrics per mine type (minetype1):")
    count = results_grouped.groupby("minetype1")[["iou"]].count()
    count.columns = ["count"]
    mean = results_grouped.groupby("minetype1")[["iou", "f1", "accuracy", "recall", "precision"]]
    mean = pd.concat([count, mean.mean()], axis=1)
    print(mean)
    print("-----------------------------------------------")

    print("Metrics per mine type (minetype2):")
    count = results_grouped.groupby("minetype2")[["iou"]].count()
    count.columns = ["count"]
    mean = results_grouped.groupby("minetype2")[["iou", "f1", "accuracy", "recall", "precision"]]
    mean = pd.concat([count, mean.mean()], axis=1)
    print(mean)
    print("-----------------------------------------------")

    # Metrics per dataset
    print("Metrics per dataset (preferred_dataset):")
    count = results_grouped.groupby("preferred_dataset")[["iou"]].count()
    count.columns = ["count"]
    mean = results_grouped.groupby("preferred_dataset")[["iou", "f1", "accuracy", "recall", "precision"]]
    mean = pd.concat([count, mean.mean()], axis=1)
    print(mean)

In [None]:
def plot_iou_on_test_tiles(results_grouped, add_tile_id=False):
    fig, ax = plt.subplots(1, 1, figsize=(20, 25))

    # Assuming test_tiles is your GeoDataFrame
    results_grouped = results_grouped.set_crs(epsg=4326)

    # Convert the GeoDataFrame to Web Mercator projection (EPSG:3857)
    results_grouped = results_grouped.to_crs(epsg=3857)

    plot_df = results_grouped.copy()

    # Buffer the polygons to increase their size
    plot_df['geometry'] = plot_df.geometry.buffer(170000)

    # Plot the GeoDataFrame
    plot_df.plot(column='iou', cmap='RdYlGn', linewidth=0.8, ax=ax, edgecolor='0.8', alpha=0.8)

    # Add a basemap
    ctx.add_basemap(ax, source=ctx.providers.OpenStreetMap.Mapnik)

    ax.axis('off')
    ax.set_title('IoU on test tiles', fontdict={'fontsize': '25', 'fontweight' : '3'})

    # Create a colorbar as a legend
    sm = plt.cm.ScalarMappable(cmap='RdYlGn', norm=plt.Normalize(vmin=min(plot_df['iou']), vmax=max(plot_df['iou'])))
    sm._A = []
    cbar = fig.colorbar(sm, ax=ax, orientation='horizontal', fraction=0.03, pad=0.04)
    cbar.ax.tick_params(labelsize=8)  # set the size of the colorbar labels

    if add_tile_id: 
        # Annotate each geometry with the label from the specified column
        for idx, row in plot_df.iterrows():
            centroid = row['geometry'].centroid
            ax.annotate(
                text=row["tile_id"], 
                xy=(centroid.x, centroid.y), 
                xytext=(5, 5),  # shift annotation to the top right
                textcoords='offset points', 
                horizontalalignment='center', 
                fontsize=8, 
                color='black',
                rotation=45  # angle the text by 45 degrees
            )
    plt.show()

Create the new directory for the report and the data files

In [None]:
if CLAY:
    output_dir = root + f"/reports/clay/{''.join(model_name.split('.')[:-1])}"
else:
    output_dir = root + f"/reports/cnn/{''.join(model_name.split('.')[:-1])}"
Path(output_dir).mkdir(parents=True, exist_ok=True)
print(f"Saving results to {output_dir}")

### 1. On the test set chips (mining area chips only)

Copy Chips and Labels with mining area to a separate directory: 

In [None]:
# List all files in the test chip directory
test_chips = os.listdir(TEST_CHIP_DIR)
test_masks = os.listdir(TEST_LABEL_DIR)

# Create a new directory at the same level as TEST_CHIP_DIR with the name test_minearea_only
greatgrandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(TEST_CHIP_DIR)))
test_minearea_only_dir = os.path.join(greatgrandparent_dir, "test_minearea_only")
Path(os.path.join(test_minearea_only_dir, "chips")).mkdir(parents=True, exist_ok=True)
Path(os.path.join(test_minearea_only_dir, "labels")).mkdir(parents=True, exist_ok=True)

minearea_chip_dir = os.path.join(test_minearea_only_dir, "chips/")
minearea_label_dir = os.path.join(test_minearea_only_dir, "labels/")

In [None]:
# Move files containing "nominearea" to the respective directories
for chip in tqdm(test_chips):
    if "nominearea" not in chip:
        # Move chip file
        shutil.copy(os.path.join(TEST_CHIP_DIR, chip), os.path.join(minearea_chip_dir, chip))

# Move files containing "nominearea" to the respective directories
for mask in tqdm(test_masks):
    if "nominearea" not in mask:
        # Move mask file
        shutil.copy(os.path.join(TEST_LABEL_DIR, mask), os.path.join(minearea_label_dir, mask))

print(f"Copied chips with mining area to {minearea_chip_dir}")
print(f"Copied labels with mining area to {minearea_label_dir}")
print(f"Number of chips with mining area: {len(os.listdir(minearea_chip_dir))}")

Calculate the metrics: 

In [None]:
# results = calculate_metrics(model, minearea_chip_dir, minearea_label_dir, TESTSET_BATCH_SIZE)

**Metrics on the Test Set (Average over all chips with mining area):**

In [None]:
# # load the processed dataset
# tiles = gpd.read_file(DATASET, layer="tiles")
# test_tiles = tiles[tiles["split"] == "test"]

# # extract the tile_id from the file_name
# results["tile_id"] = results["file_name"].apply(lambda x: x.split("_")[0])

# # convert tile_id to int
# results['tile_id'] = results['tile_id'].astype(int)

# # Merge the results with the test_tiles
# results_merged = results.merge(test_tiles[["tile_id", "preferred_dataset", "minetype1", "minetype2"]], on='tile_id')

# # # save as csv in the reports folder
# output_path = output_dir + "/testset_metrics_per_chip.csv"
# results_merged.to_csv(output_path, index=False)
# print(f"Saved per-chip metrics to {output_path}")

# # calculate overall metrics
# iou = results_merged["iou"].mean()
# f1 = results_merged["f1"].mean()
# accuracy = results_merged["accuracy"].mean()
# recall = results_merged["recall"].mean()
# precision = results_merged["precision"].mean()

# # print the results
# print(f"Mean IoU: {iou}")
# print(f"Mean F1: {f1}")
# print(f"Mean Accuracy: {accuracy}")
# print(f"Mean Recall: {recall}")
# print(f"Mean Precision: {precision}")

# results_merged

Grouped metrics: 

In [None]:
# print_grouped_metrics(results_merged)

Histogram of the IoU: 

In [None]:
# # print a histogram of the iou scores
# results_merged["iou"].hist(bins=20)
# plt.show()

Boxplots: 

In [None]:
# # create boxplots of the iou scores
# results_merged.boxplot(column=["iou", "f1", "accuracy", "precision", "recall"], figsize=(10, 6), rot=45, grid=False)
# plt.show()

### 2. On the whole test set tile (including chips without mining area)
This allows us to compare the metrics between models with different chip sizes.

Calculate the metrics: 

In [None]:
# calculate the batch size, so that one batch includes all chips for a tile
batch_size_tile = int((2048/CHIP_SIZE)**2)
print(f"Batch size: {batch_size_tile}")

results = calculate_metrics(model, TEST_CHIP_DIR, TEST_LABEL_DIR, batch_size_tile, calculate_per_tile=True)

Per-Tile metrics: 

In [None]:
# load the processed dataset
tiles = gpd.read_file(DATASET, layer="tiles")
test_tiles = tiles[tiles["split"] == "test"]

# extract the tile_id from the file_name
results["tile_id"] = results["file_name"].apply(lambda x: x.split("_")[0])

# convert tile_id to int
results['tile_id'] = results['tile_id'].astype(int)

# Merge the results with the test_tiles
results_merged = results.merge(test_tiles[["tile_id", "preferred_dataset", "minetype1", "minetype2", "geometry"]], on='tile_id')

# # save as csv in the reports folder
output_path = output_dir + "/testset_metrics_per_tile.csv"
results_merged.to_csv(output_path, index=False)
print(f"Saved aggregated metrics to {output_path}")

# calculate overall metrics
iou = results_merged["iou"].mean()
f1 = results_merged["f1"].mean()
accuracy = results_merged["accuracy"].mean()
recall = results_merged["recall"].mean()
precision = results_merged["precision"].mean()

# print the results
print(f"Mean IoU: {iou}")
print(f"Mean F1: {f1}")
print(f"Mean Accuracy: {accuracy}")
print(f"Mean Recall: {recall}")
print(f"Mean Precision: {precision}")
results_merged[['tile_id', 'preferred_dataset', 'minetype1', 'minetype2', 'iou', 'f1', 'accuracy', 'recall', 'precision']]

Grouped metrics: 

In [None]:
print_grouped_metrics(results_merged)

Histogram: 

In [None]:
# print a histogram of the iou scores
results_merged["iou"].hist(bins=20)
plt.show()

Boxplots: 

In [None]:
# create boxplots of the iou scores
results_merged.boxplot(column=["iou", "f1", "accuracy", "precision", "recall"], figsize=(10, 6), rot=45, grid=False)
plt.show()

IoU per tile on the map: 

In [None]:
results_merged_gdf = gpd.GeoDataFrame(results_merged, geometry=results_merged["geometry"])

plot_iou_on_test_tiles(results_merged_gdf, add_tile_id = True)

### 3. On the whole test set tile, using validated mining areas

This allows us to see if the model performs better or worse on a set of validated polyons

Calculate the metrics: 

In [None]:
# calculate the batch size, so that one batch includes all chips for a tile
batch_size_tile = int((2048/CHIP_SIZE)**2)
print(f"Batch size: {batch_size_tile}")

results = calculate_metrics(model, TEST_CHIP_DIR, TEST_LABEL_VALIDATED_DIR, batch_size_tile, calculate_per_tile=True)

Per-Tile metrics: 

In [None]:
# load the processed dataset
tiles = gpd.read_file(DATASET, layer="tiles")
test_tiles = tiles[tiles["split"] == "test"]

# extract the tile_id from the file_name
results["tile_id"] = results["file_name"].apply(lambda x: x.split("_")[0])

# convert tile_id to int
results['tile_id'] = results['tile_id'].astype(int)

# Merge the results with the test_tiles
results_merged = results.merge(test_tiles[["tile_id", "preferred_dataset", "minetype1", "minetype2", "geometry"]], on='tile_id')

# # save as csv in the reports folder
output_path = output_dir + "/testset_metrics_per_tile.csv"
results_merged.to_csv(output_path, index=False)
print(f"Saved aggregated metrics to {output_path}")

# calculate overall metrics
iou = results_merged["iou"].mean()
f1 = results_merged["f1"].mean()
accuracy = results_merged["accuracy"].mean()
recall = results_merged["recall"].mean()
precision = results_merged["precision"].mean()

# print the results
print(f"Mean IoU: {iou}")
print(f"Mean F1: {f1}")
print(f"Mean Accuracy: {accuracy}")
print(f"Mean Recall: {recall}")
print(f"Mean Precision: {precision}")
results_merged[['tile_id', 'preferred_dataset', 'minetype1', 'minetype2', 'iou', 'f1', 'accuracy', 'recall', 'precision']]

Grouped metrics: 

In [None]:
print_grouped_metrics(results_merged)

Histogram: 

In [None]:
# print a histogram of the iou scores
results_merged["iou"].hist(bins=20)
plt.show()

Boxplots: 

In [None]:
# create boxplots of the iou scores
results_merged.boxplot(column=["iou", "f1", "accuracy", "precision", "recall"], figsize=(10, 6), rot=45, grid=False)
plt.show()

IoU per tile on the map: 

In [None]:
results_merged_gdf = gpd.GeoDataFrame(results_merged, geometry=results_merged["geometry"])

plot_iou_on_test_tiles(results_merged_gdf, add_tile_id = True)

## Plot individual tiles and predictions
You can check the dataframe above (results_merged), and indicate the index that should be displayed. 

In [None]:
# Create a new directory at the same level as TEST_CHIP_DIR with the name temp
greatgrandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(TEST_CHIP_DIR)))
temp_dir = os.path.join(greatgrandparent_dir, "temp")
Path(os.path.join(temp_dir, "chips")).mkdir(parents=True, exist_ok=True)
Path(os.path.join(temp_dir, "labels")).mkdir(parents=True, exist_ok=True)

temp_chip_dir = os.path.join(temp_dir, "chips/")
temp_label_dir = os.path.join(temp_dir, "labels/")

batch_size_tile = int((2048/CHIP_SIZE)**2)
print(f"Batch size: {batch_size_tile}")

In [None]:
tile_id = 1100

file_name = copy_files_to_temp_directory(tile_id, TEST_CHIP_DIR, TEST_LABEL_DIR, temp_chip_dir, temp_label_dir)
print(f"Using tile_id {tile_id}, with filename {file_name}")

In [None]:
# Get data
batch, metadata = get_data(
    TRAIN_CHIP_DIR,
    TRAIN_LABEL_DIR,
    VAL_CHIP_DIR,
    VAL_LABEL_DIR,
    temp_chip_dir,
    temp_label_dir,
    METADATA_PATH,
    batch_size_tile,
    NUM_WORKERS,
    PLATFORM,
    data_augmentation=False,
    index=0
)

# Move batch to GPU
if torch.cuda.is_available():
    batch = {k: v.to("cuda") for k, v in batch.items()}

# Run prediction
outputs = run_prediction(model, batch, is_clay=CLAY)

In [None]:
# list all files in the temp directory
temp_chips = os.listdir(temp_chip_dir)

# extract the chip number
chip_num = [int(chip.split("_")[9].split(".")[0]) for chip in temp_chips]

images = []
labels = []
probas = []
preds = []

# Iterate over the indices in the batch
for i in range(len(chip_num)):
    index = chip_num.index(i)
    # print(f"Processing chip {temp_chips[index]}")
    # Post-process the results
    image, label, proba, pred = post_process(batch, outputs, metadata, index=index)
    images.append(image)
    labels.append(label)
    probas.append(proba.squeeze())
    preds.append(pred.squeeze())

# Combine the 16 images into a single 2048x2048 image
def put_np_together(images, channels=3, chip_size=512):
    if channels == 1:
        big_image = np.zeros((2048, 2048))
    elif channels == 3:
        big_image = np.zeros((2048, 2048, 3))

    # Define the chip size and the number of chips in each dimension
    chip_size = chip_size
    n_chips_x = 2048 // chip_size
    n_chips_y = 2048 // chip_size

    # Iterate over the 16 images and place them in the correct position
    chip_number = 0
    for i in range(n_chips_x):  # Iterate over columns
        for j in range(n_chips_y):  # Iterate over rows
            x1, y1 = j * chip_size, i * chip_size
            x2, y2 = x1 + chip_size, y1 + chip_size
            if channels == 1:
                big_image[x1:x2, y1:y2] = images[chip_number]
            else:
                big_image[x1:x2, y1:y2, :] = images[chip_number]
            chip_number += 1

    return big_image

big_image = put_np_together(images, channels=3, chip_size=CHIP_SIZE)
big_label = put_np_together(labels, channels=1, chip_size=CHIP_SIZE)
big_proba = put_np_together(probas, channels=1, chip_size=CHIP_SIZE)
big_pred = put_np_together(preds, channels=1, chip_size=CHIP_SIZE)

plot_predictions(big_image, big_label, big_proba, big_pred)

In [None]:
plot_pred_vs_true_mask(big_image, big_label, big_pred, add_legend=False)

In [None]:
# REMEMBER TO SAVE THE NOTEBOOK BEFORE RUNNING THIS CELL!!!!
nb_path = root + "/notebooks/cnn/cnn_inference.ipynb"
output_dir_report = output_dir + "/"
print(f"Exporting notebook to {output_dir_report}")

# export the notebook to reports
!jupyter nbconvert --to html $nb_path --output-dir=$output_dir_report --output="CNN_testset_evaluation.html" --no-input --no-prompt