# **DATASET GENERATION**

In [1]:
import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
from shapely.geometry import box
from shapely import Polygon, MultiPolygon
from PIL import Image
import shutil
import os
import glob
from natsort import natsorted
from sklearn.model_selection import train_test_split

In [2]:
from google.colab import drive

drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import os

base = "/content/drive/My Drive"

image_path = os.path.join(base, "Input_data_for_datasat_generation/Orthophotos/merged_RGB.tif")
labeled_file_path = os.path.join(base, "Input_data_for_datasat_generation/Goslar_roof_labels_GT/labeled_goslar_roofs.shp")
output_path = os.path.join(base, "Goslar_dataset/")
print(os.path.exists(image_path))
print(image_path)



True
/content/drive/My Drive/Input_data_for_datasat_generation/Orthophotos/merged_RGB.tif


In [4]:
# size of gt data
cut_size = 128 # the images will have a resultion of 128 x 128 px

# here we can decide which classes should be ignored because of unknown label or no data available
class_exceptions = [0, 1, 5, 12]

new_class_dictionary = {
        '2': 0,
        '3': 1,
        '4': 2,
        '6': 3,
        '7':4,
        '8':5,
        '9':6,
        '10':7,
        '11':8,
        '13':9
}

# class names
# classes = ['Schieferplatten', 'Tonziegel', 'Zementgebunden', 'Flachdachbelaege']
classes = ['Slate', 'Sandstone','Clay','Copper','Zinc','Aluminium','Tar', 'Cement','Glass', 'Flat_roof_coverings']
        #   0         1            2     3         4       5         6        7      8              9

# Using 0,2,7,9 only   as they have sufficient number of images.  Therefore new class will be

#classes= ['Slate', 'Clay', 'Cement', 'Flat_roof_coverings']

# Shuffle
SEED = 42

In [None]:
def check_bbox(x1, y1, x2, y2, H, W, cut_size):
    if x1 < 0: x1 = 0
    if y1 < 0: y1 = 0
    if x1 + cut_size > W : x1 = W - cut_size
    if y1 + cut_size > H : y1 = H - cut_size

    x2 = x1 + cut_size
    y2 = y1 + cut_size

    return x1, y1, x2, y2

In [None]:
def create_masks_and_polygons(image_path, labeled_path, output_dir, cut_size, class_exceptions, class_mapping):
    gdf = gpd.read_file(labeled_path)
    results = []

    with rasterio.open(image_path) as src:
        # image = src.read(1)
        bands = src.read()
        meta = src.meta.copy()

        num_bands = bands.shape[0]
        print(src.height, src.width, num_bands)

        # loop through the shapefile rows
        for idx, row in gdf.iterrows():
            polygon = row['geometry']
            classes = int(row['Klasse'])
            id = row['id']
            if classes is not None and classes not in class_exceptions:
                # labels must be consecutive and in ascending order
                classes = str(classes)
                class_label = class_mapping.get(classes)

                # calculate center of roof polygons
                centroid = polygon.centroid
                centroid_pixel = ~src.transform * (centroid.x, centroid.y)
                xc, yc = int(centroid_pixel[0]), int(centroid_pixel[1])

                # caculate bounding box with the size of cut_size around the center of the building
                x1 = int(xc - cut_size / 2)
                y1 = int(yc - cut_size / 2)
                x2 = x1 + cut_size
                y2 = y1 + cut_size

                x1, y1, x2, y2 = check_bbox(x1, y1, x2, y2, src.height, src.width, cut_size)

                # image_cutout = image[y1:y2, x1:x2]
                image_cutout = bands[:, y1:y2, x1:x2]
                rgb_image = np.clip(image_cutout[:3, :, :], 0, 255).astype(np.uint8)
                rgb_image = np.transpose(rgb_image, (1, 2, 0))

                # the output filename contains the class value and the coordinates of the bounding box
                #output_filename = f"{output_dir}/class_{class_label}_{x1}_{y1}_{x2}_{y2}.png"
                class_folder = os.path.join(output_dir, str(class_label))
                output_filename = os.path.join(class_folder, f"class_{class_label}_{x1}_{y1}_{x2}_{y2}.png")

                img_to_save = Image.fromarray(rgb_image)
                img_to_save.save(output_filename)

                if isinstance(polygon, Polygon):
                    xx, yy = polygon.exterior.coords.xy
                    # print(xx)
                elif isinstance(polygon, MultiPolygon):
                    xx, yy = [], []
                    for poly in polygon.geoms:
                        x_coords, y_coords = poly.exterior.coords.xy
                        for x, y in zip(x_coords, y_coords):
                            xx.append(x), yy.append(y)
                    # xx = array.array('d', xx)
                    # yy = array.array('d', yy)
                polygon_pixels = [(~src.transform * (x, y)) for x, y in zip(xx, yy)]
                polygon_pixels = [(int(px_x), int(px_y)) for px_x, px_y in polygon_pixels]

                polygon_pixels_in_patch = [(px - x1, py - y1) for px, py in polygon_pixels]
                results.append({
                    'filename': output_filename,
                    'class': class_label,
                    'original_polygon_points': polygon_pixels,
                    'polygon_points_in_patch': polygon_pixels_in_patch
                })

            else:
                continue
    results_df = pd.DataFrame(results)
    return results_df

