## Catálogo de Dados — MVP INMET (camada Gold)
## 
Este notebook reúne o Catálogo de Dados do MVP, com foco na camada Gold (mvp.gold).
O catálogo inclui:

- descrição das tabelas e colunas (comentários no Databricks)
- perfil dos dados por coluna (mínimo, máximo, % nulos e cardinalidade)
- linhagem dos dados (origem e principais transformações)

O pipeline completo foi implementado no notebook MVP_Dados_INMET.

In [0]:
%sql
USE CATALOG mvp;
USE SCHEMA gold;

SHOW TABLES;


database,tableName,isTemporary
gold,agg_anual_estacao,False
gold,agg_mensal_estacao,False
gold,data_lineage,False
gold,observacoes_gold,False


In [0]:
%sql
USE CATALOG mvp;
USE SCHEMA gold;

CREATE TABLE IF NOT EXISTS data_lineage (
  dataset_name STRING,
  source_platform STRING,
  source_url STRING,
  data_description STRING,
  granularity STRING,
  spatial_scope STRING,
  temporal_scope STRING,
  operational_note STRING,
  transformations_summary STRING,
  created_at TIMESTAMP
);

-- garante que não fica duplicado (caso já exista alguma linha com o mesmo nome)
DELETE FROM data_lineage
WHERE dataset_name = 'INMET (automáticas) — capitais 2020–2024 (diário)';

INSERT INTO data_lineage VALUES (
  'INMET (automáticas) — capitais 2020–2024 (diário)',
  'Kaggle',
  'https://www.kaggle.com/datasets/gregoryoliveira/brazil-weather-information-by-inmet',
  'Observações meteorológicas de estações automáticas do INMET disponibilizadas em dataset no Kaggle (com agregação diária).',
  'Diária (agregada)',
  'Capitais do Brasil (estações selecionadas por otimização operacional do MVP)',
  '2020-01-01 a 2024-12-31 (período selecionado por otimização operacional do MVP)',
  'O recorte para capitais e para o período 2020–2024 foi realizado para reduzir volume de dados e tempo de processamento no Databricks Free, mantendo representatividade geográfica para as análises.',
  'Pipeline no Databricks: staging (download CSV) → bronze (ingestão Delta) → silver (tipagem e validações) → gold (enriquecimento com metadados de estação e agregações mensal/anual).',
  current_timestamp()
);

SELECT * FROM data_lineage;


dataset_name,source_platform,source_url,data_description,granularity,spatial_scope,temporal_scope,operational_note,transformations_summary,created_at
INMET (automáticas) — capitais 2020–2024 (diário),Kaggle,https://www.kaggle.com/datasets/gregoryoliveira/brazil-weather-information-by-inmet,Observações meteorológicas de estações automáticas do INMET disponibilizadas em dataset no Kaggle (com agregação diária).,Diária (agregada),Capitais do Brasil (estações selecionadas por otimização operacional do MVP),2020-01-01 a 2024-12-31 (período selecionado por otimização operacional do MVP),"O recorte para capitais e para o período 2020–2024 foi realizado para reduzir volume de dados e tempo de processamento no Databricks Free, mantendo representatividade geográfica para as análises.",Pipeline no Databricks: staging (download CSV) → bronze (ingestão Delta) → silver (tipagem e validações) → gold (enriquecimento com metadados de estação e agregações mensal/anual).,2025-12-20T19:44:29.895Z


## Linhagem dos dados

Os dados utilizados neste MVP têm origem em observações horárias de estações meteorológicas automáticas do Instituto Nacional de Meteorologia (INMET). Para este trabalho, foi utilizado um dataset disponibilizado no Kaggle, já em formato CSV com agregação diária.

Para viabilizar a execução no ambiente gratuito (Databricks Free) e evitar tempos excessivos de processamento, foi realizado um recorte operacional:

- escopo espacial: estações localizadas nas capitais brasileiras
- escopo temporal: período de 2020 a 2024, que abrange condições meteorológicas de grande escala diferentes (El Niño, La Niña e Neutralidade)

No Databricks, foi construído um pipeline em camadas (staging → bronze → silver → gold), e a camada Gold foi enriquecida com metadados das estações (região, UF, cidade e coordenadas).

https://www.kaggle.com/datasets/gregoryoliveira/brazil-weather-information-by-inmet

