# Dataset selection pour notebook 04b

## Imports

In [None]:
import os
import numpy as np
import geopandas as gpd
from shapely.geometry import Polygon
import pandas as pd
from pathlib import Path
from tqdm.auto import tqdm
import logging
import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import subprocess


## Variables

In [None]:
# Today's date for output naming
todays_date = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

GEOPARQUET_CLASSIFICATION_PATH = "data/notebook_02/parquet/02_gdf_toiture_7_classification.parquet"
SPLIT_GEOTIFF_1024_PARQUET = "data/notebook_04/geotiff/tile_1024_split/combined_metadata.parquet"
SAMPLES_PER_CAT = 8

GRAPHICS_PATH = "data/notebook_06/graphics"
GPKG_SELECTION_DATASET = "data/notebook_06/parquet/06a_02_dataset.gpkg"
GPKG_CAD_COMMUNE = "data/SITG/CAD_COMMUNE_2024-11-03.gpkg"

CONVERT_GEOTIFF_TO_PNG = True

# Output directory for PNG files
PNG_OUTPUT_PATH = f"data/notebook_06/dataset_{todays_date}"
os.makedirs(PNG_OUTPUT_PATH, exist_ok=True)

In [None]:
assert(os.path.exists(GEOPARQUET_CLASSIFICATION_PATH))
assert(os.path.exists(GRAPHICS_PATH))
assert(os.path.exists(PNG_OUTPUT_PATH))

## Charger données

In [None]:
gdf_toiture_7_classification = gpd.read_parquet(GEOPARQUET_CLASSIFICATION_PATH)

In [None]:
assert(gdf_toiture_7_classification["egid"].isna().sum() == 0)
gdf_toiture_7_classification["egid"].duplicated().sum() == 0

In [None]:
len(gdf_toiture_7_classification["egid"].unique())

## Nettoyage et enrichir les données

### geotiff parquet

In [None]:
df_split_geotiff = pd.read_parquet(SPLIT_GEOTIFF_1024_PARQUET)

In [None]:
df_split_geotiff.dtypes

In [None]:
gdf_toiture_7_classification.dtypes

### sia_cat

In [None]:
# Copy split geotiff dataframe
df_split_geotiff_sia_cat = df_split_geotiff.copy()

# Find common columns, excluding geometry
common_columns = set(df_split_geotiff_sia_cat.columns) & set(gdf_toiture_7_classification.columns)
common_columns = list(common_columns - {"geometry"})

# Merge classification with split geotiff data
df_split_geotiff_sia_cat = pd.merge(
    gdf_toiture_7_classification,
    df_split_geotiff_sia_cat,
    on=common_columns,
    how="left"
)

In [None]:
df_split_geotiff_sia_cat.head(5)

In [None]:
# Ensure all roofs are associated with a tile
assert(len(df_split_geotiff_sia_cat[df_split_geotiff_sia_cat["tile_path"].isnull()][["egid", "SHAPE__Area", "globalid", "sia_cat", "tile_id"]].sort_values(by=["SHAPE__Area"], ascending=False)) == 0)

In [None]:
# Save enriched dataframe to Parquet
df_split_geotiff_sia_cat.to_parquet("data/notebook_06/06a_02_split_geotiff_sia_cat.parquet")

## Selection dataset

In [None]:
# Copy the enriched dataframe
df_selection = df_split_geotiff_sia_cat.copy()

# Shuffle rows for randomness
df_selection = df_selection.sample(frac=1, random_state=42).reset_index(drop=True)

display(df_selection.head(5))
display(df_selection.columns)

### Analyse

In [None]:
# Egid par tile
df_selection.groupby("tile_id")["egid"].nunique().reset_index().sort_values(by=["egid"], ascending=False)

In [None]:
# sum of SHAPE__Area by tile_id
df_selection.groupby("tile_id")["SHAPE__Area"].sum().reset_index().sort_values(by=["SHAPE__Area"], ascending=False)

