# **Análisis exploratorio de calidad datos meteorológicos (AEMET)**

# Índice de Chequeos de Calidad de Datos

## A. Integridad estructural
- A.1 Verificación del esquema de datos
- A.2 Comprobación de tipos de datos
- A.3 Unicidad de la clave primaria (fecha + estación)

## B. Completitud
- B.1 Cobertura temporal (días presentes en el periodo 2017–2025)
- B.2 Cobertura espacial (estaciones del catálogo)
- B.3 Valores requeridos (campos obligatorios sin nulos)
- B.4 Cobertura de variables clave (temperatura, precipitación, etc.)
- B.5 Cobertura de variables opcionales

## C. Validez y rangos de valores
- C.1 Valores de temperatura en rangos razonables
- C.2 Consistencia entre temperaturas (mínima, máxima y media)
- C.3 Precipitación no negativa y códigos especiales correctos
- C.4 Viento: velocidad, racha y dirección en rangos válidos
- C.5 Presión atmosférica en rango válido
- C.6 Humedad relativa en rango válido
- C.7 Insolación en rango válido

## D. Consistencia temporal
- D.1 Formato y rango correcto de las horas
- D.2 Ausencia de fechas duplicadas por estación
- D.3 Validez de valores acumulados (ej. precipitaciones)

## E. Coherencia espacial
- E.1 Coherencia de altitud con el catálogo de estaciones

## F. Homogeneidad y representatividad
- F.1 Distribución regional de las estaciones activas
- F.2 Balance temporal de la cobertura de datos



## Uso de Apache Spark para el análisis exploratorio

El análisis exploratorio de los datos meteorológicos requiere procesar un volumen elevado de información: **2.848.554 registros** correspondientes a observaciones diarias de múltiples estaciones a lo largo de un periodo de casi nueve años.
Ante este escenario, resulta fundamental seleccionar una tecnología que garantice eficiencia, escalabilidad y capacidad de integración con el resto de la arquitectura del *Data Lake*.

Se ha optado por emplear **Apache Spark** en lugar de alternativas como **DuckDB**.

In [5]:
import os, sys, tempfile, pathlib, shutil, glob, subprocess

# --- Java 11 (ajusta si lo tienes en otra ruta) ---
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home"
os.environ["PATH"] = os.path.join(os.environ["JAVA_HOME"], "bin") + os.pathsep + os.environ.get("PATH","")

# --- Red/IPv4 y limpieza básica ---
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["PYSPARK_PYTHON"] = sys.executable
for k in ["PYSPARK_SUBMIT_ARGS", "SPARK_SUBMIT_OPTS"]:
    os.environ.pop(k, None)

# --- Directorio local para Spark (evita KeyError) ---
base_tmp = pathlib.Path(tempfile.gettempdir()) / "spark-local"
base_tmp.mkdir(parents=True, exist_ok=True)
# Si quieres forzar una ruta concreta dentro del proyecto, cámbiala aquí:
# base_tmp = pathlib.Path("/Users/inessabate/PycharmProjects/climascan-general/.spark_local"); base_tmp.mkdir(parents=True, exist_ok=True)

# Lo dejamos disponible como variable de entorno (opcional)
os.environ["SPARK_LOCAL_DIRS"] = str(base_tmp)

# Limpia restos antiguos de Spark (opcional, prudente)
for p in glob.glob("/tmp/spark-*"):
    shutil.rmtree(p, ignore_errors=True)

print("JAVA_HOME ->", os.environ["JAVA_HOME"])
print("SPARK_LOCAL_DIRS ->", os.environ["SPARK_LOCAL_DIRS"])
print(subprocess.run(["java","-version"], capture_output=True, text=True).stderr.strip())

JAVA_HOME -> /Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home
SPARK_LOCAL_DIRS -> /var/folders/1p/hh63llf17gqcbswv4kcb9tbr0000gn/T/spark-local
openjdk version "11.0.27" 2025-04-15
OpenJDK Runtime Environment Homebrew (build 11.0.27+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.27+0, mixed mode)


In [6]:
from pyspark.sql import SparkSession
import os

# Mostrar el comando de lanzamiento por si fallase
os.environ["SPARK_PRINT_LAUNCH_COMMAND"] = "1"

# Si tuviste una sesión previa en este kernel, intenta pararla
try:
    spark.stop()
except:
    pass

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("smoke-min")
    # Enlaces a loopback para evitar líos de red
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    # Evitar conflictos de puertos
    .config("spark.ui.enabled", "false")
    .config("spark.blockManager.port", "0")
    .config("spark.driver.port", "0")
    # Forzar IPv4
    .config("spark.driver.extraJavaOptions", "-Djava.net.preferIPv4Stack=true")
    .config("spark.executor.extraJavaOptions", "-Djava.net.preferIPv4Stack=true")
    # Usar el directorio local que hemos creado (evita el KeyError)
    .config("spark.local.dir", os.environ["SPARK_LOCAL_DIRS"])
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print("Spark OK:", spark.version)
spark.range(1).show()
spark.stop()

25/08/25 22:27:10 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


Spark OK: 3.5.1
+---+
| id|
+---+
|  0|
+---+



In [7]:
from pyspark.sql import SparkSession
import os

DELTA_VERSION = "3.2.0"  # Spark 3.5.x + Scala 2.12

try:
    spark.stop()
except:
    pass

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("EDA_Delta")
    .config("spark.driver.bindAddress","127.0.0.1")
    .config("spark.driver.host","127.0.0.1")
    .config("spark.ui.enabled","false")
    .config("spark.blockManager.port","0")
    .config("spark.driver.port","0")
    .config("spark.driver.extraJavaOptions","-Djava.net.preferIPv4Stack=true")
    .config("spark.executor.extraJavaOptions","-Djava.net.preferIPv4Stack=true")
    .config("spark.local.dir", os.environ["SPARK_LOCAL_DIRS"])
    # Delta Lake
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.jars.packages", f"io.delta:delta-spark_2.12:{DELTA_VERSION}")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print("Spark OK:", spark.version)
print("Delta:", spark.conf.get("spark.sql.extensions"), "|", spark.sparkContext.getConf().get("spark.jars.packages"))

Spark OK: 3.5.1
Delta: io.delta.sql.DeltaSparkSessionExtension | io.delta:delta-spark_2.12:3.2.0


25/08/25 22:27:24 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


## Configuración inicial:
 1. Creación de sesión Spark
2. Obtención del dataset

In [12]:
# --- Resolver Java 11 dentro del kernel (robusto en macOS/ARM) ---
import os, sys, subprocess, shutil, pathlib

def pick_java11():
    # 1) Intenta usar el selector del sistema
    try:
        jhome = subprocess.check_output(
            ["/usr/libexec/java_home", "-v", "11"],
            stderr=subprocess.STDOUT
        ).decode().strip()
        if pathlib.Path(jhome).exists():
            return jhome
    except Exception:
        pass
    # 2) Fallback a Homebrew OpenJDK 11 (ruta que ya comprobaste que existe)
    hb = "/opt/homebrew/Cellar/openjdk@11/11.0.27/libexec/openjdk.jdk/Contents/Home"
    if pathlib.Path(hb).exists():
        return hb
    raise RuntimeError("No encuentro Java 11. Ajusta la ruta a tu JDK 11.")

JAVA_HOME = pick_java11()
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PATH"] = os.path.join(JAVA_HOME, "bin") + os.pathsep + os.environ.get("PATH","")
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["PYSPARK_PYTHON"] = sys.executable

print("JAVA_HOME:", os.environ["JAVA_HOME"])
print("java on PATH:", shutil.which("java"))

# Verificación rápida
print(subprocess.run(["java","-version"], capture_output=True, text=True).stderr.strip())

JAVA_HOME: /opt/homebrew/Cellar/openjdk@11/11.0.27/libexec/openjdk.jdk/Contents/Home
java on PATH: /opt/homebrew/Cellar/openjdk@11/11.0.27/libexec/openjdk.jdk/Contents/Home/bin/java
openjdk version "11.0.27" 2025-04-15
OpenJDK Runtime Environment Homebrew (build 11.0.27+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.27+0, mixed mode)


In [None]:

from pyspark.sql import SparkSession

# Cierra sesión previa si existiera
try:
    spark.stop()
except Exception:
    pass

# Crea sesión

import os
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

