# Silver layer
# Passaggio 0: Verifica e caricamento dei dati
Questo primo blocco carica i dati e stampa a schermo la data più vecchia e quella più recente che trova

In [None]:
import pandas as pd
from deltalake import DeltaTable, write_deltalake

import os
from dotenv import load_dotenv

load_dotenv()

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)


print("Inizio Fase 1: Caricamento e Preparazione...")


# ================== CONFIGURAZIONE ==================
storage_options = { 
    "AWS_ACCESS_KEY_ID": os.getenv("AWS_ACCESS_KEY_ID"),                
    "AWS_SECRET_ACCESS_KEY": os.getenv("AWS_SECRET_ACCESS_KEY"), 
    "AWS_ENDPOINT_URL": os.getenv("AWS_ENDPOINT_URL"), 
    "AWS_ALLOW_HTTP": "true" 
}

try:
    df_misure_raw = DeltaTable("s3a://external/qualita_aria_bronze", storage_options=storage_options).to_pandas()
    df_stazioni_anagrafica = DeltaTable("s3a://external/anagrafica_stazioni_bronze", storage_options=storage_options).to_pandas()
    df_parametri_anagrafica = DeltaTable("s3a://external/anagrafica_parametri_bronze", storage_options=storage_options).to_pandas()
    print("Tabelle Bronze caricate.")
except Exception as e:
    print(f"ERRORE CRITICO: Impossibile caricare le tabelle. Dettagli: {e}")

df_misure_raw.rename(columns={'COD_STAZ': 'id_stazione', 'ID_PARAM': 'id_parametro'}, inplace=True)
df_parametri_anagrafica.rename(columns={'IdParametro': 'id_parametro'}, inplace=True)
df_stazioni_anagrafica.rename(columns={'Cod_staz': 'id_stazione'}, inplace=True)
df_lavoro = pd.merge(df_misure_raw, df_parametri_anagrafica[['id_parametro', 'PARAMETRO']], on='id_parametro', how='left')

df_lavoro['DATA_INIZIO'] = pd.to_datetime(df_lavoro['DATA_INIZIO'])
df_lavoro['VALORE'] = pd.to_numeric(df_lavoro['VALORE'], errors='coerce')
df_lavoro.dropna(subset=['VALORE', 'PARAMETRO'], inplace=True)

df_stazioni_anagrafica['id_stazione'] = pd.to_numeric(df_stazioni_anagrafica['id_stazione'], errors='coerce')


print("df_misure_raw------------------------------------------------------")
df_misure_raw.info()
print("df_parametri_anagrafica------------------------------------------------------")
df_parametri_anagrafica.info()
print("df_stazioni_anagrafica------------------------------------------------------")
df_stazioni_anagrafica.info()
print("df_lavoro------------------------------------------------------")
df_lavoro.info()

print("DataFrame di lavoro pronto.")

Inizio Fase 1: Caricamento e Preparazione...
Tabelle Bronze caricate.


# Passaggio 1: Preparazione del DataFrame di Lavoro
Ora che abbiamo verificato i dati, li prepariamo per la trasformazione. Questo include la pulizia dei nomi, l'unione delle informazioni e la conversione dei tipi di dato.


In [None]:
print("Inizio Fase 1: Preparazione DataFrame di Lavoro...")

# Standardizziamo i nomi delle colonne chiave per unire i dati in modo affidabile
df_misure_raw.rename(columns={'COD_STAZ': 'id_stazione', 'ID_PARAM': 'id_parametro'}, inplace=True)
df_parametri_anagrafica.rename(columns={'IdParametro': 'id_parametro'}, inplace=True)
df_stazioni_anagrafica.rename(columns={'Cod_staz': 'id_stazione'}, inplace=True)

# Uniamo le misurazioni con il nome del parametro per avere un contesto
df_lavoro = pd.merge(df_misure_raw, df_parametri_anagrafica[['id_parametro', 'PARAMETRO']], on='id_parametro', how='left')

# Convertiamo il valore in numerico, gestendo eventuali errori
df_lavoro['VALORE'] = pd.to_numeric(df_lavoro['VALORE'], errors='coerce')
# Rimuoviamo righe dove il valore non è valido o il nome del parametro è mancante
rows_before = len(df_lavoro)
df_lavoro.dropna(subset=['VALORE', 'PARAMETRO'], inplace=True)
rows_after = len(df_lavoro)
print(f"Rimossi {rows_before - rows_after} record con date o valori non validi.")
df_lavoro.info()

