## Cálculo de Métricas de Cobertura con Sedona y NumPy

Este notebook demuestra un flujo de trabajo completo para el análisis geoespacial distribuido. El objetivo es calcular métricas de ecología del paisaje a partir de datos de cobertura del suelo de **MapBiomas Chile**, almacenados en una base de datos PostGIS como un cubo de datos ráster.
El proceso se estructura en un pipeline ETL (Extracción, Transformación, Carga) canónico:
1.  **Extracción (E):** Se realiza una ingesta de datos vía JDBC desde una base de datos PostgreSQL/PostGIS. La clave de esta fase es la conversión al vuelo del tipo de dato `raster` de PostGIS a un formato `GeoTIFF` binario, lo que garantiza la compatibilidad con Apache Sedona.
2.  **Transformación (T):**
    - **Integración:** Se crea un Almacén de Datos Operacional (ODS) uniendo los datos ráster con datos vectoriales (comunas, grillas) para enriquecerlos y filtrarlos según una Región de Interés (ROI).
    - **Cálculo:** Se ejecutan las métricas de paisaje (Composición, Fragmentación, Transición) utilizando **Funciones Definidas por el Usuario (UDFs) de Python**. Estas UDFs están altamente optimizadas, delegando el procesamiento numérico a librerías como **NumPy y SciPy** para un rendimiento superior.
3.  **Carga (L):** Los resultados agregados (Data Marts) se muestran en formato de tabla, listos para su análisis o persistencia.

### 1. Preparación del Entorno: Librerías y Rutas
En esta sección, importamos las librerías necesarias y configuramos las rutas del proyecto. Esto es fundamental para asegurar que el notebook pueda localizar el driver JDBC de PostgreSQL y el archivo de configuración `.env`.

In [None]:
# Importación de librerías principales
from pathlib import Path  # Para el manejo de rutas de sistema de forma agnóstica al SO
from sedona.spark import SedonaContext  # El punto de entrada para las funcionalidades de Sedona
from pyspark.sql import functions as F  # Funciones built-in de Spark para manipulación de DataFrames
from pyspark.sql.types import MapType, IntegerType, LongType, StructType, StructField, DoubleType # Tipos de datos para definir esquemas
import numpy as np  # Librería para computación numérica, clave para el procesamiento de píxeles
from dotenv import dotenv_values  # Para cargar variables de entorno (credenciales) desde el archivo .env
import json # Para manejar la lista de clases en la UDF de fragmentación
import IPython.display as display  # Para mostrar DataFrames de Spark en notebooks Jupyter

# --- Configuración de Rutas ---
# Se asume que el notebook está en la carpeta /notebook del proyecto
notebook_dir = Path.cwd()
project_dir = notebook_dir.parent
env_path = project_dir / '.env' # Ruta al archivo con las credenciales de la BD
jdbc_driver_path = str(project_dir / 'libs' / 'postgresql-42.6.0.jar') # Ruta al conector JDBC de PostgreSQL

print(f'Ruta del driver JDBC: {jdbc_driver_path}')

### 2. Inicialización de Spark y Registro de UDFs
Aquí se configura e inicializa la sesión de Spark con Sedona. La configuración de memoria es crucial para manejar grandes volúmenes de datos geoespaciales. 
A continuación, se definen y registran las UDFs que encapsulan la lógica de negocio para el cálculo de las métricas. Estas funciones convierten los datos ráster a arrays de NumPy para un procesamiento matricial eficiente.

#### 2.1. Configuración de la Sesión de Spark
<small>Se configura la sesión de Spark con Sedona, ajustando la memoria para el procesamiento de datos geoespaciales y añadiendo el driver JDBC necesario. La memoria del `driver` (4g) y la memoria `Off-Heap` (2g) son cruciales para optimizar el manejo de objetos geoespaciales de gran tamaño.</small>