In [None]:
# cat SIA par tile
df_selection.groupby("tile_id")["sia_cat"].nunique().reset_index().sort_values(by=["sia_cat"], ascending=False)

In [None]:
# Number of unique egid per tile
df_selection["egid_per_tile"] = df_selection.groupby("tile_id")["egid"].transform("nunique")

# Number of unique sia_cat per tile
df_selection["sia_cat_per_tile"] = df_selection.groupby("tile_id")["sia_cat"].transform("nunique")

# Total SHAPE__Area per tile
df_selection["SHAPE__Area_sum_per_tile"] = df_selection.groupby("tile_id")["SHAPE__Area"].transform("sum")

### Sélection SIA_cat + area_bins

In [None]:
from collections import Counter

# Aggregate roof data by tile_id
tile_groups = df_selection.groupby('tile_id').agg({
    'globalid': list,
    'geometry_x': list,
    'sia_cat': list,
    'altitude_min': 'mean',
    'altitude_max': 'mean',
    'date_leve': 'first',
    'tile_path': 'first',
    'tile_bounds': 'first',
    "SHAPE__Area": 'sum',
}).reset_index()

# Assign dominant SIA class per tile
tile_groups['dominant_class'] = tile_groups['sia_cat'].apply(
    lambda x: Counter(x).most_common(1)[0][0]
)

# Bin tiles by total area
area_bins = [0, 200, 500, 1000, 2000, 5000, np.inf]
tile_groups['area_bin'] = pd.cut(
    tile_groups['SHAPE__Area'],
    bins=area_bins,
    labels=[
        '0-200', '200-500', '500-1000',
        '1000-2000', '2000-5000', '5000+'
    ]
)

print("Manual Bins:")
print(tile_groups['area_bin'].value_counts().sort_index())


In [None]:
# Sample up to SAMPLES_PER_CAT tiles per dominant_class and area_bin
def sample_tiles(group, n_samples):
    if len(group) > n_samples:
        return group.sample(n=n_samples, random_state=42)
    return group

sampled_df = tile_groups.groupby(['dominant_class', 'area_bin']).apply(
    sample_tiles, n_samples=SAMPLES_PER_CAT
).reset_index(drop=True)

# Convert tile_bounds to Polygon geometry
def convert_bounds_to_polygon(bounds):
    if isinstance(bounds, str):
        bounds = tuple(map(float, bounds.strip("()").split(",")))
    if len(bounds) == 4:
        minx, miny, maxx, maxy = bounds
        return Polygon([(minx, miny), (maxx, miny), (maxx, maxy), (minx, maxy)])
    raise ValueError(f"Invalid bounds format: {bounds}")

sampled_df['geometry'] = sampled_df['tile_bounds'].apply(convert_bounds_to_polygon)

# Create GeoDataFrame and save to file
sampled_gdf = gpd.GeoDataFrame(sampled_df, geometry='geometry', crs='EPSG:2056')
sampled_gdf.to_file(GPKG_SELECTION_DATASET, driver="GPKG", layer="sampled_tiles")


## Visualisation
### Graphiques

In [None]:
# Count dominant SIA classes per tile
class_counts = tile_groups['dominant_class'].value_counts().reset_index()
class_counts.columns = ['Classe SIA', 'Nombre de tiles']

sns.set_style("whitegrid")

plt.figure(figsize=(6.5, 5.5))
plot = sns.barplot(
    x='Classe SIA', 
    y='Nombre de tiles', 
    data=class_counts.sort_values(by='Classe SIA'),
    palette='pastel',
)

# Improve label readability
plot.set_xticklabels(plot.get_xticklabels(), rotation=45, horizontalalignment='right')

total_tiles = class_counts['Nombre de tiles'].sum()

