# LCZ Modeling

## Import Libraries

In [None]:
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

import numpy as np
import xarray as xr
import pandas as pd
import rasterio as rio
from rasterio import mask
from rasterio.plot import show
from rasterio.mask import mask
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio.enums import Resampling as ResamplingEnum
from rasterio.windows import Window
from rasterio.enums import Resampling
from rasterio.transform import from_origin

import ipywidgets as widgets

import seaborn as sns
import geopandas as gpd

import xml.dom.minidom

import IPython.display as display
import matplotlib
from matplotlib.ticker import MultipleLocator
import matplotlib.colors as colors
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import matplotlib.pylab as pl
import matplotlib.ticker as ticker
from plotly import graph_objs as go

import os
from osgeo import gdal, ogr, gdalconst, gdal_array, osr

from scipy.stats import mode
from scipy.stats import linregress
from scipy.ndimage import median_filter
from shapely.geometry import box

from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.inspection import permutation_importance
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import LabelEncoder

In [None]:
# Import functions and set auto-reload
from functions_S2 import *
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Import Parameters

In [None]:
sel_s2_date ='2023-11-17'

formatted_date = sel_s2_date.replace('-', '')


classification_method = 'Random Forest'
sensor_w = 'Sentinel-2'

output_path = r"E:\TESI\LCZ_Geoinf_Proj\MyCode\Output"
input_path = r"E:\TESI\LCZ_Geoinf_Proj\MyCode\Input"

nc_file_path = input_path + '\Sentinel2_Summer.nc' #Netcdf file from which we will extract the .tiff file

selected_s2_image = f'E:\TESI\LCZ_Geoinf_Proj\MyCode\Input\S2_{formatted_date}_20m_masked_clip.tif'


testing_set_path = f'E:\TESI\LCZ_Geoinf_Proj\MyCode\Output\S2_{formatted_date}_20m_testing.tif'

training_polygons = input_path + fr'\training_set_{formatted_date}.gpkg'
testing_polygons = input_path + fr'\testing_set_{formatted_date}.gpkg'

s2_meta = xml.dom.minidom.parse('E:\TESI\LCZ_Geoinf_Proj\Dati_Esempio_Vavassori\LCZ\MTD_MSIL2A.xml') 

roi_new_path = f"E:\TESI\LCZ_Geoinf_Proj\MyCode\Input\S2_{formatted_date}_20m_masked_clip_2.tif"

cmm_folder = 'E:\TESI\BBOX\Definitiva\AOI.shp' # cmm_folder (str): path to the geopackage with the boundaries of the Metropolitan City of Milan

attribute = 'LCZ' # Used to rasterize results
projection = 32632 # Used to rasterize results and staking bands in a GEOTIFF from the netcdf
ucp_path =  input_path + '/UCP_20m/'

# OPTIONALLY
#input_tiff = input_path + '\S2_' + sel_s2_date.replace('-', '') + '_20m_all_bands_clip_NEW.tif' # Path to the file that has bands that needs to be reordered / path of folder including single bands that needs to be stacked
#output_tiff = output_path + '\S2_' + sel_s2_date.replace('-', '') + '_20m_all_bands_clip_NEW_reordered.tif' # Path and name of S2 with reordered bands / stacked bands

# From Datacube to .tiff

In [None]:
datacube = xr.open_dataset(nc_file_path)
print(datacube)
ds = datacube.rename({"B02": "bands"})

## Extract Bands

In [None]:
subset = datacube.sel(time=sel_s2_date)
print(subset)

try:
    datacube = xr.open_dataset(nc_file_path, engine="netcdf4")
except Exception as e:
    print("Error reading NetCDF file:", e)

print("Available Variables in NetCDF:", list(subset.data_vars.keys()))

print("B02 Shape:", subset["bands"].shape)
print("B02 Dimensions:", subset["bands"].dims)
print("B02 Coordinates:", subset["bands"].coords)

if "band" in subset["bands"].dims:
    b04_data = subset["bands"].sel(band="B04")
    print("Extracted B04 Shape:", b04_data.shape)
else:
    print("'band' dimension not found in 'bands'")