builder = (
    SparkSession.builder
    .master("local[*]")
    .appName("EDA_aemet_landing")
    .config("spark.driver.bindAddress","127.0.0.1")
    .config("spark.driver.host","127.0.0.1")
    .config("spark.ui.enabled","false")
    .config("spark.blockManager.port","0")
    .config("spark.driver.port","0")
    .config("spark.sql.shuffle.partitions","200")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("Spark:", spark.version)

spark.sparkContext.setLogLevel("WARN")
print("Spark:", spark.version)
print("Driver host:", spark.sparkContext.getConf().get("spark.driver.host"))
print("Packages:", spark.sparkContext.getConf().get("spark.jars.packages"))

In [9]:
# Obtección del dataset de observaciones diarias
from pathlib import Path

# Root del proyecto: subir dos niveles desde notebooks/01_data_analytics/
PROJECT_ROOT = Path().resolve().parents[1]
LANDING_PATH = PROJECT_ROOT / "data" / "landing" / "aemet_deltalake"

try:
    df_meteo = spark.read.format("delta").load(str(LANDING_PATH))   # si hay _delta_log
except Exception:
    df_meteo = spark.read.parquet(str(LANDING_PATH / "*.parquet"))  # fallback parquet

# Obtencion del dataset de estaciones
CATALOG_PATH = PROJECT_ROOT / "data" / "landing" / "aemet"
df_estaciones = spark.read.parquet(str(CATALOG_PATH / "*.parquet"))

In [10]:
# Size of df_meteo
print((df_meteo.count(), len(df_meteo.columns)))

25/08/25 22:28:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

(2682447, 26)


In [11]:

df_meteo.show(5)

+----------+----------+--------------------+--------------------+-------+----+----+----+--------+----+--------+-----+---------+-----+---------+-------+----+--------+-----+---------+-------+-----------+-------+-----------+----+----+
|     fecha|indicativo|              nombre|           provincia|altitud|tmed|prec|tmin|horatmin|tmax|horatmax|hrMax|horaHrMax|hrMin|horaHrMin|hrMedia| dir|velmedia|racha|horaracha|presMax|horaPresMax|presMin|horaPresMin| sol|year|
+----------+----------+--------------------+--------------------+-------+----+----+----+--------+----+--------+-----+---------+-----+---------+-------+----+--------+-----+---------+-------+-----------+-------+-----------+----+----+
|2017-11-25|     C453I| CITFAGRO_88_GAITERO|STA. CRUZ DE TENE...|   1744|11,4| 0,0| 9,8|   18:00|13,1|  Varias|   91|    17:10|   51|    10:50|     62|NULL|    NULL| NULL|     NULL|   NULL|       NULL|   NULL|       NULL|NULL|2017|
|2017-11-25|     7250C|            ABANILLA|              MURCIA|    174

In [12]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Size de df_estaciones
print((df_estaciones.count(), len(df_estaciones.columns)))

# Numero de estaciones (indicativo)
print("Número de estaciones:", df_estaciones.select("indicativo").distinct().count())

# Duplcados en catálogo
dups_cat = (df_estaciones
    .groupBy("indicativo")
    .agg(F.count("*").alias("n"))
    .filter(F.col("n") > 1)
    .orderBy(F.desc("n"))
)
print("Número de estaciones duplicadas en catálogo:", dups_cat.count())

(947, 7)
Número de estaciones: 947
Número de estaciones duplicadas en catálogo: 0


In [13]:
df_estaciones.show(5)

+-------+-------------+-------+----------+-------------------+--------+--------+
|latitud|    provincia|altitud|indicativo|             nombre|indsinop|longitud|
+-------+-------------+-------+----------+-------------------+--------+--------+
|394924N|ILLES BALEARS|    490|     B013X|      ESCORCA, LLUC|   08304| 025309E|
|394744N|ILLES BALEARS|      5|     B051A|     SÓLLER, PUERTO|   08316| 024129E|
|393446N|ILLES BALEARS|     52|     B103B| ANDRATX - SANT ELM|        | 022208E|
|393305N|ILLES BALEARS|     50|     B158X|CALVIÀ, ES CAPDELLÀ|        | 022759E|
|394121N|ILLES BALEARS|     60|     B087X|        BANYALBUFAR|        | 023046E|
+-------+-------------+-------+----------+-------------------+--------+--------+
only showing top 5 rows



In [14]:
# Esquema según metadata (id + data type)
expected_fields = [
    ("fecha",      "string"),  # AAAA-MM-DD (string en origen)
    ("indicativo", "string"),
    ("nombre",     "string"),
    ("provincia",  "string"),
    ("altitud",    "float"),
    ("tmed",       "float"),
    ("prec",       "float"),   # puede venir con 'Ip' o 'Acum' (string)
    ("tmin",       "float"),
    ("horatmin",   "string"),  # HH:MM (UTC) - se valida en D.1
    ("tmax",       "float"),
    ("horatmax",   "string"),
    ("dir",        "float"),   # decenas de grado; 88/99 especiales
    ("velmedia",   "float"),
    ("racha",      "float"),
    ("horaracha",  "string"),
    ("sol",        "float"),
    ("presmax",    "float"),
    ("horapresmax","string"),
    ("presmin",    "float"),
    ("horapresmin","string"),
    ("hrmedia",    "float"),
    ("hrmax",      "float"),
    ("horahrmax",  "string"),
    ("hrmin",      "float"),
    ("horahrmin",  "string"),
]

# Normalización de nombres de columnas en df_meteo a minúsculas para comparar con metadata
def to_lower_cols(df):
    for c in df.columns:
        df = df.withColumnRenamed(c, c.lower())
    return df

df_meteo_norm = to_lower_cols(df_meteo)

# Mapeo de alias frecuentes (tu muestra tenía hrMax, horaHrMax, etc.)
alias_map = {
    "hrmax": "hrmax",
    "horahrmax": "horahrmax",
    "hrmin": "hrmin",
    "horahrmin": "horahrmin",
    "hrmedia": "hrmedia",
    "presmax": "presmax",
    "horapresmax": "horapresmax",
    "presmin": "presmin",
    "horapresmin": "horapresmin",
}
# Detecta y renombra columnas camelCase si están presentes (ej.: 'hrMax' -> 'hrmax').
for camel, target in [("hrmax","hrmax"), ("horahrmax","horahrmax"),
                      ("hrmin","hrmin"), ("horahrmin","horahrmin"),
                      ("hrmedia","hrmedia"), ("presmax","presmax"),
                      ("horapresmax","horapresmax"), ("presmin","presmin"),
                      ("horapresmin","horapresmin")]:
    # busca variantes camel en df_meteo original para renombrar
    for col in df_meteo.columns:
        if col.lower() == camel and col != camel:
            df_meteo_norm = df_meteo_norm.withColumnRenamed(col.lower(), target)
            print(f"Renombrada columna {col} a {target}")

# 4) Listas de columnas esperadas / requeridas (según metadata)
expected_cols = [c for c, _ in expected_fields]
required_cols = ["fecha", "indicativo", "nombre", "provincia", "altitud"]  # requeridos = true

Renombrada columna hrMax a hrmax
Renombrada columna horaHrMax a horahrmax
Renombrada columna hrMin a hrmin
Renombrada columna horaHrMin a horahrmin
Renombrada columna hrMedia a hrmedia
Renombrada columna presMax a presmax
Renombrada columna horaPresMax a horapresmax
Renombrada columna presMin a presmin
Renombrada columna horaPresMin a horapresmin


# A. Integridad estructural

**Motivación.** Garantizar que la estructura del dataset se ajusta a la especificación: campos, tipos y claves.

### A.1 Verificación del esquema de datos
Comprobar que todos los campos definidos en la metadata están presentes en los Parquet de *landing* y no existen campos inesperados.




In [None]:
# Columnas presentes en df
present_cols = [c.lower() for c in df_meteo_norm.columns]

# Faltantes respecto a metadata
missing_from_df = [c for c in expected_cols if c not in present_cols]

# Extras (presentes pero no esperadas)
extra_in_df = [c for c in present_cols if c not in expected_cols]

print("A.1 Esquema:")
print(" - Columnas esperadas:", len(expected_cols))
print(" - Presentes en df_meteo:", len(present_cols))
print(" - Faltantes respecto a metadata:", missing_from_df)
print(" - Extras no contempladas en metadata:", extra_in_df)


Todas las columnas del esperadas están presentes en el dataset. Es correcto que la columna year no se encontrase en la metadata del dataset, ya que se ha añadido al ingestar los datos en landing.

### A.2 Comprobación de tipos de datos
Validar que los tipos de las columnas coinciden con los esperados (fechas como texto/fecha, magnitudes físicas como `float`).


In [None]:
# A.2 · Tipos de datos (diagnóstico de castabilidad) — con totales y porcentajes
from pyspark.sql import functions as F
from pyspark.sql import types as T

def type_check_report(df, spec):
    total = df.count()  # denominador común para porcentajes
    rows = []
    for col, typ in spec:
        if col not in df.columns:
            rows.append((col, typ, "columna no presente", None, None, total, None, None, None))
            continue

        if typ == "float":
            s = F.col(col)
            s_norm = F.regexp_replace(F.col(col).cast("string"), ",", ".")
            casted = s_norm.cast(T.DoubleType())

            agg = (df
                   .select(
                       F.sum(F.when(s.isNull(), 1).otherwise(0)).alias("n_nulls"),
                       F.sum(F.when((s.isNotNull()) & (casted.isNull()), 1).otherwise(0)).alias("n_cast_fail")
                   )
                   .first())
            n_nulls = int(agg["n_nulls"])
            n_cast_fail = int(agg["n_cast_fail"])
            n_valid_non_null = total - n_nulls - n_cast_fail  # no nulo y casteable

            rows.append((
                col, "float", "transformar a float",
                n_cast_fail, n_nulls, total,
                round(100.0 * n_nulls / total, 4),
                round(100.0 * n_cast_fail / total, 4),
                n_valid_non_null
            ))

        elif typ == "string":
            n_nulls = df.select(F.sum(F.when(F.col(col).isNull(), 1).otherwise(0)).alias("n")).first()["n"]
            rows.append((
                col, "string", "nulls",
                None, int(n_nulls), total,
                round(100.0 * n_nulls / total, 4),
                None,
                total - int(n_nulls)  # no nulos (válidos para string)
            ))

        else:
            n_nulls = df.select(F.sum(F.when(F.col(col).isNull(), 1).otherwise(0)).alias("n")).first()["n"]
            rows.append((
                col, typ, "nulls",
                None, int(n_nulls), total,
                round(100.0 * n_nulls / total, 4),
                None,
                total - int(n_nulls)
            ))

    schema_chk_df = spark.createDataFrame(rows, [
        "Columna", "Tipo esperado", "Check aplicado",
        "Filas cast falla", "Nº nulos", "Nº valores totales",
        "% nulos", "% cast falla", "Nº no nulos válidos"
    ])
    return schema_chk_df

type_report = type_check_report(df_meteo_norm, expected_fields)
type_report.orderBy(F.col("% nulos").desc_nulls_last(), F.col("% cast falla").desc_nulls_last()).show(truncate=False)

# Tipo esperado: data type segun metadata
# Check aplicado: test aplicado. Para float, se intenta castear cambiando comas por puntos. Para string, solo se cuentan los valores nulos.
# Nulos: nº de NULL en cada variable en landing.
# Filas cast falla: nº de filas no nulas en origen que no se han podido castear como float tras cambiar coma por punto.

In [None]:
# Diagnóstico detallado de la columna 'prec' (precipitación), tiene muchos fallos de casteo a float
from pyspark.sql import functions as F, types as T

# Normalizamos la coma por punto y casteamos a double
prec_casted = F.regexp_replace(F.col("prec").cast("string"), ",", ".").cast(T.DoubleType())

# Filas que NO se pueden castear (fallan -> cast es NULL, pero la columna original no era NULL)
df_prec_fail = (df_meteo_norm
    .withColumn("prec_casted", prec_casted)
    .filter((F.col("prec").isNotNull()) & (F.col("prec_casted").isNull()))
)

print("Número de filas que fallan el cast:", df_prec_fail.count())
df_prec_fail.select("fecha", "indicativo", "prec").show(truncate=False)

# Filas que SÍ se pueden castear (prec original no nulo y prec_casted no nulo)
df_prec_ok = (df_meteo_norm
    .withColumn("prec_casted", prec_casted)
    .filter((F.col("prec").isNotNull()) & (F.col("prec_casted").isNotNull()))
)

print("Número de filas casteables:", df_prec_ok.count())
df_prec_ok.select("fecha", "indicativo", "prec", "prec_casted").show(truncate=False)

Variables muy incompletas:
- sol (insolación) falta en el 83% de las filas.
- presión (presmin, presmax y horas asociadas) falta en torno al 75%.
- viento (dir, velmedia, racha, horaracha) falta en un 20%.

Variables más críticas (temperaturas y precipitación) están mucho más completas:
- tmin, tmax, tmed sólo tienen ~2% de nulos.
- prec tiene un 2,7% de nulos y un 0,6% de registros no numéricos.
- Los registros no numéricos en prec corresponden a valores 'Ip'. Segun la metadata de AEMET, significa precipitación inferior a 0.1 mm.

Nota sobre 'Ip' en precipitación:
EStimación: Valor "Ip" debe transformarse a un valor numérico coherente a 0.0 mm




### A.3 Unicidad de la clave primaria (fecha + estación)
Garantizar un único registro por combinación `(fecha, indicativo)`.


In [None]:
print("A.3  Unicidad de fecha + indicativo:")

# Normalizamos clave a minúsculas por seguridad
key_cols = ["fecha", "indicativo"]
for k in key_cols:
    assert k in df_meteo_norm.columns, f"Falta columna clave: {k}"

dups_df = (df_meteo_norm
           .groupBy([F.col(k) for k in key_cols])
           .agg(F.count("*").alias("n"))
           .filter(F.col("n") > 1))

n_dups = dups_df.count()
print(f" - Registros con clave duplicada: {n_dups}")

if n_dups > 0:
    display(dups_df.orderBy(F.desc("n")))

No hay duplicados en la clave primaria (fecha + indicativo). Es decir, para cada estación y día, hay un único registro.

# B. Completitud

**Motivación.** La ausencia de datos reduce el poder explicativo y puede introducir sesgos. Medimos completitud por días, por estaciones y por variables.



In [None]:
from pyspark.sql import functions as F

# Parámetros del periodo de estudio
DATE_START = "2017-01-01"
DATE_END   = "2025-06-30"


# Fechas como date
df_meteo_norm = df_meteo_norm.withColumn("fecha_date", F.to_date("fecha"))

# Calendario completo (un día por fila)
days = (F.sequence(F.to_date(F.lit(DATE_START)), F.to_date(F.lit(DATE_END))))
df_calendar = (spark.range(1)
    .select(F.explode(days).alias("fecha_date")))

TOTAL_DAYS = df_calendar.count()

# Helper: condición "informado" para variables de texto/numéricas (no NULL y no cadena vacía)
def informed(colname):
    return (F.col(colname).isNotNull()) & (F.length(F.trim(F.col(colname))) > 0)

### B.1 Cobertura temporal (2017–2025)
Verificar que existen registros para todos los días del periodo objetivo, o documentar los huecos.

In [None]:
# Días presentes en observaciones
df_days_present = df_meteo_norm.select("fecha_date").distinct()

# % días presentes
n_days_present = df_days_present.count()
pct_days_present = round(100.0 * n_days_present / TOTAL_DAYS, 2)

print(f"B.1 Cobertura temporal: {n_days_present}/{TOTAL_DAYS} días ({pct_days_present}%)")

# Fechas faltantes
df_missing_days = df_calendar.join(df_days_present, on="fecha_date", how="left_anti")

# Listado de huecos por año-mes (para documentar en la memoria)
df_missing_by_month = (df_missing_days
    .withColumn("year", F.year("fecha_date"))
    .withColumn("month", F.date_format("fecha_date","yyyy-MM"))
    .groupBy("year","month").agg(F.count("*").alias("dias_faltantes"))
    .orderBy("year","month"))

df_missing_by_month.show(truncate=False)


Faltan datos de todas las estaciones para el día 2024-01. El resto de días comprendidos entre 2017-01-01 y 2025-06-30 tienen al menos un registro.

In [None]:
from pyspark.sql import functions as F

# B.1 Cobertura temporal (optimizada: evita countDistinct costoso)
df_daily = (df_meteo_norm
    .select("fecha_date", "indicativo")
    .dropna(subset=["fecha_date", "indicativo"])
    .dropDuplicates(["fecha_date", "indicativo"])   # evita countDistinct, más eficiente
    .groupBy("fecha_date").agg(F.count("*").alias("n_registros"))
    .orderBy("fecha_date"))

# Cacheamos el resultado para reutilizar sin recalcular
df_daily.cache().count()  # fuerza la materialización en memoria


# Vista rápida de los primeros días
df_daily.show(10)

Con el fin de identificar si los días con menor cobertura están aislados o concentrados en tramos concretos:

Se identifica que a medida que avanza el tiempo la cobertura es mayor. Esto puede deberse a que haya estaciones que se hayan incorporado a la red de estaciones posteriormente a enero de 2017.

In [None]:
# Serie temporal de nº de registros por día con Altair (colores azules + líneas horizontales extra)
import altair as alt
import pandas as pd

# Convertir el df_daily ya calculado y cacheado a Pandas

# Solo calcula si no existe ya en memoria (evita recomputar)
if "pdf_daily" not in globals() or pdf_daily is None:
    pdf_daily = df_daily.toPandas().sort_values("fecha_date")
    pdf_daily["fecha_date"] = pd.to_datetime(pdf_daily["fecha_date"])
    pdf_daily["roll7"] = pdf_daily["n_registros"].rolling(7, min_periods=1, center=True).mean()
else:
    print("Reutilizando pdf_daily ya materializado en memoria")

# Constantes
EXPECTED_STATIONS = 947
EXPECTED_STATIONS_ACTIVE = 918

# Altair setup
alt.data_transformers.disable_max_rows()
brush = alt.selection_interval(encodings=["x"])  # Altair >= 5

# Línea base (nº de registros por día)
line_base = alt.Chart(pdf_daily).mark_line(color="steelblue").encode(
    x=alt.X("fecha_date:T", title="Fecha"),
    y=alt.Y("n_registros:Q", title="Nº de registros (estaciones reportando)")
)

# Línea suavizada (media móvil 7d)
line_roll = alt.Chart(pdf_daily).mark_line(color="darkblue", strokeDash=[4,2]).encode(
    x="fecha_date:T",
    y="roll7:Q",
    tooltip=[
        alt.Tooltip("fecha_date:T", title="Fecha"),
        alt.Tooltip("n_registros:Q", title="Nº registros"),
        alt.Tooltip("roll7:Q", title="Media móvil 7d", format=".1f")
    ]
).transform_filter(brush)

# Líneas horizontales de referencia
rule_target = alt.Chart(pd.DataFrame({"y": [EXPECTED_STATIONS]})).mark_rule(color="blue").encode(y="y:Q")
rule_active = alt.Chart(pd.DataFrame({"y": [EXPECTED_STATIONS_ACTIVE]})).mark_rule(color="#1f77b4").encode(y="y:Q")

# Detalle arriba
detail = (line_base.transform_filter(brush) + line_roll + rule_target + rule_active).properties(height=300)

# Overview abajo (área con tono azul claro)
overview = alt.Chart(pdf_daily).mark_area(opacity=0.4, color="lightblue").encode(
    x=alt.X("fecha_date:T", title="Arrastra para seleccionar un rango"),
    y="n_registros:Q"
).properties(height=80).add_params(brush)

(detail & overview).properties(
    title="Registros diarios por fecha (con líneas horizontales de referencia en azul)"
)

### B.2 Cobertura espacial
Comprobar que las estaciones del catálogo aparecen y con actividad mínima.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark import StorageLevel

# 1) Reduce a claves y deduplica UNA vez (fecha, estación)
df_keys = (df_meteo_norm
    .select("fecha_date", "indicativo")
    .dropna(subset=["fecha_date","indicativo"])
    .dropDuplicates(["fecha_date","indicativo"])
    .persist(StorageLevel.MEMORY_ONLY)
)
_ = df_keys.count()  # materializa en memoria

# 2) Estaciones del catálogo (pequeño -> broadcast)
df_cat_est = df_estaciones.select("indicativo").distinct()
from pyspark.sql.functions import broadcast

n_est_cat = df_cat_est.count()

# 3) Estaciones observadas (ya deduplicadas)
df_est_obs = df_keys.select("indicativo").distinct().persist(StorageLevel.MEMORY_ONLY)
n_est_obs = df_est_obs.count()

pct_est_activas = round(100.0 * n_est_obs / n_est_cat, 2) if n_est_cat else None
print(f"B.2 Cobertura espacial: {n_est_obs}/{n_est_cat} estaciones con datos ({pct_est_activas}%)")

# 4) Estaciones del catálogo SIN datos (anti-join con broadcast para evitar shuffle grande)
df_est_sin_datos = broadcast(df_cat_est).join(df_est_obs, on="indicativo", how="left_anti")

df_est_sin_datos.show(truncate=False)

Hay 29 estaciones del catálogo sin ningún registro en el periodo 2017-2025.
<br>
El resto de estaciones tienen al menos un registro.


In [None]:
# 5) Días reportados por estación (usa df_keys ya deduplicado: evita countDistinct)
df_days_per_station = (df_keys
    .groupBy("indicativo")
    .agg(F.count("*").alias("dias_reportados"))
    .persist(StorageLevel.MEMORY_ONLY)
)
# Vista rápida sin ordenar globalmente (evita shuffle extra):
df_days_per_station.orderBy(F.col("dias_reportados").asc()).show(30)


### B.3 Valores requeridos (sin nulos): fecha, indicativo, nombre, provincia, altitud
Asegurar que `fecha`, `indicativo`, `nombre`, `provincia`, `altitud` no presentan nulos.

In [None]:
from pyspark.sql import functions as F, types as T

required_cols = ["fecha", "indicativo", "nombre", "provincia", "altitud"]

def not_informed(c):
    return (F.col(c).isNull()) | (F.length(F.trim(F.col(c))) == 0)

df_req = df_meteo_norm.select(*required_cols)
total_rows = df_req.count()

rows = []
for c in required_cols:
    n_null = df_req.filter(F.col(c).isNull()).count()
    n_empty = df_req.filter((F.col(c).isNotNull()) & (F.length(F.trim(F.col(c))) == 0)).count()
    n_bad  = n_null + n_empty
    completeness = round(100.0 * (total_rows - n_bad) / total_rows, 4)
    rows.append((c, total_rows, n_null, n_empty, n_bad, completeness))

df_b3 = spark.createDataFrame(rows, [
    "columna", "n_total", "n_nulls", "n_vacios", "n_incompletos", "pct_completitud"
])

print("B.3 Completitud de campos obligatorios")
df_b3.orderBy(F.col("pct_completitud").asc()).show()

Los campos obligatorios segun la metadata no contienen nulos.

In [None]:
df_b3.orderBy(F.col("pct_completitud").asc())

### B.4 Cobertura de variables clave por estación (tmax, tmin, tmed, prec)
Para `tmax`, `tmin`, `tmed`, `prec`, calcular el **% de días informados** por estación.


In [None]:
# B.4  Cobertura de variables clave en general
from pyspark.sql import functions as F
key_vars = ["tmax", "tmin", "tmed", "prec", "racha"]
rows = []
for v in key_vars:
    n_informados = df_meteo_norm.filter(informed(v)).count()
    pct = round(100.0 * n_informados / total_rows, 4)
    rows.append((v, n_informados, total_rows, pct))

df_key_overall = spark.createDataFrame(rows, ["Variable", "Nº informados", "Nº total", "% informado"])

print("B.4 Cobertura de variables clave (global):")
df_key_overall.orderBy(F.col("% informado").desc()).show()

In [None]:
# B.4  Cobertura de variables clave por estación
from pyspark.sql import functions as F

key_vars = ["tmax", "tmin", "tmed", "prec"]

# Dataset reducido a campos necesarios
df_keys = df_meteo_norm.select("indicativo", "fecha_date", *key_vars)

# Total de días por estación (sirve de denominador)
df_totals = df_keys.groupBy("indicativo").agg(F.count("*").alias("n_total"))

# Cálculo de informados y nulos por variable
agg_exprs = []
for v in key_vars:
    agg_exprs.append(F.sum(F.when(F.col(v).isNotNull() & (F.length(F.trim(F.col(v).cast("string"))) > 0), 1).otherwise(0)).alias(f"{v}_informados"))
    agg_exprs.append(F.sum(F.when(F.col(v).isNull() | (F.length(F.trim(F.col(v).cast("string"))) == 0), 1).otherwise(0)).alias(f"{v}_nulos"))

df_cov = df_keys.groupBy("indicativo").agg(*agg_exprs)

# Unimos con total de días por estación
df_cov = df_cov.join(df_totals, on="indicativo", how="left")

# Añadimos % de completitud
for v in key_vars:
    df_cov = df_cov.withColumn(f"{v}_pct", F.round(100.0 * F.col(f"{v}_informados") / F.col("n_total"), 2))

# Copbertura de valores clave por estación
df_cov.orderBy(F.col("tmax_pct").asc()).show(truncate=False)

In [None]:
# === Métricas de cobertura: globales y por estación ===
from pyspark.sql import functions as F

# Ajusta esta lista si quieres incluir/excluir variables
key_vars = ["tmax", "tmin", "tmed", "prec", "racha"]

# --- 0) Dataset base ---
# Usamos df_meteo_norm si existe; si no, intentamos usar df (original)
try:
    _ = df_meteo_norm
    base = df_meteo_norm
except NameError:
    base = df

# Nos quedamos con columnas necesarias; si 'fecha' está normalizada en 'fecha_date', no es imprescindible aquí
cols_presentes = [c for c in ["indicativo"] + key_vars if c in base.columns]
if "indicativo" not in cols_presentes:
    raise ValueError("No se encontró la columna 'indicativo' en el DataFrame. Es necesaria para métricas por estación.")
df_keys = base.select(*cols_presentes)

# Función "está informado": no nulo y no cadena vacía (por si los campos aún son string)
def informed(colname):
    col = F.col(colname)
    return F.when(col.isNotNull() & (F.length(F.trim(col.cast("string"))) > 0), 1).otherwise(0)

# --- 1) Cobertura global por variable ---
n_total = df_keys.count()

agg_exprs_global = []
for v in key_vars:
    if v in df_keys.columns:
        agg_exprs_global.append(F.sum(informed(v)).alias(f"{v}_informados"))

global_counts = df_keys.agg(*agg_exprs_global)

# Construimos tabla global “larga”
rows = []
row = global_counts.collect()[0].asDict()
for v in key_vars:
    if f"{v}_informados" in row:
        n_inf = int(row[f"{v}_informados"])
        rows.append((v, n_inf, n_total, n_total - n_inf, 100.0 * n_inf / n_total, 100.0 * (n_total - n_inf) / n_total))
global_df = spark.createDataFrame(rows, ["variable", "n_informados", "n_total", "n_nulos", "pct_informado", "pct_nulo"])

print("=== Cobertura global por variable ===")
global_df.orderBy(F.col("pct_informado").desc()).show(truncate=False)

In [None]:
# --- Cobertura por estación (distribución y percentiles) ---
# Conteo por estación (denominador)
by_station_total = df_keys.groupBy("indicativo").agg(F.count(F.lit(1)).alias("n_total_est"))

# Conteo informados por estación y variable
agg_exprs_station = []
for v in key_vars:
    if v in df_keys.columns:
        agg_exprs_station.append(F.sum(informed(v)).alias(f"{v}_inf"))
by_station_inf = df_keys.groupBy("indicativo").agg(*agg_exprs_station)

# Une y calcula % por estación/variable
by_station = by_station_inf.join(by_station_total, on="indicativo", how="inner")
for v in key_vars:
    if f"{v}_inf" in by_station.columns:
        by_station = by_station.withColumn(f"{v}_pct", F.round(100.0 * F.col(f"{v}_inf") / F.col("n_total_est"), 4))

# Métricas agregadas por variable sobre la distribución por estación
# percentiles aproximados para velocidad
def pct_expr(colname, p):
    return F.expr(f"percentile_approx({colname}, {p})")

stats_rows = []
estaciones_total = by_station.select("indicativo").distinct().count()

for v in key_vars:
    pct_col = f"{v}_pct"
    if pct_col in by_station.columns:
        dist = by_station.select(pct_col).where(F.col(pct_col).isNotNull())
        # percentiles
        p25, p50, p75 = dist.select(
            pct_expr(pct_col, 0.25).alias("p25"),
            pct_expr(pct_col, 0.50).alias("p50"),
            pct_expr(pct_col, 0.75).alias("p75"),
        ).collect()[0]
        # min/max
        minv, maxv = dist.agg(F.min(pct_col), F.max(pct_col)).collect()[0]
        # estaciones por umbral
        ge95 = by_station.where(F.col(pct_col) >= 95).count()
        ge90 = by_station.where(F.col(pct_col) >= 90).count()
        ge80 = by_station.where(F.col(pct_col) >= 80).count()

        stats_rows.append((
            v, estaciones_total, ge95, ge90, ge80,
            float(minv), float(p25), float(p50), float(p75), float(maxv)
        ))

stats_df = spark.createDataFrame(
    stats_rows,
    ["variable", "n_estaciones", "est_ge95", "est_ge90", "est_ge80", "min_pct", "p25_pct", "p50_pct", "p75_pct", "max_pct"]
)

print("\n=== Cobertura por estación: distribución ===")
stats_df.orderBy("variable").show(truncate=False)

In [None]:
# 3) Tabla de estaciones por encima del umbral, por variable ---

UMBRAL_TRUSTED = 90.0
apt_cols = []
for v in key_vars:
    pct_col = f"{v}_pct"
    if pct_col in by_station.columns:
        by_station = by_station.withColumn(f"apt_{v}", (F.col(pct_col) >= UMBRAL_TRUSTED).cast("boolean"))
        apt_cols.append(f"apt_{v}")

apt_summary = None
if apt_cols:
    exprs = [F.sum(F.when(F.col(c) == True, 1).otherwise(0)).alias(c) for c in apt_cols]
    apt_summary = by_station.agg(*exprs)
    print(f"\n=== Nº de estaciones por encima del umbral, por variable (umbral {UMBRAL_TRUSTED:.0f}%) ===")
    apt_summary.show(truncate=False)

##### **Conclusiones del check B.4**

El análisis de cobertura de las variables **tmax**, **tmin**, **tmed** y **prec** se ha realizado sobre un total de **918 estaciones**.
Los resultados obtenidos muestran lo siguiente:

- **Cobertura general muy elevada** en la mayoría de estaciones:
  - El **percentil 50 (mediana)** de cobertura se sitúa en torno al **99,5%** para las tres variables de temperatura y en **99,2%** para precipitación.
  - El **percentil 25** ya supera el **98,6%** en temperaturas y el **97,7%** en precipitación.

- **Cobertura ≥95%**:
  - **tmax**: 847 estaciones (92,3%)
  - **tmin**: 847 estaciones (92,3%)
  - **tmed**: 847 estaciones (92,3%)
  - **prec**: 800 estaciones (87,2%)

- **Cobertura ≥80%**:
  - Temperaturas: 898 estaciones (>97%)
  - Precipitación: 887 estaciones (96,6%)

- **Estaciones deficitarias**:
  Existen algunas estaciones con valores de **0% de cobertura** en ciertas variables, lo que refleja estaciones pluviométricas (que solo informan precipitación) o estaciones con problemas sistemáticos de reporte en una o más variables.

##### **Conclusión principal**
La red de estaciones presenta una **alta completitud en las variables clave**: más del 90% de las estaciones alcanzan coberturas superiores al 95% en temperaturas, y cerca del 87% lo hacen en precipitación.
No obstante, se identifican estaciones con ausencia total en alguna variable, que deberán ser **clasificadas por variable de validez** (ej. estaciones pluviometricas, aptas para precipitación pero no para temperaturas)

### B.5 Cobertura de variables opcionales (ranking global por completitud)
Cuantificar nulos en variables no críticas (e.g., `sol`, `dir`, humedad, presión).


In [None]:
from pyspark.sql import functions as F

def informed_bool(colname: str):
    c = F.col(colname)
    # True si no es nulo y no es cadena vacía (aunque venga como string)
    return c.isNotNull() & (F.length(F.trim(c.cast("string"))) > 0)

def informed_int(colname: str):
    # 1/0 para sumar en agregaciones
    return F.when(informed_bool(colname), 1).otherwise(0)

# Variables opcionales
optional_vars = [
    "sol","dir","velmedia","racha","horaracha",
    "presmax","presmin","horapresmax","horapresmin",
    "hrmedia","hrmax","hrmin","horahrmax","horahrmin"
]

# Filtrar las que existen en el DF normalizado
optional_vars = [v for v in optional_vars if v in df_meteo_norm.columns]

if not optional_vars:
    raise ValueError("Ninguna de las columnas opcionales existe en df_meteo_norm.")

# Total de filas (denominador)
try:
    total_rows
except NameError:
    total_rows = df_meteo_norm.count()

agg_exprs = [F.sum(informed_int(v)).alias(f"{v}_informados") for v in optional_vars]
counts_row = df_meteo_norm.agg(*agg_exprs).collect()[0].asDict()

rows = []
for v in optional_vars:
    n_inf = int(counts_row[f"{v}_informados"])
    pct   = round(100.0 * n_inf / total_rows, 4) if total_rows else 0.0
    rows.append((v, n_inf, total_rows, pct))

df_optional_rank = spark.createDataFrame(
    rows,
    [
        "variable",
        "n_informados", # No nulo y no vacío,
        "n_total",
        "pct_informado"
    ]
)

# Mostrar ordenado por % informado
df_optional_rank.orderBy(F.col("pct_informado").desc()).show(truncate=False)

# C. Validez y rangos de valores

**Motivación.** Identificar valores fuera de rango o incoherentes, con el fin de detectar errores de medición, carga o interpretación de unidades.

In [None]:
# Helpers
from pyspark.sql import functions as F, types as T

# Normalizador robusto de numéricos (soporta coma decimal y espacios)
def to_float(colname: str):
    """Convierte a float una columna que puede venir con coma decimal o espacios."""
    c = F.col(colname).cast("string")
    return F.regexp_replace(F.trim(c), ",", ".").cast("double")

# Informed (no nulo ni vacío)
def informed(colname: str):
    c = F.col(colname)
    return c.isNotNull() & (F.length(F.trim(c.cast("string"))) > 0)

# % helper
def pct(numerador_col, denominador_col):
    return F.round(100.0 * numerador_col / F.when(denominador_col == 0, None).otherwise(denominador_col), 2)


def parse_prec(colname: str, ip_value: float = 0.05):
    s = F.trim(F.col(colname).cast("string"))
    # Ip → valor pequeño; "Acum" (o variantes) → null; resto: numérico con coma
    return F.when(s.rlike(r'(?i)^ip$'), F.lit(ip_value)) \
            .when(s.rlike(r'(?i)^acum'), F.lit(None).cast("double")) \
            .otherwise(F.regexp_replace(s, ",", ".").cast("double"))

# Fecha derivada por año/mes (si no las tienes ya)
df_meteo_norm = df_meteo_norm.withColumn("anio", F.year("fecha_date")) \
                             .withColumn("mes",  F.month("fecha_date"))


### C.1 Temperaturas en rangos razonables
Verificar que `tmax`, `tmin`, `tmed` caen dentro de un rango físico plausible para España.
**Umbrales propuestos:** `tmin ∈ [-30, 35]`, `tmax ∈ [-15, 50]`, `tmed ∈ [-20, 45]`.
Medimr  % fuera de rango y su distribución por estación y mes.

In [None]:
# Umbrales (ajusta si procede)
TMIN_MIN, TMIN_MAX = -30.0, 35.0
TMAX_MIN, TMAX_MAX = -15.0, 50.0
TMED_MIN, TMED_MAX = -20.0, 45.0

tmin_f = to_float("tmin")
tmax_f = to_float("tmax")
tmed_f = to_float("tmed")

out_tmin = informed("tmin") & ((tmin_f < TMIN_MIN) | (tmin_f > TMIN_MAX))
out_tmax = informed("tmax") & ((tmax_f < TMAX_MIN) | (tmax_f > TMAX_MAX))
out_tmed = informed("tmed") & ((tmed_f < TMED_MIN) | (tmed_f > TMED_MAX))

viol_df = (df_meteo_norm
           .withColumn("out_tmin", out_tmin.cast("int"))
           .withColumn("out_tmax", out_tmax.cast("int"))
           .withColumn("out_tmed", out_tmed.cast("int")))

agg = (viol_df.agg(
            F.sum("out_tmin").alias("tmin_fuera"),
            F.sum("out_tmax").alias("tmax_fuera"),
            F.sum("out_tmed").alias("tmed_fuera"),
            F.count("*").alias("n_total"))
        .withColumn("%_tmin_fuera", pct(F.col("tmin_fuera"), F.col("n_total")))
        .withColumn("%_tmax_fuera", pct(F.col("tmax_fuera"), F.col("n_total")))
        .withColumn("%_tmed_fuera", pct(F.col("tmed_fuera"), F.col("n_total"))))

agg.show(truncate=False)


El numero de valores fuera de rango es negligible para las tres variables de temperatura.

In [None]:
# Muestra ejemplos de filas fuera de rango
print("Muestras de out-of-range (tmax/tmin/tmed):")
(viol_df.filter(F.col("out_tmax")==1).select("fecha_date","indicativo","tmax").show(truncate=False))
(viol_df.filter(F.col("out_tmin")==1).select("fecha_date","indicativo","tmin").show(truncate=False))
(viol_df.filter(F.col("out_tmed")==1).select("fecha_date","indicativo","tmed").show(5, truncate=False))

### C.2 Consistencia entre temperaturas

**Objetivo.** Verificar la coherencia interna de las temperaturas diarias.

**Reglas (sin tolerancias):**
1) **Orden lógico:** `tmax ≥ tmin`.
2) **Media acotada:** `tmin ≤ tmed ≤ tmax`.

