In [None]:
import sys
import warnings
from functools import lru_cache
from io import StringIO
from pathlib import Path
from typing import Optional


import altair as alt
import folium
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from branca.colormap import linear
from folium.plugins import MarkerCluster
from pandas.api.types import is_datetime64_any_dtype as is_datetime

alt.renderers.set_embed_options(format_locale="pt_BR", time_format_locale="pt_BR")
sys.path.append(str(Path().resolve()))

RendererRegistry.enable('default')

In [5]:
import utils

In [None]:
import warnings
from functools import lru_cache
from io import StringIO
from typing import Optional

import altair as alt
import folium
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from branca.colormap import linear
from folium.plugins import MarkerCluster
from pandas.api.types import is_datetime64_any_dtype as is_datetime

alt.renderers.set_embed_options(format_locale="pt_BR", time_format_locale="pt_BR")

# fmt: off
CODIGOS_ESTADOS = {
    "AC": 12, "AL": 27, "AP": 16, "AM": 13, "BA": 29, "CE": 23, "DF": 53, "ES": 32,
    "GO": 52, "MA": 21, "MT": 51, "MS": 50, "MG": 31, "PA": 15, "PB": 25, "PR": 41,
    "PE": 26, "PI": 22, "RJ": 33, "RN": 24, "RS": 43, "RO": 11, "RR": 14, "SC": 42,
    "SP": 35, "SE": 28, "TO": 17
}
# fmt: on


def cria_mapa_geodataframe(
    gdf: gpd.GeoDataFrame, cols: list[str], aliases: Optional[list[str]] = None
) -> folium.Map:
    """Cria um mapa interativo com marcadores para cada ponto no GeoDataFrame.
    Args:
        gdf (gpd.GeoDataFrame): GeoDataFrame contendo os dados a serem plotados.
        cols (list): Lista de colunas do GeoDataFrame a serem exibidas no tooltip.
        aliases (list, optional): Lista de aliases para as colunas no tooltip. \
            Se None, usa os nomes originais das colunas.
    Returns:
        folium.Map: Mapa interativo com os pontos plotados.
    """
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("O argumento gdf deve ser um GeoDataFrame.")
    if gdf.empty:
        raise ValueError("O GeoDataFrame está vazio.")
    if not all(col in gdf.columns for col in cols):
        raise ValueError(
            "Uma ou mais colunas especificadas em cols não existem no GeoDataFrame."
        )
    if aliases is not None and len(cols) != len(aliases):
        raise ValueError(
            "A lista de aliases deve ter o mesmo tamanho que a lista de cols."
        )

    m = folium.Map(location=[-15, -55], zoom_start=4)

    datetime_cols = gdf.select_dtypes(include=["datetime64[ns]"]).columns.tolist()
    if datetime_cols:
        gdf = transforma_colunas_datetime_para_string(gdf, datetime_cols)

    # Converte para GeoJSON e adiciona ao mapa
    folium.GeoJson(
        gdf.to_json(),
        marker=folium.CircleMarker(radius=6, fill=True, fill_opacity=0.7),
        tooltip=folium.GeoJsonTooltip(
            fields=cols,
            aliases=aliases if aliases else cols,
            localize=True,
        ),
    ).add_to(m)

    return m


def transforma_colunas_datetime_para_string(
    gdf: gpd.GeoDataFrame, cols: list[str], formato: str = "%d/%m/%Y"
) -> gpd.GeoDataFrame:
    """Transforma uma coluna datetime de um GeoDataFrame para o formato string.
    Args:
        gdf (gpd.GeoDataFrame): GeoDataFrame contendo os dados a serem transformados.
        cols (list): Nomes das colunas a serem transformadas.
        formato (str, optional): Formato da string de data. Padrão é "%d/%m/%Y".
    Returns:
        gpd.GeoDataFrame: GeoDataFrame com as colunas transformadas para string.
    """
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("O argumento gdf deve ser um GeoDataFrame.")

    missing_cols = [col for col in cols if col not in gdf.columns]

    if missing_cols == cols:
        raise ValueError(f"Nenhuma das colunas '{cols}' existe no GeoDataFrame.")
    if missing_cols:
        warnings.warn(f"As colunas {missing_cols} não existem no GeoDataFrame.")

    for col in cols:
        if is_datetime(gdf[col]):
            gdf[col] = gdf[col].dt.strftime(formato)

    return gdf