plt.title("Distribution des classes SIA dominantes par tuile", fontsize=12, pad=10, fontweight='bold')
plt.xlabel("Classe SIA", fontsize=10)
plt.ylabel("Nombre de tuiles", fontsize=10)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)

# Annotate bar values
for p in plot.patches:
    plot.annotate(
        f'{int(p.get_height())}', 
        (p.get_x() + p.get_width() / 2., p.get_height()), 
        ha='center', 
        va='bottom', 
        fontsize=10, 
        color='black',
        xytext=(0, 5), 
        textcoords='offset points'
    )

sns.despine()
plt.tight_layout()
plt.grid(False)

plt.savefig(os.path.join(GRAPHICS_PATH, "06a_01_dominant_class_distribution.png"), dpi=300, bbox_inches='tight')
plt.show()


In [None]:
plt.close()

# Count tiles per area bin
area_counts = tile_groups['area_bin'].value_counts().reset_index()
area_counts.columns = ['Area Bin', 'Nombre de tiles']
area_counts = area_counts.sort_values(by='Area Bin')

sns.set_style("whitegrid")

plt.figure(figsize=(6.5, 4.5))
plot = sns.barplot(
    x='Area Bin', 
    y='Nombre de tiles', 
    data=area_counts,
    palette='pastel',
)

total_tiles = area_counts['Nombre de tiles'].sum()

plt.title("Distribution des tuiles par intervalle de surface", fontsize=12, fontweight='bold', pad=10)
plt.xlabel("Somme des surfaces par tuile en m²", fontsize=10)
plt.ylabel("Nombre de tuiles", fontsize=10)
plt.tick_params(axis='both', labelsize=10)
plot.xaxis.get_offset_text().set_fontsize(10)
plot.yaxis.get_offset_text().set_fontsize(10)

# Annotate bar values
for p in plot.patches:
    plot.annotate(
        f'{int(p.get_height())}', 
        (p.get_x() + p.get_width() / 2., p.get_height()), 
        ha='center', 
        va='bottom', 
        fontsize=10, 
        color='black',
        xytext=(0, 5), 
        textcoords='offset points'
    )

sns.despine()
plt.tight_layout()
plt.grid(False)

plt.savefig(os.path.join(GRAPHICS_PATH, "06a_02_area_bin_distribution.png"), dpi=300, bbox_inches='tight')
plt.show()


In [None]:
plt.close()

sns.set_style("whitegrid")

# Prepare data for stacked bar plot
stacked_data = sampled_df.groupby(['dominant_class', 'area_bin']).size().unstack().fillna(0)
area_order = ['0-200', '200-500', '500-1000', '1000-2000', '2000-5000', '5000+']
stacked_data = stacked_data.reindex(columns=area_order)

colors = sns.color_palette("pastel", len(stacked_data.columns))

plt.figure(figsize=(6.5, 5.5))

stacked_data.plot(
    kind='bar', 
    stacked=True, 
    color=colors,
    ax=plt.gca()
)

plt.xticks(rotation=45, ha='right')

total_samples = stacked_data.sum().sum()
plt.title(f"Distribution du dataset des classes SIA dominantes\npar intervalle de surface (n={int(total_samples)})", 
          fontsize=12, pad=20, fontweight='bold')
plt.xlabel("Classe SIA dominante", fontsize=10)
plt.ylabel("Nombre de tuiles", fontsize=10)
plt.tick_params(axis='both', labelsize=10)
plot.xaxis.get_offset_text().set_fontsize(10)
plot.yaxis.get_offset_text().set_fontsize(10)

plt.legend(
    title="Surface totale\n par tuile (m²)", 
    bbox_to_anchor=(1.05, 1), 
    loc='upper left',
    frameon=True,
    fancybox=True,
    shadow=True,
    fontsize=10,
    title_fontsize=10,
    reverse=True
)

plt.ylim(0, 55)
sns.despine()
plt.grid(False)