> Estas dos reglas no dependen del método con el que la estación calcule `tmed`; por tanto, no requieren umbrales ni tolerancias.

- Nº y % de violaciones sobre el total de registros.
- Ranking por estación (y por mes) de mayor % de violaciones.
- Muestras de filas problemáticas para depuración.

> (Opcional, si se desea un control más estricto) comparar `tmed` con `(tmax+tmin)/2 ± tolerancia`. No se activa en este bloque.

In [None]:
# C.2 Consistencia entre temperaturas (sin tolerancias) · FIX agg booleanas
from pyspark.sql import functions as F

required = {"tmin","tmax","tmed"}
missing  = [c for c in required if c not in df_meteo_norm.columns]
if missing:
    raise ValueError(f"C.2 requiere columnas {sorted(required)}; faltan: {missing}")

tmin = to_float("tmin")
tmax = to_float("tmax")
tmed = to_float("tmed")

have_minmax = tmin.isNotNull() & tmax.isNotNull()
have_all    = have_minmax & tmed.isNotNull()

# Reglas de consistencia (booleanas)
incumpl_max_menor_min     = have_minmax & (tmax < tmin)                     # incoherente: tmax < tmin
incumpl_tmed_fuera_rango  = have_all & ((tmed < tmin) | (tmed > tmax))      # incoherente: tmed fuera de [tmin, tmax]

