In [13]:
import geopandas as gpd

#gdf = gpd.read_file('/home/cmn60/cape_town_segmentation/final_annotations_PV_all_types_5K_cleaned.gpkg')
gdf = gpd.read_file('/home/cmn60/cape_town_segmentation/final_annotations_PV_all_types_balanced_3_cleaned.gpkg')

# Keep rows where at least one of the PV flags is 1
pv_mask = (gdf['PV_normal'] == 1) | (gdf['PV_heater'] == 1) | (gdf['PV_pool'] == 1)

# Exclude rows where uncertflag == 1
uncert_mask = gdf['uncertflag'] != 1
# Exclude rows where both PV_heater and PV_pool are 1, only 2 observations
heater_pool_overlap_mask = ~((gdf['PV_heater'] == 1) & (gdf['PV_pool'] == 1))


filtered_gdf = gdf[pv_mask & uncert_mask & heater_pool_overlap_mask].copy()

# Drop the 'uncertflag' column
if 'uncertflag' in filtered_gdf.columns:
    filtered_gdf = filtered_gdf.drop(columns=['uncertflag'])

# Output stats
print(f"Filtered dataset contains {len(filtered_gdf)} PV-related arrays")

# Save the filtered annotations
output_path = "final_annotations_PV_all_types.gpkg"
filtered_gdf.to_file(output_path, driver="GPKG")
print(f"Saved filtered annotations to {output_path}")

# Show final column names
print("Remaining columns:", filtered_gdf.columns.tolist())

Filtered dataset contains 877 PV-related arrays
Saved filtered annotations to final_annotations_PV_all_types.gpkg
Remaining columns: ['id', 'PV_normal', 'PV_heater', 'PV_pool', 'area', 'annotator', 'centroid_latitude', 'centroid_longitude', 'image_name', 'nw_corner_of_image_latitude', 'nw_corner_of_image_longitude', 'se_corner_of_image_latitude', 'se_corner_of_image_longitude', 'PV_normal_qc', 'PV_heater_qc', 'PV_pool_qc', 'PV_heater_mat_qc', 'uncertflag_qc', 'delete_qc', 'resizing_qc', 'PV_heater_mat_combined', 'geometry']


In [14]:
# Geometry error checker
from shapely.geometry import Polygon

gdf = gpd.read_file("final_annotations_PV_all_types.gpkg")

# Invalid geometry
invalid_gdf = gdf[~gdf.geometry.is_valid].copy()
invalid_gdf["image_name"] = invalid_gdf["image_name"]
print(f"Invalid geometries found: {len(invalid_gdf)}")

# Error correction : buffer(0) method 
gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.buffer(0) if not geom.is_valid else geom)


still_invalid = gdf[~gdf.geometry.is_valid].copy()
print(f"Still invalid after fixing: {len(still_invalid)}")

gdf_valid = gdf[gdf.geometry.is_valid].copy()
gdf_valid.to_file("final_annotations_PV_all_types_cleaned.gpkg", driver="GPKG")

Invalid geometries found: 0
Still invalid after fixing: 0


In [15]:
from shapely.geometry import Polygon, MultiPolygon

def convert_gpkg_to_pixel_coords(gpkg_path, image_dir):
    gdf = gpd.read_file(gpkg_path)
    pixel_rows = []

    for image_name in gdf['image_name'].unique():
        image_path = os.path.join(image_dir, f"{image_name}.tif")
        if not os.path.exists(image_path):
            print(f"Image not found: {image_path}")
            continue

        with rasterio.open(image_path) as src:
            transform = src.transform
            inv_transform = ~transform
            matrix = [inv_transform.a, inv_transform.b, inv_transform.d,
                      inv_transform.e, inv_transform.xoff, inv_transform.yoff]

            image_gdf = gdf[gdf['image_name'] == image_name]
            for _, row in image_gdf.iterrows():
                geom = row.geometry
                if geom.is_empty or geom is None:
                    continue

                geom_px = affine_transform(geom, matrix)

                if isinstance(geom_px, Polygon):
                    polygons = [geom_px]
                elif isinstance(geom_px, MultiPolygon):
                    polygons = list(geom_px.geoms)
                else:
                    continue 

                for poly in polygons:
                    centroid = poly.centroid
                    pixel_rows.append({
                        'image_name': image_name,
                        'geometry': poly,
                        'polygon_vertices_pixels': np.array(poly.exterior.coords),
                        'centroid_latitude_pixels': centroid.y,
                        'centroid_longitude_pixels': centroid.x,
                        'PV_normal': row['PV_normal'],
                        'PV_heater': row['PV_heater'],
                        'PV_pool': row['PV_pool']
                    })

    return pd.DataFrame(pixel_rows)


In [None]:
import cv2

def create_multiclass_mask(image_shape, polygons_with_classes):
    mask = np.zeros(image_shape[:2], dtype="uint8")
    for polygon, class_id in polygons_with_classes:
        cv2.fillPoly(mask, [polygon.astype(np.int32)], class_id)
    return mask