In [None]:
# Create class folders 0â€“9
# for i in range(10):
#     class_dir = os.path.join(output_path, str(i))
#     os.makedirs(class_dir, exist_ok=True)

# results_df = create_masks_and_polygons(image_path, labeled_file_path, output_path, cut_size, class_exceptions, new_class_dictionary)
# results_df

20000 20000 3


Unnamed: 0,filename,class,original_polygon_points,polygon_points_in_patch
0,/content/gdrive/My Drive/Goslar_dataset/2/clas...,2,"[(10425, 13053), (10437, 13050), (10445, 13061...","[(10, 35), (22, 32), (30, 43), (41, 34), (55, ..."
1,/content/gdrive/My Drive/Goslar_dataset/9/clas...,9,"[(12674, 12531), (12683, 12530), (12694, 12529...","[(53, 58), (62, 57), (73, 56), (75, 68), (56, ..."
2,/content/gdrive/My Drive/Goslar_dataset/2/clas...,2,"[(12640, 12609), (12610, 12621), (12600, 12597...","[(78, 57), (48, 69), (38, 45), (66, 33), (78, ..."
3,/content/gdrive/My Drive/Goslar_dataset/7/clas...,7,"[(12679, 12494), (12641, 12499), (12639, 12475...","[(81, 58), (43, 63), (41, 39), (78, 35), (81, ..."
4,/content/gdrive/My Drive/Goslar_dataset/7/clas...,7,"[(12734, 12624), (12718, 12575), (12727, 12572...","[(72, 88), (56, 39), (65, 36), (71, 54), (81, ..."
...,...,...,...,...
2524,/content/gdrive/My Drive/Goslar_dataset/7/clas...,7,"[(10460, 11592), (10397, 11622), (10386, 11600...","[(95, 47), (32, 77), (21, 55), (47, 43), (47, ..."
2525,/content/gdrive/My Drive/Goslar_dataset/2/clas...,2,"[(10851, 11410), (10893, 11392), (10904, 11419...","[(41, 71), (83, 53), (94, 80), (70, 90), (53, ..."
2526,/content/gdrive/My Drive/Goslar_dataset/0/clas...,0,"[(9810, 10164), (9979, 10034), (9984, 10041), ...","[(-108, 169), (61, 39), (66, 46), (79, 62), (7..."
2527,/content/gdrive/My Drive/Goslar_dataset/2/clas...,2,"[(9993, 11010), (9982, 11044), (9952, 11007), ...","[(62, 58), (51, 92), (21, 55), (45, 36), (62, ..."


# **TRAINING- VALIDATION- TESTING- (Dealing with class imbalance)**

In [4]:
#           0,       1,        2 ,        3
classes= ['Slate', 'Clay', 'Cement', 'Flat_roof_coverings']
def count_images_by_class(output_path, classes, image_ext=".png"):
    total_images = 0
    k=0

    print("\n Dataset Image Distribution:\n")

    for class_id, class_name in enumerate(classes):
        class_folder = os.path.join(output_path, str(class_id))

        if not os.path.exists(class_folder):
            print(f"{class_name:25s} : Folder not found")
            continue

        num_images = len([
            f for f in os.listdir(class_folder)
            if f.lower().endswith(image_ext)
        ])

        total_images += num_images


        print(f"{k} {class_name:25s} : {num_images}")
        k=k+1

    print("\n----------------------------------")
    print(f"Total images in dataset : {total_images}\n")


# Call the function
count_images_by_class("/content/drive/My Drive/Goslar_dataset", classes)



 Dataset Image Distribution:

0 Slate                     : 502
1 Clay                      : 1215
2 Cement                    : 420
3 Flat_roof_coverings       : 211

