In [249]:
import ee
import json

In [None]:
ee.Authenticate()

In [251]:
import rasterio
from rasterio.crs import CRS
from rasterio.windows import from_bounds
from rasterio.warp import transform_bounds
from rasterio.warp import transform
import numpy as np
from rasterio.warp import reproject, calculate_default_transform, Resampling
import cv2

In [252]:
from pyproj import Transformer

In [None]:
SHANGHAI = 'CAPELLA_C09_SP_GEO_HH_20240512190416_20240512190441'
AFRICA = 'CAPELLA_C06_SS_GEO_HH_20230607164219_20230607164233'
GHSL_SHANGHAI = 'GHS_BUILT_S_E2025_GLOBE_R2023A_54009_100_V1_0_R6_C30'
GHSL_AFRICA = 'GHS_BUILT_S_E2025_GLOBE_R2023A_54009_100_V1_0_R8_C21'
site_name = AFRICA
ghsl_mask = GHSL_AFRICA
img_path = f'../data/{site_name}.tif'
ghsl_path = f'../data/{ghsl_mask}.tif'
print(img_path)

In [254]:
ghsl = rasterio.open(ghsl_path)
img = rasterio.open(img_path)

In [None]:
TILE_WIDTH = 10 # 10 degrees
def find_ghsl_tile(sar):
    # First get the SAR image bounds in its native CRS
    bounds = sar.bounds
    
    # Convert bounds to WGS84 (EPSG:4326) as GHSL uses a global grid
    wgs84_bounds = transform_bounds(sar.crs, 'EPSG:4326',  
                                    bounds.left, bounds.bottom, 
                                    bounds.right, bounds.top)
    
    # GHSL uses a 5x5 degree tile system
    # Calculate the tile coordinates
    lon_min, lat_min, lon_max, lat_max = wgs84_bounds
    
    # Floor division to get tile numbers
    tile_lon_min = int(lon_min // TILE_WIDTH) * TILE_WIDTH
    tile_lat_min = int(lat_min // TILE_WIDTH) * TILE_WIDTH
    
    # Format according to GHSL naming convention
    # GHSL tiles are named like: GHS_BUILT_S_E<epoch>_GLOBE_R2023A_54009_100_V1_0_R6_C<col>_R<row>
    # where col and row are based on the 5-degree grid
    
    col = int((tile_lon_min + 180) // TILE_WIDTH)
    row = int((90 - tile_lat_min) // TILE_WIDTH)
    

    print(f"SAR Image bounds (WGS84): {wgs84_bounds}") # should be somewhat around 31.3906703, 121.5065900
    print(f"Required GHSL tile(s):")
    print(f"Column: {col}, Row: {row}")
    print(f"This corresponds to the area: {tile_lon_min}°E to {tile_lon_min+5}°E, {tile_lat_min}°N to {tile_lat_min+5}°N")
    
    return col, row

col, row = find_ghsl_tile(img)
print(col, row)

In [None]:
def check_multiple_tiles(sar):
    bounds = sar.bounds
    wgs84_bounds = transform_bounds(sar.crs, 'EPSG:4326',
                                  bounds.left, bounds.bottom,
                                  bounds.right, bounds.top)
    
    lon_min, lat_min, lon_max, lat_max = wgs84_bounds
    
    # Find which tile each corner belongs to
    corners = [
        (lon_min, lat_min),  # bottom left
        (lon_min, lat_max),  # top left
        (lon_max, lat_min),  # bottom right
        (lon_max, lat_max)   # top right
    ]
    
    # Get unique tile numbers for each corner
    unique_tiles = set()
    for lon, lat in corners:
        tile_lon = int(lon // 5) * 5
        tile_lat = int(lat // 5) * 5
        unique_tiles.add((tile_lon, tile_lat))
    
    spans_multiple = len(unique_tiles) > 1
    
    print(f"SAR Image bounds (WGS84): {wgs84_bounds}")
    print(f"Number of tiles needed: {len(unique_tiles)}")
    
    if spans_multiple:
        print("\nRequired tiles:")
        for tile_lon, tile_lat in unique_tiles:
            print(f"Tile covering {tile_lon}°E to {tile_lon+5}°E, {tile_lat}°N to {tile_lat+5}°N")
    
    return spans_multiple, unique_tiles

check_multiple_tiles(img)

In [257]:
def detect_buildings(img, ghsl, threshold = 0.1):
    # Convert GHSL to WGS84 (EPSG:4326)
    # Higher resolution since we're dealing with a large area
    transform_wgs84, width_wgs84, height_wgs84 = calculate_default_transform(
        ghsl.crs,
        'EPSG:4326',
        ghsl.width,
        ghsl.height,
        *ghsl.bounds,
        resolution=(0.001, 0.001)  # ~100m at the equator
    )
    
    ghsl_wgs84 = np.zeros((height_wgs84, width_wgs84), dtype=ghsl.dtypes[0])
    
    # Reproject GHSL to WGS84
    reproject(
        source=ghsl.read(1),
        destination=ghsl_wgs84,
        src_transform=ghsl.transform,
        src_crs=ghsl.crs,
        dst_transform=transform_wgs84,
        dst_crs='EPSG:4326',
        resampling=Resampling.nearest
    )
    
    # Now create the final array matching SAR dimensions
    reproj_ghsl = np.zeros((img.height, img.width), dtype=ghsl.dtypes[0])
    
    # Project from WGS84 to UTM, using the SAR image's transform
    reproject(
        source=ghsl_wgs84,
        destination=reproj_ghsl,
        src_transform=transform_wgs84,
        src_crs='EPSG:4326',
        dst_transform=img.transform,
        dst_crs=img.crs,
        resampling=Resampling.nearest
    )

    reproj_ghsl = reproj_ghsl.astype(np.float32)  # Convert to float for fraction calculations
    no_data_mask = reproj_ghsl == ghsl.nodata
    reproj_ghsl[no_data_mask] = -1

    # **Create binary mask for settlement areas**
    mask = reproj_ghsl > threshold
    return reproj_ghsl, mask

reproj_ghsl, mask = detect_buildings(img, ghsl)

In [None]:
# Check if any buildings are present in the GHSL mask
buildings_present = np.any(mask > 0)
valid_pixel_count = np.count_nonzero(reproj_ghsl >= 0)
print("Buildings present in GHSL mask:", buildings_present)
# Compute the percentage of image covered by buildings
building_coverage = np.sum(mask > 0) / valid_pixel_count 
print(f"Building coverage: {building_coverage}")

In [266]:
from rasterio.windows import Window

MAX_RATIO = 0.7 # Maximum ratio of black pixels to non-black pixels for patch to be considered

def too_black(patch, max_ratio = MAX_RATIO):
    black_ratio = np.sum(patch == 0) /patch.size
    return black_ratio > max_ratio

def cut_patches(img, patch_size = 256, max_ratio = MAX_RATIO):
    img_width, img_height = img.shape
    patches = []
    for i in range(0, img_height, patch_size):
        for j in range(0, img_width, patch_size):
            window = Window(j, i, patch_size, patch_size)
            patch = img.read(1, window = window)
            if patch.shape != (patch_size, patch_size):
                patch = np.pad(patch, ((0, patch_size - patch.shape[0]), 
                                       (0, patch_size - patch.shape[1])), mode='constant', constant_values=0)
                
            # If the patch doesn't have too many black pixels, add it to the list
            if np.sum(patch == 0) / patch.size < max_ratio:
                patches.append(patch)

    return patches

In [None]:
import random
from skimage import exposure

PATCH_SIZE = 1024
# Visualize a patch
def contrast_stretch(img, lower_percent=2, upper_percent=99.9):
    # Clip pixel values based on lower/upper percentiles (exclude extreme pixel values)
    lower, upper = np.percentile(img, (lower_percent, upper_percent))
    # Apply contrast stretch
    new_img = exposure.rescale_intensity(img, in_range=(lower, upper))
    # Apply CLAHE - tile the image and apply contrast stretching on each tile (local contrast stretching)
    eq_img = exposure.equalize_adapthist(new_img, clip_limit=0.03)
    # Make image a bit brighter (gamma > 1 = brighter)
    gamma_cor_img = exposure.adjust_gamma(eq_img, gamma=1.2)
    return gamma_cor_img

def visualize_patches(patches, res = PATCH_SIZE, num_patches=1):
    """Visualize a specified number of random patches."""
    selected_patches = random.sample(patches, num_patches)
    
    for patch in selected_patches:
        plt.figure(figsize=(10, 10))
        plt.imshow(contrast_stretch(patch), cmap='gray')
        plt.colorbar()
        plt.title(f'Image Patch, {res}x{res}')
        plt.xlabel('X')
        plt.ylabel('Y')
        plt.show()

patches = cut_patches(img, patch_size = PATCH_SIZE)
print(len(patches))
visualize_patches(patches, num_patches=5)