def adjust_polygon_coordinates(polygons, x_offset, y_offset):
    return [(poly - np.array([x_offset, y_offset]), cls) for poly, cls in polygons]

def save_tile_and_mask(tile, mask, tile_index_pixels, tile_dir, mask_dir, image_name):
    tile_path = os.path.join(tile_dir, f"i_{image_name}_{tile_index_pixels}.png")
    mask_path = os.path.join(mask_dir, f"m_{image_name}_{tile_index_pixels}.png")
    cv2.imwrite(tile_path, cv2.cvtColor(tile, cv2.COLOR_RGB2BGR))
    cv2.imwrite(mask_path, mask)


In [17]:
def process_geotiff(image_name, image_path, tile_size, df, tile_dir, mask_dir):
    with rasterio.open(image_path) as src:
        img = np.transpose(src.read(), (1, 2, 0))
        h, w = img.shape[:2]

        pad_h = (tile_size - h % tile_size) % tile_size
        pad_w = (tile_size - w % tile_size) % tile_size
        padded = np.pad(img, ((0, pad_h), (0, pad_w), (0, 0)), mode='constant')

        for y in range(0, padded.shape[0], tile_size):
            for x in range(0, padded.shape[1], tile_size):
                tile = padded[y:y+tile_size, x:x+tile_size]
                polygons_in_tile = []

                for _, row in df.iterrows():
                    cx, cy = row['centroid_longitude_pixels'], row['centroid_latitude_pixels']
                    if x <= cx < x + tile_size and y <= cy < y + tile_size:
                        poly = row['polygon_vertices_pixels']
                        if row['PV_normal'] == 1:
                            cls = 1
                        elif row['PV_heater'] == 1:
                            cls = 2
                        elif row['PV_pool'] == 1:
                            cls = 3
                        else:
                            continue
                        polygons_in_tile.append((poly, cls))

                adj_polygons = adjust_polygon_coordinates(polygons_in_tile, x, y)
                mask = create_multiclass_mask(tile.shape, adj_polygons)

                if np.any(mask > 0):
                    tile_idx = f"{y//tile_size}_{x//tile_size}"
                    save_tile_and_mask(tile, mask, tile_idx, tile_dir, mask_dir, image_name)


In [18]:
def process_all_images(image_dir, annotations_df, tile_size, tile_dir, mask_dir):
    os.makedirs(tile_dir, exist_ok=True)
    os.makedirs(mask_dir, exist_ok=True)

    for i, image_name in enumerate(annotations_df['image_name'].unique()):
        print(f"[{i}] Processing {image_name}")
        img_path = os.path.join(image_dir, f"{image_name}.tif")
        if not os.path.exists(img_path):
            continue
        df_img = annotations_df[annotations_df['image_name'] == image_name]
        process_geotiff(image_name, img_path, tile_size, df_img, tile_dir, mask_dir)


In [19]:
import shutil

def copy_masks_with_target(src_folder, dst_folder):
    os.makedirs(dst_folder, exist_ok=True)
    for f in os.listdir(src_folder):
        mask = cv2.imread(os.path.join(src_folder, f), cv2.IMREAD_GRAYSCALE)
        if np.any(mask > 0):
            shutil.copy(os.path.join(src_folder, f), dst_folder)

def copy_corresponding_images(mask_folder, image_folder, dst_folder):
    os.makedirs(dst_folder, exist_ok=True)
    for f in os.listdir(mask_folder):
        img_f = "i" + f[1:]
        src = os.path.join(image_folder, img_f)
        if os.path.exists(src):
            shutil.copy(src, dst_folder)


In [21]:
import os
import geopandas as gpd
import rasterio
from shapely.affinity import affine_transform
from shapely.geometry import Polygon, MultiPolygon
import cv2
import pandas as pd
import numpy as np
gpkg_path = "final_annotations_PV_all_types_cleaned.gpkg"
image_dir = "/home/il72/cape_town_annotation_checker/1.db_pipeline/download/images"
tile_dir = "tiles_320_1k_new"
mask_dir = "masks_320_1k_new"
target_mask_dir = "masks_target_1k_new"
target_image_dir = "images_target_1k_new"
tile_size = 320

annotations_df = convert_gpkg_to_pixel_coords(gpkg_path, image_dir)

process_all_images(image_dir, annotations_df, tile_size, tile_dir, mask_dir)

copy_masks_with_target(mask_dir, target_mask_dir)
copy_corresponding_images(target_mask_dir, tile_dir, target_image_dir)


[0] Processing 2023_RGB_8cm_W57B_8
[1] Processing 2023_RGB_8cm_W24A_17
[2] Processing 2023_RGB_8cm_W25C_16


In [None]:
# Unstratified dataset split: train/val/test (see below for stratified)
import os
import shutil
import random

# Set your input directories
images_dir = '/home/cmn60/cape_town_segmentation/images_target_1k_new'
masks_dir = '/home/cmn60/cape_town_segmentation/masks_target_1k_new'

# Set your output base directory
output_dir = '/home/cmn60/cape_town_segmentation/output1k_new'