print("DataFrame di lavoro pronto per la Feature Engineering.")

# Passaggio 2: Feature Engineering (Costruzione del Profilo Stazione)
Questa è la fase centrale, dove creiamo le 6 feature che descrivono ogni stazione. Ogni blocco di codice calcola una o più feature specifiche.
è stato dei


### Feature 1 & 2: Media e Variabilità Generale

**Obiettivo:** Calcolare le due statistiche più fondamentali per ogni inquinante misurato da una stazione.
- **Media (`mean`):** Rappresenta il "livello di base" di inquinamento. Ci dice se una stazione si trova, in media, in una zona più o meno inquinata.
- **Deviazione Standard (`std`):** Rappresenta la "variabilità" o "nervosismo" della stazione. Ci dice se i livelli di inquinamento sono stabili o se tendono ad avere forti picchi e crolli.

**Risultato:** Il DataFrame `df_stats` conterrà una riga per ogni stazione e una colonna per la media e la deviazione standard di *ciascun* inquinante (es. `mean_PM10`, `std_PM10`, `mean_NO2_Biossido_di_azoto`, ecc.).

In [None]:
print("1, 2. Calcolo di Media e Deviazione Standard...")
df_stats = df_lavoro.pivot_table(index='id_stazione', columns='PARAMETRO', values='VALORE', aggfunc=['mean', 'std'])
df_stats.columns = [f'{stat}_{param.replace(" ", "_").replace("(", "").replace(")", "")}' for stat, param in df_stats.columns]
df_stats.info()
#print(df_stats)
print("1, 2. Calcolo di Media e Deviazione Standard completato")
# il risultato: ci saranno una colonna per media e dev std per ogni inquinante registrato e una entry per ogni stazione. Il codice stazione non è più una colonna ma l'indice di accesso ad una riga


### Feature 3: Ciclo Settimanale (Impronta Umana)

**Obiettivo:** Quantificare l'impatto delle attività umane (come traffico e lavoro) su ogni inquinante. Lo facciamo calcolando il rapporto tra la concentrazione media nei giorni feriali (Lunedì-Venerdì) e quella nei giorni festivi (Sabato-Domenica).

- Un valore **> 1** suggerisce una forte influenza delle attività settimanali (tipico delle stazioni "da traffico").
- Un valore **~ 1** suggerisce un'influenza scarsa o nulla (tipico delle stazioni "di fondo" o rurali).

**Risultato:** Il DataFrame `df_rapporto_settimanale` conterrà una riga per ogni stazione e una colonna per questo rapporto, calcolato per ogni inquinante (es. `rapporto_feriale_festivo_NO2_Biossido_di_azoto`).

In [None]:
print("3. Calcolo del Ciclo Settimanale per tutti gli inquinanti...")
df_lavoro['feriale'] = df_lavoro['DATA_INIZIO'].dt.dayofweek < 5
df_medie_periodo = df_lavoro.pivot_table(index='id_stazione', columns=['PARAMETRO', 'feriale'], values='VALORE', aggfunc='mean')
df_rapporto_settimanale = pd.DataFrame(index=df_medie_periodo.index)
for param in df_lavoro['PARAMETRO'].unique():
    if (param, True) in df_medie_periodo.columns and (param, False) in df_medie_periodo.columns:
        nome_colonna = f'rapporto_feriale_festivo_{param.replace(" ", "_").replace("(", "").replace(")", "")}'
        df_rapporto_settimanale[nome_colonna] = df_medie_periodo[(param, True)] / df_medie_periodo[(param, False)]

df_medie_periodo.info()
print("------------------------------------------------------")
df_rapporto_settimanale.info()


print("3. Calcolo del Ciclo Settimanale per NO2 completato")


### Feature 4: Ciclo Stagionale (Impronta Climatica)

**Obiettivo:** Capire come il profilo di ogni inquinante cambia con le stagioni. Calcoliamo la concentrazione media per ciascuna delle quattro stagioni (inverno, primavera, estate, autunno). Questo ci permette di identificare pattern legati al clima, come l'accumulo di PM10 in inverno a causa delle condizioni meteorologiche o i picchi di Ozono in estate.