In [None]:
# --- Configuración de la Sesión de Spark ---
# La memoria del driver (driver.memory) se ha aumentado a 4g para poder manejar operaciones 
# que colectan datos al driver, como el join espacial a nivel nacional y la visualización con .toPandas().
# La memoria Off-Heap se habilita para que Sedona maneje geometrías y rásters fuera del Heap de la JVM, reduciendo la presión sobre el Garbage Collector.
spark = SedonaContext.builder() \
    .appName("Sedona_Multizone_Nacional_ETL") \
    .master("local[*]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.jars", f"file://{jdbc_driver_path}") \
    .getOrCreate()
    
# Reducir verbosidad de logs para una salida más limpia
spark.sparkContext.setLogLevel("ERROR")

#### 2.2. Inicializar Sedona
<small>Crea el contexto de Sedona, un paso crucial que habilita todas las funciones y tipos de datos geoespaciales específicos de Sedona dentro del entorno de Spark.</small>

In [None]:
# Inicializa el contexto de Sedona para registrar y utilizar sus funciones y tipos específicos
sedona = SedonaContext.create(spark)

#### 2.3. UDF - Composición de Cobertura (Histograma)

<small>Esta UDF calcula el histograma de valores de píxeles para una tesela de ráster. Es el pilar para la métrica de composición, ya que permite un conteo eficiente de las clases de cobertura a nivel de cada tesela.</small>

In [None]:
# UDF: composición (histograma de clases)
def get_composition_udf(raster):
    try:
        data = raster.as_numpy()[0]
        unique, counts = np.unique(data, return_counts=True)
        return {int(k): int(v) for k, v in zip(unique, counts) if not np.isnan(k)}
    except Exception:
        return {}

#### 2.4. UDF - Fragmentación (NP, MPS)
<small>Esta UDF utiliza `scipy.ndimage.label` para identificar componentes conectados (parches) y así calcular el Número de Parches (NP) y el Tamaño Medio del Parche (MPS) para un conjunto de clases de interés.</small>

In [None]:
# UDF: fragmentación (NP, MPS)
def get_fragmentation_udf(raster, class_list_json):
    try:
        from scipy import ndimage
        data = raster.as_numpy()[0]
        classes = json.loads(class_list_json)
        mask = np.isin(data, classes).astype(int)
        _, num_features = ndimage.label(mask)
        total_px = np.sum(mask)
        mps = float(total_px) / num_features if num_features > 0 else 0.0
        return {"NP": int(num_features), "MPS_px": mps}
    except Exception:
        return {"NP": 0, "MPS_px": 0.0}

#### 2.5. UDF - Matriz de Transición de Cobertura

<small>Calcula una matriz de transición entre dos teselas de ráster (tiempo 1 y tiempo 2). Codifica cada transición de píxel en un número único (`origen * 100 + destino`) para facilitar la agregación posterior.</small>

In [None]:
# UDF: transiciones entre dos rásters
def get_transition_udf(raster_t1, raster_t2):
    try:
        d1 = raster_t1.as_numpy()[0].astype(np.int32)
        d2 = raster_t2.as_numpy()[0].astype(np.int32)
        trans = (d1 * 100) + d2
        unique, counts = np.unique(trans, return_counts=True)
        return {int(k): int(v) for k, v in zip(unique, counts) if not np.isnan(k)}
    except Exception:
        return {}

#### 2.6. Registro de UDFs en Spark
<small>Este paso hace que las funciones de Python definidas anteriormente estén disponibles en el entorno SQL de Spark. Se especifica el nombre de la función y el esquema de datos que devuelve, lo que permite a Spark optimizar su ejecución.</small>

In [None]:
# --- Registro de UDFs en Spark ---
spark.udf.register('get_composition', get_composition_udf, MapType(IntegerType(), LongType()))
fragmentation_schema = StructType([
    StructField("NP", IntegerType(), True),
    StructField("MPS_px", DoubleType(), True)
])
spark.udf.register('get_fragmentation', get_fragmentation_udf, fragmentation_schema)
spark.udf.register('get_transition', get_transition_udf, MapType(IntegerType(), LongType()))
print('Spark y UDFs (get_composition, get_fragmentation, get_transition) registrados correctamente.')

### 3. Fase de Extracción (Carga de Raw Data)