def gpd_merge(left_gdf, right_df, **merge_kwargs):
    """Realiza um merge entre um GeoDataFrame e um DataFrame, preservando a geometria.
    Args:
        left_gdf (gpd.GeoDataFrame): GeoDataFrame à esquerda do merge.
        right_df (pd.DataFrame): DataFrame à direita do merge.
        **merge_kwargs: Argumentos adicionais para o pd.merge().
    Returns:
        gpd.GeoDataFrame: Resultado do merge como um GeoDataFrame.
    """
    if all(type(df) is pd.DataFrame for df in [left_gdf, right_df]):
        warnings.warn(
            "Ambos os argumentos são DataFrames. Considere usar pd.merge() diretamente.",
            UserWarning,
        )
    if not isinstance(left_gdf, gpd.GeoDataFrame):
        raise TypeError("O argumento left_gdf deve ser um GeoDataFrame.")
    if not isinstance(right_df, pd.DataFrame):
        raise TypeError("O argumento right_df deve ser um DataFrame.")
    merged = pd.merge(left_gdf, right_df, **merge_kwargs)
    return gpd.GeoDataFrame(merged, geometry=left_gdf.geometry.name, crs=left_gdf.crs)


@lru_cache(maxsize=5)
def carrega_locale_altair(locale: str = "pt-BR") -> alt.Locale:
    """Carrega a configuração de localidade (locale) para Altair a partir dos arquivos JSON do D3.

    Args:
        locale (str, optional): Código da localidade a ser carregada. Padrão é "pt_BR".

    Returns:
        alt.Locale: Objeto de localidade para uso em gráficos Altair.

    Raises:
        ValueError: Se os arquivos de locale não forem encontrados ou forem inválidos.
    """
    import json
    from urllib import error, request

    base_format_url = "https://raw.githubusercontent.com/d3/d3-format/refs/heads/main/locale/{locale}.json"
    base_time_url = "https://raw.githubusercontent.com/d3/d3-time-format/refs/heads/main/locale/{locale}.json"

    try:
        with request.urlopen(base_format_url.format(locale=locale)) as f:
            format_json = json.load(f)
        with request.urlopen(base_time_url.format(locale=locale)) as f:
            time_format_json = json.load(f)
    except error.HTTPError as e:
        raise ValueError(
            f"Locale '{locale}' não encontrado nos repositórios D3."
        ) from e
    except Exception as e:
        raise ValueError("Erro ao carregar locale para Altair.") from e

    return alt.Locale(number=format_json, time=time_format_json)


def json_municipios(ufs: list[str] | str) -> gpd.GeoDataFrame:
    """Carrega os dados geográficos dos municípios brasileiros para os estados especificados.

    Args:
        ufs (list[str] | str): Lista de siglas dos estados ou uma única sigla.

    Returns:
        gpd.GeoDataFrame: GeoDataFrame contendo os dados dos municípios dos estados especificados.

    Raises:
        ValueError: Se nenhum estado válido for fornecido.
    """
    url_municipios = "https://raw.githubusercontent.com/tbrugz/geodata-br/refs/heads/master/geojson/geojs-{codigo}-mun.json"

    if isinstance(ufs, str):
        ufs = [ufs.upper()]
    else:
        ufs = [uf.upper() for uf in ufs]

    ufs_validos = [uf for uf in ufs if uf in CODIGOS_ESTADOS]
    ufs_invalidos = set(ufs) - set(ufs_validos)

    if ufs_invalidos:
        warnings.warn(f"Estados inválidos ignorados: {', '.join(ufs_invalidos)}")

    if not ufs_validos:
        raise ValueError("Nenhum estado válido fornecido.")

    gdfs = []
    for uf in ufs_validos:
        gdf = gpd.read_file(url_municipios.format(codigo=CODIGOS_ESTADOS[uf]))
        gdf.columns = ["id", "city", "desc", "geometry"]
        gdf["state"] = uf
        gdf = gdf[["id", "city", "state", "geometry"]]
        gdfs.append(gdf)

    municipios = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True), crs="EPSG:4326")
    municipios["geometry"] = shapely.make_valid(municipios["geometry"])

    return municipios

In [6]:
# Lê o arquivo CSV removendo aspas duplas
# O CSV original é um CSV "sujo", criado com Excel, que adiciona aspas duplas
# em torno de cada campo, o que causa problemas na leitura.
with open(
    "../data/dados_exemplo_poluentes_no_acentos.csv", "r", encoding="utf-8-sig"
) as f:
    linhas = [linha.replace('"', "") for linha in f]

# Converte as linhas limpas para um buffer em memória
csv_buffer = StringIO("".join(linhas))