**Risultato:** Il DataFrame `df_medie_stagionali` conterrà una riga per ogni stazione e quattro colonne per ogni inquinante, una per la media di ogni stagione (es. `media_inverno_PM10`, `media_primavera_PM10`, ecc.).

In [None]:
print("4. Calcolo del Ciclo Stagionale a 4 stagioni per tutti gli inquinanti...")

# 1. Definiamo i mesi per ogni stagione come da te specificato
stagioni_mapping = {
    1: 'inverno', 2: 'inverno', 3: 'primavera',
    4: 'primavera', 5: 'primavera', 6: 'estate',
    7: 'estate', 8: 'estate', 9: 'autunno',
    10: 'autunno', 11: 'autunno', 12: 'inverno'
}

# 2. Creiamo una colonna 'stagione' nel DataFrame di lavoro
df_lavoro['mese'] = df_lavoro['DATA_INIZIO'].dt.month
df_lavoro['stagione'] = df_lavoro['mese'].map(stagioni_mapping)

# 3. Calcoliamo la media per ogni inquinante in ciascuna delle 4 stagioni.
#    pivot_table è perfetto per questo: creerà una colonna per ogni combinazione
#    di inquinante e stagione (es. PM10-inverno, PM10-primavera, etc.).
df_medie_stagionali = df_lavoro.pivot_table(
    index='id_stazione', 
    columns=['PARAMETRO', 'stagione'], 
    values='VALORE', 
    aggfunc='mean'
)

# 4. Rinominiamo le colonne per renderle pulite e utilizzabili,
#    ad esempio ('PM10', 'inverno') diventa 'media_inverno_PM10'.
df_medie_stagionali.columns = [
    f'media_{stagione}_{param.replace(" ", "_").replace("(", "").replace(")", "")}' 
    for param, stagione in df_medie_stagionali.columns
]

# Il DataFrame 'df_medie_stagionali' ora contiene le feature che ci servono
# e andrà a sostituire il vecchio 'df_rapporto_stagionale' nell'unione finale.
print("4. Calcolo delle medie per 4 stagioni completato.")

### Feature 5: Eventi Estremi (Rischio Sanitario)

**Obiettivo:** Misurare non solo il comportamento medio, ma anche la frequenza degli eventi ad alto inquinamento. Calcoliamo il numero totale di giorni in cui la media giornaliera di PM10 ha superato la soglia di legge di 50 µg/m³. Questa feature è un indicatore diretto del rischio per la salute in quella zona.

**Risultato:** Il DataFrame `df_superamenti` conterrà una riga per ogni stazione e una singola colonna, `giorni_superamento_soglia_PM10`, con il conteggio totale.

In [None]:
print("5. Calcolo dei Superamenti Soglia per PM10...")
param_pm10 = 'PM10' # Assicurati che questo nome sia corretto
soglia_pm10 = 50
df_pm10 = df_lavoro[df_lavoro['PARAMETRO'] == param_pm10]

# --- INIZIO DELLA CORREZIONE ---
# Metodo robusto con .apply() per evitare l'errore di indicizzazione

# 1. Definiamo una funzione che opera su un piccolo DataFrame (i dati di una singola stazione)
def count_daily_exceedances(group):
    # Se il gruppo è vuoto, non ci sono superamenti
    if group.empty:
        return 0
    # Impostiamo la data come indice per poter usare resample
    group = group.set_index('DATA_INIZIO')
    # Calcoliamo la media giornaliera
    daily_mean = group['VALORE'].resample('D').mean()
    # Contiamo quante di queste medie giornaliere superano la soglia
    return (daily_mean > soglia_pm10).sum()

# 2. Applichiamo questa funzione a ogni gruppo di stazioni
#anche se lancia warning non ci interessa perchè non tocco la colonna di raggruppamento
df_superamenti = df_pm10.groupby('id_stazione').apply(count_daily_exceedances, include_groups=False).to_frame(name='giorni_superamento_soglia_PM10')
# --- FINE DELLA CORREZIONE ---

print("5. Calcolo dei Superamenti Soglia per PM10 completato")