En esta fase, nos conectamos a la base de datos PostgreSQL y cargamos las tablas dimensionales que necesitaremos para el análisis:
- **`mapbiomas_clases`**: La leyenda de clases de cobertura.
- **`dpa_comuna_subdere`**: Las geometrías de las comunas de Chile.
- **`mapbiomas_grid`**: La grilla de teselas que cubre el área de estudio.

Los datos geométricos se leen como WKB (Well-Known Binary) y se convierten al tipo `Geometry` de Sedona usando `ST_GeomFromWKB`.

<small>Los datos fueron extraídos de sus fuentes e integrados a PostgreSQL mediante la función `raster2psql` previamente y se generó una grilla vectorial a partir de los tiles 512x512 con la función PostGIS `ST_Envelope()`<small>

In [None]:
# Carga de credenciales desde el archivo .env
config = dotenv_values(env_path)
jdbc_url = f"jdbc:postgresql://{config.get('POSTGRES_HOST')}:{config.get('POSTGRES_PORT')}/{config.get('POSTGRES_DB')}"
jdbc_props = {'user': config.get('POSTGRES_USER'), 'password': config.get('POSTGRES_PASSWORD'), 'driver': 'org.postgresql.Driver'}

# Carga de la tabla de clases de MapBiomas. Se cachea porque es pequeña y se usa en múltiples joins.
df_clases = spark.read.jdbc(jdbc_url, 'uso_suelo.mapbiomas_clases', properties=jdbc_props).cache()

# Carga de las geometrías de las comunas
df_comunas = spark.read.jdbc(jdbc_url, 'dpa_limites.dpa_comuna_subdere', properties=jdbc_props)
df_comunas.createOrReplaceTempView('raw_comunas')
df_comunas = spark.sql('SELECT comuna_id, comuna, ST_GeomFromWKB(geometria) as geometria FROM raw_comunas')

# Carga de la grilla de referencia
df_grid = spark.read.jdbc(jdbc_url, 'uso_suelo.mapbiomas_grid', properties=jdbc_props)
df_grid.createOrReplaceTempView('raw_grid')
df_grid = spark.sql('SELECT grid_id, ST_GeomFromWKB(geometria) as geometria FROM raw_grid')

print('Raw Data: Tablas dimensionales cargadas.')

### 4. Data Store (ODS) Curada
El ODS es un DataFrame intermedio que integra todos los datos necesarios para el análisis. Los pasos son:
1.  **Definir una Región de Interés (ROI):** Se especifica una lista de comunas para acotar el análisis.
2.  **Join Espacial:** Se cruzan las geometrías de las comunas del ROI con la grilla completa para identificar solo las teselas (`grid_id`) que se intersectan con nuestra ROI.
3.  **Ingesta de Rásters:** Se construye una consulta SQL dinámica para leer desde PostGIS únicamente las teselas ráster que pertenecen a la ROI. **La función `ST_AsGDALRaster(rast, 'GTiff')` es fundamental aquí**, ya que convierte el tipo `raster` de PostGIS a un `byte array` en formato GeoTIFF, que Sedona puede interpretar.
4.  **Recargar:** La función `RS_FromGeoTiff` de Sedona convierte el `byte array` en un objeto ráster de Sedona, listo para ser procesado por las UDFs.

In [None]:
# 1. Definir la Región de Interés (ROI) por nombre de comuna
ROI = ['Ñiquén', 'Penco', 'Nacimiento', 'Laja', 'Tomé']
test_mode = False # Poner en True para limitar a 1 tesela para pruebas rápidas

# Filtrar las comunas de interés
target = df_comunas.filter(F.col('comuna').isin(ROI)).cache()

# 2. Join espacial para encontrar las teselas que se intersectan con el ROI
roi_grids = df_grid.alias('g').join(target.alias('c'), F.expr('ST_Intersects(g.geometria, c.geometria)')).select('g.grid_id', 'c.comuna_id', 'c.comuna')

if test_mode:
    print("MODO TEST: Limitando a 1 tesela.")
    roi_grids = roi_grids.limit(1)

roi_grids = roi_grids.cache()