# Annotate total per bar
for i, total in enumerate(stacked_data.sum(axis=1)):
    plt.text(i, total + 0.5, f'{int(total)}', 
             ha='center', va='bottom', fontsize=10, color='black')

# Annotate segments with low sample count
for i, (index, row) in enumerate(stacked_data.iterrows()):
    bottom = 0
    annotation_count = 0
    for j, (area_bin, count) in enumerate(row.items()):
        if 0 < count < 8:
            segment_middle = bottom + count/2
            offset = 0.4
            x_offset = i + offset if annotation_count % 2 == 0 else i - offset
            plt.text(x_offset, segment_middle, f'{int(count)}', 
                     ha='center', va='center', fontsize=9, 
                     color='black', fontweight='demibold')
            annotation_count += 1
        bottom += count

plt.tight_layout()
plt.savefig(os.path.join(GRAPHICS_PATH, "06a_03_stacked.png"), dpi=300, bbox_inches='tight')
plt.show()


### Vue en plan

In [None]:
def bounds_to_polygon(bounds_str):
    """Convert '(minx, miny, maxx, maxy)' string to Polygon."""
    try:
        coords = [float(x) for x in bounds_str.strip("()").split(", ")]
        return Polygon([
            (coords[0], coords[1]),
            (coords[2], coords[1]),
            (coords[2], coords[3]),
            (coords[0], coords[3])
        ])
    except ValueError:
        logging.error(f"Error converting bounds to polygon: {bounds_str}")
        return None

sns.set_style("white")
sns.set_context("notebook", font_scale=1.2)

# Add geometry column
tile_groups['geometry'] = tile_groups['tile_bounds'].apply(bounds_to_polygon)

# Create GeoDataFrame
gdf = gpd.GeoDataFrame(tile_groups, geometry='geometry', crs="EPSG:2056")

unique_classes = sorted(gdf['dominant_class'].unique())
palette = sns.color_palette('tab20', n_colors=len(unique_classes))
color_dict = dict(zip(unique_classes, palette))

# Plot tiles by dominant class
ax = gdf.plot(
    figsize=(15, 15),
    column='dominant_class',
    legend=True,
    legend_kwds={'loc': 'upper left', 'bbox_to_anchor': (1, 1)},
    edgecolor='black',
    linewidth=0.3,
    alpha=0.7,
    color=gdf['dominant_class'].map(color_dict)
)

# Overlay commune boundaries
cad_commune = gpd.read_file(GPKG_CAD_COMMUNE, layer="CAD_COMMUNE")
cad_commune = cad_commune.to_crs("EPSG:2056")
cad_commune.plot(ax=ax, color="none", edgecolor="black", linewidth=0.2)

# Custom legend
ax.legend(
    handles=[plt.Line2D([0], [0], marker='o', color='w', label=cls, 
                        markerfacecolor=color_dict[cls], markersize=10) for cls in unique_classes],
    title="Classe SIA dominante",
    loc='upper left',
    bbox_to_anchor=(1, 1)
)

ax.set_title(f"Tile par catégorie SIA dominante (total {len(tile_groups)} tiles)", fontsize=16, pad=20)
ax.set_axis_off()
plt.tight_layout()
plt.show()

plt.savefig(os.path.join(GRAPHICS_PATH, "06a_04_map_tiles_group.png"), dpi=300, bbox_inches='tight')
plt.close()


In [None]:
plt.close()

# Geometry from bounds
sampled_df['geometry'] = sampled_df['tile_bounds'].apply(bounds_to_polygon)

# GeoDataFrame
gdf = gpd.GeoDataFrame(sampled_df, geometry='geometry', crs="EPSG:2056")

unique_classes = sorted(gdf['dominant_class'].unique())

sns.set_style("whitegrid")

vibrant_colors = [
    '#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FECA57', 
    '#FF9FF3', '#54A0FF', '#5F27CD', '#00D2D3', '#FF9F43',
    '#A3CB38', '#C44569'
]
color_dict = dict(zip(unique_classes, vibrant_colors[:len(unique_classes)]))