### Feature 6: Tipo di Area (Contesto Urbanistico)

**Obiettivo:** Classificare ogni stazione in base al contesto urbano in cui si trova. Abbiamo definito una feature categorica che distingue tra stazioni situate in un **Capoluogo di provincia** e quelle in altri comuni (**Provincia**). Questa distinzione ci aiuta a separare i pattern delle grandi aree metropolitane da quelli delle zone meno densamente popolate.

Per rendere questa informazione utilizzabile dal modello di machine learning, è stata trasformata in colonne numeriche (es. `Tipo_Area_Capoluogo` con valore 1 o 0) tramite One-Hot Encoding.

**Risultato:** Il DataFrame `df_area_tipo` contiene, per ogni stazione, queste nuove colonne binarie che ne descrivono il contesto urbanistico.

In [None]:
print("--- Calcolo Feature 6: Tipo di Area ---")

# 1. Definiamo i capoluoghi di provincia dell'Emilia-Romagna, basandoci sulla lista fornita
capoluoghi = [
    'BOLOGNA', 'FERRARA', 'FORLI\'', 'CESENA', 'MODENA', 'PARMA', 
    'PIACENZA', 'RAVENNA', 'REGGIO NELL\'EMILIA', 'RIMINI'
    # Nota: Cesena è capoluogo insieme a Forlì, ma potrebbe apparire come comune a sé
]

# 2. Creiamo una tabella pulita con le informazioni anagrafiche, se non già esistente
if 'df_stazioni_info' not in locals():
    colonne_anagrafiche = ['id_stazione', 'Stazione', 'COMUNE', 'PROVINCIA', 'Coord_X', 'Coord_Y', 'LON_GEO', 'LAT_GEO']
    df_stazioni_info = df_stazioni_anagrafica[colonne_anagrafiche].drop_duplicates(subset=['id_stazione'])

# 3. Creiamo la feature categorica 'Tipo_Area'
# Usiamo .str.upper() per rendere il confronto insensibile a maiuscole/minuscole
df_stazioni_info['Tipo_Area'] = df_stazioni_info['COMUNE'].str.upper().apply(
    lambda comune: 'Capoluogo' if comune in capoluoghi else 'Provincia'
)

# 4. Applichiamo One-Hot Encoding
df_area_tipo = pd.get_dummies(
    df_stazioni_info[['id_stazione', 'Tipo_Area']], 
    columns=['Tipo_Area'], 
    prefix='Tipo_Area'
).astype(int)

print("Calcolo 'Tipo di Area' completato.")
print("Anteprima del nuovo DataFrame di feature:")
display(df_area_tipo.head())

### Feature 7: Densità di Stazioni Vicine (Contesto della Rete)

**Obiettivo:** Misurare il grado di "isolamento" di una stazione all'interno della rete di monitoraggio. Per ogni stazione, abbiamo calcolato quante altre stazioni si trovano entro un **raggio di 20 km**, utilizzando le loro coordinate geografiche (`LON_GEO`, `LAT_GEO`).

- Un valore **alto** indica che la stazione fa parte di una rete fitta, tipicamente in un'area di grande interesse o criticità (es. un'area urbana).
- Un valore **basso (o zero)** indica una stazione più isolata, probabilmente con funzione di monitoraggio di fondo, rurale o montano.

**Risultato:** Il DataFrame `df_densita` contiene, per ogni stazione, la colonna `stazioni_vicine_20km` con il conteggio delle stazioni limitrofe.

In [None]:
from sklearn.metrics.pairwise import haversine_distances
import numpy as np

print("--- Calcolo Feature 7: Densità di Stazioni Vicine ---")

# 1. Usiamo le coordinate geografiche pulite dal nostro DataFrame anagrafico
df_coords = df_stazioni_info[['id_stazione', 'LAT_GEO', 'LON_GEO']].dropna()

# 2. Convertiamo i gradi in radianti per il calcolo matematico
df_coords['lat_rad'] = np.radians(df_coords['LAT_GEO'])
df_coords['lon_rad'] = np.radians(df_coords['LON_GEO'])

# 3. Calcoliamo la matrice delle distanze (in km) tra ogni coppia di stazioni
#    haversine_distances è lo strumento perfetto per questo.
dist_matrix_km = haversine_distances(df_coords[['lat_rad', 'lon_rad']]) * 6371  # Raggio medio della Terra in km