# Lê o CSV normalmente após a limpeza
df = pd.read_csv(csv_buffer, sep=",")
#
# Remove a coluna "unit" se todos os valores forem iguais
# Neste caso, todos os valores são "mg/L", então a coluna é desnecessária
if "unit" in df.columns and df["unit"].nunique() == 1:
    df = df.drop(columns=["unit"])

# Converte as colunas de latitude e longitude para o tipo numérico
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
df["lon"] = pd.to_numeric(df["lon"], errors="coerce")

# Remove Estacao do nome das estações e converte para categoria
df["station_name"] = df["station_name"].str.replace("Estacao", "")
df["station_name"] = df["station_name"].str.strip()
df["station_name"] = df["station_name"].astype("category")

# Converte a coluna de data para o tipo datetime
df["sample_dt"] = pd.to_datetime(df["sample_dt"], format="%Y-%m-%d")

# Remove a coluna station_id, já que station_name é suficiente para identificar os pontos de coleta
# Cada station_id tem um station_name único
df = df.drop(columns=["station_id"])

# Transforma o DataFrame para o formato longo (long format) para facilitar a plotagem
# com bibliotecas de visualização
df_longo = df.melt(
    id_vars=["station_name", "lat", "lon", "sample_dt"],
    value_vars=["pol_a", "pol_b"],
    var_name="pollutant",
    value_name="value",
)

gdf = gpd.GeoDataFrame(
    df, geometry=gpd.points_from_xy(df["lon"], df["lat"]), crs="EPSG:4326"
)

gdf_longo = gpd.GeoDataFrame(
    df_longo,
    geometry=gpd.points_from_xy(df_longo["lon"], df_longo["lat"]),
    crs="EPSG:4326",
)

In [7]:
# Criar GeoDataFrame
geo = gpd.GeoDataFrame(gdf, geometry="geometry", crs="EPSG:4326")
pol_a_log = np.log(geo["pol_a"])
pol_b_log = np.log(geo["pol_b"])
# Colormap para pol_a
colormap = linear.BuGn_06.scale(pol_a_log.min(), pol_a_log.max())
# colormap = linear.BuGn_06.scale(geo["pol_a"].min(), geo["pol_a"].max())
colormap = colormap.to_step(n=10)

m = folium.Map(location=[-15, -55], zoom_start=4)
marker_cluster = MarkerCluster().add_to(m)

for idx, row in geo.iterrows():
    folium.CircleMarker(
        location=[row["lat"], row["lon"]],
        radius=6,
        fill=True,
        color=colormap(row["pol_a"]),
        fill_opacity=0.7,
        popup=folium.Popup(
            f"""
            <b>Estação:</b> {row["station_name"].replace("Estacao", "")}<br>
            <b>Poluente A:</b> {row["pol_a"]}<br>
            <b>Poluente B:</b> {row["pol_b"]}<br>
            <b>Data:</b> {row["sample_dt"]}
        """,
            max_width=250,
        ),
    ).add_to(marker_cluster)

colormap.caption = "ln do poluente A (mg/L)"
colormap.add_to(m)

m

In [8]:
estados_sudeste = ["ES", "MG", "RJ", "SP"]
municipios_sudeste = json_municipios(estados_sudeste)

pontos_coleta = (
    gdf[["station_name", "geometry"]].drop_duplicates().reset_index(drop=True)
)

pontos_coleta_municipios = gpd.sjoin(
    municipios_sudeste, pontos_coleta, how="inner", predicate="intersects"
).drop(columns="index_right")

pontos_coleta_municipios = gpd_merge(
    gdf,
    pontos_coleta_municipios[["station_name", "city", "state"]],
    on="station_name",
    how="left",
)

non_cat_cols = pontos_coleta_municipios.select_dtypes(exclude=["category"]).columns
pontos_coleta_municipios[non_cat_cols] = pontos_coleta_municipios[non_cat_cols].fillna(
    "N/A"
)

In [9]:
cria_mapa_geodataframe(
    pontos_coleta_municipios,
    cols=["station_name", "city", "state"],
    aliases=["Estação", "Cidade", "Estado"],
)


In [10]:
df_longo_formatado = df_longo.copy(deep=True)
df_longo_formatado["pollutant"] = df_longo_formatado["pollutant"].replace(
    {"pol_a": "A", "pol_b": "B"}
)

source = df_longo_formatado[df_longo["station_name"] == "M1"]