if "band" in subset["bands"].dims:
    # Iterate over all unique bands
    for band_name in subset["bands"].band.values:
        # Extract data for the current band
        band_data = subset["bands"].sel(band=band_name)      
        band_data.plot.imshow(cmap="viridis")
else:
    print("'band' dimension not found in 'B02'")

## Stack Bands

In [None]:
# Initialize a list to hold the band data
band_arrays = []

# Extract and stack all bands
for band_name in subset["bands"].band.values:
    band_data = subset["bands"].sel(band=band_name).values  # Extract the band data as a numpy array
    print(f"Adding band: {band_name}")  # Print the order
    band_arrays.append(band_data)

# Stack the bands into a 3D numpy array (each band as a slice)
stacked_bands = np.stack(band_arrays, axis=0)  # Shape: (num_bands, lat, lon)

# Get metadata information for the TIFF file (e.g., spatial reference)
transform = from_origin(subset.lon.values[0], subset.lat.values[0], abs(subset.lat.values[1] - subset.lat.values[0]), abs(subset.lon.values[1] - subset.lon.values[0]))

# Create the output TIFF file
with rio.open( 
    input_tiff,
    'w', 
    driver='GTiff', 
    count=stacked_bands.shape[0],  # Number of bands
    width=stacked_bands.shape[2],  # Number of columns (lon)
    height=stacked_bands.shape[1],  # Number of rows (lat)
    dtype=stacked_bands.dtype,  # Data type (e.g., float32 or float64)
    crs= projection,  # Coordinate reference system (can be changed if needed)
    transform=transform  # Geospatial transform
) as dst:
    # Write each band to the .tiff file
    #for i in range(stacked_bands.shape[0]):
    for i, band_name in enumerate(subset["bands"].band.values):
        dst.write(stacked_bands[i], i+1)  # Band indices start from 1 in rasterio
        dst.set_band_description(i+1, band_name)  # ✅ Set band name

print("✅ Saved the stacked bands as a .tiff file.")

## Reorder Bands and Save File

In [None]:
# Current and correct band order
current_order = ["B02", "B03", "B04", "B05", "B06", "B07", "B08", "B11", "B12", "B8A"]  # Incorrect
correct_order = ["B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B11", "B12"]  # Correct

# Compute the reordering indices
reorder_indices = [current_order.index(band) for band in correct_order]

# Open the input TIFF
with rasterio.open(input_tiff) as src:
    meta = src.meta.copy()  # Copy metadata
    meta.update(count=len(correct_order))  # Update number of bands if needed

    # Read all bands and reorder them
    bands = [src.read(i + 1) for i in reorder_indices]  # Read and reorder bands

    # Write new TIFF with correct order
    with rasterio.open(output_tiff, 'w', **meta) as dst:
        for i, band_data in enumerate(bands):
            dst.write(band_data, i + 1)  # Write bands in new order
            dst.set_band_description(i + 1, correct_order[i])  # Set correct names

print("✅ TIFF file saved with corrected band order.")

✅ TIFF file saved with corrected band order.


In [None]:
with rio.open(output_tiff) as dataset:
    for i in range(1, dataset.count + 1):
        print(f"✅ Band {i}: {dataset.descriptions[i-1]}")
        band = dataset.read(i)  # Read band data

        # Flatten and remove NaNs
        valid_pixels = band[~np.isnan(band)].flatten()

        # Compute the 90th percentile threshold
        p90 = np.percentile(valid_pixels, 90)

        # Filter out extreme values (keep values below 90th percentile)
        filtered_pixels = valid_pixels[valid_pixels <= p90]

        # Compute statistics
        min_val = np.min(filtered_pixels)
        max_val = np.max(filtered_pixels)
        mean_val = np.mean(filtered_pixels)

        # Compute mode (most frequent value)
        mode_val = mode(filtered_pixels, keepdims=False).mode

        print(f"✅ {dataset.descriptions[i-1]} - Min: {min_val}, Max: {max_val}, Mean: {mean_val}, Mode: {mode_val}")

# MAIN CODE

In [None]:
wvl_s = get_prisma_s2_wvl(prisma_meta = 0, s2_meta = s2_meta)
print(wvl_s)
wvl_s = [492.7, 559.8, 664.6, 704.1, 740.5, 782.8, 832.8, 864.7, 1613.7, 2202.4]