In [0]:
%sql
USE CATALOG mvp;
USE SCHEMA gold;

CREATE OR REPLACE VIEW data_lineage_vertical AS
SELECT dataset_name AS dataset, 'Fonte' AS item, source_platform AS detalhe FROM data_lineage
UNION ALL
SELECT dataset_name, 'URL', source_url FROM data_lineage
UNION ALL
SELECT dataset_name, 'Descrição', data_description FROM data_lineage
UNION ALL
SELECT dataset_name, 'Granularidade', granularity FROM data_lineage
UNION ALL
SELECT dataset_name, 'Escopo espacial', spatial_scope FROM data_lineage
UNION ALL
SELECT dataset_name, 'Escopo temporal', temporal_scope FROM data_lineage
UNION ALL
SELECT dataset_name, 'Nota operacional', operational_note FROM data_lineage
UNION ALL
SELECT dataset_name, 'Transformações (resumo)', transformations_summary FROM data_lineage;

SELECT *
FROM data_lineage_vertical
ORDER BY dataset, item;


dataset,item,detalhe
INMET (automáticas) — capitais 2020–2024 (diário),Descrição,Observações meteorológicas de estações automáticas do INMET disponibilizadas em dataset no Kaggle (com agregação diária).
INMET (automáticas) — capitais 2020–2024 (diário),Escopo espacial,Capitais do Brasil (estações selecionadas por otimização operacional do MVP)
INMET (automáticas) — capitais 2020–2024 (diário),Escopo temporal,2020-01-01 a 2024-12-31 (período selecionado por otimização operacional do MVP)
INMET (automáticas) — capitais 2020–2024 (diário),Fonte,Kaggle
INMET (automáticas) — capitais 2020–2024 (diário),Granularidade,Diária (agregada)
INMET (automáticas) — capitais 2020–2024 (diário),Nota operacional,"O recorte para capitais e para o período 2020–2024 foi realizado para reduzir volume de dados e tempo de processamento no Databricks Free, mantendo representatividade geográfica para as análises."
INMET (automáticas) — capitais 2020–2024 (diário),Transformações (resumo),Pipeline no Databricks: staging (download CSV) → bronze (ingestão Delta) → silver (tipagem e validações) → gold (enriquecimento com metadados de estação e agregações mensal/anual).
INMET (automáticas) — capitais 2020–2024 (diário),URL,https://www.kaggle.com/datasets/gregoryoliveira/brazil-weather-information-by-inmet


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType, StringType, DateType, TimestampType

spark.sql("USE CATALOG mvp")
spark.sql("USE SCHEMA gold")

gold_tables = [
    "mvp.gold.observacoes_gold",
    "mvp.gold.agg_anual_estacao",
    "mvp.gold.agg_mensal_estacao",
]