# 4. Per ogni stazione (riga della matrice), contiamo quante altre sono nel raggio definito
raggio_km = 20
# La condizione (row > 0) è fondamentale per escludere la distanza di una stazione con se stessa.
stazioni_vicine = [np.sum((row > 0) & (row <= raggio_km)) for row in dist_matrix_km]

# 5. Creiamo il DataFrame finale con i risultati
df_densita = pd.DataFrame({
    'id_stazione': df_coords['id_stazione'],
    f'stazioni_vicine_{raggio_km}km': stazioni_vicine
})

print("Calcolo 'Densità di Stazioni Vicine' completato.")
print("Anteprima del nuovo DataFrame di feature:")
display(df_densita.head())

### Fase Finale: Unione, Pulizia e Salvataggio del Silver Layer

**Obiettivo:** Assemblare tutti i DataFrame di feature calcolati in un'unica, grande tabella `df_silver_profiles`. Questo passaggio finale include:
1.  **Concatenazione** di tutte le feature (statistiche, cicli temporali, correlazioni e geografiche).
2.  **Gestione dei valori nulli (`NaN`)** che possono emergere durante i calcoli.
3.  **Arricchimento** con i metadati anagrafici (nomi delle stazioni, comuni, province).
4.  **Pulizia finale** per rimuovere eventuali stazioni per cui non sono state trovate informazioni anagraf

In [None]:
# ===================================================================
# FASE FINALE: UNIONE COMPLETA, PULIZIA E SALVATAGGIO
# ===================================================================
print("--- Inizio Fase Finale: Unione, Pulizia e Salvataggio ---")

# --- 1. Unione di TUTTE le 8 categorie di feature calcolate ---
print("\n[Passaggio 1] Unione di tutti i DataFrame di feature...")
# Mettiamo tutti i DataFrame (inclusi quelli geografici) nella lista
df_list = [
    df_stats, 
    df_rapporto_settimanale, 
    df_medie_stagionali, 
    df_superamenti, 
    df_area_tipo.set_index('id_stazione'), 
    df_densita.set_index('id_stazione')
]
df_silver_profiles = pd.concat(df_list, axis=1)
print(f"Tabella concatenata creata con {df_silver_profiles.shape} colonne di feature.")

# --- 2. Gestione dei valori NaN (numerici) ---
print("[Passaggio 2] Gestione dei valori nulli nelle feature numeriche...")
# Riempiamo i valori mancanti come prima
df_silver_profiles.fillna(0, inplace=True)
for col in df_silver_profiles.columns:
    if 'rapporto' in col:
        df_silver_profiles[col] = df_silver_profiles[col].replace(0, 1)
    if 'corr' in col:
        if '_vs_' in col and col.split('_vs_') == col.split('_vs_'):
            df_silver_profiles[col] = df_silver_profiles[col].replace(0, 1)
        else:
            df_silver_profiles[col] = df_silver_profiles[col].replace(0, 0)
print("Valori nulli numerici gestiti.")

# --- 3. Arricchimento con i metadati anagrafici ---
print("[Passaggio 3] Unione con i metadati anagrafici...")
# Trasformiamo l'indice 'id_stazione' in una colonna per il merge
df_silver_profiles.reset_index(inplace=True)

# Eseguiamo il merge con le informazioni delle stazioni
df_silver_profiles = pd.merge(df_silver_profiles, df_stazioni_info, on='id_stazione', how='left')
print("Merge con anagrafica completato.")

# --- 4. Pulizia finale: gestione delle stazioni senza anagrafica ---
print("[Passaggio 4] Pulizia delle stazioni con anagrafica mancante...")
righe_con_nan_anagrafica = df_silver_profiles['COMUNE'].isnull().sum()
if righe_con_nan_anagrafica > 0:
    print(f"Trovate {righe_con_nan_anagrafica} stazioni con metadati mancanti. Verranno rimosse dall'analisi finale.")
    df_silver_profiles.dropna(subset=['COMUNE'], inplace=True)
    print(f"Stazioni rimosse. La tabella finale ora contiene {len(df_silver_profiles)} stazioni.")
    display(df_silver_profiles)