----------------------------------
Total images in dataset : 2348



In [5]:
import os
import random
import shutil
import glob


from tqdm import tqdm

import numpy as np
import cv2 as cv


In [6]:
def augment_image(img):
    h, w = img.shape[:2]

    # Random rotation
    angle = random.uniform(-20, 20)
    M = cv.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
    img = cv.warpAffine(img, M, (w, h), borderMode=cv.BORDER_REFLECT)

    # Random horizontal flip
    if random.random() > 0.5:
        img = cv.flip(img, 1)

    # Random brightness
    value = random.randint(-30, 30)
    img = np.clip(img.astype(np.int16) + value, 0, 255).astype(np.uint8)

    return img


In [7]:
def split_and_balance_goslar_dataset(
    input_root,
    output_root,
    n_test=50,
    n_val=50,
    n_train=500,
    seed=42
):
    random.seed(seed)
    os.makedirs(output_root, exist_ok=True)

    splits = ["train", "val", "test"]
    for s in splits:
        os.makedirs(os.path.join(output_root, s), exist_ok=True)

    class_dirs = sorted([
        d for d in os.listdir(input_root)
        if os.path.isdir(os.path.join(input_root, d))
    ])

    for cls in class_dirs:
        print(f"\nðŸ“‚ Processing class {cls}")

        cls_input = os.path.join(input_root, cls)
        images = sorted(glob.glob(os.path.join(cls_input, "*.png")))
        random.shuffle(images)

        if len(images) < (n_test + n_val):
            raise ValueError(
                f"Class {cls} has only {len(images)} images â€” "
                f"not enough for test+val"
            )

        # ---------------------------
        #  TEST SET
        # ---------------------------
        test_imgs = images[:n_test]

        # ---------------------------
        #  VALIDATION SET
        # ---------------------------
        val_imgs = images[n_test:n_test + n_val]

        # ---------------------------
        #  TRAIN POOL (UNIQUE)
        # ---------------------------
        train_pool = images[n_test + n_val:]

        # Create output dirs
        train_dir = os.path.join(output_root, "train", cls)
        val_dir = os.path.join(output_root, "val", cls)
        test_dir = os.path.join(output_root, "test", cls)

        os.makedirs(train_dir, exist_ok=True)
        os.makedirs(val_dir, exist_ok=True)
        os.makedirs(test_dir, exist_ok=True)

        # Copy test & val
        for i, img in enumerate(test_imgs):
            shutil.copy(img, os.path.join(test_dir, f"{cls}_{i:04d}.png"))

        for i, img in enumerate(val_imgs):
            shutil.copy(img, os.path.join(val_dir, f"{cls}_{i:04d}.png"))

        # ---------------------------
        # TRAIN SET (AUGMENT IF NEEDED)
        # ---------------------------
        train_count = 0

        # Copy original train images
        for img_path in train_pool:
            if train_count >= n_train:
                break
            shutil.copy(
                img_path,
                os.path.join(train_dir, f"{cls}_{train_count:05d}.png")
            )
            train_count += 1

        # Augment if needed
        aug_index = train_count
        while train_count < n_train:
            src_img_path = random.choice(train_pool)
            img = cv.imread(src_img_path)

            aug_img = augment_image(img)
            save_path = os.path.join(
                train_dir, f"{cls}_aug_{aug_index:05d}.png"
            )
            cv.imwrite(save_path, aug_img)

            train_count += 1
            aug_index += 1

        # ---------------------------
        # SUMMARY
        # ---------------------------
        print(
            f" Class {cls} â†’ "
            f"Train: {train_count}, "
            f"Val: {len(val_imgs)}, "
            f"Test: {len(test_imgs)}"
        )




In [8]:
input_root = "/content/drive/My Drive/Goslar_dataset"
output_root = "/content/drive/My Drive/Goslar_dataset_split"

split_and_balance_goslar_dataset(
    input_root=input_root,
    output_root=output_root
)



ðŸ“‚ Processing class 0
âœ… Class 0 â†’ Train: 500, Val: 50, Test: 50

ðŸ“‚ Processing class 1
âœ… Class 1 â†’ Train: 500, Val: 50, Test: 50

ðŸ“‚ Processing class 2
âœ… Class 2 â†’ Train: 500, Val: 50, Test: 50

ðŸ“‚ Processing class 3
âœ… Class 3 â†’ Train: 500, Val: 50, Test: 50

ðŸŽ‰ DATASET READY â€” NO LEAKAGE, PERFECTLY BALANCED