df_flags = (df_meteo_norm
            .withColumn("incumpl_max_menor_min", incumpl_max_menor_min)
            .withColumn("incumpl_tmed_fuera_rango", incumpl_tmed_fuera_rango))

# Contar True con count(when(col, True)) para evitar sum(BOOLEAN)
res_global = (df_flags
              .agg(
                  F.count(F.when(F.col("incumpl_max_menor_min"), True)).alias("n_incumpl_max_menor_min"),
                  F.count(F.when(F.col("incumpl_tmed_fuera_rango"), True)).alias("n_incumpl_tmed_fuera_rango"),
                  F.count("*").alias("n_total")
              )
              .withColumn("%_incumpl_max_menor_min",    F.round(100.0 * F.col("n_incumpl_max_menor_min")   / F.col("n_total"), 4))
              .withColumn("%_incumpl_tmed_fuera_rango", F.round(100.0 * F.col("n_incumpl_tmed_fuera_rango")/ F.col("n_total"), 4))
             )

res_global.show(truncate=False)



In [None]:
# Ver registro que incumple la regla tmax < tmin
print("Registro con tmax < tmin:")
(df_meteo_norm
 .filter(to_float("tmax") < to_float("tmin"))
 .select("fecha","indicativo","tmin","tmax","tmed","provincia","nombre")
 .show(truncate=False))