points = (
    alt.Chart(source)
    .mark_line(point=alt.OverlayMarkDef(filled=False, fill="white"))
    .encode(
        x=alt.X("sample_dt:T", axis=alt.Axis(format="%d %b", title="Data")),
        y=alt.Y("value:Q", title="Valor (mg/L)"),
        color=alt.Color(
            "pollutant:N",
            legend=alt.Legend(title="Poluente"),
            scale=alt.Scale(range=["green", "purple"]),
        ),
        tooltip=[
            alt.Tooltip("station_name:N", title="Estação"),
            alt.Tooltip("sample_dt:T", title="Data da coleta", format="%d-%m-%Y"),
            alt.Tooltip("value:Q", title="Valor (mg/L)"),
        ],
    )
).add_params()

points


In [11]:
locale = carrega_locale_altair("pt-BR")


In [None]:
def obtem_centroide(pontos: list[shapely.Point] | gpd.GeoSeries) -> tuple[float, float]:
    """Calcula o centroide de um conjunto de pontos.

    Args:
        pontos (list[shapely.Point]|gpd.GeoSeries[shapely.Point]): Conjunto de pontos.

    Returns:
        tuple[float, float]: Coordenadas (latitude, longitude) do centroide.
    """
    if isinstance(pontos, gpd.GeoSeries) and not all(
        isinstance(p, shapely.Point) for p in pontos
    ):
        raise TypeError("GeoSeries deve conter apenas objetos Point.")

    if pontos.empty:
        raise ValueError("A série de pontos está vazia.")
    from shapely.geometry import Polygon

    poly = Polygon([point for point in pontos])
    return poly.convex_hull.centroid.coords[0][::-1]

(-22.066485749096593, -43.01216006644244)

In [None]:
def cria_mapa_com_graficos(
    df_mapa: gpd.GeoDataFrame, df_longo: pd.DataFrame
) -> folium.Map:
    centroide = obtem_centroide(pontos=pontos_coleta_municipios["geometry"])
    m = folium.Map(location=centroide, zoom_start=8)

    df_longo["pollutant"] = df_longo["pollutant"].replace({"pol_a": "A", "pol_b": "B"})

    for _, row in df_mapa.iterrows():
        nome_estacao = row["station_name"]
        cidade = row["city"]
        estado = row["state"]
        lat = row.geometry.y
        lon = row.geometry.x

        # Filtra dados para a estação
        dados_estacao = df_longo[df_longo["station_name"] == nome_estacao]

        if dados_estacao.empty:
            continue

        # Formata o nome dos poluentes (exemplo)

        # Cria gráfico Altair
        chart = (
            alt.Chart(dados_estacao)
            .mark_line(point=alt.OverlayMarkDef(filled=False, fill="white"))
            .encode(
                x=alt.X("sample_dt:T", axis=alt.Axis(format="%d %b", title="Data")),
                y=alt.Y("value:Q", title="Valor (mg/L)"),
                color=alt.Color(
                    "pollutant:N",
                    legend=alt.Legend(title="Poluente"),
                    scale=alt.Scale(range=["green", "purple"]),
                ),
                tooltip=[
                    alt.Tooltip("station_name:N", title="Estação"),
                    alt.Tooltip(
                        "sample_dt:T", title="Data da coleta", format="%d-%m-%Y"
                    ),
                    alt.Tooltip("value:Q", title="Valor (mg/L)"),
                ],
            )
            .properties(
                width=280,
                height=220,
                title=(
                    f"Estação {nome_estacao}"
                    f"{'' if cidade == 'N/A' else f' - {cidade}'}"
                    f"{'' if estado == 'N/A' else f'/{estado}'}"
                ),
            )
            .configure_title(fontSize=14, font="Courier", color="gray", anchor="start")
            .configure_legend(labelFontSize=10, titleFontSize=12)
        ).configure(locale=locale)

        # popup com espaço suficiente
        vega = folium.VegaLite(chart, width=400, height=260)
        popup = folium.Popup(max_width=400)
        vega.add_to(popup)

        tooltip = f"<b>Estação:</b> {nome_estacao}<br><b>Cidade:</b> {cidade}<br><b>Estado:</b> {estado}"
        # Cria marcador
        marker = folium.Marker(
            location=(lat, lon),
            icon=folium.Icon(icon="flask-vial", prefix="fa", color="green"),
            tooltip=tooltip,
            popup=popup,
        )
        popup.add_to(marker)
        marker.add_to(m)

    return m


In [61]:
m = cria_mapa_com_graficos(pontos_coleta_municipios, df_longo)
m.save("mapa_com_graficos.html")
m

In [14]:
pontos_coleta_municipios[pontos_coleta_municipios["city"] == "N/A"].drop_duplicates(
    subset=["station_name"]
).to_file("pontos_coleta_na.gpkg")

In [15]:
municipios_sudeste[municipios_sudeste["state"] == "RJ"].to_file(
    "municipios_rj_val.gpkg"
)