# ----------------------------
# 1) Mapa de descrições (dicionário)
# ----------------------------
desc_map = {
    "observacoes_gold": {
        "id_station": ("Identificador da estação (código INMET) usado para junção com o cadastro.", "-", "Formato típico A###; não nulo"),
        "region": ("Região geográfica associada à estação.", "-", "Categorias: N, NE, CO, SE, S"),
        "state": ("Unidade federativa (UF) associada à estação.", "-", "Siglas das UFs"),
        "city_station": ("Nome da cidade/localidade associada à estação.", "-", "Texto"),
        "lat": ("Latitude da estação.", "graus", "[-90, 90]"),
        "lon": ("Longitude da estação.", "graus", "[-180, 180]"),
        "lvl": ("Altitude da estação.", "m", ">= 0 (esperado)"),
        "date": ("Data de referência da observação diária.", "data", "Recorte do MVP: 2020–2024"),
        "year": ("Ano extraído de date para agregações.", "ano", "2020–2024"),
        "month": ("Mês extraído de date para agregações.", "mês", "1–12"),
        "rain_max": ("Precipitação acumulada diária (total do dia).", "mm", ">= 0"),
        "rad_max": ("Radiação diária (métrica conforme dataset fonte).", "(conforme fonte)", ">= 0 (esperado)"),
        "temp_avg": ("Temperatura média diária.", "°C", "Faixa esperada (aprox.) [-10, 45]"),
        "temp_max": ("Temperatura máxima diária.", "°C", "Faixa esperada (aprox.) [-10, 50]"),
        "temp_min": ("Temperatura mínima diária.", "°C", "Faixa esperada (aprox.) [-15, 40]"),
        "hum_max": ("Umidade relativa máxima diária.", "%", "[0, 100]"),
        "hum_min": ("Umidade relativa mínima diária.", "%", "[0, 100]"),
        "wind_max": ("Vento máximo diário (métrica conforme dataset fonte).", "(conforme fonte)", ">= 0 (esperado)"),
        "wind_avg": ("Vento médio diário (métrica conforme dataset fonte).", "(conforme fonte)", ">= 0 (esperado)"),
    },
    "agg_anual_estacao": {
        "id_station": ("Identificador da estação (código INMET).", "-", "Formato típico A###"),
        "region": ("Região associada à estação.", "-", "N, NE, CO, SE, S"),
        "state": ("UF associada à estação.", "-", "Siglas das UFs"),
        "city_station": ("Cidade/localidade da estação.", "-", "Texto"),
        "year": ("Ano de referência da agregação.", "ano", "2020–2024"),
        "n_dias": ("Número de dias com registros no ano.", "dias", ">= 0"),
        "chuva_total_ano": ("Precipitação acumulada no ano (soma do acumulado diário).", "mm", ">= 0"),
        "temp_media_ano": ("Temperatura média anual (média das médias diárias).", "°C", "Faixa esperada (aprox.)"),
        "temp_max_ano": ("Maior temperatura máxima diária observada no ano.", "°C", "Faixa esperada (aprox.)"),
        "temp_min_ano": ("Menor temperatura mínima diária observada no ano.", "°C", "Faixa esperada (aprox.)"),
        "vento_medio_ano": ("Vento médio anual (média do vento médio diário).", "(conforme fonte)", ">= 0 (esperado)"),
        "vento_max_ano": ("Maior vento máximo diário observado no ano.", "(conforme fonte)", ">= 0 (esperado)"),
    },
    "agg_mensal_estacao": {
        "id_station": ("Identificador da estação (código INMET).", "-", "Formato típico A###"),
        "region": ("Região associada à estação.", "-", "N, NE, CO, SE, S"),
        "state": ("UF associada à estação.", "-", "Siglas das UFs"),
        "city_station": ("Cidade/localidade da estação.", "-", "Texto"),
        "year": ("Ano de referência da agregação.", "ano", "2020–2024"),
        "month": ("Mês de referência da agregação.", "mês", "1–12"),
        "n_dias": ("Número de dias com registros no mês.", "dias", ">= 0"),
        "chuva_total_mes": ("Precipitação acumulada no mês (soma do acumulado diário).", "mm", ">= 0"),
        "temp_media_mes": ("Temperatura média mensal (média das médias diárias).", "°C", "Faixa esperada (aprox.)"),
        "temp_max_mes": ("Maior temperatura máxima diária observada no mês.", "°C", "Faixa esperada (aprox.)"),
        "temp_min_mes": ("Menor temperatura mínima diária observada no mês.", "°C", "Faixa esperada (aprox.)"),
        "umid_max_media_mes": ("Média mensal da umidade máxima diária.", "%", "[0, 100]"),
        "umid_min_media_mes": ("Média mensal da umidade mínima diária.", "%", "[0, 100]"),
    }
}

# ----------------------------
# 2) Gera data_catalog (perfil: min/max/%null/distinct)
# ----------------------------
catalog_rows = []
dict_rows = []

for full in gold_tables:
    df = spark.table(full)
    schema_name, table_name = full.split(".")[0] + "." + full.split(".")[1], full.split(".")[2]
    total = df.count()

    for field in df.schema.fields:
        c = field.name
        dtype = field.dataType
        dtype_str = str(dtype)

        # perfil
        nulls = df.filter(F.col(c).isNull()).count()
        pct_null = round(100.0 * nulls / total, 2) if total else 0.0

        minv = maxv = None
        distinct_count = None

        if isinstance(dtype, NumericType):
            agg = df.select(F.min(F.col(c)).alias("minv"), F.max(F.col(c)).alias("maxv")).collect()[0]
            minv, maxv = agg["minv"], agg["maxv"]

        if isinstance(dtype, (StringType, DateType, TimestampType)):
            distinct_count = df.select(c).distinct().count()

        catalog_rows.append((
            schema_name, table_name, c, dtype_str,
            total, nulls, pct_null,
            str(minv) if minv is not None else None,
            str(maxv) if maxv is not None else None,
            distinct_count
        ))

        # dicionário (descrição/uni/domínio esperado)
        d = desc_map.get(table_name, {}).get(c, None)
        if d is None:
            # fallback razoável quando a coluna não está mapeada
            d = (f"Coluna da tabela {table_name}.", "-", "Conforme dataset/pipeline")
        description, unit, expected_domain = d

        dict_rows.append((
            table_name, c, dtype_str,
            description, unit, expected_domain
        ))