# 3. Construir filtro SQL para leer solo los rásters necesarios
grid_ids = [str(r['grid_id']) for r in roi_grids.select('grid_id').distinct().collect()]
grid_filter = f"WHERE grid_id IN ({','.join(grid_ids)})" if grid_ids else "WHERE 1=0"
raster_query = f"(SELECT grid_id, year, ST_AsGDALRaster(rast, 'GTiff') as gdal_rast FROM uso_suelo.mapbiomas_raster {grid_filter}) AS sub"

# Leer los datos ráster crudos desde la base de datos
df_raster_raw = spark.read.jdbc(jdbc_url, raster_query, properties=jdbc_props)

# 4. Unir y recargar los rásters para crear el ODS final
df_raster_roi = df_raster_raw.join(roi_grids, on='grid_id', how='inner') \
    .select('grid_id', F.expr('RS_FromGeoTiff(gdal_rast)').alias('rast'), 'year', 'comuna').cache()
print(f'ODS Creado: {df_raster_roi.count()} teselas cargadas para las comunas {ROI}.')

### 5. Data Mart 1: Métrica de Composición de Cobertura
Esta métrica calcula el área total (en hectáreas) para cada clase de cobertura de suelo (agregada a Nivel 2 de la leyenda de MapBiomas) por comuna y año.

El flujo es el siguiente:
1.  Se aplica la UDF `get_composition` a cada tesela para obtener un mapa de `{id_clase: conteo_pixeles}`.
2.  Se usa `F.explode` para convertir el mapa en filas, creando una tabla larga (una fila por clase por tesela).
3.  Se agrupan los resultados por `comuna`, `año` y `id_clase` para sumar los píxeles.
4.  Se convierte el conteo de píxeles a hectáreas (asumiendo píxeles de 30x30m, 0.09 ha/píxel).
5.  Se hace un join con `df_clases` y se extrae el nombre de la clase de Nivel 2 del campo JSON para la agregación final.

In [None]:
print('Calculando Composición de Cobertura por Nivel 2...')
# 1. Aplicar UDF y explotar el mapa resultante
df_composition_base = df_raster_roi.select('year', 'comuna', F.explode(F.expr('get_composition(rast)')).alias('id_clase', 'px')) \
    .groupBy('year', 'comuna', 'id_clase').agg(F.sum('px').alias('total_px')) \
    .withColumn('area_ha', F.col('total_px') * 0.09) \
    .join(df_clases, on='id_clase', how='left')

# 2. Agregar a Nivel 2 de la leyenda
df_composition_n2 = df_composition_base.filter(F.get_json_object('clase', '$.nivel_2').isNotNull()) \
    .groupBy('comuna', 'year', F.get_json_object('clase', '$.nivel_2').alias('clase_n2')) \
    .agg(F.sum('total_px').alias('total_px'), F.sum('area_ha').alias('area_ha')) \
    .orderBy('comuna', 'year', 'clase_n2', F.desc('area_ha'))

print('Composición de Cobertura Nivel 2 calculada:')
display.display(df_composition_n2.toPandas())

### 6. Data Mart 2: Métrica de Fragmentación de Bosque Nativo
Esta sección calcula la fragmentación específicamente para las clases que componen el **Bosque Nativo (Nivel 3)**: Bosque Primario, Bosque Secundario y Bosque Achaparrado.

Las métricas calculadas son:
-   **Número de Parches (NP):** El número total de fragmentos de una clase.
-   **Tamaño Medio del Parche (MPS):** El área promedio de esos fragmentos.

El proceso es más complejo que el anterior porque se calcula una métrica para cada tipo de bosque por separado y luego se unifican los resultados:
1.  Se define la lista de IDs de clase para el bosque nativo.
2.  Se aplica la UDF `get_fragmentation` de forma dinámica para cada ID, generando columnas separadas para los resultados de cada tipo de bosque (ej. `frag_59`, `frag_60`).
3.  Se extraen las métricas (NP, MPS) de las columnas de tipo `Struct`.
4.  Se agrupan los resultados por comuna y año.
5.  Finalmente, se utiliza la función `stack` de Spark SQL para "des-pivotear" la tabla, convirtiendo las múltiples columnas de métricas en filas, lo que resulta en una tabla final limpia y normalizada.

