This notebook was used to do the training and testing split of the dataset for the Late Fusion model. The starting input is represented by the ASM points previously obtain with the R script that leverages the BlockCV package.

Note that this notebook was ran in Colab, as the dataset was originally stored in Google Drive, so it might be necessary to install some packages in the environment to be executable locally (as well as changing the directories). 

In [None]:
import geopandas as gpd
import rasterio
import shapely.geometry
import os
import numpy as np
import matplotlib.pyplot as plt
import shutil

from shapely.geometry import box
from sklearn.model_selection import train_test_split

In [None]:
# from google.colab import drive
# drive.mount('/content/drive/')

In [None]:
split_number = '9'
train_points = gpd.read_file(f'/content/drive/MyDrive/mgi_thesis/asm_points_split/split_{split_number}/train_data_split_{split_number}.geojson')
test_points = gpd.read_file(f'/content/drive/MyDrive/mgi_thesis/asm_points_split/split_{split_number}/test_data_split_{split_number}.geojson')

In [None]:
gt_dir = '/content/drive/MyDrive/mgi_thesis/gt_binary'
planet_dir = '/content/drive/MyDrive/mgi_thesis/planet_images'
s1_dir = '/content/drive/MyDrive/mgi_thesis/s1_images_both_orbits'

base_dir = f'/content/drive/MyDrive/mgi_thesis/asm_dataset_split_{split_number}/fusion'
train_dir = os.path.join(base_dir, 'training_data')
test_dir = os.path.join(base_dir, 'testing_data')

In [None]:
def get_image_boundaries(image_path):
    """
    Extracts the boundaries of a raster image as a polygon. This function
    will allow to determine which ASM sites fall within the area covered by
    each satellite image.

    Parameters:
    image_path (str): Path to the raster image file.

    Returns:
    shapely.geometry.Polygon: A polygon of the image's geographical bounds.
    """
    with rasterio.open(image_path) as dataset:
        bounds = dataset.bounds
        return box(bounds.left, bounds.bottom, bounds.right, bounds.top)

In [None]:
image_boundaries = []
image_types = ['Sentinel-1', 'Planet']
directories = [s1_dir, planet_dir]

for image_type, directory in zip(image_types, directories):
    print(f"Checking directory: {directory} for {image_type}")
    for image_filename in os.listdir(directory):
        if image_filename.endswith('.tif') and not image_filename.startswith('gt_'):
            image_path = os.path.join(directory, image_filename)
            boundary = get_image_boundaries(image_path)

            image_id_with_type = f"{image_filename.replace('.tif', '')}"
            image_boundaries.append({'image_id': image_id_with_type, 'type': image_type, 'geometry': boundary})

images_gdf = gpd.GeoDataFrame(image_boundaries, geometry='geometry')

In [None]:
# set the CRS again
images_gdf = images_gdf.set_crs(4326)

In [None]:
def categorize_images(images_gdf, train_points_gdf, test_points_gdf):
    """
    Perform spatial join and categorize images
    """
    # spatial join points with images
    train_overlap = gpd.sjoin(images_gdf, train_points_gdf, how='left', op='intersects')
    test_overlap = gpd.sjoin(images_gdf, test_points_gdf, how='left', op='intersects')

    # count points in each image
    train_counts = train_overlap.groupby('image_id').size()
    test_counts = test_overlap.groupby('image_id').size()

    # determine category based on counts
    image_category = {}
    for image_id in images_gdf['image_id']:
        train_count = train_counts.get(image_id, 0)
        test_count = test_counts.get(image_id, 0)
        category = 'training' if train_count > test_count else 'testing'
        image_category[image_id] = category

    return image_category

# categorize images
image_category = categorize_images(images_gdf, train_points, test_points)

In [None]:
def validate_and_copy(src_path, dest_path, file_type):
    if os.path.exists(src_path):
        shutil.copy(src_path, dest_path)
        print(f"Copying from {src_path} to {dest_path} for {file_type}")

In [None]:
for image_id, category in image_category.items():
    print(f"Processing image ID: {image_id}, Category: {category}")

    if 's1' in image_id:
        # handling for S1 images
        print(image_id)
        src_image_filename = image_id + '.tif'
        src_gt_filename = 'nicfi_gt_' + image_id.split('s1_')[1] + '.tif'
    elif 'nicfi' in image_id:
        # handling for Planet images
        src_image_filename = image_id + '.tif'
        src_gt_filename = 'nicfi_gt_' + image_id.split('nicfi_')[1] + '.tif'
    else:
        # log an error if the image_id does not contain 's1' or 'nicfi'
        print(f"Unknown image type for image ID: {image_id}")
        continue

    # source paths
    src_image_path = os.path.join(s1_dir if 's1' in image_id else planet_dir, src_image_filename)
    src_gt_path = os.path.join(gt_dir, src_gt_filename)

    # target directories
    dest_image_dir = os.path.join(base_dir, f"{category}_data/", 's1' if 's1' in image_id else 'planet')
    dest_gt_dir = os.path.join(base_dir, f"{category}_data/gt")

    # ensure destination directories exist
    os.makedirs(dest_image_dir, exist_ok=True)
    os.makedirs(dest_gt_dir, exist_ok=True)

    # target paths
    dest_image_path = os.path.join(dest_image_dir, src_image_filename)
    dest_gt_path = os.path.join(dest_gt_dir, src_gt_filename)

    # Log paths for verification
    print(f"Source image path: {src_image_path}")
    print(f"Destination image path: {dest_image_path}")
    print(f"Source GT path: {src_gt_path}")
    print(f"Destination GT path: {dest_gt_path}")

    # copy files
    validate_and_copy(src_image_path, dest_image_path, "Image")
    validate_and_copy(src_gt_path, dest_gt_path, "Ground Truth")