else:
    print("Nessuna stazione con anagrafica mancante. Ottimo.")

### Fase Finale: Test Estensivi di Qualità e Coerenza del Silver Layer

**Obiettivo:** Eseguire una serie di controlli approfonditi sulla tabella finale `df_silver_profiles` per validarne la qualità prima di procedere con il machine learning. Questi test vanno oltre la semplice presenza di dati, verificando:
- **Completezza:** Assenza di valori nulli non gestiti.
- **Integrità Strutturale:** Unicità degli identificativi delle stazioni.
- **Coerenza Logica:** I valori calcolati rispettano vincoli logici (es. deviazioni standard non negative).
- **Validità di Dominio:** I valori rientrano in range plausibili (es. correlazioni tra -1 e 1).
- **Coerenza Geografica:** Le coordinate delle stazioni sono plausibili per la regione di studio.

Superare questi test ci dà un'alta fiducia nella qualità del dataset che useremo per il clustering.

In [None]:
print("--- INIZIO DEI TEST SULLA TABELLA SILVER FINALE ---")

# Assicurati che df_silver_profiles sia il tuo DataFrame finale e completo
# (già unito con tutte le feature, inclusi i metadati)

# Test 1: Controllo dei Valori Nulli (Completezza)
print("\n[Test 1] Controllo Valori Nulli...")
valori_nulli = df_silver_profiles.isnull().sum().sum()
if valori_nulli == 0:
    print("PASSATO: La tabella non contiene valori nulli (NaN). Il dataset è completo.")
else:
    print(f"FALLITO: Sono stati trovati {valori_nulli} valori nulli. Ispezionare le seguenti colonne:")
    print(df_silver_profiles.isnull().sum()[df_silver_profiles.isnull().sum() > 0])

# Test 2: Unicità delle Stazioni (Integrità Strutturale)
print("\n[Test 2] Controllo Unicità Stazioni...")
if df_silver_profiles['id_stazione'].is_unique:
    print(f"PASSATO: Ogni riga rappresenta una stazione unica. L'ID è una chiave primaria valida.")
else:
    print(f"FALLITO: Sono presenti ID di stazione duplicati. Questo è un problema critico.")
    duplicati = df_silver_profiles[df_silver_profiles['id_stazione'].duplicated()]['id_stazione']
    print(f"ID duplicati: {duplicati.tolist()}")

# Test 3: Coerenza Logica dei Valori Calcolati
print("\n[Test 3] Controllo Coerenza Logica (Valori Non Negativi)...")
# Deviazioni standard, rapporti e conteggi non possono essere negativi.
colonne_da_testare = [c for c in df_silver_profiles.columns if c.startswith('std_') or c.startswith('rapporto_') or c.startswith('giorni_')]
min_valori = df_silver_profiles[colonne_da_testare].min()
if (min_valori >= 0).all():
    print("PASSATO: Tutte le deviazioni standard, i rapporti e i conteggi sono correttamente maggiori o uguali a zero.")
else:
    print("FALLITO: Trovati valori negativi dove non dovrebbero esserci. Dettaglio:")
    print(min_valori[min_valori < 0])

# Test 4: Validità di Dominio delle Correlazioni
print("\n[Test 4] Controllo Range Valori di Correlazione...")
colonne_corr = [col for col in df_silver_profiles.columns if 'corr_' in str(col)]
min_corr = df_silver_profiles[colonne_corr].min().min()
max_corr = df_silver_profiles[colonne_corr].max().max()
if min_corr >= -1 and max_corr <= 1:
    print(f"PASSATO: Tutti i valori di correlazione sono correttamente compresi tra -1 e 1.")
else:
    print(f"FALLITO: Trovati valori di correlazione non validi. Min: {min_corr}, Max: {max_corr}")

# Test 5: Coerenza tra Feature Correlate
print("\n[Test 5] Controllo Coerenza tra Media e Deviazione Standard...")
# Se la media di un inquinante è 0, anche la sua deviazione standard deve essere 0.
colonne_mean = [c for c in df_silver_profiles.columns if c.startswith('mean_')]
incoerenze = 0
for col_mean in colonne_mean:
    col_std = col_mean.replace('mean_', 'std_')
    if col_std in df_silver_profiles.columns:
        # Trova le righe dove la media è 0 ma la std non lo è
        stazioni_incoerenti = df_silver_profiles[(df_silver_profiles[col_mean] == 0) & (df_silver_profiles[col_std] != 0)]
        if not stazioni_incoerenti.empty:
            incoerenze += 1
            print(f"  - FALLITO per {col_mean}: {len(stazioni_incoerenti)} stazioni hanno media 0 ma std != 0.")