# Ver registro que incumple la regla tmed fuera de [tmin, tmax]
print("Registro con tmed fuera de [tmin, tmax]:")
(df_meteo_norm
 .filter((to_float("tmed") < to_float("tmin")) | (to_float("tmed") > to_float("tmax")))
 .select("fecha","indicativo","tmin","tmax","tmed","provincia","nombre")
 .show(truncate=False))

Se detecta un registro incoherente (Fuengirola, 20/09/2023, con `tmin=50°C` y `tmax=-50°C`).
<br>
Probablemente un error de carga o de medición.

### C.3 Precipitación (no negativa y códigos especiales)
Asegura `prec ≥ 0`.
<br>
Interpreta `Ip` como traza (0.05 mm por defecto) y “Acum” como nulos (o ajustar según negocio).
Mide recuento de negativos, presencia de `Ip/Acum` y conversión correcta a numérico.

In [None]:
prec_f = parse_prec("prec", ip_value=0.05)  # ajusta el valor de traza si procede

prec_df = df_meteo_norm.withColumn("prec_num", prec_f)

# Negativos informados
neg = informed("prec") & (F.col("prec_num") < 0)

# Marcas para Ip/Acum en crudo (texto original), por curiosidad
prec_raw = F.trim(F.col("prec").cast("string"))
is_ip   = prec_raw.rlike(r'(?i)^ip$')
is_acum = prec_raw.rlike(r'(?i)^acum')