#### 6.1. Fragmentación: Definición de Clases y Filtrado Temporal
<small>Se definen las clases de bosque nativo que serán el foco del análisis y se filtra el DataFrame para el rango de años de interés (2000-2024).</small>

In [None]:
print('Calculando Fragmentación para las clases de Bosque Nativo (Nivel 3)...')

# 1. Definir clases de interés (Nivel 3: Bosques Nativos)
nivel3_bosques_ids = [59, 60, 67] # 59: Primario, 60: Secundario, 67: Achaparrado
clase_names = {
    59: "Bosque Primario",
    60: "Bosque Secundario",
    67: "Bosque Achaparrado"
}
df_frag_cols = df_raster_roi.filter((F.col("year") >= 2000) & (F.col("year") <= 2024))
display.display(df_frag_cols.toPandas())

#### 6.2. Fragmentación: Aplicación Dinámica de UDF
<small>Para cada clase de bosque, se invoca la UDF de fragmentación. Esto crea una tabla ancha donde cada columna (`frag_59`, `frag_60`, etc.) contiene un `Struct` con las métricas NP y MPS para una clase específica.</small>

In [None]:
# 2. Crear una expresión de UDF para cada clase de bosque y aplicarlas en una sola pasada
frag_expressions = []
for target_id in nivel3_bosques_ids:
    target_classes_json = json.dumps([target_id])
    frag_expressions.append(F.expr(f"get_fragmentation(rast, '{target_classes_json}')").alias(f"frag_{target_id}"))

# Aplicar todas las UDFs en una sola pasada, creando una columna por métrica
df_frag_multi_col = df_frag_cols.select("comuna", "year", *frag_expressions)
display.display(df_frag_multi_col.toPandas())

#### 6.3. Fragmentación: Extracción de Métricas
<small>Una vez calculadas, las métricas anidadas dentro de los `Structs` se extraen a columnas de primer nivel para facilitar la posterior agregación (ej. `frag_59.NP` se convierte en una columna `NP_59`).</small>

In [None]:
# 3. Extraer las métricas del struct. Ej: df.select('frag_59.NP')
select_expr = ["comuna", "year"]
for target_id in nivel3_bosques_ids:
    select_expr.append(F.col(f"frag_{target_id}.NP").alias(f"NP_{target_id}"))
    select_expr.append(F.col(f"frag_{target_id}.MPS_px").alias(f"MPS_px_{target_id}"))
df_metrics_expanded = df_frag_multi_col.select(*select_expr)
display.display(df_metrics_expanded.toPandas())

#### 6.5. Fragmentación: Agregación por Comuna

<small>Con las métricas en columnas separadas, se realiza una agregación a nivel de comuna y año. El número de parches (NP) se suma, mientras que para el tamaño medio del parche (MPS) se calcula un promedio.</small>

In [None]:
# 4. Agregar los resultados (sumar NP, promediar MPS) por comuna y año
agg_expressions = []
for target_id in nivel3_bosques_ids:
    agg_expressions.append(F.sum(f"NP_{target_id}").alias(f"total_parches_{target_id}"))
    agg_expressions.append(F.round(F.avg(f"MPS_px_{target_id}"), 2).alias(f"tamano_medio_parche_px_{target_id}"))
    agg_expressions.append(F.sum(f"MPS_px_{target_id}").alias(f"Total_area_px_{target_id}"))

df_aggregated_metrics = df_metrics_expanded.groupBy("comuna", "year").agg(*agg_expressions)
print('Métricas de Fragmentación calculadas y agregadas:')
display.display(df_aggregated_metrics.toPandas())

#### 6.6. Fragmentación: Normalización y Formato Final
<small>El paso final consiste en usar la expresión `stack` de Spark SQL para des-pivotear la tabla (convertir las columnas de métricas en filas). Esto normaliza la estructura de datos, produciendo una tabla final limpia, con una fila por tipo de bosque, comuna y año.</small>