[492.7, 559.8, 664.6, 704.1, 740.5, 782.8, 864.7, 1613.7, 2202.4]


In [90]:
with rio.open(selected_s2_image) as src_s:
    data_s = src_s.read()

In [91]:
band_threshold = 1e-8
data = data_s[~np.all(data_s <= band_threshold, axis=(1,2))]

In [None]:
plot_signature_widgets(selected_s2_image, wvl = 0, wvl_s = wvl_s, data = data, data_s = data_s)

## Training sample spectral signature

In [93]:
legend = {
    2: ['Compact mid-rise', '#D10000'],
    3: ['Compact low-rise', '#CD0000'],
    5: ['Open mid-rise', '#FF6600'],
    6: ['Open low-rise', '#FF9955'],
    8: ['Large low-rise', '#BCBCBC'],
    101: ['Dense trees', '#006A00'],
    102: ['Scattered trees', '#00AA00'],
    104: ['Low plants', '#B9DB79'],
    105: ['Bare rock or paved', '#545454'],
    106: ['Bare soil or sand', '#FBF7AF'],
    107: ['Water', '#6A6AFF']
}

In [None]:
with rasterio.open(selected_s2_image) as src:
    print(src)

In [None]:
training, m, shapes = plot_training_samples(training_polygons, cmm_folder, legend)

In [None]:
spectral_sign_s, spectral_sign_std_s = compute_spectral_signature(selected_s2_image, legend, shapes)

In [None]:
selected_LCZ_names = [value[0] for value in legend.values()]
selected_classes = [key for key, value in legend.items() if value[0] in selected_LCZ_names]
print(selected_LCZ_names)
print(selected_classes)

In [None]:
plot_interactive_spectral_sign(wvl=0, wvl_s=wvl_s, spectral_sign=0, spectral_sign_s=spectral_sign_s, legend=legend)

## Training sample spectral signature

In [None]:
testing, m_testing, shapes_testing = plot_training_samples(training_polygons, cmm_folder, legend)

In [None]:
spectral_sign_test_s, spectral_sign_test_std_s = compute_spectral_signature(selected_s2_image, legend, shapes_testing)

# Classification of S2 imagery

In [None]:
with rasterio.open(selected_s2_image) as src:
    # Read RGB bands (B4=Red, B3=Green, B2=Blue)
    img = src.read(indexes=[4, 3, 2]).astype('float32')  # 1-based indexing
    nodata = src.nodata

# Replace nodata values or any NaNs with 0
img[np.isnan(img)] = 0
if nodata is not None:
    img[img == nodata] = 0

# Normalize by the 99th percentile to reduce outlier impact
percentile = np.percentile(img, 99)
img = np.clip(img / percentile, 0, 1)

# Convert from (3, H, W) to (H, W, 3) for matplotlib
img = np.transpose(img, (1, 2, 0))

# Plot
plt.figure(figsize=(10, 10))
plt.imshow(img)
plt.title("Sentinel-2 RGB Composite")
plt.axis('off')
plt.show()

# Training

In [None]:
vector_LCZ_path = training_area(sel_s2_date, legend, training_polygons)

## 2. Rasterize training samples

In [None]:
driver = gdal.GetDriverByName('GTiff')
if driver is None:
    raise ValueError("GTiff driver is not available.")

import os
if not os.path.isdir(os.path.dirname(output_path)):
    raise ValueError(f"Output directory does not exist: {os.path.dirname(output_path)}")

print(os.getcwd())

In [None]:
output_training_file_name = output_path + '\S2_' + sel_s2_date.replace('-', '') + '_20m_training.tif' # Save here rasterized image
rasterized_result = rasterize_training(selected_s2_image, vector_LCZ_path, output_training_file_name, attribute, projection)

In [None]:
bbox = prisma_bbox(selected_s2_image, sel_s2_date, output_path)

In [None]:
mask_s2 = mask_s2_image(selected_s2_image)