agg = (prec_df.agg(
            F.sum(neg.cast("int")).alias("prec_negativos"),
            F.sum(is_ip.cast("int")).alias("prec_Ip_crudo"),
            F.sum(is_acum.cast("int")).alias("prec_Acum_crudo"),
            F.count("*").alias("n_total"))
       .withColumn("%_negativos", pct(F.col("prec_negativos"), F.col("n_total")))
)

print("C.3 Precipitación:")
agg.show(truncate=False)


El check C.3 confirma que no existen valores negativos de precipitación (0% de los registros), lo que indica consistencia básica.
<br>
Se han identificado 16 087 casos de traza (`Ip`) y 348 de acumulación (`Acum`).

In [None]:
from pyspark.sql import functions as F

print("Registros con precipitación marcada como 'Acum':")
(df_meteo_norm
 .filter(F.trim(F.col("prec").cast("string")).rlike(r'(?i)^acum$'))
 .select("fecha","indicativo","nombre","provincia","prec")
 .show(10, truncate=False))

### C.4 Viento: velocidad, racha y dirección
Validar `velmedia ≥ 0`, `racha ≥ velmedia`,
y `dir ∈ [0,360]` (aceptando códigos especiales si aplican, p. ej. `88/99`).

In [None]:
vel_f  = to_float("velmedia")
racha_f = to_float("racha")
dir_s   = F.trim(F.col("dir").cast("string"))
dir_f   = F.regexp_replace(dir_s, ",", ".").cast("double")

