In [0]:
# ============================================================
# 1. Imports
# ============================================================

import unicodedata
import re
from functools import reduce

from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, regexp_replace, when, trim,
    min, max, count, lit, coalesce
)


# ============================================================
# 2. Função: Normalizar nomes das colunas
# ============================================================

def normalizar_colunas(df: DataFrame) -> DataFrame:
    novas_colunas = []

    for c in df.columns:
        c_norm = (
            unicodedata.normalize("NFKD", c)
            .encode("ASCII", "ignore")
            .decode("utf-8")
        )
        c_norm = c_norm.lower()
        c_norm = re.sub(r"[^a-z0-9]+", "_", c_norm)
        c_norm = c_norm.strip("_")

        novas_colunas.append(c_norm)

    return df.toDF(*novas_colunas)


# ============================================================
# 3. Leitura das tabelas INMET (2021–2025)
# ============================================================

tabelas = [
    "workspace.default.inmet_ne_se_a_409_aracaju_01_01_2021_a_31_12_2021",
    "workspace.default.inmet_ne_se_a_409_aracaju_01_01_2022_a_31_12_2022",
    "workspace.default.inmet_ne_se_a_409_aracaju_01_01_2023_a_31_12_2023",
    "workspace.default.inmet_ne_se_a_409_aracaju_01_01_2024_a_31_12_2024",
    "workspace.default.inmet_ne_se_a_409_aracaju_01_01_2025_a_31_08_2025",
]

dfs_normalizados = [
    normalizar_colunas(spark.table(t))
    for t in tabelas
]


# ============================================================
# 4. Consolidação dos DataFrames
# ============================================================

df_consolidado = reduce(
    lambda a, b: a.unionByName(b, allowMissingColumns=True),
    dfs_normalizados
)


# ============================================================
# 5. Conversão de colunas numéricas
# ============================================================

colunas_nao_numericas = {"data", "hora_utc"}

def converter_para_double(df: DataFrame) -> DataFrame:
    df_out = df

    for c in df.columns:
        if c not in colunas_nao_numericas:
            df_out = df_out.withColumn(
                c,
                when(
                    col(c).isNull()
                    | (trim(col(c).cast("string")) == "")
                    | (col(c).cast("string").isin("-9999", "////", "NaN")),
                    None
                ).otherwise(
                    regexp_replace(
                        col(c).cast("string"), ",", "."
                    ).cast("double")
                )
            )

    return df_out


df_numerico = converter_para_double(df_consolidado)

df_numerico.printSchema()


# ============================================================
# 6. Adequação / Harmonização de colunas equivalentes
#    - Une colunas semanticamente iguais
#    - Prioriza valores não nulos
# ============================================================

mapa_colunas_equivalentes = {
    "precipitacao_total_mm": [
        "precipitacao_total_horario_mm",
        "precipitacao_total_mm","precipitao_total_horrio_mm","precipitao_total_mm"
    ],
    "temperatura_ar_c": [
        "temperatura_do_ar_bulbo_seco_horaria_c",
        "temperatura_do_ar_bulbo_seco_c"
    ],
    "umidade_relativa_ar": [
        "umidade_relativa_do_ar",
        "umidade_relativa_do_ar_horaria"
    ],
    "pressao_atmosferica_mb": [
        "pressao_atmosferica_ao_nivel_da_estacao_mb",
        "pressao_atmosferica_ao_nivel_da_estacao_horaria_mb"
    ],
    "vento_direcao_gr": [
        "vento_direo_horaria_gr_gr",
        "vento","vento_direcao_gr"
    ],
    "vento_rajada_maxima_m_s": [
        "vento_maxima_m_s",
        "vento_rajada_maxima_m_s"
    ],
    "vento_velocidade_m_s": [
        "vento_velocidade_horaria_m_s",
        "vento_velocidade_m_s"
    ],
}

for coluna_final, colunas_origem in mapa_colunas_equivalentes.items():
    colunas_existentes = [
        col(c) for c in colunas_origem if c in df_numerico.columns
    ]

    if colunas_existentes:
        df_numerico = df_numerico.withColumn(
            coluna_final,
            coalesce(*colunas_existentes)
        ).drop(*[c for c in colunas_origem if c in df_numerico.columns])


# ============================================================
# 7. Identificação das colunas numéricas
# ============================================================

colunas_numericas = [
    c for c, t in df_numerico.dtypes
    if t not in ("string", "date")
]


# ============================================================
# 8. Análise de qualidade dos dados
# ============================================================

total_registros = df_numerico.count()

dfs_stats = []

for c in colunas_numericas:
    df_stats = df_numerico.select(
        lit(c).alias("coluna"),
        min(c).alias("minimo"),
        max(c).alias("maximo"),
        (lit(total_registros) - count(c)).alias("dados_faltantes"),
        lit(total_registros).alias("total_registros")
    )
    dfs_stats.append(df_stats)


# ============================================================
# 9. Consolidação da tabela de qualidade
# ============================================================

df_qualidade = reduce(
    lambda a, b: a.unionByName(b),
    dfs_stats
)

display(df_qualidade)
# ============================================================
# 10. Ranking dos extremos climáticos
# ============================================================

# 10.1. 10 dias mais chuvosos
df_chuva_dia = df_numerico.groupBy("data").sum("precipitacao_total_mm") \
    .withColumnRenamed("sum(precipitacao_total_mm)", "precipitacao_total_dia_mm") \
    .orderBy(col("precipitacao_total_dia_mm").desc()) \
    .limit(10)

display(df_chuva_dia)

# 10.2. 10 dias mais quentes
df_temp_dia = df_numerico.groupBy("data").avg("temperatura_ar_c") \
    .withColumnRenamed("avg(temperatura_ar_c)", "temperatura_media_dia_c") \
    .orderBy(col("temperatura_media_dia_c").desc()) \
    .limit(10)

display(df_temp_dia)

# 10.3. 10 dias mais frios
df_temp_fria_dia = (
    df_numerico
    .groupBy("data")
    .avg("temperatura_ar_c")
    .withColumnRenamed("avg(temperatura_ar_c)", "temperatura_media_dia_c")
    .filter(col("temperatura_media_dia_c").isNotNull())  # remove valores nulos
    .orderBy(col("temperatura_media_dia_c").asc())
    .limit(10)
)

display(df_temp_fria_dia)