In [None]:
imperv_new_path = ucp_path + 'IMD.tif' # imperviousness
perc_build_new_path = ucp_path + 'BSF.tif' #building fraction
svf_new_path = ucp_path + 'SVF.tif'
canopy_height_new_path = ucp_path + 'TCH.tif'
buildings_new_path = ucp_path + 'BH.tif'

In [110]:
def reproj_match(infile, match, outfile):
    """Reproject a file to match the shape, alignment, extent, and projection of an existing raster.

    Parameters
    ----------
    infile : (string) path to input file to reproject
    match : (string) path to raster with desired shape, alignment, extent, and projection
    outfile : (string) path to output file tif
    """
    # Open the input file
    with rasterio.open(infile) as src:
        src_transform = src.transform
        src_crs = src.crs

        # Open the match file
        with rasterio.open(match) as match_raster:
            match_crs = match_raster.crs
            match_transform = match_raster.transform
            match_width = match_raster.width
            match_height = match_raster.height

        # Update the metadata to match the `match` raster
        dst_meta = src.meta.copy()
        dst_meta.update({
            "crs": match_crs,
            "transform": match_transform,
            "width": match_width,
            "height": match_height,
        })

        # Create the output file
        with rasterio.open(outfile, "w", **dst_meta) as dst:
            for i in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, i),
                    destination=rasterio.band(dst, i),
                    src_transform=src_transform,
                    src_crs=src_crs,
                    dst_transform=match_transform,
                    dst_crs=match_crs,
                    resampling=Resampling.nearest,
                )


reproj_match(perc_build_new_path, selected_s2_image, ucp_path + 'building_fraction_20m_align.tif')
reproj_match(imperv_new_path, selected_s2_image, ucp_path + 'imperviousness_20m_align.tif')
reproj_match(svf_new_path, selected_s2_image, ucp_path + 'SVF_20m_align.tif')
reproj_match(canopy_height_new_path, selected_s2_image, ucp_path + 'canopy_height_20m_align.tif')
reproj_match(buildings_new_path, selected_s2_image, ucp_path + 'building_height_20m_align.tif')

In [None]:
imperv = open_layer(ucp_path + 'canopy_height_20m_align.tif', mask_s2)
perc_build = open_layer(ucp_path + 'building_fraction_20m_align.tif', mask_s2)
svf = open_layer(ucp_path + 'SVF_20m_align.tif', mask_s2)
canopy_height = open_layer( ucp_path + 'canopy_height_20m_align.tif', mask_s2)
buildings = open_layer(ucp_path + 'building_height_20m_align.tif', mask_s2)

In [None]:
plot_ucl(imperv, perc_build, svf, canopy_height, buildings)

In [None]:
study_area = gpd.read_file(bbox)

print(output_training_file_name)
img, roi = clip_training_sample(selected_s2_image, output_training_file_name, sel_s2_date, study_area, roi_new_path)

In [None]:
imperv, perc_build, svf, canopy_height, buildings, roi = check_layers_dimension(imperv, perc_build, svf, canopy_height, buildings, roi, img)

In [None]:
img = np.dstack((imperv, perc_build, svf, canopy_height, buildings, img))
print(f"The stacked array shape is --> {img.shape}")

In [None]:
labels = np.unique(roi[roi > 0])
print(f'The training data include {labels.size} classes: {labels}')

In [None]:
X = img[roi > 0, :] 
y = roi[roi > 0]
print(f'X matrix size: {X.shape}')
print(f'y array size: {y.shape}')

## 4. Classifier training

In [118]:
# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

The following function performs hyperparameter tuning for the selected classification algorithm, and returns the best hyperparameter. The selection is done by computing the accuracy score:

$ {Accuracy Score} = {(TP+TN)\over(TP+FN+TN+FP)}$

where $TP$ and $TN$ are true positives/negatives, $FP$ abd $FN$ are false positives/negatives.

In [None]:
best_params, best_score, cv_results = parameter_tuning(classification_method, X_train, y_train)

In [None]:
best_params
best_score
cv_results

In [121]:
best_params = {'criterion': best_params['criterion'], 'max_features': best_params['max_features'], 'n_estimators': best_params['n_estimators']}

In [122]:
y_pred, clc = classification(classification_method, best_params, X_train, y_train, X_test)