# AEMET (en algunas series) usa 88/99 como códigos; aquí los aceptamos como válidos especiales.
dir_valid = (dir_f >= 0) & (dir_f <= 360) | dir_s.isin("88","99")

viol_vel_neg   = informed("velmedia") & (vel_f < 0)
viol_racha_rel = informed("velmedia") & informed("racha") & (racha_f < vel_f)
viol_dir_range = informed("dir") & (~dir_valid)

wind_df = (df_meteo_norm
           .withColumn("viol_vel_neg", viol_vel_neg.cast("int"))
           .withColumn("viol_racha_rel", viol_racha_rel.cast("int"))
           .withColumn("viol_dir_range", viol_dir_range.cast("int")))

agg = (wind_df.agg(F.sum("viol_vel_neg").alias("vel_neg"),
                   F.sum("viol_racha_rel").alias("racha_menor_vel"),
                   F.sum("viol_dir_range").alias("dir_fuera_rango"),
                   F.count("*").alias("n_total"))
       .withColumn("%_vel_neg", pct(F.col("vel_neg"), F.col("n_total")))
       .withColumn("%_racha_menor_vel", pct(F.col("racha_menor_vel"), F.col("n_total")))
       .withColumn("%_dir_fuera_rango", pct(F.col("dir_fuera_rango"), F.col("n_total"))))

print("C.4 Viento:")
agg.show(truncate=False)


Todos los regstros cumplen las reglas de validación para viento.

# D. Consistencia temporal

**Motivación.** La correcta localización temporal es esencial para relacionar eventos y construir series.

### D.1 Validez de valores acumulados
Si aparecen acumulaciones (p. ej., precipitación), comprobar su plausibilidad y consistencia con periodos adyacentes.

In [None]:
from pyspark.sql import functions as F, Window
from pyspark.sql.types import DoubleType

# 1) Detectar "Acum" y parsear precipitación (usa tu parse_prec si ya lo tienes)
prec_raw = F.trim(F.col("prec").cast("string"))
is_acum  = prec_raw.rlike(r'(?i)^acum$')

def parse_prec(colname: str, ip_value: float = 0.05):
    c = F.trim(F.col(colname).cast("string"))
    return (F.when(c.rlike(r'(?i)^ip$'), F.lit(ip_value))
             .when(c.rlike(r'(?i)^acum$'), F.lit(None).cast(DoubleType()))
             .otherwise(F.regexp_replace(c, ",", ".").cast(DoubleType())))

prec_num = parse_prec("prec", ip_value=0.05)

dfp = (df_meteo_norm
       .withColumn("is_acum", is_acum)
       .withColumn("prec_num", prec_num)
       .withColumn("fecha_date", F.to_date("fecha")))

# 2) Coger un (indicativo, fecha) con Acum (el primero)
sample = (dfp.filter(F.col("is_acum"))
            .select("indicativo","fecha_date")
            .orderBy("indicativo","fecha_date")
            .limit(1)
            .collect())

if not sample:
    print("No se han encontrado registros con 'Acum'.")
else:
    s_indic, s_fecha = sample[0]["indicativo"], sample[0]["fecha_date"]

    # 3) Ventana ±5 días para ese indicativo
    ctx = (dfp.filter(F.col("indicativo") == s_indic)
              .withColumn("diff_days", F.datediff(F.col("fecha_date"), F.lit(s_fecha)))
              .filter((F.col("diff_days") >= -5) & (F.col("diff_days") <= 5))
              .select("fecha_date","indicativo","prec","prec_num","is_acum")
              .orderBy("fecha_date"))

    print(f"Contexto ±5 días alrededor de {s_fecha} en {s_indic}:")
    ctx.show(30, truncate=False)

# E. Coherencia espacial

**Motivación.** Estaciones cercanas deben mostrar comportamientos compatibles; la ubicación y altitud influyen en las magnitudes.

### E.1 Coherencia de altitud con el catálogo
Verificar que la altitud por `indicativo` coincide con el catálogo y no cambia en el tiempo.
**Qué medir:** discrepancias y su magnitud.

In [20]:
# E.1 Coherencia de altitud con el catálogo (usando df_meteo ya cargado)
from pyspark.sql import functions as F, Window

# Dataset de catálogo de estaciones (Parquet)
df_cat = spark.read.parquet(
    "/Users/inessabate/PycharmProjects/climascan-general/data/landing/aemet/aemet_stations.parquet"
).withColumn("alt_cat", F.col("altitud").cast("float")) \
 .select("indicativo", "alt_cat")

# Cast de altitud en df_meteo
df_alt = df_meteo.withColumn("altitud_num", F.col("altitud").cast("float"))

# 1. Altitud cambiante: más de un valor distinto en observaciones
alt_counts = (df_alt.groupBy("indicativo")
              .agg(F.countDistinct("altitud_num").alias("n_alt_distintos")))
estaciones_altitud_cambiante = alt_counts.filter(F.col("n_alt_distintos") > 1).count()

# 2. Discrepancias vs catálogo (usando moda observada)
alt_moda = (df_alt.groupBy("indicativo", "altitud_num")
            .count()
            .withColumn("rn", F.row_number().over(
                Window.partitionBy("indicativo").orderBy(F.col("count").desc())
            ))
            .filter(F.col("rn") == 1)
            .select("indicativo", F.col("altitud_num").alias("alt_obs")))

alt_join = alt_moda.join(df_cat, on="indicativo", how="inner")
discrepancia_vs_catalogo = alt_join.filter(F.col("alt_obs") != F.col("alt_cat")).count()

# Output final
res = spark.createDataFrame([(
    estaciones_altitud_cambiante,
    discrepancia_vs_catalogo
)], ["estaciones_altitud_cambiante", "discrepancia_vs_catalogo"])

print("E.1 Coherencia de altitud con el catálogo")
res.show(truncate=False)

E.1 · Coherencia de altitud con el catálogo
+----------------------------+------------------------+
|estaciones_altitud_cambiante|discrepancia_vs_catalogo|
+----------------------------+------------------------+
|0                           |0                       |
+----------------------------+------------------------+