fig, ax = plt.subplots(figsize=(6.5, 6.5), dpi=300)

# Plot tiles by dominant class
gdf.plot(
    column='dominant_class',
    ax=ax,
    edgecolor=gdf['dominant_class'].map(color_dict),
    linewidth=2,
    alpha=1,
    color=gdf['dominant_class'].map(color_dict)
)

# Plot commune boundaries
cad_commune = gpd.read_file(GPKG_CAD_COMMUNE, layer="CAD_COMMUNE")
cad_commune = cad_commune.to_crs("EPSG:2056")
cad_commune.plot(ax=ax, color="none", edgecolor="grey", linewidth=0.8, alpha=0.6, ls='--')

# Shortened labels for legend
class_labels = {
    'I habitat collectif': 'I Habitat collectif',
    'II habitat individuel': 'II Habitat individuel', 
    'III administration': 'III Administration',
    'IV écoles': 'IV Écoles',
    'IX industrie': 'IX Industrie',
    'V commerce': 'V Commerce',
    'VI restauration': 'VI Restauration',
    'VII lieux de rassemblement': 'VII Lieux rassemblement',
    'VIII hôpitaux': 'VIII Hôpitaux',
    'X dépôts': 'X Dépôts',
    'XI installations sportives': 'XI Installations sportives',
    'XII piscines couvertes': 'XII Piscines couvertes'
}

legend_elements = []
for cls in unique_classes:
    label = class_labels.get(cls, cls)
    if len(label) > 25:
        label = label[:22] + "..."
    legend_elements.append(
        plt.Line2D([0], [0], marker='s', color='w', label=label,
                   markerfacecolor=color_dict[cls], markersize=8, 
                   markeredgecolor='black', markeredgewidth=0.5)
    )

# Legend
legend = ax.legend(
    handles=legend_elements,
    title="Classe SIA dominante",
    loc='upper right',
    bbox_to_anchor=(0.35, 0.95),
    fontsize=9,
    title_fontsize=10,
)

ax.set_title(f"Tuiles par catégorie SIA dominante\n(total {len(sampled_df)} tuiles)", fontsize=12, pad=-10, fontweight='bold')
ax.set_axis_off()
plt.tight_layout()
plt.subplots_adjust(left=0.02, right=0.98, top=0.95, bottom=0.02)

plt.savefig(os.path.join(GRAPHICS_PATH, "06a_05_map_sampled_df.png"), dpi=300, bbox_inches='tight')
plt.show()
plt.close()

In [None]:
import folium
import pyproj

# Coordinate systems
swiss_cs = pyproj.CRS("EPSG:2056")
wgs84_cs = pyproj.CRS("EPSG:4326")
transformer = pyproj.Transformer.from_crs(swiss_cs, wgs84_cs, always_xy=True)

def swiss_to_wgs84(east, north):
    """Convert Swiss coordinates to WGS84 (lat, lon)."""
    lon, lat = transformer.transform(east, north)
    return [lat, lon]

all_sw_points = []
all_ne_points = []

for tile_id in sampled_df['tile_id'].tolist():
    tile_row = sampled_df[sampled_df['tile_id'] == tile_id]
    if len(tile_row) == 0:
        continue
    bounds = tile_row['tile_bounds'].values[0]
    if isinstance(bounds, str):
        bounds_clean = bounds.strip('()').replace(' ', '')
        bounds_list = bounds_clean.split(',')
        bounds = tuple(float(x) for x in bounds_list if x)
    if len(bounds) >= 4:
        minx, miny, maxx, maxy = bounds[:4]
        sw = swiss_to_wgs84(minx, miny)
        ne = swiss_to_wgs84(maxx, maxy)
        all_sw_points.append(sw)
        all_ne_points.append(ne)