In [None]:
result = permutation_importance(
    clc, X_test, y_test, n_repeats=10, random_state=0, n_jobs=2
)
feature_names = ["Imp", "BSF", "SVF", "CH", "BH", "B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B11", "B12"]
forest_importances = pd.Series(result.importances_mean, index=feature_names)

In [None]:
forest_importances

In [None]:
fig, ax = plt.subplots()
forest_importances.plot.bar(yerr=result.importances_std, ax=ax)
ax.set_title("Feature importances using permutation on full model")
ax.set_ylabel("Mean accuracy decrease")
fig.tight_layout()
plt.show()

In [126]:
importances = clc.feature_importances_
std = np.std([tree.feature_importances_ for tree in clc.estimators_], axis=0)

In [127]:
feature_names = [f"feature {i}" for i in range(X.shape[1])]

In [None]:
importances

In [None]:
forest_importances = pd.Series(importances, index=feature_names)
fig, ax = plt.subplots()
forest_importances.plot.bar(yerr=std, ax=ax)
ax.set_title("Feature importances using MDI")
ax.set_ylabel("Mean decrease in impurity")
fig.tight_layout()

## 5.Accuracy assessment


In [None]:
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.3f}")

In [None]:
print_metrics(y_test, y_pred)

## 6.Classified image filtering and export

In [None]:
export_classified_map(img, clc, X, selected_s2_image, classification_method, sel_s2_date, output_path) 

In [None]:
img = np.nan_to_num(img)

print(f'Using {classification_method}')
# reshape into long 2d array (nrow * ncol, nband) for classification
new_shape = (img.shape[0] * img.shape[1], img.shape[2])

img_as_array = img[:, :, :X.shape[1]].reshape(new_shape)
print('Reshaped from {o} to {n}'.format(o=img.shape,n=img_as_array.shape))

In [134]:
# Now predict for each pixel
class_prediction = clc.predict(img_as_array)

In [135]:
# Reshape our classification map
class_prediction = class_prediction.reshape(img[:, :, 0].shape)

# LCZ classification accuracy assessment

# Part 1: Classification accuracy on testing samples

## 1.Import testing samples

In [137]:
legend = {
    2: ['Compact mid-rise', '#D10000'],
    3: ['Compact low-rise', '#CD0000'],
    5: ['Open mid-rise', '#FF6600'],
    6: ['Open low-rise', '#FF9955'],
    8: ['Large low-rise', '#BCBCBC'],
    101: ['Dense trees', '#006A00'],
    102: ['Scattered trees', '#00AA00'],
    104: ['Low plants', '#B9DB79'],
    105: ['Bare rock or paved', '#545454'],
    106: ['Bare soil or sand', '#FBF7AF'],
    107: ['Water', '#6A6AFF']
}

In [None]:
testing, m, shapes = plot_training_samples(testing_polygons, cmm_folder, legend)

# Testing

In [None]:
vector_LCZ_path = testing_area(sel_s2_date, legend, testing_polygons)

## 2. Rasterize testing samples

In [None]:
output_testing_file_name = output_path + '\S2_' + sel_s2_date.replace('-', '') + '_20m_testing.tif' # Save here rasterized image
rasterized_result = rasterize_training(selected_s2_image, vector_LCZ_path, output_testing_file_name, attribute, projection)

In [None]:
bbox = prisma_bbox(selected_s2_image, sel_s2_date, output_path)

In [None]:
mask_prisma = mask_prisma_image(selected_s2_image)

## 3.Import the classified image to be assessed

In [None]:
print(f'Selected classification method: {classification_method}')

In [None]:
classified_image_path = output_path + '\classified_' + classification_method + '_' + sel_s2_date.replace('-', '') + '_medianfilter_20m.tif' # Classified image path
classified_image = rasterio.open(classified_image_path)
print(f"Selected image shape: {classified_image.shape}")

## 4. Assess classification accuracy on testing samples

In [None]:
accuracy, confusion, report, report_df = print_accuracy_s2(classification_method, sel_s2_date, sel_s2_date, legend, classified_image_path, testing_set_path )
print(sel_s2_date)

In [None]:
confusion

In [None]:
report_df