if incoerenze == 0:
    print("PASSATO: La deviazione standard è coerente con la media per tutte le feature.")

# Test 6: Coerenza Geografica (Bounding Box)
print("\n[Test 6] Controllo Coerenza Geografica delle Coordinate...")
# Definiamo un "recinto" geografico approssimativo per l'Emilia-Romagna
lat_min, lat_max = 43.7, 45.1
lon_min, lon_max = 9.2, 12.7
lat_fuori_range = df_silver_profiles[(df_silver_profiles['LAT_GEO'] < lat_min) | (df_silver_profiles['LAT_GEO'] > lat_max)]
lon_fuori_range = df_silver_profiles[(df_silver_profiles['LON_GEO'] < lon_min) | (df_silver_profiles['LON_GEO'] > lon_max)]
if lat_fuori_range.empty and lon_fuori_range.empty:
    print("PASSATO: Tutte le coordinate delle stazioni rientrano nei confini geografici attesi per l'Emilia-Romagna.")
else:
    print("FALLITO: Trovate stazioni con coordinate geografiche non plausibili.")
    if not lat_fuori_range.empty:
        print(f"Stazioni con latitudine fuori range: {lat_fuori_range['id_stazione'].tolist()}")
    if not lon_fuori_range.empty:
        print(f"Stazioni con longitudine fuori range: {lon_fuori_range['id_stazione'].tolist()}")


# Test 7 (Finale): Ispezione Manuale delle Statistiche
print("\n[Test 7] Analisi Statistiche Descrittive (Campione) per Ispezione Manuale...")
print("Controllare manualmente la colonna 'std' (deviazione standard) per valori molto alti che potrebbero indicare anomalie o outlier.")
colonne_campione = [
    'mean_PM10', 'std_PM10', 
    'rapporto_feriale_festivo_NO2_Biossido_di_azoto',
    'media_inverno_PM10', 'media_estate_PM10',
    'corr_NO2_Biossido_di_azoto_vs_PM10',
    'giorni_superamento_soglia_PM10',
    f'stazioni_vicine_20km',
    'Tipo_Area_Capoluogo'
]
colonne_campione_esistenti = [col for col in colonne_campione if col in df_silver_profiles.columns]
display(df_silver_profiles[colonne_campione_esistenti].describe().T)

print("\n--- TEST COMPLETATI ---")

## FASE FINALE: SALVATAGGIO LOCALE DEL SILVER LAYER

In [None]:
# ===================================================================
# FASE FINALE: SALVATAGGIO LOCALE DEL SILVER LAYER
# ===================================================================
print("--- Inizio Fase di Salvataggio Locale ---")

# 1. Definiamo il percorso locale dove salvare la tabella Silver
#    Assicurati che la cartella 'delta_tables_silver' esista allo stesso livello di 'scripts' e 'notebooks'
#    Se non esiste, creala con il comando 'mkdir delta_tables_silver' dal terminale.
local_silver_path = "../delta_tables/profili_stazioni_silver"

# 2. Ci assicuriamo che l'indice sia una colonna normale, come richiesto da Delta Lake
#    Se df_silver_profiles ha ancora 'id_stazione' come indice, questa riga è necessaria.
#    Se è già una colonna, non darà errore.
if 'id_stazione' not in df_silver_profiles.columns:
    df_silver_profiles.reset_index(inplace=True)

# 3. Salviamo il DataFrame in formato Delta Lake nella cartella locale
try:
    write_deltalake(
        local_silver_path,
        df_silver_profiles,
        mode="overwrite"
    )
    print(f"\nTabella Silver salvata con successo nel percorso locale:")
    print(local_silver_path)
    print("\nOra puoi eseguire lo script di upload per caricarla su MinIO.")

except Exception as e:
    print(f"\nERRORE durante il salvataggio locale: {e}")