This notebook shows how to load samples from PASTIS using the dataloaders provided in the repository.


In [None]:
# Fill these file paths with the locations on your machine.
PATH_TO_CODE = "/Users/louis.stefanuto.c/Documents/pastis-benchmark-mines2024/src/"  # path to the code folder of the repo
PATH_TO_PASTIS = (
    "/Users/louis.stefanuto.c/Documents/pastis-benchmark-mines2024/PASTIS-mini/TRAIN"
)
# PATH_TO_PASTISR = '/path/PASTIS-R'
# PATH_TO_PASTISR_pixel = '/path/PASTIS-R_PixelSet/'


import sys

sys.path.append(PATH_TO_CODE)


import torch
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import matplotlib

cm = matplotlib.colormaps.get_cmap("tab20")
def_colors = cm.colors
cus_colors = ["k"] + [def_colors[i] for i in range(1, 20)] + ["w"]
cmap = ListedColormap(colors=cus_colors, name="agri", N=21)


def get_rgb(x, batch_index=0, t_show=1):
    """Utility function to get a displayable rgb image
    from a Sentinel-2 time series.
    """
    im = x["S2"][batch_index, t_show, [2, 1, 0]].cpu().numpy()
    mx = im.max(axis=(1, 2))
    mi = im.min(axis=(1, 2))
    im = (im - mi[:, None, None]) / (mx - mi)[:, None, None]
    im = im.swapaxes(0, 2).swapaxes(0, 1)
    im = np.clip(im, a_max=1, a_min=0)
    return im


def get_radar(x, batch_index=0, t_show=6, orbit="D"):
    """Utility function to get a displayable image
    from a Sentinel-1 time series.
    """
    im = x["S1{}".format(orbit)][batch_index, t_show].cpu().numpy()
    mx = im.max(axis=(1, 2))
    mi = im.min(axis=(1, 2))
    im = (im - mi[:, None, None]) / (mx - mi)[:, None, None]
    im = im.swapaxes(0, 2).swapaxes(0, 1)
    im = np.clip(im, a_max=1, a_min=0)
    return im

# Custom Dataloader


In [None]:
from baseline_dataset import BaselineDataset
from collate import pad_collate

dt = BaselineDataset(PATH_TO_PASTIS)

dl = torch.utils.data.DataLoader(
    dt, batch_size=32, collate_fn=pad_collate, shuffle=True
)

In [None]:
x, y = dl.__iter__().__next__()

In [None]:
print(x["S2"].shape)
print(y.shape)

In [None]:
bid = 0  # Which element of the batch to plot

fix, axes = plt.subplots(1, 2, figsize=(20, 20))

axes[0].imshow(get_rgb(x, batch_index=bid))
axes[1].imshow(y[bid].squeeze(), cmap=cmap, vmin=0, vmax=20)

axes[0].set_title("One S2 observation.")
axes[1].set_title("Semantic labels.")

plt.show()

# Metrics


In [None]:
from torchmetrics.segmentation import MeanIoU

NUM_CLASSES = 4
BATCH_SHAPE = (3, 256, 256)

# Create two randoms matrices, one for the preds, one for the targets
preds = torch.randint(low=0, high=NUM_CLASSES - 1, size=BATCH_SHAPE, dtype=torch.int64)
target = torch.randint(low=0, high=NUM_CLASSES - 1, size=BATCH_SHAPE, dtype=torch.int64)

miou = MeanIoU(num_classes=NUM_CLASSES)
miou(preds, target)

# Submission

This section shows you how to submit predictions on Kaggle.

Your submission must be in the CSV format. It should have two columns:
- **ID**: the ID of the image
- **MASKS**: contains the 1D-flattened string conversion of the 2D segmentation masks

To generate the `MASKS` column, we provide you a `masks_to_str` function. We also provide the decoding script so you have a plain understanding of how we will process your submission.

In [None]:
import numpy as np
import pandas as pd


def masks_to_str(predictions: np.ndarray) -> list[str]:
    """
    Convert the

    Args:
        predictions (np.ndarray): predictions as a 3D batch (B, H, W)

    Returns:
        list[str]: a list of B strings, each string is a flattened stringified prediction mask
    """
    return [" ".join(f"{x}" for x in np.ravel(x)) for x in predictions]


