This Notebook will download the CloudSEN12 "high" labeled images from Planetary Computer, make sure you have already downloaded the CloudSEN12 data from Hugging Face first, using the other CloudSEN12 notebook.

The only thing you will need to change in this is the base_dataset_dir to a local drive with 300 GB of available storage

In [1]:
import rasterio as rio
from tqdm.auto import tqdm
from pathlib import Path
from multiprocessing import Pool
import re
import logging
from datetime import datetime, timedelta
from typing import Tuple, Optional, List
import rioxarray
import pystac_client
import planetary_computer as pc
from shutil import copyfile
from pystac.item import Item
from pystac_client.stac_api_io import StacApiIO
from urllib3 import Retry


In [None]:
base_dataset_dir = Path("/media/nick/4TB Working 7/Datasets/OCM datasets")

In [None]:
high_planetary_computer_dir = base_dataset_dir / "CloudSEN12 high planetary computer"
high_planetary_computer_dir.mkdir(exist_ok=True, parents=True)

In [None]:
bands = ["B04", "B03", "B8A"]

In [None]:
high_dir = base_dataset_dir / "CloudSEN12 high"
if not high_dir.exists():
    raise FileNotFoundError(
        f"High directory {high_dir} does not exist."
        f" Please download the CloudSEN12 dataset from Hugging Face first."
    )

In [None]:
# Configure logging
logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s")


def parse_cloudsen12_filename(filename: str) -> Tuple[str, datetime, str]:
    """Parse CloudSEN12 filename to extract metadata."""
    pattern = r"CloudSEN12_ROI_(\d+)__(\d{8}T\d{6})_\d{8}T\d{6}_([A-Z0-9]+)_"
    match = re.search(pattern, filename)
    if not match:
        raise ValueError(f"Cannot parse filename: {filename}")

    roi_id, datetime_str, mgrs_tile = match.groups()
    # Remove T prefix from MGRS tile if present
    clean_mgrs_tile = mgrs_tile[1:] if mgrs_tile.startswith("T") else mgrs_tile

    # Parse datetime
    date_obj = datetime.strptime(datetime_str, "%Y%m%dT%H%M%S")

    return roi_id, date_obj, clean_mgrs_tile


def find_sentinel2_item(mgrs_tile: str, target_date: datetime) -> Optional[Item]:
    """Find matching Sentinel-2 item using broad date search then metadata filtering."""
    retry = Retry(
        total=5,
        backoff_factor=1,
        status_forcelist=[502, 503, 504],
        allowed_methods=None,
    )
    stac_api_io = StacApiIO(max_retries=retry)

    catalog = pystac_client.Client.open(
        "https://planetarycomputer.microsoft.com/api/stac/v1",
        modifier=pc.sign_inplace,
        stac_io=stac_api_io,
    )

    # Extract timestamp for datatake_id filtering
    timestamp_str = target_date.strftime("%Y%m%dT%H%M%S")

    # Broad date search (±3 days)
    start_date = (target_date - timedelta(days=3)).strftime("%Y-%m-%d")
    end_date = (target_date + timedelta(days=3)).strftime("%Y-%m-%d")

    search = catalog.search(
        collections=["sentinel-2-l2a"],
        datetime=f"{start_date}/{end_date}",
        query={"s2:mgrs_tile": {"eq": mgrs_tile}},
        limit=100,
    )

    items = list(search.items())

    if not items:
        logging.warning(
            f"No items found for MGRS {mgrs_tile} in "
            f"date range {start_date} to {end_date}"
        )
        return None

    # Filter by datatake_id containing our timestamp
    matching_items = []

    for item in items:
        datatake_id = item.properties.get("s2:datatake_id", "")
        if timestamp_str in datatake_id:
            matching_items.append(item)

    if len(matching_items) == 0:
        logging.info(
            f"No items found with timestamp {timestamp_str} in "
            f"datatake_id for MGRS {mgrs_tile}"
        )
        return None

    if len(matching_items) > 1:
        logging.info(
            f"Found {len(matching_items)} processing versions for "
            f"timestamp {timestamp_str}, using first"
        )
    # Use the first match (handles multiple processing levels)
    return matching_items[0]


def download_pc_bands(
    item,
    output_path: Path,
    reference_file: Path,
    bands: List[str],
) -> None:
    """Download specified bands from Planetary Computer and save as GeoTIFF."""

    # Get reference file properties
    with rio.open(reference_file) as ref:
        ref_crs = ref.crs
        ref_transform = ref.transform
        ref_bounds = ref.bounds
        ref_shape = (ref.height, ref.width)

    # Download each band
    band_arrays = []

    for band_name in bands:
        if band_name not in item.assets:
            raise ValueError(f"Band {band_name} not available in item")

        # Get signed URL and open with rioxarray
        signed_url = pc.sign(item.assets[band_name].href)

        # Open and clip to reference bounds
        band_da = rioxarray.open_rasterio(signed_url)

        # Clip to reference file bounds and reproject to match reference
        clipped = band_da.rio.clip_box(  # type: ignore
            minx=ref_bounds.left,
            miny=ref_bounds.bottom,
            maxx=ref_bounds.right,
            maxy=ref_bounds.top,
            crs=ref_crs,
        )

        reprojected = clipped.rio.reproject(
            ref_crs,
            shape=ref_shape,
            transform=ref_transform,
        )

        band_arrays.append(reprojected[0])

    with rio.open(
        output_path,
        "w",
        driver="GTiff",
        height=ref_shape[0],
        width=ref_shape[1],
        count=len(bands),
        dtype=band_arrays[0].dtype,
        crs=ref_crs,
        transform=ref_transform,
        compress="lzw",
    ) as dst:
        for index, band in enumerate(band_arrays):
            dst.write(band, index + 1)


def process_file(
    input_file: Path, output_dir: Path = high_planetary_computer_dir
) -> bool:
    """Process a single CloudSEN12 file."""
    try:
        _, target_date, mgrs_tile = parse_cloudsen12_filename(input_file.name)

        output_filename = input_file.name.replace("_l2a.tif", "_l2a_PC.tif")
        output_path = output_dir / output_filename

        if output_path.exists():
            return True

        # Find matching Sentinel-2 item
        item = find_sentinel2_item(mgrs_tile, target_date)
        if item is None:
            logging.error(f"No matching Sentinel-2 data found for {input_file.name}")
            return False

        # Download and save
        download_pc_bands(item, output_path, input_file, bands)
        return True

    except Exception as e:
        logging.error(f"Error processing {input_file.name}: {e}")
        return False

In [None]:
logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s")

In [None]:
# Find all L2A files
l2a_files = list(high_dir.glob("*_l2a.tif"))
total_count = len(l2a_files)
total_count

In [None]:
with Pool(32) as pool:
    list(tqdm(pool.imap(process_file, l2a_files), total=total_count))

In [None]:
# copy over the labels from high to planetary computer
high_labels = list(high_dir.glob("*label*.tif"))
len(high_labels)

In [None]:
for high_label in tqdm(high_labels):
    file_name = high_label.name
    file_name = file_name.replace(".tif", "_PC.tif")
    copyfile(high_label, high_planetary_computer_dir / file_name)