In [None]:
# 5. Usar 'stack' para des-pivotear y finalizar la tabla
stack_expr_parts = []
for target_id in nivel3_bosques_ids:
    stack_expr_parts.append(repr(clase_names[target_id]))
    stack_expr_parts.append(f"total_parches_{target_id}")
    stack_expr_parts.append(f"tamano_medio_parche_px_{target_id}")
    stack_expr_parts.append(f"Total_area_px_{target_id}")

stack_expression = f'stack({len(nivel3_bosques_ids)}, {", ".join(stack_expr_parts)}) as (clase, total_parches, tamano_medio_parche_px, Total_area_px_raw)'
df_final_frag = df_aggregated_metrics.selectExpr("comuna", "year", stack_expression)
df_final_frag = df_final_frag     .withColumn("area_total_ha", F.round(F.col("Total_area_px_raw") * 0.09, 2))     .select("comuna", "year", "clase", "total_parches", "tamano_medio_parche_px", "area_total_ha")     .orderBy("comuna", "year", "clase")
display.display(df_final_frag.toPandas())

### 7. Data Mart 3: Métrica de Transición de Cobertura
Esta métrica construye una matriz de cambio que muestra cómo ha cambiado la cobertura del suelo entre dos años (en este caso, 2000 y 2024). 

Pasos del proceso:
1.  Se filtran los datos del ODS para obtener dos DataFrames, uno para el año de inicio (`t1`) y otro para el año final (`t2`).
2.  Se realiza un `join` entre `t1` y `t2` usando `grid_id` y `comuna` como clave. Esto empareja cada tesela con su versión correspondiente en el otro año.
3.  Se aplica la UDF `get_transition` a cada par de teselas. La UDF codifica la transición de cada píxel en un número único (`origen * 100 + destino`).
4.  Se explota el mapa de transiciones y se agrupa para obtener el conteo total de píxeles para cada tipo de transición (`origen -> destino`).
5.  Se decodifican los IDs de clase de origen y destino.
6.  Se realiza un doble join con la tabla de clases para obtener los nombres de las clases de origen y destino, y se agregan los resultados a Nivel 2.

In [None]:
print('Calculando Transiciones de Cobertura (2000 -> 2024) por Nivel 2...')
# 1. Filtrar los años de inicio y fin
t1 = df_raster_roi.filter(F.col('year') == 2000).alias('t1')
t2 = df_raster_roi.filter(F.col('year') == 2024).alias('t2')

# 2. Unir las teselas por su ID, aplicar la UDF de transición, explotar y agregar los resultados
df_trans_base = t1.join(t2, on=['grid_id', 'comuna']) \
    .select("comuna", F.expr("get_transition(t1.rast, t2.rast)").alias("transicion_data")) \
    .select("comuna", F.explode("transicion_data").alias("codigo_transicion", "total_px_transicion")) \
    .groupBy("comuna", "codigo_transicion").agg(F.sum("total_px_transicion").alias("total_px")) \
    .withColumn("from_id_clase", (F.col("codigo_transicion") / 100).cast('integer')) \
    .withColumn("to_id_clase", (F.col("codigo_transicion") % 100).cast('integer')) \
    .withColumn("area_ha", F.round(F.col("total_px") * 0.09, 2)) \
    .join(df_clases.alias("clase_origen"), F.col("from_id_clase") == F.col("clase_origen.id_clase"), "left") \
    .join(df_clases.alias("clase_destino"), F.col("to_id_clase") == F.col("clase_destino.id_clase"), "left")

# 3. Agregar a Nivel 2 y mostrar
df_trans_n2 = df_trans_base.filter(
        F.get_json_object(F.col("clase_origen.clase"), '$.nivel_2').isNotNull() & \
        F.get_json_object(F.col("clase_destino.clase"), '$.nivel_2').isNotNull()
    ) \
    .groupBy(
        "comuna", 
        F.get_json_object(F.col("clase_origen.clase"), '$.nivel_2').alias("clase_origen"), 
        F.get_json_object(F.col("clase_destino.clase"), '$.nivel_2').alias("clase_transicion")
    ) \
    .agg(F.sum('total_px').alias('total_px_transicion'), F.sum('area_ha').alias('area_ha_transicion')) \
    .orderBy("comuna", F.desc("area_ha_transicion"))

display.display(df_trans_n2.toPandas())