data_catalog_df = spark.createDataFrame(
    catalog_rows,
    schema="""
        schema_name STRING,
        table_name STRING,
        column_name STRING,
        data_type STRING,
        row_count BIGINT,
        null_count BIGINT,
        pct_null DOUBLE,
        min_value STRING,
        max_value STRING,
        distinct_count BIGINT
    """
)

data_dictionary_df = spark.createDataFrame(
    dict_rows,
    schema="""
        table_name STRING,
        column_name STRING,
        data_type STRING,
        description STRING,
        unit STRING,
        expected_domain STRING
    """
).dropDuplicates(["table_name", "column_name"])

data_catalog_df.write.format("delta").mode("overwrite").saveAsTable("data_catalog")
data_dictionary_df.write.format("delta").mode("overwrite").saveAsTable("data_dictionary")

# ----------------------------
# 3) View final para leitura
# ----------------------------
spark.sql("""
CREATE OR REPLACE VIEW catalogo_gold_final AS
SELECT
  d.table_name,
  d.column_name,
  d.data_type,
  d.description,
  d.unit,
  d.expected_domain,
  c.row_count,
  c.null_count,
  c.pct_null,
  c.min_value,
  c.max_value,
  c.distinct_count
FROM data_dictionary d
LEFT JOIN data_catalog c
  ON d.table_name = c.table_name
 AND d.column_name = c.column_name
ORDER BY d.table_name, d.column_name
""")

print("✅ Catálogo gerado: mvp.gold.data_dictionary, mvp.gold.data_catalog e view mvp.gold.catalogo_gold_final")
display(spark.table("mvp.gold.catalogo_gold_final"))


✅ Catálogo gerado: mvp.gold.data_dictionary, mvp.gold.data_catalog e view mvp.gold.catalogo_gold_final


table_name,column_name,data_type,description,unit,expected_domain,row_count,null_count,pct_null,min_value,max_value,distinct_count
agg_anual_estacao,chuva_total_ano,DoubleType(),Precipitação acumulada no ano (soma do acumulado diário).,mm,>= 0,142,1,0.7,0.0,2799.6000000000004,
agg_anual_estacao,city_station,StringType(),Cidade/localidade da estação.,-,Texto,142,0,0.0,,,30.0
agg_anual_estacao,id_station,StringType(),Identificador da estação (código INMET).,-,Formato típico A###,142,0,0.0,,,30.0
agg_anual_estacao,n_dias,LongType(),Número de dias com registros no ano.,dias,>= 0,142,0,0.0,21.0,366.0,
agg_anual_estacao,region,StringType(),Região associada à estação.,-,"N, NE, CO, SE, S",142,0,0.0,,,5.0
agg_anual_estacao,state,StringType(),UF associada à estação.,-,Siglas das UFs,142,0,0.0,,,27.0
agg_anual_estacao,temp_max_ano,DoubleType(),Maior temperatura máxima diária observada no ano.,°C,Faixa esperada (aprox.),142,0,0.0,31.5,43.7,
agg_anual_estacao,temp_media_ano,DoubleType(),Temperatura média anual (média das médias diárias).,°C,Faixa esperada (aprox.),142,0,0.0,17.69965457397261,30.128115878612714,
agg_anual_estacao,temp_min_ano,DoubleType(),Menor temperatura mínima diária observada no ano.,°C,Faixa esperada (aprox.),142,0,0.0,-5.0,22.3,
agg_anual_estacao,vento_max_ano,DoubleType(),Maior vento máximo diário observado no ano.,(conforme fonte),>= 0 (esperado),142,0,0.0,9.9,39.0,