La altitud reportada en las observaciones es consistente con la altitud del catálogo de estaciones.

# F. Homogeneidad y representatividad

**Motivación.** El análisis de riesgos requiere cobertura suficiente y balanceada en el tiempo, el espacio y las variables clave.

### F.1 Distribución regional de estaciones activas
Contar estaciones activas por provincia (tras filtrar por cobertura/variables clave) para evaluar representatividad territorial.

In [None]:
from pyspark.sql import functions as F, Window
from datetime import date

EXPECTED_DAYS_TOTAL = (date(2025,6,30) - date(2017,1,1)).days + 1
COVERAGE_THRESHOLD_PCT = 90.0

KEY_VARS = ["tmax","tmin","tmed","prec"]

def informed(colname: str):
    c = F.col(colname).cast("string")
    return c.isNotNull() & (F.length(F.trim(c)) > 0)

# --- Cobertura global por estación (nº de días con registro) ---
by_station = (df_meteo
              .groupBy("indicativo")
              .agg(F.count(F.lit(1)).alias("n_registros")))

by_station = by_station.withColumn("coverage_pct_total",
                                   F.round(100.0 * F.col("n_registros")/F.lit(EXPECTED_DAYS_TOTAL), 2))

# --- Cobertura por variable clave (por estación) ---
agg_exprs = []
for v in KEY_VARS:
    agg_exprs.append(F.sum(F.when(informed(v), 1).otherwise(0)).alias(f"{v}_ok"))
df_k = df_meteo.groupBy("indicativo").agg(*agg_exprs)

for v in KEY_VARS:
    df_k = df_k.withColumn(f"{v}_pct", F.round(100.0 * F.col(f"{v}_ok")/F.lit(EXPECTED_DAYS_TOTAL), 2))

# --- Provincia por estación (toma la moda/valor más frecuente informado) ---
prov_rank = (df_meteo
             .where(informed("provincia"))
             .groupBy("indicativo","provincia")
             .count()
             .withColumn("rn", F.row_number().over(
                 Window.partitionBy("indicativo").orderBy(F.col("count").desc())
             ))
             .filter(F.col("rn")==1)
             .select("indicativo", F.col("provincia").alias("prov_est")))

# --- Estaciones "activas" (cobertura mínima) ---
df_active = (by_station
             .join(df_k, on="indicativo", how="left")
             .join(prov_rank, on="indicativo", how="left")
             .withColumn("ok_total", F.col("coverage_pct_total") >= COVERAGE_THRESHOLD_PCT))

# criterio: todas las claves con ≥ 90% (puedes flexibilizar a “al menos 3 de 4”, etc.)
for v in KEY_VARS:
    df_active = df_active.withColumn(f"ok_{v}", F.col(f"{v}_pct") >= COVERAGE_THRESHOLD_PCT)

df_active = df_active.withColumn(
    "activa",
    F.col("ok_total") & F.col("ok_tmax") & F.col("ok_tmin") & F.col("ok_tmed") & F.col("ok_prec")
)

# --- Conteo por provincia y % sobre el total de activas ---
actives = df_active.filter(F.col("activa")==True)

by_prov = (actives.groupBy("prov_est")
           .agg(F.count("*").alias("n_estaciones_activas"))
           .withColumn("pct_sobre_activas",
                       F.round(100.0 * F.col("n_estaciones_activas")/F.sum("n_estaciones_activas").over(Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), 2))
           .orderBy(F.col("n_estaciones_activas").desc()))

by_prov.show(truncate=False)



In [25]:
# --- Mapa de España con Altair: provincias + estaciones ---
import altair as alt
import pandas as pd
import numpy as np
import json, os
from pathlib import Path

# 0) Asegurar renderer para PyCharm / notebooks fuera de JupyterLab
try:
    alt.renderers.enable('html')
except Exception:
    pass

# 1) Helpers para convertir coordenadas AEMET ("394924N", "025309E") a grados decimales
def dms_to_decimal(dms: str) -> float | None:
    if dms is None or not isinstance(dms, str) or not dms.strip():
        return None
    s = dms.strip().upper()
    # latitud: DDMMSSN  (7 u 8 chars), longitud: DDDMM SSE  (8 u 9 chars)
    # Tomamos últimos 1 char como hemisferio
    hemi = s[-1]
    body = s[:-1]
    # detectamos si es lat (2 grados) o lon (3 grados) por longitud del cuerpo
    # lat suele venir como 6 (DDMMSS) o 7 (0DDMMSS); lon como 7 (DDDMMSS) u 8
    if len(body) in (6,7):  # latitud
        # Aseguramos 6 dígitos
        body = body[-6:].rjust(6, "0")
        dd = int(body[0:2]); mm = int(body[2:4]); ss = int(body[4:6])
    else:                    # longitud
        body = body[-7:].rjust(7, "0")
        # si viene con 7, interpretamos DDDMMSS; si viniera con 6, asumimos DDMMSS
        if len(body) == 7:
            dd = int(body[0:3]); mm = int(body[3:5]); ss = int(body[5:7])
        else:
            dd = int(body[0:2]); mm = int(body[2:4]); ss = int(body[4:6])
    dec = dd + mm/60 + ss/3600
    if hemi in ("S","W","O"):  # O = Oeste (ES)
        dec = -dec
    return dec

# 2) Partimos del DF de estaciones en Spark (si no lo tienes aún, lee el parquet)
#    df_est = spark.read.parquet("/Users/inessabate/PycharmProjects/climascan-general/data/landing/aemet/aemet_stations.parquet")

est_pd = df_estaciones.select("indicativo","nombre","provincia","latitud","longitud").toPandas()

# 3) Convertir coordenadas a decimal
est_pd["lat"] = est_pd["latitud"].apply(dms_to_decimal)
est_pd["lon"] = est_pd["longitud"].apply(dms_to_decimal)

# 4) Unir, si la tienes ya calculada, la cobertura por estación (df_by_station con columnas: indicativo, registros, coverage_pct, grupo_cobertura)
#    Si no la tienes en Spark, puedes saltarte este merge y colorear todos igual.
try:
    cov_pd = df_by_station.toPandas()  # debe incluir 'indicativo' y 'coverage_pct'/'grupo_cobertura'
except NameError:
    cov_pd = pd.DataFrame(columns=["indicativo","coverage_pct","grupo_cobertura"])

est_pd = est_pd.merge(cov_pd[["indicativo","coverage_pct","grupo_cobertura"]], on="indicativo", how="left")

# 5) Filtrar puntos claramente fuera de España (rango aproximado)
est_pd = est_pd[(est_pd["lat"].between(27, 44)) & (est_pd["lon"].between(-19, 5))].copy()

# 6) Cargar GeoJSON provincial si existe (cámbialo si lo tienes en otra ruta)
PROJECT_ROOT = Path().resolve().parents[1] if Path().resolve().name == "01_data_analytics" else Path().resolve().parents[0]
geo_path = PROJECT_ROOT / "data" / "external" / "spain_provinces.geojson"

have_provinces = os.path.exists(geo_path)

if have_provinces:
    with open(geo_path, "r", encoding="utf-8") as f:
        provinces_geo = json.load(f)

# 7) Construir capas Altair
base = alt.Chart().properties(width=700, height=700)

if have_provinces:
    # Capa de provincias (relleno claro, contorno fino)
    provinces = alt.Chart(alt.Data(values=provinces_geo))\
        .mark_geoshape(strokeOpacity=0.6, strokeWidth=0.3)\
        .project(type="mercator")\
        .properties(title="Estaciones AEMET por provincia (Altair)")
else:
    # Sin provincias, solo puntos (Altair aún necesita una proyección)
    provinces = alt.Chart(pd.DataFrame({"dummy":[0]})).mark_geoshape()\
        .project(type="mercator")\
        .properties(title="Estaciones AEMET")

# Puntos de estaciones
# Si no tienes 'grupo_cobertura', puedes usar tamaño constante y color por provincia, por ejemplo.
color_field = "grupo_cobertura" if "grupo_cobertura" in est_pd.columns else "provincia"

points = alt.Chart(est_pd).mark_circle(opacity=0.85).encode(
    longitude="lon:Q",
    latitude="lat:Q",
    tooltip=["indicativo","nombre","provincia","coverage_pct"],
    size=alt.value(30),
    color=alt.Color(f"{color_field}:N", legend=alt.Legend(title=color_field.replace("_"," ").title()))
).project(type="mercator")

chart = provinces + points
chart