def decode_masks(
    masks: list[str],
    target_shape: tuple[int, int] = (128, 128),
) -> np.ndarray:
    """
    Convert each string in masks back to a 1D list of integers.

    Args:
        masks (list[str]): list of stringified masks

    Returns:
        np.ndarray: reconstructed batch of masks
    """
    return np.array(
        [
            np.fromstring(mask, sep=" ", dtype=np.uint8).reshape(target_shape)
            for mask in masks
        ]
    )

- generate a random submission (and solution)

To help you, here is a random submission generation.

In [None]:
X = np.random.randint(0, NUM_CLASSES, size=(16, 128, 128), dtype=np.uint8)
masks = masks_to_str(X)

# Is in the public test set or in the private test set ?
usages = ["Public"] * 10 + ["Private"] * (len(X) - 10)

submission = pd.DataFrame.from_dict({"ID": range(len(X)), "MASKS": masks})
solution = pd.DataFrame.from_dict(
    {"ID": range(len(X)), "Usage": usages, "MASKS": masks}
)

# Note that the index=False argument is important.
submission.to_csv("submission.csv", index=False)
solution.to_csv("solution.csv", index=False)

- decode

This section shows you how our automatic evaluation pipeline reads your CSV to compute the mIOU.

In [None]:
df = pd.read_csv("submission.csv")

# Verify the shape of the restored array
X_restored = decode_masks(df["MASKS"].to_list())
print(X_restored.shape)

In [None]:
# Reconstruction test
(X == X_restored).all()

Let's compute the mIOU between the original prediction batch tensor and its restored version.

If everything went well, the cell should return `1.0`.

In [None]:
NUM_CLASSES = 3
miou = MeanIoU(num_classes=NUM_CLASSES)

A = torch.tensor(X, dtype=torch.int64)

iou = miou(A, A)
iou

- The evaluation pipeline in Kaggle

In [None]:
"""
TODO: Enter any documentation that only people updating the metric should read here.

All columns of the solution and submission dataframes are passed to your metric, except for the Usage column.

Your metric must satisfy the following constraints:
- You must have a function named score. Kaggle's evaluation system will call that function.
- You can add your own arguments to score, but you cannot change the first three (solution, submission, and row_id_column_name).
- All arguments for score must have type annotations.
- score must return a single, finite, non-null float.
"""

import numpy as np
import pandas as pd
import pandas.api.types
import torch
from torchmetrics.segmentation import MeanIoU


class ParticipantVisibleError(Exception):
    # If you want an error message to be shown to participants, you must raise the error as a ParticipantVisibleError
    # All other errors will only be shown to the competition host. This helps prevent unintentional leakage of solution data.
    pass


def decode_masks(
    masks: list[str],
    target_shape: tuple[int, int] = (128, 128),
) -> np.ndarray:
    """
    Convert each string in masks back to a 1D list of integers.

    Args:
        masks (list[str]): list of stringified masks

    Returns:
        np.ndarray: reconstructed batch of masks
    """
    return np.array(
        [
            np.fromstring(mask, sep=" ", dtype=np.uint8).reshape(target_shape)
            for mask in masks
        ]
    )


def score(
    solution: pd.DataFrame, submission: pd.DataFrame, row_id_column_name: str
) -> float:
    # Check submission files
    COL_MASK = "MASKS"
    expected_columns = ["ID", COL_MASK]
    for col in expected_columns:
        if col not in submission.columns:
            raise ParticipantVisibleError(
                f"Required column: {col} not found in the submission dataframe. Check your column names."
            )

    if not pandas.api.types.is_string_dtype(submission[COL_MASK]):
        raise ParticipantVisibleError(
            f"Submission column {col} must be an object (str) column"
        )

    # Parse and decode the masks into tensors
    masks_submission = decode_masks(submission[COL_MASK].to_list())
    masks_solution = decode_masks(solution[COL_MASK].to_list())

    masks_submission = torch.tensor(masks_submission, dtype=torch.int64)
    masks_solution = torch.tensor(masks_solution, dtype=torch.int64)

    # Compute metrics
    NUM_CLASSES = 3
    miou = MeanIoU(num_classes=NUM_CLASSES)

    # Convert and return the tensor as a float
    return miou(masks_submission, masks_solution).item()