if all_sw_points and all_ne_points:
    avg_lat = np.mean([pt[0] for pt in all_sw_points + all_ne_points])
    avg_lon = np.mean([pt[1] for pt in all_sw_points + all_ne_points])
    print(f"Center of all tiles (lat, lon): {avg_lat}, {avg_lon}")
    m = folium.Map(location=[avg_lat, avg_lon], zoom_start=14)
else:
    print("No valid coordinates found")
    m = folium.Map(location=[46.2044, 6.1432], zoom_start=12)

tiles_added = 0
for tile_id in sampled_df['tile_id'].tolist():
    tile_row = sampled_df[sampled_df['tile_id'] == tile_id]
    if len(tile_row) == 0:
        continue
    bounds = tile_row['tile_bounds'].values[0]
    if isinstance(bounds, str):
        bounds_clean = bounds.strip('()').replace(' ', '')
        bounds_list = bounds_clean.split(',')
        bounds = tuple(float(x) for x in bounds_list if x)
    if len(bounds) >= 4:
        minx, miny, maxx, maxy = bounds[:4]
        sw = swiss_to_wgs84(minx, miny)
        ne = swiss_to_wgs84(maxx, maxy)
        nw = swiss_to_wgs84(minx, maxy)
        se = swiss_to_wgs84(maxx, miny)
        folium.Polygon(
            locations=[sw, nw, ne, se],
            color='blue',
            weight=1,
            fill=True,
            fill_opacity=0.4,
            tooltip=f"Tile ID: {tile_id}"
        ).add_to(m)
        tiles_added += 1

print(f"Added {tiles_added} tiles to the map")
m


## Tif to PNG for roboflow

In [None]:
sampled_df

In [None]:
def convert_tiff_to_png(sample_df, output_dir):
    """Convert GeoTIFF files in sample_df['tile_path'] to PNG format for roboflow."""
    # Create output directory with timestamp
    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    output_dir = Path(output_dir) / f"PNG_dataset_roboflow_{timestamp}"
    output_dir.mkdir(parents=True, exist_ok=True)
    logging.info(f"Output directory created: {output_dir}")

    # Get unique TIFF file paths
    tiff_files = sample_df["tile_path"].dropna().unique().tolist()
    tiff_files = [Path(file) if not isinstance(file, Path) else file for file in tiff_files]
    logging.info(f"Found {len(tiff_files)} unique TIFF files to process")

    for tiff_file in tqdm(tiff_files, desc="Converting to PNG"):
        if not Path(tiff_file).exists():
            logging.warning(f"Source file not found: {tiff_file}")
            continue

        png_file = output_dir / f"{Path(tiff_file).stem}.png"
        if not png_file.exists():
            try:
                subprocess.run(
                    [
                        "gdal_translate",
                        "-of", "PNG",
                        "-co", "ZLEVEL=9",
                        "-co", "PREDICTOR=2",
                        "-ot", "Byte",
                        "-r", "nearest",
                        "-co", "COMPRESS=PNG",
                        str(tiff_file),
                        str(png_file),
                    ],
                    check=True,
                    capture_output=True,
                    text=True,
                )
                logging.info(f"Converted {Path(tiff_file).name}")
            except subprocess.CalledProcessError as e:
                logging.error(f"Error converting {tiff_file}: {e.stderr}")
            except Exception as e:
                logging.error(f"Unexpected error processing {tiff_file}: {str(e)}")
        else:
            logging.info(f"Skipping {Path(tiff_file).name} (already exists)")

    logging.info(f"Conversion complete. Files saved to {output_dir}")
    sample_df.to_csv(output_dir / "sampled_tiles.csv", index=False)
    return output_dir


In [None]:
# Convert GeoTIFF tiles to PNG if enabled
if CONVERT_GEOTIFF_TO_PNG:
    convert_tiff_to_png(
        sampled_df,
        PNG_OUTPUT_PATH,
    )