image_files = [f for f in os.listdir(images_dir) if f.endswith(('.png', '.jpg', '.tif'))]
image_suffixes = [f[2:] for f in image_files]  # Remove 'i_' prefix

# Shuffle
random.shuffle(image_suffixes)

# Split
total = len(image_suffixes)
train_end = int(0.7 * total)
val_end = train_end + int(0.15 * total)

splits = {
    'train': image_suffixes[:train_end],
    'val': image_suffixes[train_end:val_end],
    'test': image_suffixes[val_end:]
}

# Create directories
for split in splits:
    os.makedirs(os.path.join(output_dir, split, 'images'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, split, 'masks'), exist_ok=True)

# Copy files
for split_name, suffix_list in splits.items():
    for suffix in suffix_list:
        image_file = f"i_{suffix}"
        mask_file = f"m_{suffix}"
        
        src_image = os.path.join(images_dir, image_file)
        src_mask = os.path.join(masks_dir, mask_file)
        dst_image = os.path.join(output_dir, split_name, 'images', image_file)
        dst_mask = os.path.join(output_dir, split_name, 'masks', mask_file)
        
        if os.path.exists(src_image) and os.path.exists(src_mask):
            shutil.copy(src_image, dst_image)
            shutil.copy(src_mask, dst_mask)
        else:
            print(f"Warning: Missing pair for {suffix}")

print("✅ Dataset split completed.")

✅ Dataset split completed.


In [2]:
# Stratified dataset split
from collections import defaultdict
from sklearn.model_selection import train_test_split
import pandas as pd
import cv2
import numpy as np
import os

images_dir = '/home/cmn60/cape_town_segmentation/images_target_1k_new'
masks_dir = '/home/cmn60/cape_town_segmentation/masks_target_1k_new'
output_dir = '/home/cmn60/cape_town_segmentation/output1k_new_stratified'

# Step 1: Load all mask files and assign class combination label
data = []
for fname in os.listdir(masks_dir):
    if fname.endswith('.png'):
        mask = cv2.imread(os.path.join(masks_dir, fname), cv2.IMREAD_GRAYSCALE)
        classes = set(np.unique(mask)) - {0}  # exclude background

        combo = f"{int(1 in classes)}_{int(2 in classes)}_{int(3 in classes)}"
        suffix = fname[2:]  # strip 'm_' prefix
        data.append({'suffix': suffix, 'combo': combo})

df = pd.DataFrame(data)

# Step 2: Stratified split
train_val_df, test_df = train_test_split(df, test_size=0.10, stratify=df['combo'], random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.1111, stratify=train_val_df['combo'], random_state=42)

splits = {
    'train': train_df['suffix'].tolist(),
    'val': val_df['suffix'].tolist(),
    'test': test_df['suffix'].tolist()
}

# Step 3: Copy files to output structure
import shutil

for split in splits:
    os.makedirs(os.path.join(output_dir, split, 'images'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, split, 'masks'), exist_ok=True)

for split_name, suffixes in splits.items():
    for suffix in suffixes:
        img_file = f"i_{suffix}"
        mask_file = f"m_{suffix}"

        src_img = os.path.join(images_dir, img_file)
        src_mask = os.path.join(masks_dir, mask_file)
        dst_img = os.path.join(output_dir, split_name, 'images', img_file)
        dst_mask = os.path.join(output_dir, split_name, 'masks', mask_file)

        if os.path.exists(src_img) and os.path.exists(src_mask):
            shutil.copy(src_img, dst_img)
            shutil.copy(src_mask, dst_mask)
        else:
            print(f"⚠️ Missing pair for {suffix}")


In [None]:
# Print dataset splitting
# 0_1_0 means the image has (no) PV normal, (yes) PV heater, and (no) PV pool
for split_name, suffix_list in splits.items():
    split_df = df[df['suffix'].isin(suffix_list)]
    print(f"\n🔹 {split_name.upper()} combo distribution:")
    print(split_df['combo'].value_counts())


🔹 TRAIN combo distribution:
combo
0_1_0    132
0_0_1    107
1_0_0     83
0_1_1     20
1_1_0     20
1_0_1     17
1_1_1      6
Name: count, dtype: int64

🔹 VAL combo distribution:
combo
0_1_0    17
0_0_1    14
1_0_0    10
1_1_0     3
1_0_1     2
0_1_1     2
1_1_1     1
Name: count, dtype: int64

🔹 TEST combo distribution:
combo
0_1_0    17
0_0_1    14
1_0_0    10
0_1_1     3
1_1_0     2
1_0_1     2
1_1_1     1
Name: count, dtype: int64


In [None]:
# Count and print number of annotations for each PV type
num_normal = (gdf_valid['PV_normal'] == 1).sum()
num_heater = (gdf_valid['PV_heater'] == 1).sum()
num_pool = (gdf_valid['PV_pool'] == 1).sum()

print(f"PV_normal: {num_normal}")
print(f"PV_heater: {num_heater}")
print(f"PV_pool: {num_pool}")

PV_normal: 334
PV_heater: 307
PV_pool: 236
